Files
ailine/backend/app/graph/react_nodes.py
root e3adb45454
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m15s
feat: 实现 React 模式循环推理,带超时重试和结构化错误处理
- 更新 intent.py 为 React 模式推理器
- 新增 react_nodes.py: React 模式节点
- 新增 retry_utils.py: 超时和重试工具
- 更新 state.py: 支持循环步数和错误记录
- 重写 subgraph_builder.py: 完整 React 循环流程
- 结构化错误输出,符合 Agent 执行循环最佳实践
- 限制最大推理步数 ≤40,防止无限循环
- RAG 检索带重试和超时保护
- 子图错误可传递给主图处理
2026-04-26 11:14:04 +08:00

389 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
React 模式节点模块 - 带超时和重试功能
包含:
- react_reason_node: 使用 intent.py 进行推理
- rag_retrieve_node: RAG 检索节点(带重试)
- error_handling_node: 错误处理节点
- final_response_node: 最终回答节点
"""
import sys
import time
from typing import Dict, Any, Optional
from datetime import datetime
from functools import wraps
# 导入我们的 intent.py
from ..agent_subgraphs.common.intent import (
react_reason,
get_route_by_reasoning,
ReasoningAction,
RetrievalConfig,
ReasoningResult
)
from ..agent_subgraphs.common.state_base import StateUtils
from .state import MainGraphState, ErrorRecord, ErrorSeverity
from .retry_utils import (
RetryConfig,
RetryResult,
with_retry,
create_retry_wrapper_for_node,
RAG_RETRY_CONFIG,
SUBGRAPH_RETRY_CONFIG
)
def get_rag_tool():
"""
获取 RAG 工具(延迟导入,避免循环依赖)
"""
try:
# 尝试导入现有的 RAG 工具
from ..rag.tools import create_rag_tool_sync
# 注意:这里简化处理,实际使用时应该从全局获取初始化好的工具
return None # 先返回 None后面通过注入方式
except Exception:
return None
# ========== 1. React 推理节点 ==========
def react_reason_node(state: MainGraphState) -> MainGraphState:
"""
React 模式推理节点:判断下一步做什么
Returns: 更新后的状态
"""
state.current_phase = "react_reasoning"
state.reasoning_step += 1
# 检查是否超过最大步数
if state.reasoning_step > state.max_steps:
state.current_phase = "max_steps_exceeded"
state.final_result = (
f"❌ 推理步数超过限制(最大 {state.max_steps} 步),"
f"已执行 {state.reasoning_step - 1} 步。"
f"请简化您的问题或分批提问。"
)
state.success = False
return state
# 准备上下文
context = {
"retrieved_docs": state.rag_docs,
"previous_actions": [h.get("action") for h in state.reasoning_history],
"messages": state.messages,
"errors": state.errors
}
# 使用 intent.py 进行推理
result: ReasoningResult = react_reason(state.user_query, context)
# 记录推理历史
state.reasoning_history.append({
"step": state.reasoning_step,
"action": result.action.name,
"confidence": result.confidence,
"reasoning": result.reasoning,
"timestamp": datetime.now().isoformat()
})
# 更新状态
state.debug_info["last_reasoning"] = {
"action": result.action.name,
"confidence": result.confidence,
"reasoning": result.reasoning
}
# 保存推理结果到状态(供条件路由使用)
state.debug_info["reasoning_result"] = result
# 确定下一步动作
state.last_action = result.action.name
return state
# ========== 2. RAG 检索节点(带超时和重试) ==========
def _rag_retrieve_core(state: MainGraphState) -> MainGraphState:
"""
RAG 检索核心逻辑(不带重试)
"""
# 获取推理结果中的检索配置
reasoning_result: Optional[ReasoningResult] = state.debug_info.get("reasoning_result")
retrieval_query = state.user_query
if reasoning_result and reasoning_result.retrieval_config:
cfg: RetrievalConfig = reasoning_result.retrieval_config
if cfg.retrieval_query:
retrieval_query = cfg.retrieval_query
# 尝试获取 RAG 工具并调用
# 这里演示如何调用,实际使用时需要确保 RAG 已初始化
# 暂时用模拟数据
state.rag_context = (
f"[模拟RAG检索结果]\n"
f"查询: {retrieval_query}\n"
f"这是一个来自知识库的示例回答。"
)
state.rag_docs = [
{"source": "doc1.txt", "content": "示例内容1"},
{"source": "doc2.txt", "content": "示例内容2"}
]
state.rag_retrieved = True
state.success = True
return state
def rag_retrieve_node(state: MainGraphState) -> MainGraphState:
"""
RAG 检索节点:带超时和重试
Returns: 更新后的状态
"""
state.current_phase = "rag_retrieving"
# 使用重试包装器
start_time = time.time()
last_error = None
for attempt in range(RAG_RETRY_CONFIG.max_retries + 1):
try:
# 执行核心逻辑
result = _rag_retrieve_core(state)
# 成功
state.debug_info["rag_retrieval"] = {
"attempt": attempt + 1,
"success": True,
"time": time.time() - start_time
}
return result
except Exception as e:
last_error = e
if attempt >= RAG_RETRY_CONFIG.max_retries:
break
# 等待后重试(指数退避)
delay = RAG_RETRY_CONFIG.base_delay * (2 ** attempt)
time.sleep(min(delay, RAG_RETRY_CONFIG.max_delay))
# 所有重试都失败,记录结构化错误
error_record = ErrorRecord(
error_type="RAGRetrievalError",
error_message=str(last_error) if last_error else "RAG 检索超时",
severity=ErrorSeverity.WARNING,
source="rag_retrieve_node",
timestamp=datetime.now().isoformat(),
retry_count=RAG_RETRY_CONFIG.max_retries,
max_retries=RAG_RETRY_CONFIG.max_retries,
context={
"query": state.user_query,
"total_time": time.time() - start_time,
"timeout": RAG_RETRY_CONFIG.timeout
}
)
state.errors.append(error_record)
state.current_error = error_record
state.current_phase = "error_handling"
return state
# ========== 3. 错误处理节点 ==========
def error_handling_node(state: MainGraphState) -> MainGraphState:
"""
错误处理节点:处理子图/工具调用错误
返回结构化错误信息,格式如下:
{
"tool/node": "...",
"status": "failed",
"error": "...",
"retries_exhausted": true/false,
"suggestion": "..."
}
"""
state.current_phase = "error_handling"
if not state.current_error:
# 没有错误,直接返回
state.current_phase = "react_reasoning"
return state
error = state.current_error
# 更新错误状态
state.error_message = f"{error.error_type}: {error.error_message}"
# 记录结构化错误信息(用于 LLM 决策)
structured_error = {
"tool": error.source,
"status": "failed",
"error": error.error_message,
"retries_exhausted": error.retry_count >= error.max_retries,
"retry_count": error.retry_count,
"max_retries": error.max_retries
}
# 根据错误类型添加建议
if "RAG" in error.error_type:
structured_error["suggestion"] = "尝试重新表述问题或直接询问,我会用现有知识回答"
elif "subgraph" in error.source or "contact" in error.source:
structured_error["suggestion"] = "子图执行失败,请尝试简化查询或使用其他功能"
elif "timeout" in error.error_message.lower():
structured_error["suggestion"] = "请求超时,请稍后再试或简化查询"
else:
structured_error["suggestion"] = "请尝试其他方式提问"
state.debug_info["structured_error"] = structured_error
# 策略1: 检查是否可以重试
can_retry = (
error.severity in [ErrorSeverity.WARNING, ErrorSeverity.ERROR]
and error.retry_count < error.max_retries
)
if can_retry:
# 重试策略
error.retry_count += 1
state.retry_action = error.source
state.debug_info["retry_count"] = error.retry_count
if "RAG" in error.error_type:
state.last_action = "RE_RETRIEVE_RAG"
elif "subgraph" in error.source:
state.last_action = "DIRECT_RESPONSE"
else:
state.last_action = "REASON"
state.current_phase = "retrying"
return state
# 策略2: 无法重试,尝试降级方案
if error.severity != ErrorSeverity.FATAL:
# 降级到直接回答模式
state.final_result = (
f"⚠️ 遇到一些问题:\n"
f"```json\n{structured_error}\n```\n"
f"但我会尽力用现有信息回答您。"
)
state.success = True
state.current_phase = "finalizing"
return state
# 策略3: 致命错误,无法继续
state.final_result = (
f"❌ 服务暂时不可用,请稍后再试。\n"
f"```json\n{structured_error}\n```"
)
state.success = False
state.current_phase = "finalizing"
return state
# ========== 4. 最终回答节点 ==========
def final_response_node(state: MainGraphState) -> MainGraphState:
"""
最终回答节点:整理并生成最终回答
"""
state.current_phase = "finalizing"
# 如果已经有 final_result 了,直接返回
if state.final_result:
state.current_phase = "done"
return state
# 构建最终回答
parts = []
# 添加 RAG 上下文(如果有)
if state.rag_context:
parts.append(state.rag_context)
parts.append("---")
# 添加子图结果(如果有)
if state.contact_result and state.contact_result.get("final_result"):
parts.append(state.contact_result["final_result"])
if state.dictionary_result and state.dictionary_result.get("final_result"):
parts.append(state.dictionary_result["final_result"])
if state.news_result and state.news_result.get("final_result"):
parts.append(state.news_result["final_result"])
# 如果都没有,用默认回答
if not parts:
parts.append(f"我理解了您的问题:{state.user_query}")
state.final_result = "\n".join(parts)
state.success = True
state.current_phase = "done"
state.end_time = datetime.now().isoformat()
return state
# ========== 5. 初始化状态节点 ==========
def init_state_node(state: MainGraphState) -> MainGraphState:
"""
初始化状态节点:在流程开始时设置初始值
"""
state.current_phase = "initializing"
state.reasoning_step = 0
state.start_time = datetime.now().isoformat()
# 从 messages 中提取用户查询(如果 user_query 为空)
if not state.user_query and state.messages:
last_msg = state.messages[-1]
state.user_query = getattr(last_msg, "content", str(last_msg))
return state
# ========== 6. 条件路由函数 ==========
def route_by_reasoning(state: MainGraphState) -> str:
"""
根据推理结果决定下一步路由
Returns: 路由字符串
"""
# 先检查特殊情况
if state.current_phase == "max_steps_exceeded":
return "final_response"
if state.current_phase == "error_handling" or state.current_error:
return "handle_error"
if state.current_phase == "finalizing" or state.current_phase == "done":
return "final_response"
if state.current_phase == "retrying":
# 重试路由
if state.retry_action and "rag" in state.retry_action.lower():
return "rag_retrieve"
return "react_reason"
# 获取推理结果
reasoning_result: Optional[ReasoningResult] = state.debug_info.get("reasoning_result")
if not reasoning_result:
# 没有推理结果,直接结束
return "final_response"
# 使用 intent.py 提供的路由函数
route = get_route_by_reasoning(reasoning_result)
# 映射到我们的节点名称
route_mapping = {
"direct_response": "final_response",
"retrieve_rag": "rag_retrieve",
"re_retrieve_rag": "rag_retrieve",
"clarify": "final_response", # 简化:澄清直接回答让用户补充
"call_tool": "final_response", # 简化:工具调用暂未实现
"contact": "contact_subgraph",
"dictionary": "dictionary_subgraph",
"news_analysis": "news_analysis_subgraph",
}
return route_mapping.get(route, "final_response")