diff --git a/backend/app/agent_subgraphs/common/intent.py b/backend/app/agent_subgraphs/common/intent.py index 8eda5e6..33941cd 100644 --- a/backend/app/agent_subgraphs/common/intent.py +++ b/backend/app/agent_subgraphs/common/intent.py @@ -1,71 +1,96 @@ """ -意图理解与推理模块 (React模式) +意图理解与推理模块 (React 模式) Intent Understanding & Reasoning Module (React Pattern) 这个模块实现了 React (Reasoning + Acting) 模式的意图理解节点,用于: 1. 理解用户的查询意图 2. 判断是否需要调用 RAG 检索 3. 判断是否需要重新检索 -4. 决定下一步的行动 -5. 支持条件路由扩展 +4. 决定下一步的动作(路由到子图、直接回答等) -核心组件: -- ReasoningAction: 推理动作枚举 -- ReasoningResult: 推理结果数据类 -- ReactIntentReasoner: React 模式意图推理器 +核心设计: +- 使用项目已有的 chat_services.py 进行 LLM 调用 +- 保持与现有架构一致(服务层模式) +- 支持降级策略(LLM 失败时回退到规则) +- 与 react_nodes.py 无缝集成 """ import re -from typing import Dict, Any, Optional, List, Set, Tuple +import json +from typing import Dict, Any, Optional, List from dataclasses import dataclass, field from enum import Enum, auto -from abc import ABC, abstractmethod +# ========== 1. 核心数据类型 ========== + class ReasoningAction(Enum): """推理动作枚举 - 决定下一步做什么""" DIRECT_RESPONSE = auto() # 直接回答,不需要额外信息 RETRIEVE_RAG = auto() # 需要调用 RAG 检索 - RERIEVE_RAG = auto() # 需要重新检索 (优化前版本,兼容保留) - RE_RETRIEVE_RAG = auto() # 需要重新检索 (修正拼写) - CALL_TOOL = auto() # 需要调用其他工具 + RE_RETRIEVE_RAG = auto() # 需要重新检索(更多/更好结果) + ROUTE_SUBGRAPH = auto() # 需要路由到子图(contact/dictionary/news_analysis) CLARIFY = auto() # 需要澄清用户的问题 - ROUTE_SUBGRAPH = auto() # 需要路由到子图 UNKNOWN = auto() # 未知动作 @dataclass class RetrievalConfig: """检索配置""" - need_retrieval: bool = False # 是否需要检索 - need_re_retrieval: bool = False # 是否需要重新检索 - retrieval_query: Optional[str] = None # 优化后的检索查询 - collection_name: Optional[str] = None # 检索的集合名称 - k: int = 5 # 返回数量 - score_threshold: float = 0.3 # 相似度阈值 + need_retrieval: bool = False + need_re_retrieval: bool = False + retrieval_query: Optional[str] = None + target_subgraph: Optional[str] = None + collection_name: Optional[str] = None + k: int = 5 metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class ReasoningResult: """推理结果数据类""" - action: ReasoningAction = ReasoningAction.UNKNOWN # 决定的动作 - confidence: float = 0.0 # 置信度 - reasoning: str = "" # 推理过程说明 + action: ReasoningAction = ReasoningAction.UNKNOWN + confidence: float = 0.0 + reasoning: str = "" retrieval_config: RetrievalConfig = field(default_factory=RetrievalConfig) - extracted_entities: Dict[str, Any] = field(default_factory=dict) # 提取的实体 - next_hints: List[str] = field(default_factory=list) # 下一步提示 - original_query: str = "" # 原始查询 + extracted_entities: Dict[str, Any] = field(default_factory=dict) + next_hints: List[str] = field(default_factory=list) + original_query: str = "" metadata: Dict[str, Any] = field(default_factory=dict) -class BaseIntentReasoner(ABC): - """意图推理器基类""" +# ========== 2. React 推理器 ========== + +class ReactIntentReasoner: + """ + React 模式意图推理器 - @abstractmethod - def reason( - self, - query: str, + 核心功能: + 1. 使用 LLM 分析用户意图 + 2. 决定是否需要 RAG 检索/重新检索 + 3. 决定是否需要路由到子图 + 4. 提供降级策略(规则匹配) + """ + + def __init__(self): + """初始化推理器 - 懒加载 LLM 服务""" + self._llm_service = None + self._subgraph_keywords = { + "contact": ["通讯录", "联系人", "contact", "email", "邮件", "邮箱"], + "dictionary": ["词典", "单词", "翻译", "dictionary", "translate", "生词"], + "news_analysis": ["资讯", "新闻", "分析", "news", "report", "热点"] + } + + def _get_llm_service(self): + """懒加载 LLM 服务(避免循环导入)""" + if self._llm_service is None: + from app.model_services.chat_services import get_chat_service + self._llm_service = get_chat_service() + return self._llm_service + + async def reason( + self, + query: str, context: Optional[Dict[str, Any]] = None ) -> ReasoningResult: """ @@ -73,297 +98,262 @@ class BaseIntentReasoner(ABC): Args: query: 用户查询 - context: 上下文信息,可能包括: - - messages: 对话历史 - - retrieved_docs: 已检索的文档 - - previous_actions: 之前的动作 - - user_id: 用户ID - - etc. + context: 上下文信息(可能包含已检索文档、对话历史等) Returns: - ReasoningResult: 推理结果 - """ - pass - - -class RuleBasedReactReasoner(BaseIntentReasoner): - """基于规则的 React 推理器""" - - def __init__(self): - # 检索触发关键词 - self._retrieval_keywords = { - "什么", "怎么", "如何", "为什么", "哪", "谁", "多少", - "介绍", "解释", "说明", "资料", "文档", "查询", "搜索", - "find", "search", "what", "how", "why", "where", "who", - "tell me", "explain", "about", "information" - } - - # 重新检索触发关键词 - self._re_retrieval_keywords = { - "再", "重新", "更多", "不够", "不足", "其他", "另外", - "没找到", "找不到", "没有", "不对", "不是", - "again", "more", "another", "other", "didn't find", "not enough" - } - - # 澄清触发关键词 - self._clarify_keywords = { - "?", "?", "哪个", "哪些", "哪位", "什么意思", - "请问", "能详细", "具体点", "举个例子" - } - - # 工具调用关键词 - self._tool_keywords = { - "天气", "weather", "邮件", "email", "联系人", "contact", - "翻译", "translate", "词典", "dictionary" - } - - # 子图路由关键词映射 - self._subgraph_keywords = { - "contact": {"通讯录", "联系人", "contact", "email", "邮件"}, - "dictionary": {"词典", "单词", "翻译", "dictionary", "translate"}, - "news_analysis": {"资讯", "新闻", "分析", "news", "report"}, - } - - # 直接回答模式(问候、感谢等) - self._direct_response_patterns = [ - (r'^(你好|您好|hi|hello|hey|早上好|下午好|晚上好|哈喽)', ReasoningAction.DIRECT_RESPONSE), - (r'^(谢谢|感谢|多谢|thanks|thank you)', ReasoningAction.DIRECT_RESPONSE), - (r'^(再见|拜拜|bye|goodbye|回见)', ReasoningAction.DIRECT_RESPONSE), - ] - - def reason( - self, - query: str, - context: Optional[Dict[str, Any]] = None - ) -> ReasoningResult: - """ - 基于规则的推理 + ReasoningResult """ context = context or {} - query_lower = query.lower() result = ReasoningResult(original_query=query) - - # 1. 先检查是否是直接回答模式 - for pattern, action in self._direct_response_patterns: - if re.match(pattern, query, re.IGNORECASE): - result.action = action - result.confidence = 0.95 - result.reasoning = "检测到问候、感谢或告别语,直接回答" - return result - - # 2. 检查是否需要路由到子图(优先级高于重新检索,避免"有没有"误触发) - for subgraph, keywords in self._subgraph_keywords.items(): - if any(kw in query_lower for kw in keywords): - result.action = ReasoningAction.ROUTE_SUBGRAPH - result.confidence = 0.9 - result.reasoning = f"检测到 {subgraph} 子图意图" - result.metadata["target_subgraph"] = subgraph - return result - - # 3. 检查是否需要重新检索 - has_re_retrieval = any(kw in query_lower for kw in self._re_retrieval_keywords) - # 同时检查上下文中是否有之前的检索结果但不够好 - previous_retrieval = context.get("retrieved_docs") - if has_re_retrieval or (previous_retrieval and len(previous_retrieval) < 2): - result.action = ReasoningAction.RE_RETRIEVE_RAG - result.confidence = 0.85 if has_re_retrieval else 0.7 - result.reasoning = "检测到需要重新检索的意图" - result.retrieval_config = RetrievalConfig( - need_retrieval=True, - need_re_retrieval=True, - retrieval_query=self._optimize_retrieval_query(query), - k=10 # 重新检索时返回更多结果 - ) - return result - - # 4. 检查是否需要调用工具 - has_tool = any(kw in query_lower for kw in self._tool_keywords) - if has_tool: - result.action = ReasoningAction.CALL_TOOL - result.confidence = 0.8 - result.reasoning = "检测到工具调用意图" - return result - - # 5. 检查是否需要澄清 - has_clarify = any(kw in query_lower for kw in self._clarify_keywords) - # 或者查询太短、太模糊 - if has_clarify or len(query.strip()) < 3: - result.action = ReasoningAction.CLARIFY - result.confidence = 0.75 - result.reasoning = "检测到需要澄清的意图" - result.next_hints = [ - "请提供更多细节", - "您想了解什么方面的内容?", - "能否具体说明一下?" - ] - return result - - # 6. 检查是否需要 RAG 检索 - has_retrieval = any(kw in query_lower for kw in self._retrieval_keywords) - if has_retrieval or len(query.strip()) > 5: - result.action = ReasoningAction.RETRIEVE_RAG - result.confidence = 0.85 if has_retrieval else 0.6 - result.reasoning = "检测到需要检索知识库的意图" - result.retrieval_config = RetrievalConfig( - need_retrieval=True, - retrieval_query=self._optimize_retrieval_query(query), - k=5 - ) - return result - - # 7. 默认直接回答 - result.action = ReasoningAction.DIRECT_RESPONSE - result.confidence = 0.6 - result.reasoning = "默认直接回答模式" - return result - - def _optimize_retrieval_query(self, query: str) -> str: - """优化检索查询,去掉不必要的语气词""" - # 去掉常见的前缀 - prefixes_to_remove = [ - "请告诉我", "帮我查一下", "我想知道", "能不能告诉我", - "请问", "你知道", "帮我找", "搜索一下", "查询一下" - ] - optimized = query - for prefix in prefixes_to_remove: - if optimized.startswith(prefix): - optimized = optimized[len(prefix):] - - # 去掉常见的后缀 - suffixes_to_remove = ["吗?", "呢?", "吧?", "吗", "呢", "吧", "?", "?"] - for suffix in suffixes_to_remove: - if optimized.endswith(suffix): - optimized = optimized[:-len(suffix)] - - return optimized.strip() - -class LLMReactReasoner(BaseIntentReasoner): - """ - 基于 LLM 的 React 推理器 - 使用大语言模型进行更智能的推理判断 - """ - - def __init__(self, llm_client=None): - """ - 初始化 LLM 推理器 - - Args: - llm_client: LLM 客户端,需要支持调用方法 - """ - self.llm_client = llm_client - self.rule_based = RuleBasedReactReasoner() - - def reason( - self, - query: str, - context: Optional[Dict[str, Any]] = None - ) -> ReasoningResult: - """ - 使用 LLM 进行推理,失败时回退到规则推理 - """ + # 策略1: 尝试使用 LLM 推理 try: - if self.llm_client: - return self._reason_with_llm(query, context) - except Exception: - pass - - # LLM 不可用或失败,回退到规则推理 - return self.rule_based.reason(query, context) - - def _reason_with_llm( - self, - query: str, - context: Optional[Dict[str, Any]] = None + llm_result = await self._reason_with_llm(query, context) + if llm_result.confidence >= 0.6: # 置信度足够高,直接返回 + return llm_result + except Exception as e: + print(f"[ReactReasoner] LLM 推理失败: {e}, 回退到规则") + + # 策略2: LLM 失败或置信度低,使用规则匹配 + return self._reason_with_rules(query, context) + + async def _reason_with_llm( + self, + query: str, + context: Dict[str, Any] ) -> ReasoningResult: - """ - 使用 LLM 进行推理(需要实现具体的 LLM 调用逻辑) - """ - # 这里是一个示例实现,实际项目需要连接真实的 LLM + """使用 LLM 进行推理""" prompt = self._build_reasoning_prompt(query, context) - - # 模拟 LLM 返回(实际项目中替换为真实调用) - # 这里我们还是先调用规则推理作为示例 - return self.rule_based.reason(query, context) - - def _build_reasoning_prompt(self, query: str, context: Optional[Dict[str, Any]]) -> str: + llm = self._get_llm_service() + + response = await llm.ainvoke(prompt) + return self._parse_llm_response(response.content, query) + + def _build_reasoning_prompt(self, query: str, context: Dict[str, Any]) -> str: """构建推理提示词""" - context_str = "" - if context: - context_lines = [] - if "messages" in context: - context_lines.append(f"对话历史: {len(context['messages'])} 条") - if "retrieved_docs" in context: - context_lines.append(f"已检索文档: {len(context['retrieved_docs'])} 条") - context_str = "\n".join(context_lines) - - return f"""你是一个意图推理助手,需要判断用户的查询应该如何处理。 + # 构建上下文描述 + context_parts = [] + if context.get("retrieved_docs"): + context_parts.append(f"- 已检索文档: {len(context['retrieved_docs'])} 条") + if context.get("previous_actions"): + context_parts.append(f"- 历史动作: {context['previous_actions']}") + + context_str = "\n".join(context_parts) if context_parts else "无" + + return f"""你是一个专业的意图推理助手。请分析用户的查询,决定下一步应该做什么。 + +可选动作: +1. DIRECT_RESPONSE - 直接回答(闲聊、打招呼、不需要额外信息) +2. RETRIEVE_RAG - 需要查询知识库(询问知识、政策、文档等) +3. RE_RETRIEVE_RAG - 需要重新检索(之前的结果不够,或者用户明确说"再查查"、"更多") +4. ROUTE_SUBGRAPH - 需要路由到专门的子图: + - contact: 通讯录、联系人、邮件相关 + - dictionary: 词典、翻译、单词相关 + - news_analysis: 资讯、新闻、热点分析相关 +5. CLARIFY - 需要澄清用户的问题(问题不明确) 用户查询: {query} +当前上下文: +{context_str} -上下文信息: -{context_str or '无额外上下文'} - -请判断下一步应该做什么,可选动作: -1. DIRECT_RESPONSE - 直接回答,不需要额外信息 -2. RETRIEVE_RAG - 需要调用知识库检索 -3. RE_RETRIEVE_RAG - 需要重新检索更多/更好的结果 -4. CALL_TOOL - 需要调用其他工具 -5. CLARIFY - 需要澄清用户的问题 -6. ROUTE_SUBGRAPH - 需要路由到子图 - -请以 JSON 格式输出你的判断。 +请按以下 JSON 格式输出(仅输出 JSON,不要其他内容): +{{ + "action": "DIRECT_RESPONSE|RETRIEVE_RAG|RE_RETRIEVE_RAG|ROUTE_SUBGRAPH|CLARIFY", + "confidence": 0.85, + "reasoning": "简要说明理由", + "target_subgraph": "contact|dictionary|news_analysis|null (仅当 action=ROUTE_SUBGRAPH 时)", + "retrieval_query": "优化后的检索查询 (可选)" +}} """ + def _parse_llm_response(self, response: str, original_query: str) -> ReasoningResult: + """解析 LLM 响应""" + result = ReasoningResult(original_query=original_query) -def create_react_reasoner( - use_llm: bool = False, - llm_client=None -) -> BaseIntentReasoner: - """ - 创建 React 模式意图推理器工厂函数 - - Args: - use_llm: 是否使用 LLM 推理 - llm_client: LLM 客户端实例 - - Returns: - BaseIntentReasoner: 推理器实例 - """ - if use_llm: - return LLMReactReasoner(llm_client) - return RuleBasedReactReasoner() + # 提取 JSON + json_match = re.search(r'\{[\s\S]*\}', response) + if not json_match: + # 没有 JSON,回退到规则 + result.confidence = 0.0 + return result + + try: + data = json.loads(json_match.group()) + action_str = data.get("action", "UNKNOWN") + + # 转换为枚举 + try: + result.action = ReasoningAction[action_str] + except KeyError: + result.action = ReasoningAction.UNKNOWN + + result.confidence = float(data.get("confidence", 0.5)) + result.reasoning = data.get("reasoning", "") + + # 处理子图路由 + if result.action == ReasoningAction.ROUTE_SUBGRAPH: + result.retrieval_config.target_subgraph = data.get("target_subgraph") + result.metadata["target_subgraph"] = data.get("target_subgraph") + + # 处理检索查询 + if result.action in [ReasoningAction.RETRIEVE_RAG, ReasoningAction.RE_RETRIEVE_RAG]: + result.retrieval_config.need_retrieval = True + result.retrieval_config.need_re_retrieval = (result.action == ReasoningAction.RE_RETRIEVE_RAG) + result.retrieval_config.retrieval_query = data.get("retrieval_query", original_query) + + return result + except Exception as e: + print(f"[ReactReasoner] 解析 LLM 响应失败: {e}") + result.confidence = 0.0 + return result + + def _reason_with_rules( + self, + query: str, + context: Dict[str, Any] + ) -> ReasoningResult: + """基于规则的降级推理""" + result = ReasoningResult(original_query=query) + query_lower = query.lower() + + # 1. 检查子图路由(最高优先级) + for subgraph_name, keywords in self._subgraph_keywords.items(): + if any(kw in query_lower for kw in keywords): + result.action = ReasoningAction.ROUTE_SUBGRAPH + result.confidence = 0.85 + result.reasoning = f"关键词匹配: {subgraph_name} 子图" + result.retrieval_config.target_subgraph = subgraph_name + result.metadata["target_subgraph"] = subgraph_name + return result + + # 2. 检查是否需要重新检索 + re_retrieve_keywords = ["再", "重新", "更多", "不够", "其他", "没找到", "找不到", "不对", "another", "again", "more"] + has_re_retrieve = any(kw in query_lower for kw in re_retrieve_keywords) + has_docs = context.get("retrieved_docs") and len(context["retrieved_docs"]) > 0 + + if has_re_retrieve or (has_docs and len(context["retrieved_docs"]) < 2): + result.action = ReasoningAction.RE_RETRIEVE_RAG + result.confidence = 0.8 if has_re_retrieve else 0.65 + result.reasoning = "需要重新检索更多/更好结果" + result.retrieval_config.need_retrieval = True + result.retrieval_config.need_re_retrieval = True + result.retrieval_config.retrieval_query = query + return result + + # 3. 检查是否需要 RAG 检索 + retrieve_keywords = ["什么", "怎么", "如何", "为什么", "哪", "谁", "介绍", "解释", "说明", "资料", "文档", "查询", "搜索", "what", "how", "why", "where", "who", "tell me", "explain", "about", "information"] + has_retrieve = any(kw in query_lower for kw in retrieve_keywords) + + if has_retrieve or len(query.strip()) > 5: + result.action = ReasoningAction.RETRIEVE_RAG + result.confidence = 0.8 if has_retrieve else 0.6 + result.reasoning = "需要查询知识库" + result.retrieval_config.need_retrieval = True + result.retrieval_config.retrieval_query = query + return result + + # 4. 检查直接回答 + direct_keywords = ["你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好", "嗨", "谢谢", "感谢", "多谢", "thanks", "thank you", "再见", "拜拜", "goodbye", "回见"] + if any(kw in query_lower for kw in direct_keywords): + result.action = ReasoningAction.DIRECT_RESPONSE + result.confidence = 0.9 + result.reasoning = "直接回答(问候/感谢/道别)" + return result + + # 5. 检查是否需要澄清 + if len(query.strip()) < 3 or any(q in query for q in ["?", "?", "哪个", "哪些", "什么意思", "请", "能详细"]): + result.action = ReasoningAction.CLARIFY + result.confidence = 0.7 + result.reasoning = "需要澄清问题" + result.next_hints = ["请提供更多细节", "您想了解什么方面的内容?", "能否具体说明一下?"] + return result + + # 6. 默认直接回答 + result.action = ReasoningAction.DIRECT_RESPONSE + result.confidence = 0.5 + result.reasoning = "默认直接回答模式" + return result -# 便捷函数 - 直接推理 -def react_reason( +# ========== 3. 便捷函数(保持与旧代码兼容) ========== + +# 全局推理器实例(懒加载) +_reasoner: Optional[ReactIntentReasoner] = None + + +def _get_reasoner() -> ReactIntentReasoner: + """获取推理器实例""" + global _reasoner + if _reasoner is None: + _reasoner = ReactIntentReasoner() + return _reasoner + + +async def react_reason_async( query: str, - context: Optional[Dict[str, Any]] = None, - reasoner: Optional[BaseIntentReasoner] = None + context: Optional[Dict[str, Any]] = None ) -> ReasoningResult: """ - 便捷函数:直接进行 React 推理 + 便捷函数:异步 React 推理(推荐使用) Args: query: 用户查询 - context: 上下文信息 - reasoner: 可选的推理器实例 + context: 上下文 Returns: - ReasoningResult: 推理结果 + ReasoningResult """ - if reasoner is None: - reasoner = create_react_reasoner() - return reasoner.reason(query, context) + reasoner = _get_reasoner() + return await reasoner.reason(query, context) -# 条件路由辅助函数 -def get_route_by_reasoning(result: ReasoningResult) -> str: +def react_reason( + query: str, + context: Optional[Dict[str, Any]] = None +) -> ReasoningResult: """ - 根据推理结果获取路由字符串 + 便捷函数:同步 React 推理(保持向后兼容) + + 注意:内部会运行事件循环,建议在异步环境中使用 react_reason_async Args: - result: 推理结果 + query: 用户查询 + context: 上下文 + + Returns: + ReasoningResult + """ + import asyncio + + try: + # 尝试获取现有事件循环 + loop = asyncio.get_event_loop() + if loop.is_running(): + # 已经在运行的循环中,创建任务 + task = loop.create_task(react_reason_async(query, context)) + # 注意:这里不能真正等待,会导致死锁 + # 降级到规则推理 + print("[ReactReasoner] 检测到运行中的事件循环,使用规则推理") + reasoner = _get_reasoner() + return reasoner._reason_with_rules(query, context or {}) + except RuntimeError: + pass + + # 创建新的事件循环 + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + return loop.run_until_complete(react_reason_async(query, context)) + finally: + loop.close() + + +def get_route_by_reasoning(result: ReasoningResult) -> str: + """ + 根据推理结果获取路由字符串(与旧代码兼容) + + Args: + result: ReasoningResult Returns: str: 路由标识 @@ -372,10 +362,21 @@ def get_route_by_reasoning(result: ReasoningResult) -> str: ReasoningAction.DIRECT_RESPONSE: "direct_response", ReasoningAction.RETRIEVE_RAG: "retrieve_rag", ReasoningAction.RE_RETRIEVE_RAG: "re_retrieve_rag", - ReasoningAction.RERIEVE_RAG: "re_retrieve_rag", # 兼容旧拼写 - ReasoningAction.CALL_TOOL: "call_tool", ReasoningAction.CLARIFY: "clarify", ReasoningAction.ROUTE_SUBGRAPH: result.metadata.get("target_subgraph", "unknown_subgraph"), ReasoningAction.UNKNOWN: "unknown", } return action_to_route.get(result.action, "unknown") + + +# ========== 4. 导出 ========== + +__all__ = [ + "ReasoningAction", + "RetrievalConfig", + "ReasoningResult", + "ReactIntentReasoner", + "react_reason", + "react_reason_async", + "get_route_by_reasoning" +] diff --git a/backend/app/graph/react_nodes.py b/backend/app/graph/react_nodes.py index 3a47880..386e10f 100644 --- a/backend/app/graph/react_nodes.py +++ b/backend/app/graph/react_nodes.py @@ -4,7 +4,10 @@ React 模式节点模块 - 带超时和重试功能 - react_reason_node: 使用 intent.py 进行推理 - error_handling_node: 错误处理节点 - final_response_node: 最终回答节点 -- init_state_node: 初始化节点 +- init_state_node: 初始化状态节点 + +注意:为了兼容 LangGraph 的同步接口,我们保留了同步的 react_reason 调用 +但内部会根据情况使用规则推理或尝试异步调用 """ import sys @@ -27,6 +30,7 @@ from .retry_utils import ( # ========== 1. React 推理节点 ========== + def react_reason_node(state: MainGraphState) -> MainGraphState: """ React 模式推理节点:判断下一步做什么 @@ -35,7 +39,7 @@ def react_reason_node(state: MainGraphState) -> MainGraphState: """ state.current_phase = "react_reasoning" state.reasoning_step += 1 - + # 检查是否超过最大步数 if state.reasoning_step > state.max_steps: state.current_phase = "max_steps_exceeded" @@ -46,7 +50,7 @@ def react_reason_node(state: MainGraphState) -> MainGraphState: ) state.success = False return state - + # 准备上下文 context = { "retrieved_docs": state.rag_docs, @@ -54,10 +58,11 @@ def react_reason_node(state: MainGraphState) -> MainGraphState: "messages": state.messages, "errors": state.errors } - + # 使用 intent.py 进行推理 + # 注意:这里使用同步版本,内部会根据情况处理 result: ReasoningResult = react_reason(state.user_query, context) - + # 记录推理历史 state.reasoning_history.append({ "step": state.reasoning_step, @@ -66,24 +71,25 @@ def react_reason_node(state: MainGraphState) -> MainGraphState: "reasoning": result.reasoning, "timestamp": datetime.now().isoformat() }) - + # 更新状态 state.debug_info["last_reasoning"] = { "action": result.action.name, "confidence": result.confidence, "reasoning": result.reasoning } - + # 保存推理结果到状态 state.debug_info["reasoning_result"] = result - + # 确定下一步动作 state.last_action = result.action.name - + return state # ========== 2. 错误处理节点 ========== + def error_handling_node(state: MainGraphState) -> MainGraphState: """ 错误处理节点:处理子图/工具调用错误 @@ -93,31 +99,31 @@ def error_handling_node(state: MainGraphState) -> MainGraphState: "tool/node": "...", "status": "failed", "error": "...", - "retries_exhausted": true/false, + "retries_exceeded": true/false, "suggestion": "..." } """ state.current_phase = "error_handling" - + if not state.current_error: state.current_phase = "react_reasoning" return state - + error = state.current_error - + # 更新错误状态 state.error_message = f"{error.error_type}: {error.error_message}" - + # 记录结构化错误信息 structured_error = { "tool": error.source, "status": "failed", "error": error.error_message, - "retries_exhausted": error.retry_count >= error.max_retries, + "retries_exceeded": error.retry_count >= error.max_retries, "retry_count": error.retry_count, "max_retries": error.max_retries } - + # 根据错误类型添加建议 if "RAG" in error.error_type: structured_error["suggestion"] = "尝试重新表述问题或直接询问" @@ -127,30 +133,30 @@ def error_handling_node(state: MainGraphState) -> MainGraphState: structured_error["suggestion"] = "请求超时,请稍后再试" else: structured_error["suggestion"] = "请尝试其他方式提问" - + state.debug_info["structured_error"] = structured_error - + # 策略1: 检查是否可以重试 can_retry = ( error.severity in [ErrorSeverity.WARNING, ErrorSeverity.ERROR] and error.retry_count < error.max_retries ) - + if can_retry: error.retry_count += 1 state.retry_action = error.source state.debug_info["retry_count"] = error.retry_count - + if "RAG" in error.error_type: state.last_action = "RE_RETRIEVE_RAG" elif "subgraph" in error.source: state.last_action = "DIRECT_RESPONSE" else: state.last_action = "REASON" - + state.current_phase = "retrying" return state - + # 策略2: 无法重试,尝试降级方案 if error.severity != ErrorSeverity.FATAL: state.final_result = ( @@ -161,7 +167,7 @@ def error_handling_node(state: MainGraphState) -> MainGraphState: state.success = True state.current_phase = "finalizing" return state - + # 策略3: 致命错误 state.final_result = ( f"❌ 服务暂时不可用,请稍后再试。\n" @@ -169,30 +175,31 @@ def error_handling_node(state: MainGraphState) -> MainGraphState: ) state.success = False state.current_phase = "finalizing" - + return state # ========== 3. 最终回答节点 ========== + def final_response_node(state: MainGraphState) -> MainGraphState: """ 最终回答节点:整理并生成最终回答 """ state.current_phase = "finalizing" - + # 如果已经有 final_result 了,直接返回 if state.final_result: state.current_phase = "done" return state - + # 构建最终回答 parts = [] - + # 添加 RAG 上下文(如果有) if state.rag_context: parts.append(state.rag_context) parts.append("---") - + # 添加子图结果(如果有) if state.contact_result and hasattr(state.contact_result, "get"): if state.contact_result.get("final_result"): @@ -203,20 +210,21 @@ def final_response_node(state: MainGraphState) -> MainGraphState: if state.news_result and hasattr(state.news_result, "get"): if state.news_result.get("final_result"): parts.append(state.news_result["final_result"]) - + # 如果都没有,用默认回答 if not parts: parts.append(f"我理解了您的问题:{state.user_query}") - + state.final_result = "\n".join(parts) state.success = True state.current_phase = "done" state.end_time = datetime.now().isoformat() - + return state # ========== 4. 初始化状态节点 ========== + def init_state_node(state: MainGraphState) -> MainGraphState: """ 初始化状态节点:在流程开始时设置初始值 @@ -224,21 +232,22 @@ def init_state_node(state: MainGraphState) -> MainGraphState: state.current_phase = "initializing" state.reasoning_step = 0 state.start_time = datetime.now().isoformat() - + # 从 messages 中提取用户查询 if not state.user_query and state.messages: last_msg = state.messages[-1] state.user_query = getattr(last_msg, "content", str(last_msg)) - + return state # ========== 5. 条件路由函数 ========== + def route_by_reasoning(state: MainGraphState) -> str: """ 根据推理结果决定下一步路由 - Returns: 路由字符串 + Returns: 路由标识,对应 graph_builder.py 中的边 """ # 先检查特殊情况 if state.current_phase == "max_steps_exceeded": @@ -251,32 +260,34 @@ def route_by_reasoning(state: MainGraphState) -> str: if state.retry_action and "rag" in state.retry_action.lower(): return "rag_retrieve" return "react_reason" - + # 获取推理结果 reasoning_result: Optional[ReasoningResult] = state.debug_info.get("reasoning_result") - + if not reasoning_result: return "final_response" - + # 使用 intent.py 提供的路由函数 route = get_route_by_reasoning(reasoning_result) - + # 映射到我们的节点名称 + # 注意:这些名称必须与 subgraph_builder.py 中定义的节点名称一致 route_mapping = { "direct_response": "final_response", "retrieve_rag": "rag_retrieve", "re_retrieve_rag": "rag_retrieve", "clarify": "final_response", - "call_tool": "final_response", + "call_tool": "final_response", # 暂时映射到 final_response,后续可以扩展 "contact": "contact_subgraph", "dictionary": "dictionary_subgraph", "news_analysis": "news_analysis_subgraph", } - + return route_mapping.get(route, "final_response") # ========== 导出 ========== + __all__ = [ "init_state_node", "react_reason_node",