Files
ailine/backend/app/main_graph/utils/main_graph_builder.py
root afddea61f8
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 6m37s
恢复循环推理架构,子图执行完回到react_reason
- 恢复子图→react_reason的边
- 保持intent.py中的逻辑:检测到subgraph_completed就返回DIRECT_RESPONSE
- 保持llm_call中的逻辑:检测到final_result就直接返回
2026-05-02 09:11:38 +08:00

284 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
整合后的完整主图构建器 - 所有节点都直接操作 MainGraphState
"""
from app.main_graph.graph import StateGraph, START, END
from typing import Dict, Any, Optional
from langchain_core.runnables.config import RunnableConfig
from app.main_graph.state import MainGraphState
from app.main_graph.nodes.react_nodes import (
init_state_node,
react_reason_node,
web_search_node,
error_handling_node,
route_by_reasoning
)
from app.main_graph.nodes.llm_call import create_llm_call_node
from app.main_graph.nodes.rag_nodes import rag_retrieve_node
from app.main_graph.nodes.retrieve_memory import create_retrieve_memory_node
from app.main_graph.nodes.memory_trigger import memory_trigger_node, set_mem0_client
from app.main_graph.nodes.summarize import create_summarize_node
from app.main_graph.nodes.finalize import finalize_node
from app.subgraphs.contact import build_contact_subgraph
from app.subgraphs.dictionary import build_dictionary_subgraph
from app.subgraphs.news_analysis import build_news_analysis_subgraph
from app.memory.mem0_client import Mem0Client
from app.logger import info, debug
# ========== 检查是否需要总结 ==========
def should_summarize(state: MainGraphState) -> str:
"""
检查是否需要总结对话(对话足够长时)
Args:
state: 当前图状态
Returns:
"summarize""finalize"
"""
if state.turns_since_last_summary >= 5: # 每5轮对话总结一次
return "summarize"
else:
return "finalize"
# ========== 子图包装器(处理子图错误传递)==========
def wrap_subgraph_for_error_handling(subgraph, name: str):
"""
包装子图,使其错误能传递给主图
Args:
subgraph: 编译好的子图
name: 子图名称(用于错误标识)
Returns: 包装后的节点函数
"""
def wrapped_node(state: MainGraphState) -> MainGraphState:
try:
# 调用子图
result = subgraph.invoke(state)
# 更新主图状态
subgraph_result = None
if name == "contact":
state.contact_result = result
subgraph_result = result.get("final_result", "")
elif name == "dictionary":
state.dictionary_result = result
subgraph_result = result.get("final_result", "")
elif name == "news_analysis":
state.news_result = result
subgraph_result = result.get("final_result", "")
# 关键:设置最终结果,这样就不需要再回到 react_reason 了
if subgraph_result:
state.final_result = subgraph_result
else:
state.final_result = "子图执行完成"
# 标记成功
state.success = True
state.current_phase = "done"
# 标记不再需要推理,避免循环
state.reasoning_history.append({
"step": state.reasoning_step,
"action": "subgraph_completed",
"confidence": 1.0,
"reasoning": f"{name}子图执行完成",
"timestamp": datetime.now().isoformat()
})
return state
except Exception as e:
# 捕获子图错误,传递给主图
from app.main_graph.state import ErrorRecord, ErrorSeverity
from datetime import datetime
error_record = ErrorRecord(
error_type=f"{name}SubgraphError",
error_message=str(e),
severity=ErrorSeverity.WARNING,
source=f"{name}_subgraph",
timestamp=datetime.now().isoformat(),
retry_count=0,
max_retries=1,
context={"user_query": state.user_query}
)
state.errors.append(error_record)
state.current_error = error_record
state.current_phase = "error_handling"
state.success = False
return state
return wrapped_node
# ========== 主图构建 ==========
def build_react_main_graph(llm=None, tools=None, mem0_client=None) -> StateGraph:
"""
构建整合后的完整主图
完整流程:
START
retrieve_memory (从Mem0检索长期记忆)
memory_trigger (记忆触发器)
init_state (初始化)
react_reason (推理) ←───────────────────────┐
↓ │
条件路由 │
├─ rag_retrieve →─────────────────────────┤
├─ contact_subgraph →─────────────────────┤
├─ dictionary_subgraph →──────────────────┤
├─ news_analysis_subgraph →───────────────┤
├─ web_search →───────────────────────────┤
├─ handle_error → (重试或结束) ────────────┤
└─ llm_call (大模型调用) ←────────────────┘
检查:需要总结吗?
├─ 是 → summarize (提交给Mem0存储)
└─ 否 → (跳过)
finalize (发送完成事件)
END
"""
# 创建图
graph = StateGraph(MainGraphState)
# 设置全局 mem0_client
if mem0_client:
set_mem0_client(mem0_client)
# 创建节点
llm_node = None
if llm is not None:
llm_node = create_llm_call_node(llm, tools or [])
retrieve_memory_node = None
summarize_node = None
if mem0_client:
retrieve_memory_node = create_retrieve_memory_node(mem0_client)
summarize_node = create_summarize_node(mem0_client)
# ========== 添加节点 ==========
# 第一阶段:记忆检索
if retrieve_memory_node:
graph.add_node("retrieve_memory", retrieve_memory_node)
graph.add_node("memory_trigger", memory_trigger_node)
# 第二阶段React 循环推理
graph.add_node("init_state", init_state_node)
graph.add_node("react_reason", react_reason_node)
graph.add_node("rag_retrieve", rag_retrieve_node)
graph.add_node("web_search", web_search_node)
graph.add_node("handle_error", error_handling_node)
if llm_node is not None:
graph.add_node("llm_call", llm_node)
# 子图节点
contact_graph = build_contact_subgraph()
dictionary_graph = build_dictionary_subgraph()
news_analysis_graph = build_news_analysis_subgraph()
graph.add_node(
"contact_subgraph",
wrap_subgraph_for_error_handling(contact_graph.compile(), "contact")
)
graph.add_node(
"dictionary_subgraph",
wrap_subgraph_for_error_handling(dictionary_graph.compile(), "dictionary")
)
graph.add_node(
"news_analysis_subgraph",
wrap_subgraph_for_error_handling(news_analysis_graph.compile(), "news_analysis")
)
# 第三阶段:完成处理
if summarize_node:
graph.add_node("summarize", summarize_node)
graph.add_node("finalize", finalize_node)
# ========== 添加边 ==========
# 第一阶段:记忆检索
if retrieve_memory_node:
graph.add_edge(START, "retrieve_memory")
graph.add_edge("retrieve_memory", "memory_trigger")
else:
graph.add_edge(START, "memory_trigger")
# 进入第二阶段
graph.add_edge("memory_trigger", "init_state")
graph.add_edge("init_state", "react_reason")
# 第二阶段React 循环推理
graph.add_conditional_edges(
"react_reason",
route_by_reasoning,
{
"rag_retrieve": "rag_retrieve",
"web_search": "web_search",
"contact_subgraph": "contact_subgraph",
"dictionary_subgraph": "dictionary_subgraph",
"news_analysis_subgraph": "news_analysis_subgraph",
"handle_error": "handle_error",
"llm_call": "llm_call"
}
)
# 循环边rag、web_search、子图、error都回到reason
graph.add_edge("rag_retrieve", "react_reason")
graph.add_edge("web_search", "react_reason")
graph.add_edge("contact_subgraph", "react_reason")
graph.add_edge("dictionary_subgraph", "react_reason")
graph.add_edge("news_analysis_subgraph", "react_reason")
graph.add_edge("handle_error", "react_reason")
# 第三阶段llm_call 后进入完成处理
if llm_node is not None:
if summarize_node:
# 检查是否需要总结
graph.add_conditional_edges(
"llm_call",
should_summarize,
{
"summarize": "summarize",
"finalize": "finalize"
}
)
graph.add_edge("summarize", "finalize")
else:
# 没有 summarize 节点,直接 finalize
graph.add_edge("llm_call", "finalize")
# 完成
graph.add_edge("finalize", END)
info("✅ [图构建] 整合后的完整主图构建完成")
return graph
# ========== 兼容性:保留旧的函数名 ==========
def build_main_graph() -> StateGraph:
"""
兼容性函数:旧代码调用 build_main_graph() 时返回 React 版本
"""
return build_react_main_graph()
# ========== 导出 ==========
__all__ = [
"build_react_main_graph",
"build_main_graph",
"wrap_subgraph_for_error_handling"
]