Files
ailine/backend/app/main_graph/nodes/routing.py
root adb25a2e22
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m37s
优化:重构 state,分离持久化和临时字段,完善 init_state_node
主要改动:
1. 移除无用字段 system_prompt
2. 重新组织 state,明确标注「持久化字段」和「临时字段」
3. 完善 init_state_node,重置所有临时字段
4. 解决数据残留隐患,确保每轮对话开始时状态干净
2026-05-06 15:10:33 +08:00

221 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
路由与初始化模块
包含状态初始化节点和条件路由函数
三层统一循环防护:
1. 全局步数硬上限reasoning_step > max_steps
2. 路由模式检测A→B→A→B 交替循环)
3. 状态停滞检测(连续相同动作)
"""
from datetime import datetime
from backend.app.core.intent import get_route_by_reasoning, ReasoningAction
from ...main_graph.state import (
MainGraphState,
CurrentAction,
ReactReasoningState,
HybridRouterState,
FastPathState
)
from backend.app.logger import info
# ========== 初始化状态节点 ==========
def init_state_node(state: MainGraphState) -> MainGraphState:
"""
初始化状态节点:在流程开始时设置初始值
重置策略:
- 持久化字段(如 messages、turns_since_last_summary不重置
- 临时字段(如 rag_context、final_result重置为初始值
"""
# 持久化字段保留原样
# - messages
# - turns_since_last_summary
# - user_id
# ========== 重置临时字段 ==========
# 主图控制字段
state.user_query = ""
state.current_action = CurrentAction.NONE
state.current_model = ""
state.intent_confidence = 0.0
# React 推理专用字段
state.reasoning_step = 0
state.last_action = ""
state.reasoning_history = []
# RAG 相关字段
state.rag_context = ""
state.rag_retrieved = False
state.rag_docs = []
state.rag_confidence = 0.0
state.rag_attempts = 0
# 联网搜索相关字段
state.web_search_results = []
# 错误处理字段
state.errors = []
state.current_error = None
state.retry_action = None
state.error_message = ""
# 子图结果字段
state.news_result = None
state.dictionary_result = None
state.contact_result = None
# 执行状态
state.current_phase = "initializing"
state.final_result = ""
state.success = False
# 元数据
state.start_time = None
state.end_time = None
# 结构化状态
state.react_reasoning = ReactReasoningState()
state.hybrid_router = HybridRouterState()
state.fast_path = FastPathState()
# 统计字段
state.llm_calls = 0
state.last_token_usage = {}
state.last_elapsed_time = 0.0
state.memory_context = ""
# 向后兼容字段
state.debug_info = {}
# 设置初始值
state.current_phase = "initializing"
state.reasoning_step = 0
state.start_time = datetime.now().isoformat()
# 从 messages 中提取 user_query如果没有的话
if not state.user_query and state.messages:
last_msg = state.messages[-1]
state.user_query = getattr(last_msg, "content", str(last_msg))
return state
# ========== 条件路由函数 ==========
def route_by_reasoning(state: MainGraphState) -> str:
"""
根据推理结果决定下一步路由,带三层统一循环防护
核心逻辑:
1. DIRECT_RESPONSE → 直接返回 llm_call
2. 子图完成/已有结果 → 直接返回 llm_call
3. 步数超限 → 直接返回 llm_call
4. 其他 → 正常路由
"""
# 获取历史动作
previous_actions = [h.get("action") for h in state.reasoning_history]
info(f"[条件路由] step={state.reasoning_step}, phase={state.current_phase}, history={previous_actions}")
# ========== 获取推理结果 - 从新的结构化字段获取 ==========
reasoning_result = state.react_reasoning.reasoning_result
latest_action = reasoning_result.action.name if reasoning_result else None
# ========== 核心检查DIRECT_RESPONSE 优先 ==========
# 从 reasoning_result 检查(最新)
if latest_action == "DIRECT_RESPONSE":
info(f"[条件路由] 推理结果为 DIRECT_RESPONSE直接去 llm_call")
return "llm_call"
# 备用:从历史记录检查
if previous_actions and previous_actions[-1] == "DIRECT_RESPONSE":
info(f"[条件路由] 历史记录最新动作为 DIRECT_RESPONSE直接去 llm_call")
return "llm_call"
# ========== 子图完成/已有结果 ==========
if "subgraph_completed" in previous_actions or state.final_result:
info("[条件路由] 子图已完成或已有结果,直接终止")
return "llm_call"
# ========== 步数超限 ==========
if state.reasoning_step > state.max_steps:
info(f"[条件路由] 步数超限 ({state.reasoning_step}/{state.max_steps}),强制终止")
return "llm_call"
# ========== 特殊阶段快速通道 ==========
if state.current_phase in ("max_steps_exceeded", "finalizing", "done"):
return "llm_call"
if state.current_phase == "error_handling" or state.current_error:
return "handle_error"
# ========== 无推理结果,默认终止 ==========
if not reasoning_result:
info("[条件路由] 无推理结果,默认去 llm_call")
return "llm_call"
# ========== 计算目标路由 ==========
route = get_route_by_reasoning(reasoning_result)
route_mapping = {
"direct_response": "llm_call",
"retrieve_rag": "rag_retrieve",
"re_retrieve_rag": "rag_retrieve",
"web_search": "web_search",
"clarify": "llm_call",
"call_tool": "llm_call",
"contact": "contact_subgraph",
"dictionary": "dictionary_subgraph",
"news_analysis": "news_analysis_subgraph",
}
target = route_mapping.get(route, "llm_call")
# ========== RAG 次数硬限制 ==========
rag_attempts = getattr(state, 'rag_attempts', 0)
if target == "rag_retrieve" and rag_attempts >= 2:
info(f"[条件路由] RAG已尝试{rag_attempts}次,强制走联网搜索")
target = "web_search"
# ========== 循环防护检测 ==========
# 1. 路由模式检测A→B→A→B 交替)
if len(previous_actions) >= 4:
if (previous_actions[-4] == previous_actions[-2]
and previous_actions[-3] == previous_actions[-1]
and previous_actions[-2] != previous_actions[-1]):
info(f"[条件路由] 检测到路由循环: {previous_actions[-4:]},强制终止")
return "llm_call"
# 2. 状态停滞检测(连续相同动作 TODO本来应该是2
if len(previous_actions) >= 3 and previous_actions[-1] == previous_actions[-2] and previous_actions[-2] == previous_actions[-3]:
info(f"[条件路由] 连续相同动作 '{previous_actions[-1]}',强制终止")
return "llm_call"
# ========== 智能优化 ==========
if target == "rag_retrieve" and (state.rag_docs or state.rag_context):
info("[条件路由] RAG 结果已存在,跳过检索")
return "llm_call"
info(f"[条件路由] 动作={latest_action}, 目标={target}")
return target
# ========== 完成阶段条件路由函数 ==========
def should_summarize(state: MainGraphState) -> str:
"""
检查是否需要总结对话(对话足够长时)
Args:
state: 当前图状态
Returns:
"summarize""finalize"
"""
if state.turns_since_last_summary >= 5: # 每5轮对话总结一次
return "summarize"
else:
return "finalize"