Files
ailine/backend/app/main_graph/utils/main_graph_builder.py
root 128aad0c22
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m31s
refactor: 重构快速路径流程,统一通过 llm_call 输出
- 重构 fast_paths.py,让 fast_chitchat 和 fast_rag 都进入 llm_call 而不是直接设置 final_result
- 修改 check_fast_path_success 函数返回 'llm_call' 而不是 'success'
- 更新 main_graph_builder.py 的条件边配置,支持路由到 llm_call
- 在快速路径节点中添加清除 state.final_result 的逻辑,避免复用旧结果
- 重构 RAG 工具初始化方式,使用模块级变量管理
- 修改 finalize.py 让它返回 final_result
- 更新 agent_service.py 的 RAG 工具注入方式
- 简化 hybrid_router.py 的代码结构
- 清理 rag_nodes.py 的全局变量相关代码
- 更新相关测试文件
2026-05-05 04:32:42 +08:00

371 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
整合后的完整主图构建器 - 所有节点都直接操作 MainGraphState
"""
from ..graph import StateGraph, START, END
from typing import Dict, Any, Optional
from langchain_core.runnables.config import RunnableConfig
from ..state import MainGraphState
from ..nodes.reasoning import react_reason_node
from ..nodes.web_search import web_search_node
from ..nodes.error_handling import error_handling_node
from ..nodes.routing import init_state_node, route_by_reasoning
from ..nodes.hybrid_router import (
hybrid_router_node,
route_from_hybrid_decision,
check_fast_path_success,
)
from ..nodes.fast_paths import (
fast_chitchat_node,
fast_rag_node,
fast_tool_node,
)
from ..nodes.llm_call import create_llm_call_node
from ..nodes.rag_nodes import rag_retrieve_node
from ..nodes.retrieve_memory import create_retrieve_memory_node
from ..nodes.memory_trigger import memory_trigger_node, set_mem0_client
from ..nodes.summarize import create_summarize_node
from ..nodes.finalize import finalize_node
from ...subgraphs.contact import build_contact_subgraph
from ...subgraphs.dictionary import build_dictionary_subgraph
from ...subgraphs.news_analysis import build_news_analysis_subgraph
from ...memory.mem0_client import Mem0Client
from ...logger import info, debug
# ========== 检查是否需要总结 ==========
def should_summarize(state: MainGraphState) -> str:
"""
检查是否需要总结对话(对话足够长时)
Args:
state: 当前图状态
Returns:
"summarize""finalize"
"""
if state.turns_since_last_summary >= 5: # 每5轮对话总结一次
return "summarize"
else:
return "finalize"
# ========== 子图包装器(处理子图错误传递)==========
def wrap_subgraph_for_error_handling(subgraph, name: str):
"""
包装子图,使其错误能传递给主图
Args:
subgraph: 编译好的子图
name: 子图名称(用于错误标识)
Returns: 包装后的节点函数
"""
async def wrapped_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState:
# 发送子图开始事件
if config:
try:
from langchain_core.callbacks.manager import adispatch_custom_event
callbacks = config.get("callbacks")
if callbacks:
await adispatch_custom_event(
"react_reasoning",
{
"step": state.reasoning_step,
"action": f"{name}_subgraph_start",
"confidence": 1.0,
"reasoning": f"开始执行 {name} 子图..."
},
callbacks=callbacks
)
except Exception as e:
info(f"[{name}_subgraph] 无法发送开始事件: {e}")
try:
# 调用子图
result = subgraph.invoke(state)
# 更新主图状态
subgraph_result = None
if name == "contact":
state.contact_result = result
subgraph_result = result.get("final_result", "")
elif name == "dictionary":
state.dictionary_result = result
subgraph_result = result.get("final_result", "")
elif name == "news_analysis":
state.news_result = result
subgraph_result = result.get("final_result", "")
# 关键:设置最终结果
if subgraph_result:
state.final_result = subgraph_result
else:
state.final_result = "子图执行完成"
# 标记成功
state.success = True
state.current_phase = "done"
# 标记不再需要推理,避免循环
state.reasoning_history.append({
"step": state.reasoning_step,
"action": "subgraph_completed",
"confidence": 1.0,
"reasoning": f"{name}子图执行完成",
"timestamp": datetime.now().isoformat()
})
# 发送子图完成事件
if config:
try:
from langchain_core.callbacks.manager import adispatch_custom_event
callbacks = config.get("callbacks")
if callbacks:
await adispatch_custom_event(
"react_reasoning",
{
"step": state.reasoning_step,
"action": f"{name}_subgraph_complete",
"confidence": 1.0,
"reasoning": f"{name} 子图执行完成"
},
callbacks=callbacks
)
except Exception as e:
info(f"[{name}_subgraph] 无法发送完成事件: {e}")
return state
except Exception as e:
# 捕获子图错误,传递给主图
from ..state import ErrorRecord, ErrorSeverity
from datetime import datetime
error_record = ErrorRecord(
error_type=f"{name}SubgraphError",
error_message=str(e),
severity=ErrorSeverity.WARNING,
source=f"{name}_subgraph",
timestamp=datetime.now().isoformat(),
retry_count=0,
max_retries=1,
context={"user_query": state.user_query}
)
state.errors.append(error_record)
state.current_error = error_record
state.current_phase = "error_handling"
state.success = False
# 发送子图错误事件
if config:
try:
from langchain_core.callbacks.manager import adispatch_custom_event
callbacks = config.get("callbacks")
if callbacks:
await adispatch_custom_event(
"react_reasoning",
{
"step": state.reasoning_step,
"action": f"{name}_subgraph_error",
"confidence": 1.0,
"reasoning": f"{name} 子图执行失败: {str(e)}"
},
callbacks=callbacks
)
except Exception as e:
info(f"[{name}_subgraph] 无法发送错误事件: {e}")
return state
return wrapped_node
# ========== 主图构建 ==========
def build_react_main_graph(llm=None, tools=None, mem0_client=None, use_hybrid_router: bool = True) -> StateGraph:
"""
构建整合后的完整主图(支持混合路由)
Args:
llm: LangChain ChatModel 实例
tools: 工具列表
mem0_client: Mem0 客户端实例
use_hybrid_router: 是否使用混合路由(快速路径 + React 循环)
Returns:
StateGraph: 构建好的图
"""
# 创建图
graph = StateGraph(MainGraphState)
# 设置全局 mem0_client
if mem0_client:
set_mem0_client(mem0_client)
# 创建节点
llm_node = None
if llm is not None:
llm_node = create_llm_call_node(llm, tools or [])
retrieve_memory_node = None
summarize_node = None
if mem0_client:
retrieve_memory_node = create_retrieve_memory_node(mem0_client)
summarize_node = create_summarize_node(mem0_client)
# ========== 添加节点 ==========
# 第一阶段:记忆检索
if retrieve_memory_node:
graph.add_node("retrieve_memory", retrieve_memory_node)
graph.add_node("memory_trigger", memory_trigger_node)
# 第二阶段:初始化
graph.add_node("init_state", init_state_node)
# ========== 混合路由节点(如果启用) ==========
if use_hybrid_router:
graph.add_node("hybrid_router", hybrid_router_node)
graph.add_node("fast_chitchat", fast_chitchat_node)
graph.add_node("fast_rag", fast_rag_node)
graph.add_node("fast_tool", fast_tool_node)
# 第三阶段React 循环推理(始终保留)
graph.add_node("react_reason", react_reason_node)
graph.add_node("rag_retrieve", rag_retrieve_node)
graph.add_node("web_search", web_search_node)
graph.add_node("handle_error", error_handling_node)
if llm_node is not None:
graph.add_node("llm_call", llm_node)
# 子图节点
contact_graph = build_contact_subgraph()
dictionary_graph = build_dictionary_subgraph()
news_analysis_graph = build_news_analysis_subgraph()
graph.add_node(
"contact_subgraph",
wrap_subgraph_for_error_handling(contact_graph.compile(), "contact")
)
graph.add_node(
"dictionary_subgraph",
wrap_subgraph_for_error_handling(dictionary_graph.compile(), "dictionary")
)
graph.add_node(
"news_analysis_subgraph",
wrap_subgraph_for_error_handling(news_analysis_graph.compile(), "news_analysis")
)
# 第四阶段:完成处理
if summarize_node:
graph.add_node("summarize", summarize_node)
graph.add_node("finalize", finalize_node)
# ========== 添加边 ==========
# 第一阶段:记忆检索
if retrieve_memory_node:
graph.add_edge(START, "retrieve_memory")
graph.add_edge("retrieve_memory", "memory_trigger")
else:
graph.add_edge(START, "memory_trigger")
# 进入初始化
graph.add_edge("memory_trigger", "init_state")
# ========== 混合路由分支(如果启用) ==========
if use_hybrid_router:
graph.add_edge("init_state", "hybrid_router")
# 从 hybrid_router 条件分支
graph.add_conditional_edges(
"hybrid_router",
route_from_hybrid_decision,
{
"fast_chitchat": "fast_chitchat",
"fast_rag": "fast_rag",
"fast_tool": "fast_tool",
"react_loop": "react_reason"
}
)
# 快速路径的完成检查
for fast_node in ["fast_chitchat", "fast_rag", "fast_tool"]:
graph.add_conditional_edges(
fast_node,
check_fast_path_success,
{
"llm_call": "llm_call",
"escalate": "react_reason"
}
)
info(f"✅ [图构建] 混合路由模式已启用")
else:
# 无混合路由,直接到 react_reason
graph.add_edge("init_state", "react_reason")
info(f"✅ [图构建] 纯 React 模式")
# ========== React 循环边(始终保留) ==========
graph.add_conditional_edges(
"react_reason",
route_by_reasoning,
{
"rag_retrieve": "rag_retrieve",
"web_search": "web_search",
"contact_subgraph": "contact_subgraph",
"dictionary_subgraph": "dictionary_subgraph",
"news_analysis_subgraph": "news_analysis_subgraph",
"handle_error": "handle_error",
"llm_call": "llm_call"
}
)
# 循环边rag、web_search、子图、error都回到 reason
graph.add_edge("rag_retrieve", "react_reason")
graph.add_edge("web_search", "react_reason")
graph.add_edge("contact_subgraph", "react_reason")
graph.add_edge("dictionary_subgraph", "react_reason")
graph.add_edge("news_analysis_subgraph", "react_reason")
graph.add_edge("handle_error", "react_reason")
# ========== 最终完成阶段 ==========
if llm_node is not None:
if summarize_node:
# 检查是否需要总结
graph.add_conditional_edges(
"llm_call",
should_summarize,
{
"summarize": "summarize",
"finalize": "finalize"
}
)
graph.add_edge("summarize", "finalize")
else:
# 没有 summarize 节点,直接 finalize
graph.add_edge("llm_call", "finalize")
# 完成
graph.add_edge("finalize", END)
info(f"✅ [图构建] 整合后的完整主图构建完成(混合路由: {use_hybrid_router}")
return graph
# ========== 兼容性:保留旧的函数名 ==========
def build_main_graph() -> StateGraph:
"""
兼容性函数:旧代码调用 build_main_graph() 时返回 React 版本
"""
return build_react_main_graph()
# ========== 导出 ==========
__all__ = [
"build_react_main_graph",
"build_main_graph",
"wrap_subgraph_for_error_handling"
]