""" 整合后的完整主图构建器 - 所有节点都直接操作 MainGraphState """ from app.main_graph.graph import StateGraph, START, END from typing import Dict, Any, Optional from langchain_core.runnables.config import RunnableConfig from app.main_graph.state import MainGraphState from app.main_graph.nodes.react_nodes import ( init_state_node, react_reason_node, web_search_node, error_handling_node, route_by_reasoning ) from app.main_graph.nodes.llm_call import create_llm_call_node from app.main_graph.nodes.rag_nodes import rag_retrieve_node from app.main_graph.nodes.retrieve_memory import create_retrieve_memory_node from app.main_graph.nodes.memory_trigger import memory_trigger_node, set_mem0_client from app.main_graph.nodes.summarize import create_summarize_node from app.main_graph.nodes.finalize import finalize_node from app.subgraphs.contact import build_contact_subgraph from app.subgraphs.dictionary import build_dictionary_subgraph from app.subgraphs.news_analysis import build_news_analysis_subgraph from app.memory.mem0_client import Mem0Client from app.logger import info, debug # ========== 检查是否需要总结 ========== def should_summarize(state: MainGraphState) -> str: """ 检查是否需要总结对话(对话足够长时) Args: state: 当前图状态 Returns: "summarize" 或 "finalize" """ if state.turns_since_last_summary >= 5: # 每5轮对话总结一次 return "summarize" else: return "finalize" # ========== 子图包装器(处理子图错误传递)========== def wrap_subgraph_for_error_handling(subgraph, name: str): """ 包装子图,使其错误能传递给主图 Args: subgraph: 编译好的子图 name: 子图名称(用于错误标识) Returns: 包装后的节点函数 """ def wrapped_node(state: MainGraphState) -> MainGraphState: try: # 调用子图 result = subgraph.invoke(state) # 更新主图状态 subgraph_result = None if name == "contact": state.contact_result = result subgraph_result = result.get("final_result", "") elif name == "dictionary": state.dictionary_result = result subgraph_result = result.get("final_result", "") elif name == "news_analysis": state.news_result = result subgraph_result = result.get("final_result", "") # 关键:设置最终结果,这样就不需要再回到 react_reason 了 if subgraph_result: state.final_result = subgraph_result else: state.final_result = "子图执行完成" # 标记成功 state.success = True state.current_phase = "done" # 标记不再需要推理,避免循环 state.reasoning_history.append({ "step": state.reasoning_step, "action": "subgraph_completed", "confidence": 1.0, "reasoning": f"{name}子图执行完成", "timestamp": datetime.now().isoformat() }) return state except Exception as e: # 捕获子图错误,传递给主图 from app.main_graph.state import ErrorRecord, ErrorSeverity from datetime import datetime error_record = ErrorRecord( error_type=f"{name}SubgraphError", error_message=str(e), severity=ErrorSeverity.WARNING, source=f"{name}_subgraph", timestamp=datetime.now().isoformat(), retry_count=0, max_retries=1, context={"user_query": state.user_query} ) state.errors.append(error_record) state.current_error = error_record state.current_phase = "error_handling" state.success = False return state return wrapped_node # ========== 主图构建 ========== def build_react_main_graph(llm=None, tools=None, mem0_client=None) -> StateGraph: """ 构建整合后的完整主图 完整流程: START ↓ retrieve_memory (从Mem0检索长期记忆) ↓ memory_trigger (记忆触发器) ↓ init_state (初始化) ↓ react_reason (推理) ←───────────────────────┐ ↓ │ 条件路由 │ ├─ rag_retrieve →─────────────────────────┤ ├─ contact_subgraph →─────────────────────┤ ├─ dictionary_subgraph →──────────────────┤ ├─ news_analysis_subgraph →───────────────┤ ├─ web_search →───────────────────────────┤ ├─ handle_error → (重试或结束) ────────────┤ └─ llm_call (大模型调用) ←────────────────┘ ↓ 检查:需要总结吗? ├─ 是 → summarize (提交给Mem0存储) └─ 否 → (跳过) ↓ finalize (发送完成事件) ↓ END """ # 创建图 graph = StateGraph(MainGraphState) # 设置全局 mem0_client if mem0_client: set_mem0_client(mem0_client) # 创建节点 llm_node = None if llm is not None: llm_node = create_llm_call_node(llm, tools or []) retrieve_memory_node = None summarize_node = None if mem0_client: retrieve_memory_node = create_retrieve_memory_node(mem0_client) summarize_node = create_summarize_node(mem0_client) # ========== 添加节点 ========== # 第一阶段:记忆检索 if retrieve_memory_node: graph.add_node("retrieve_memory", retrieve_memory_node) graph.add_node("memory_trigger", memory_trigger_node) # 第二阶段:React 循环推理 graph.add_node("init_state", init_state_node) graph.add_node("react_reason", react_reason_node) graph.add_node("rag_retrieve", rag_retrieve_node) graph.add_node("web_search", web_search_node) graph.add_node("handle_error", error_handling_node) if llm_node is not None: graph.add_node("llm_call", llm_node) # 子图节点 contact_graph = build_contact_subgraph() dictionary_graph = build_dictionary_subgraph() news_analysis_graph = build_news_analysis_subgraph() graph.add_node( "contact_subgraph", wrap_subgraph_for_error_handling(contact_graph.compile(), "contact") ) graph.add_node( "dictionary_subgraph", wrap_subgraph_for_error_handling(dictionary_graph.compile(), "dictionary") ) graph.add_node( "news_analysis_subgraph", wrap_subgraph_for_error_handling(news_analysis_graph.compile(), "news_analysis") ) # 第三阶段:完成处理 if summarize_node: graph.add_node("summarize", summarize_node) graph.add_node("finalize", finalize_node) # ========== 添加边 ========== # 第一阶段:记忆检索 if retrieve_memory_node: graph.add_edge(START, "retrieve_memory") graph.add_edge("retrieve_memory", "memory_trigger") else: graph.add_edge(START, "memory_trigger") # 进入第二阶段 graph.add_edge("memory_trigger", "init_state") graph.add_edge("init_state", "react_reason") # 第二阶段:React 循环推理 graph.add_conditional_edges( "react_reason", route_by_reasoning, { "rag_retrieve": "rag_retrieve", "web_search": "web_search", "contact_subgraph": "contact_subgraph", "dictionary_subgraph": "dictionary_subgraph", "news_analysis_subgraph": "news_analysis_subgraph", "handle_error": "handle_error", "llm_call": "llm_call" } ) # 循环边(rag、web_search、error 回到 reason) graph.add_edge("rag_retrieve", "react_reason") graph.add_edge("web_search", "react_reason") graph.add_edge("handle_error", "react_reason") # 关键修改:子图执行完后直接去 finalize(避免循环) graph.add_edge("contact_subgraph", "finalize") graph.add_edge("dictionary_subgraph", "finalize") graph.add_edge("news_analysis_subgraph", "finalize") # 第三阶段:llm_call 后进入完成处理 if llm_node is not None: if summarize_node: # 检查是否需要总结 graph.add_conditional_edges( "llm_call", should_summarize, { "summarize": "summarize", "finalize": "finalize" } ) graph.add_edge("summarize", "finalize") else: # 没有 summarize 节点,直接 finalize graph.add_edge("llm_call", "finalize") # 完成 graph.add_edge("finalize", END) info("✅ [图构建] 整合后的完整主图构建完成") return graph # ========== 兼容性:保留旧的函数名 ========== def build_main_graph() -> StateGraph: """ 兼容性函数:旧代码调用 build_main_graph() 时返回 React 版本 """ return build_react_main_graph() # ========== 导出 ========== __all__ = [ "build_react_main_graph", "build_main_graph", "wrap_subgraph_for_error_handling" ]