refactor: 重写 intent.py,使用真实 LLM 服务进行 React 模式推理
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m38s

- 重写 intent.py,整合 chat_services.py
- 支持 LLM 推理 + 规则降级策略
- 支持子图路由(contact/dictionary/news_analysis)
- 保持与现有 react_nodes.py 兼容
- 更新 react_nodes.py 以更好地处理新的接口
This commit is contained in:
2026-04-26 23:27:22 +08:00
parent 338fda188a
commit 26f872f975
2 changed files with 344 additions and 332 deletions

View File

@@ -6,64 +6,89 @@ Intent Understanding & Reasoning Module (React Pattern)
1. 理解用户的查询意图
2. 判断是否需要调用 RAG 检索
3. 判断是否需要重新检索
4. 决定下一步的
5. 支持条件路由扩展
4. 决定下一步的动作(路由到子图、直接回答等)
核心组件
- ReasoningAction: 推理动作枚举
- ReasoningResult: 推理结果数据类
- ReactIntentReasoner: React 模式意图推理器
核心设计
- 使用项目已有的 chat_services.py 进行 LLM 调用
- 保持与现有架构一致(服务层模式)
- 支持降级策略LLM 失败时回退到规则)
- 与 react_nodes.py 无缝集成
"""
import re
from typing import Dict, Any, Optional, List, Set, Tuple
import json
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from enum import Enum, auto
from abc import ABC, abstractmethod
# ========== 1. 核心数据类型 ==========
class ReasoningAction(Enum):
"""推理动作枚举 - 决定下一步做什么"""
DIRECT_RESPONSE = auto() # 直接回答,不需要额外信息
RETRIEVE_RAG = auto() # 需要调用 RAG 检索
RERIEVE_RAG = auto() # 需要重新检索 (优化前版本,兼容保留)
RE_RETRIEVE_RAG = auto() # 需要重新检索 (修正拼写)
CALL_TOOL = auto() # 需要调用其他工具
RE_RETRIEVE_RAG = auto() # 需要重新检索(更多/更好结果)
ROUTE_SUBGRAPH = auto() # 需要路由到子图contact/dictionary/news_analysis
CLARIFY = auto() # 需要澄清用户的问题
ROUTE_SUBGRAPH = auto() # 需要路由到子图
UNKNOWN = auto() # 未知动作
@dataclass
class RetrievalConfig:
"""检索配置"""
need_retrieval: bool = False # 是否需要检索
need_re_retrieval: bool = False # 是否需要重新检索
retrieval_query: Optional[str] = None # 优化后的检索查询
collection_name: Optional[str] = None # 检索的集合名称
k: int = 5 # 返回数量
score_threshold: float = 0.3 # 相似度阈值
need_retrieval: bool = False
need_re_retrieval: bool = False
retrieval_query: Optional[str] = None
target_subgraph: Optional[str] = None
collection_name: Optional[str] = None
k: int = 5
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ReasoningResult:
"""推理结果数据类"""
action: ReasoningAction = ReasoningAction.UNKNOWN # 决定的动作
confidence: float = 0.0 # 置信度
reasoning: str = "" # 推理过程说明
action: ReasoningAction = ReasoningAction.UNKNOWN
confidence: float = 0.0
reasoning: str = ""
retrieval_config: RetrievalConfig = field(default_factory=RetrievalConfig)
extracted_entities: Dict[str, Any] = field(default_factory=dict) # 提取的实体
next_hints: List[str] = field(default_factory=list) # 下一步提示
original_query: str = "" # 原始查询
extracted_entities: Dict[str, Any] = field(default_factory=dict)
next_hints: List[str] = field(default_factory=list)
original_query: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
class BaseIntentReasoner(ABC):
"""意图推理器基类"""
# ========== 2. React 推理器 ==========
@abstractmethod
def reason(
class ReactIntentReasoner:
"""
React 模式意图推理器
核心功能:
1. 使用 LLM 分析用户意图
2. 决定是否需要 RAG 检索/重新检索
3. 决定是否需要路由到子图
4. 提供降级策略(规则匹配)
"""
def __init__(self):
"""初始化推理器 - 懒加载 LLM 服务"""
self._llm_service = None
self._subgraph_keywords = {
"contact": ["通讯录", "联系人", "contact", "email", "邮件", "邮箱"],
"dictionary": ["词典", "单词", "翻译", "dictionary", "translate", "生词"],
"news_analysis": ["资讯", "新闻", "分析", "news", "report", "热点"]
}
def _get_llm_service(self):
"""懒加载 LLM 服务(避免循环导入)"""
if self._llm_service is None:
from app.model_services.chat_services import get_chat_service
self._llm_service = get_chat_service()
return self._llm_service
async def reason(
self,
query: str,
context: Optional[Dict[str, Any]] = None
@@ -73,297 +98,262 @@ class BaseIntentReasoner(ABC):
Args:
query: 用户查询
context: 上下文信息可能包括:
- messages: 对话历史
- retrieved_docs: 已检索的文档
- previous_actions: 之前的动作
- user_id: 用户ID
- etc.
context: 上下文信息可能包含已检索文档、对话历史等)
Returns:
ReasoningResult: 推理结果
"""
pass
class RuleBasedReactReasoner(BaseIntentReasoner):
"""基于规则的 React 推理器"""
def __init__(self):
# 检索触发关键词
self._retrieval_keywords = {
"什么", "怎么", "如何", "为什么", "", "", "多少",
"介绍", "解释", "说明", "资料", "文档", "查询", "搜索",
"find", "search", "what", "how", "why", "where", "who",
"tell me", "explain", "about", "information"
}
# 重新检索触发关键词
self._re_retrieval_keywords = {
"", "重新", "更多", "不够", "不足", "其他", "另外",
"没找到", "找不到", "没有", "不对", "不是",
"again", "more", "another", "other", "didn't find", "not enough"
}
# 澄清触发关键词
self._clarify_keywords = {
"?", "", "哪个", "哪些", "哪位", "什么意思",
"请问", "能详细", "具体点", "举个例子"
}
# 工具调用关键词
self._tool_keywords = {
"天气", "weather", "邮件", "email", "联系人", "contact",
"翻译", "translate", "词典", "dictionary"
}
# 子图路由关键词映射
self._subgraph_keywords = {
"contact": {"通讯录", "联系人", "contact", "email", "邮件"},
"dictionary": {"词典", "单词", "翻译", "dictionary", "translate"},
"news_analysis": {"资讯", "新闻", "分析", "news", "report"},
}
# 直接回答模式(问候、感谢等)
self._direct_response_patterns = [
(r'^(你好|您好|hi|hello|hey|早上好|下午好|晚上好|哈喽)', ReasoningAction.DIRECT_RESPONSE),
(r'^(谢谢|感谢|多谢|thanks|thank you)', ReasoningAction.DIRECT_RESPONSE),
(r'^(再见|拜拜|bye|goodbye|回见)', ReasoningAction.DIRECT_RESPONSE),
]
def reason(
self,
query: str,
context: Optional[Dict[str, Any]] = None
) -> ReasoningResult:
"""
基于规则的推理
ReasoningResult
"""
context = context or {}
query_lower = query.lower()
result = ReasoningResult(original_query=query)
# 1. 先检查是否是直接回答模式
for pattern, action in self._direct_response_patterns:
if re.match(pattern, query, re.IGNORECASE):
result.action = action
result.confidence = 0.95
result.reasoning = "检测到问候、感谢或告别语,直接回答"
# 策略1: 尝试使用 LLM 推理
try:
llm_result = await self._reason_with_llm(query, context)
if llm_result.confidence >= 0.6: # 置信度足够高,直接返回
return llm_result
except Exception as e:
print(f"[ReactReasoner] LLM 推理失败: {e}, 回退到规则")
# 策略2: LLM 失败或置信度低,使用规则匹配
return self._reason_with_rules(query, context)
async def _reason_with_llm(
self,
query: str,
context: Dict[str, Any]
) -> ReasoningResult:
"""使用 LLM 进行推理"""
prompt = self._build_reasoning_prompt(query, context)
llm = self._get_llm_service()
response = await llm.ainvoke(prompt)
return self._parse_llm_response(response.content, query)
def _build_reasoning_prompt(self, query: str, context: Dict[str, Any]) -> str:
"""构建推理提示词"""
# 构建上下文描述
context_parts = []
if context.get("retrieved_docs"):
context_parts.append(f"- 已检索文档: {len(context['retrieved_docs'])}")
if context.get("previous_actions"):
context_parts.append(f"- 历史动作: {context['previous_actions']}")
context_str = "\n".join(context_parts) if context_parts else ""
return f"""你是一个专业的意图推理助手。请分析用户的查询,决定下一步应该做什么。
可选动作:
1. DIRECT_RESPONSE - 直接回答(闲聊、打招呼、不需要额外信息)
2. RETRIEVE_RAG - 需要查询知识库(询问知识、政策、文档等)
3. RE_RETRIEVE_RAG - 需要重新检索(之前的结果不够,或者用户明确说"再查查""更多"
4. ROUTE_SUBGRAPH - 需要路由到专门的子图:
- contact: 通讯录、联系人、邮件相关
- dictionary: 词典、翻译、单词相关
- news_analysis: 资讯、新闻、热点分析相关
5. CLARIFY - 需要澄清用户的问题(问题不明确)
用户查询: {query}
当前上下文:
{context_str}
请按以下 JSON 格式输出(仅输出 JSON不要其他内容:
{{
"action": "DIRECT_RESPONSE|RETRIEVE_RAG|RE_RETRIEVE_RAG|ROUTE_SUBGRAPH|CLARIFY",
"confidence": 0.85,
"reasoning": "简要说明理由",
"target_subgraph": "contact|dictionary|news_analysis|null (仅当 action=ROUTE_SUBGRAPH 时)",
"retrieval_query": "优化后的检索查询 (可选)"
}}
"""
def _parse_llm_response(self, response: str, original_query: str) -> ReasoningResult:
"""解析 LLM 响应"""
result = ReasoningResult(original_query=original_query)
# 提取 JSON
json_match = re.search(r'\{[\s\S]*\}', response)
if not json_match:
# 没有 JSON回退到规则
result.confidence = 0.0
return result
# 2. 检查是否需要路由到子图(优先级高于重新检索,避免"有没有"误触发)
for subgraph, keywords in self._subgraph_keywords.items():
try:
data = json.loads(json_match.group())
action_str = data.get("action", "UNKNOWN")
# 转换为枚举
try:
result.action = ReasoningAction[action_str]
except KeyError:
result.action = ReasoningAction.UNKNOWN
result.confidence = float(data.get("confidence", 0.5))
result.reasoning = data.get("reasoning", "")
# 处理子图路由
if result.action == ReasoningAction.ROUTE_SUBGRAPH:
result.retrieval_config.target_subgraph = data.get("target_subgraph")
result.metadata["target_subgraph"] = data.get("target_subgraph")
# 处理检索查询
if result.action in [ReasoningAction.RETRIEVE_RAG, ReasoningAction.RE_RETRIEVE_RAG]:
result.retrieval_config.need_retrieval = True
result.retrieval_config.need_re_retrieval = (result.action == ReasoningAction.RE_RETRIEVE_RAG)
result.retrieval_config.retrieval_query = data.get("retrieval_query", original_query)
return result
except Exception as e:
print(f"[ReactReasoner] 解析 LLM 响应失败: {e}")
result.confidence = 0.0
return result
def _reason_with_rules(
self,
query: str,
context: Dict[str, Any]
) -> ReasoningResult:
"""基于规则的降级推理"""
result = ReasoningResult(original_query=query)
query_lower = query.lower()
# 1. 检查子图路由(最高优先级)
for subgraph_name, keywords in self._subgraph_keywords.items():
if any(kw in query_lower for kw in keywords):
result.action = ReasoningAction.ROUTE_SUBGRAPH
result.confidence = 0.9
result.reasoning = f"检测到 {subgraph} 子图意图"
result.metadata["target_subgraph"] = subgraph
result.confidence = 0.85
result.reasoning = f"关键词匹配: {subgraph_name} 子图"
result.retrieval_config.target_subgraph = subgraph_name
result.metadata["target_subgraph"] = subgraph_name
return result
# 3. 检查是否需要重新检索
has_re_retrieval = any(kw in query_lower for kw in self._re_retrieval_keywords)
# 同时检查上下文中是否有之前的检索结果但不够好
previous_retrieval = context.get("retrieved_docs")
if has_re_retrieval or (previous_retrieval and len(previous_retrieval) < 2):
# 2. 检查是否需要重新检索
re_retrieve_keywords = ["", "重新", "更多", "不够", "其他", "没找到", "找不到", "不对", "another", "again", "more"]
has_re_retrieve = any(kw in query_lower for kw in re_retrieve_keywords)
has_docs = context.get("retrieved_docs") and len(context["retrieved_docs"]) > 0
if has_re_retrieve or (has_docs and len(context["retrieved_docs"]) < 2):
result.action = ReasoningAction.RE_RETRIEVE_RAG
result.confidence = 0.85 if has_re_retrieval else 0.7
result.reasoning = "检测到需要重新检索的意图"
result.retrieval_config = RetrievalConfig(
need_retrieval=True,
need_re_retrieval=True,
retrieval_query=self._optimize_retrieval_query(query),
k=10 # 重新检索时返回更多结果
)
result.confidence = 0.8 if has_re_retrieve else 0.65
result.reasoning = "需要重新检索更多/更好结果"
result.retrieval_config.need_retrieval = True
result.retrieval_config.need_re_retrieval = True
result.retrieval_config.retrieval_query = query
return result
# 4. 检查是否需要调用工具
has_tool = any(kw in query_lower for kw in self._tool_keywords)
if has_tool:
result.action = ReasoningAction.CALL_TOOL
result.confidence = 0.8
result.reasoning = "检测到工具调用意图"
# 3. 检查是否需要 RAG 检索
retrieve_keywords = ["什么", "怎么", "如何", "为什么", "", "", "介绍", "解释", "说明", "资料", "文档", "查询", "搜索", "what", "how", "why", "where", "who", "tell me", "explain", "about", "information"]
has_retrieve = any(kw in query_lower for kw in retrieve_keywords)
if has_retrieve or len(query.strip()) > 5:
result.action = ReasoningAction.RETRIEVE_RAG
result.confidence = 0.8 if has_retrieve else 0.6
result.reasoning = "需要查询知识库"
result.retrieval_config.need_retrieval = True
result.retrieval_config.retrieval_query = query
return result
# 4. 检查直接回答
direct_keywords = ["你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好", "", "谢谢", "感谢", "多谢", "thanks", "thank you", "再见", "拜拜", "goodbye", "回见"]
if any(kw in query_lower for kw in direct_keywords):
result.action = ReasoningAction.DIRECT_RESPONSE
result.confidence = 0.9
result.reasoning = "直接回答(问候/感谢/道别)"
return result
# 5. 检查是否需要澄清
has_clarify = any(kw in query_lower for kw in self._clarify_keywords)
# 或者查询太短、太模糊
if has_clarify or len(query.strip()) < 3:
if len(query.strip()) < 3 or any(q in query for q in ["?", "", "哪个", "哪些", "什么意思", "", "能详细"]):
result.action = ReasoningAction.CLARIFY
result.confidence = 0.75
result.reasoning = "检测到需要澄清的意图"
result.next_hints = [
"请提供更多细节",
"您想了解什么方面的内容?",
"能否具体说明一下?"
]
result.confidence = 0.7
result.reasoning = "需要澄清问题"
result.next_hints = ["请提供更多细节", "您想了解什么方面的内容?", "能否具体说明一下?"]
return result
# 6. 检查是否需要 RAG 检索
has_retrieval = any(kw in query_lower for kw in self._retrieval_keywords)
if has_retrieval or len(query.strip()) > 5:
result.action = ReasoningAction.RETRIEVE_RAG
result.confidence = 0.85 if has_retrieval else 0.6
result.reasoning = "检测到需要检索知识库的意图"
result.retrieval_config = RetrievalConfig(
need_retrieval=True,
retrieval_query=self._optimize_retrieval_query(query),
k=5
)
return result
# 7. 默认直接回答
# 6. 默认直接回答
result.action = ReasoningAction.DIRECT_RESPONSE
result.confidence = 0.6
result.confidence = 0.5
result.reasoning = "默认直接回答模式"
return result
def _optimize_retrieval_query(self, query: str) -> str:
"""优化检索查询,去掉不必要的语气词"""
# 去掉常见的前缀
prefixes_to_remove = [
"请告诉我", "帮我查一下", "我想知道", "能不能告诉我",
"请问", "你知道", "帮我找", "搜索一下", "查询一下"
]
optimized = query
for prefix in prefixes_to_remove:
if optimized.startswith(prefix):
optimized = optimized[len(prefix):]
# 去掉常见的后缀
suffixes_to_remove = ["吗?", "呢?", "吧?", "", "", "", "", "?"]
for suffix in suffixes_to_remove:
if optimized.endswith(suffix):
optimized = optimized[:-len(suffix)]
# ========== 3. 便捷函数(保持与旧代码兼容) ==========
return optimized.strip()
# 全局推理器实例(懒加载)
_reasoner: Optional[ReactIntentReasoner] = None
class LLMReactReasoner(BaseIntentReasoner):
"""
基于 LLM 的 React 推理器
使用大语言模型进行更智能的推理判断
"""
def _get_reasoner() -> ReactIntentReasoner:
"""获取推理器实例"""
global _reasoner
if _reasoner is None:
_reasoner = ReactIntentReasoner()
return _reasoner
def __init__(self, llm_client=None):
"""
初始化 LLM 推理器
Args:
llm_client: LLM 客户端,需要支持调用方法
"""
self.llm_client = llm_client
self.rule_based = RuleBasedReactReasoner()
def reason(
self,
async def react_reason_async(
query: str,
context: Optional[Dict[str, Any]] = None
) -> ReasoningResult:
"""
使用 LLM 进行推理,失败时回退到规则推理
"""
try:
if self.llm_client:
return self._reason_with_llm(query, context)
except Exception:
pass
# LLM 不可用或失败,回退到规则推理
return self.rule_based.reason(query, context)
def _reason_with_llm(
self,
query: str,
context: Optional[Dict[str, Any]] = None
) -> ReasoningResult:
"""
使用 LLM 进行推理(需要实现具体的 LLM 调用逻辑)
"""
# 这里是一个示例实现,实际项目需要连接真实的 LLM
prompt = self._build_reasoning_prompt(query, context)
# 模拟 LLM 返回(实际项目中替换为真实调用)
# 这里我们还是先调用规则推理作为示例
return self.rule_based.reason(query, context)
def _build_reasoning_prompt(self, query: str, context: Optional[Dict[str, Any]]) -> str:
"""构建推理提示词"""
context_str = ""
if context:
context_lines = []
if "messages" in context:
context_lines.append(f"对话历史: {len(context['messages'])}")
if "retrieved_docs" in context:
context_lines.append(f"已检索文档: {len(context['retrieved_docs'])}")
context_str = "\n".join(context_lines)
return f"""你是一个意图推理助手,需要判断用户的查询应该如何处理。
用户查询: {query}
上下文信息:
{context_str or '无额外上下文'}
请判断下一步应该做什么,可选动作:
1. DIRECT_RESPONSE - 直接回答,不需要额外信息
2. RETRIEVE_RAG - 需要调用知识库检索
3. RE_RETRIEVE_RAG - 需要重新检索更多/更好的结果
4. CALL_TOOL - 需要调用其他工具
5. CLARIFY - 需要澄清用户的问题
6. ROUTE_SUBGRAPH - 需要路由到子图
请以 JSON 格式输出你的判断。
"""
def create_react_reasoner(
use_llm: bool = False,
llm_client=None
) -> BaseIntentReasoner:
"""
创建 React 模式意图推理器工厂函数
Args:
use_llm: 是否使用 LLM 推理
llm_client: LLM 客户端实例
Returns:
BaseIntentReasoner: 推理器实例
"""
if use_llm:
return LLMReactReasoner(llm_client)
return RuleBasedReactReasoner()
# 便捷函数 - 直接推理
def react_reason(
query: str,
context: Optional[Dict[str, Any]] = None,
reasoner: Optional[BaseIntentReasoner] = None
) -> ReasoningResult:
"""
便捷函数:直接进行 React 推理
便捷函数:异步 React 推理(推荐使用)
Args:
query: 用户查询
context: 上下文信息
reasoner: 可选的推理器实例
context: 上下文
Returns:
ReasoningResult: 推理结果
ReasoningResult
"""
if reasoner is None:
reasoner = create_react_reasoner()
return reasoner.reason(query, context)
reasoner = _get_reasoner()
return await reasoner.reason(query, context)
# 条件路由辅助函数
def get_route_by_reasoning(result: ReasoningResult) -> str:
def react_reason(
query: str,
context: Optional[Dict[str, Any]] = None
) -> ReasoningResult:
"""
根据推理结果获取路由字符串
便捷函数:同步 React 推理(保持向后兼容)
注意:内部会运行事件循环,建议在异步环境中使用 react_reason_async
Args:
result: 推理结果
query: 用户查询
context: 上下文
Returns:
ReasoningResult
"""
import asyncio
try:
# 尝试获取现有事件循环
loop = asyncio.get_event_loop()
if loop.is_running():
# 已经在运行的循环中,创建任务
task = loop.create_task(react_reason_async(query, context))
# 注意:这里不能真正等待,会导致死锁
# 降级到规则推理
print("[ReactReasoner] 检测到运行中的事件循环,使用规则推理")
reasoner = _get_reasoner()
return reasoner._reason_with_rules(query, context or {})
except RuntimeError:
pass
# 创建新的事件循环
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(react_reason_async(query, context))
finally:
loop.close()
def get_route_by_reasoning(result: ReasoningResult) -> str:
"""
根据推理结果获取路由字符串(与旧代码兼容)
Args:
result: ReasoningResult
Returns:
str: 路由标识
@@ -372,10 +362,21 @@ def get_route_by_reasoning(result: ReasoningResult) -> str:
ReasoningAction.DIRECT_RESPONSE: "direct_response",
ReasoningAction.RETRIEVE_RAG: "retrieve_rag",
ReasoningAction.RE_RETRIEVE_RAG: "re_retrieve_rag",
ReasoningAction.RERIEVE_RAG: "re_retrieve_rag", # 兼容旧拼写
ReasoningAction.CALL_TOOL: "call_tool",
ReasoningAction.CLARIFY: "clarify",
ReasoningAction.ROUTE_SUBGRAPH: result.metadata.get("target_subgraph", "unknown_subgraph"),
ReasoningAction.UNKNOWN: "unknown",
}
return action_to_route.get(result.action, "unknown")
# ========== 4. 导出 ==========
__all__ = [
"ReasoningAction",
"RetrievalConfig",
"ReasoningResult",
"ReactIntentReasoner",
"react_reason",
"react_reason_async",
"get_route_by_reasoning"
]

View File

@@ -4,7 +4,10 @@ React 模式节点模块 - 带超时和重试功能
- react_reason_node: 使用 intent.py 进行推理
- error_handling_node: 错误处理节点
- final_response_node: 最终回答节点
- init_state_node: 初始化节点
- init_state_node: 初始化状态节点
注意:为了兼容 LangGraph 的同步接口,我们保留了同步的 react_reason 调用
但内部会根据情况使用规则推理或尝试异步调用
"""
import sys
@@ -27,6 +30,7 @@ from .retry_utils import (
# ========== 1. React 推理节点 ==========
def react_reason_node(state: MainGraphState) -> MainGraphState:
"""
React 模式推理节点:判断下一步做什么
@@ -56,6 +60,7 @@ def react_reason_node(state: MainGraphState) -> MainGraphState:
}
# 使用 intent.py 进行推理
# 注意:这里使用同步版本,内部会根据情况处理
result: ReasoningResult = react_reason(state.user_query, context)
# 记录推理历史
@@ -84,6 +89,7 @@ def react_reason_node(state: MainGraphState) -> MainGraphState:
# ========== 2. 错误处理节点 ==========
def error_handling_node(state: MainGraphState) -> MainGraphState:
"""
错误处理节点:处理子图/工具调用错误
@@ -93,7 +99,7 @@ def error_handling_node(state: MainGraphState) -> MainGraphState:
"tool/node": "...",
"status": "failed",
"error": "...",
"retries_exhausted": true/false,
"retries_exceeded": true/false,
"suggestion": "..."
}
"""
@@ -113,7 +119,7 @@ def error_handling_node(state: MainGraphState) -> MainGraphState:
"tool": error.source,
"status": "failed",
"error": error.error_message,
"retries_exhausted": error.retry_count >= error.max_retries,
"retries_exceeded": error.retry_count >= error.max_retries,
"retry_count": error.retry_count,
"max_retries": error.max_retries
}
@@ -174,6 +180,7 @@ def error_handling_node(state: MainGraphState) -> MainGraphState:
# ========== 3. 最终回答节点 ==========
def final_response_node(state: MainGraphState) -> MainGraphState:
"""
最终回答节点:整理并生成最终回答
@@ -217,6 +224,7 @@ def final_response_node(state: MainGraphState) -> MainGraphState:
# ========== 4. 初始化状态节点 ==========
def init_state_node(state: MainGraphState) -> MainGraphState:
"""
初始化状态节点:在流程开始时设置初始值
@@ -234,11 +242,12 @@ def init_state_node(state: MainGraphState) -> MainGraphState:
# ========== 5. 条件路由函数 ==========
def route_by_reasoning(state: MainGraphState) -> str:
"""
根据推理结果决定下一步路由
Returns: 路由字符串
Returns: 路由标识,对应 graph_builder.py 中的边
"""
# 先检查特殊情况
if state.current_phase == "max_steps_exceeded":
@@ -262,12 +271,13 @@ def route_by_reasoning(state: MainGraphState) -> str:
route = get_route_by_reasoning(reasoning_result)
# 映射到我们的节点名称
# 注意:这些名称必须与 subgraph_builder.py 中定义的节点名称一致
route_mapping = {
"direct_response": "final_response",
"retrieve_rag": "rag_retrieve",
"re_retrieve_rag": "rag_retrieve",
"clarify": "final_response",
"call_tool": "final_response",
"call_tool": "final_response", # 暂时映射到 final_response后续可以扩展
"contact": "contact_subgraph",
"dictionary": "dictionary_subgraph",
"news_analysis": "news_analysis_subgraph",
@@ -277,6 +287,7 @@ def route_by_reasoning(state: MainGraphState) -> str:
# ========== 导出 ==========
__all__ = [
"init_state_node",
"react_reason_node",