""" 意图理解与推理模块 (React 模式) Intent Understanding & Reasoning Module (React Pattern) 这个模块实现了 React (Reasoning + Acting) 模式的意图理解节点,用于: 1. 理解用户的查询意图 2. 判断是否需要调用 RAG 检索 3. 判断是否需要重新检索 4. 决定下一步的动作(路由到子图、直接回答等) 核心设计: - 使用项目已有的 chat_services.py 进行 LLM 调用 - 保持与现有架构一致(服务层模式) - 支持降级策略(LLM 失败时回退到规则) - 与 react_nodes.py 无缝集成 """ import re import json from typing import Dict, Any, Optional, List from dataclasses import dataclass, field from enum import Enum, auto # ========== 1. 核心数据类型 ========== class ReasoningAction(Enum): """推理动作枚举 - 决定下一步做什么""" DIRECT_RESPONSE = auto() # 直接回答,不需要额外信息 RETRIEVE_RAG = auto() # 需要调用 RAG 检索 RE_RETRIEVE_RAG = auto() # 需要重新检索(更多/更好结果) ROUTE_SUBGRAPH = auto() # 需要路由到子图(contact/dictionary/news_analysis) CLARIFY = auto() # 需要澄清用户的问题 UNKNOWN = auto() # 未知动作 @dataclass class RetrievalConfig: """检索配置""" need_retrieval: bool = False need_re_retrieval: bool = False retrieval_query: Optional[str] = None target_subgraph: Optional[str] = None collection_name: Optional[str] = None k: int = 5 metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class ReasoningResult: """推理结果数据类""" action: ReasoningAction = ReasoningAction.UNKNOWN confidence: float = 0.0 reasoning: str = "" retrieval_config: RetrievalConfig = field(default_factory=RetrievalConfig) extracted_entities: Dict[str, Any] = field(default_factory=dict) next_hints: List[str] = field(default_factory=list) original_query: str = "" metadata: Dict[str, Any] = field(default_factory=dict) # ========== 2. React 推理器 ========== class ReactIntentReasoner: """ React 模式意图推理器 核心功能: 1. 使用 LLM 分析用户意图 2. 决定是否需要 RAG 检索/重新检索 3. 决定是否需要路由到子图 4. 提供降级策略(规则匹配) """ def __init__(self): """初始化推理器 - 懒加载 LLM 服务""" self._llm_service = None self._subgraph_keywords = { "contact": ["通讯录", "联系人", "contact", "email", "邮件", "邮箱"], "dictionary": ["词典", "单词", "翻译", "dictionary", "translate", "生词"], "news_analysis": ["资讯", "新闻", "分析", "news", "report", "热点"] } def _get_llm_service(self): """懒加载 LLM 服务(避免循环导入)""" if self._llm_service is None: from app.model_services.chat_services import get_chat_service self._llm_service = get_chat_service() return self._llm_service async def reason( self, query: str, context: Optional[Dict[str, Any]] = None ) -> ReasoningResult: """ 推理意图,决定下一步动作 Args: query: 用户查询 context: 上下文信息(可能包含已检索文档、对话历史等) Returns: ReasoningResult """ context = context or {} result = ReasoningResult(original_query=query) # 策略1: 尝试使用 LLM 推理 try: llm_result = await self._reason_with_llm(query, context) if llm_result.confidence >= 0.6: # 置信度足够高,直接返回 return llm_result except Exception as e: print(f"[ReactReasoner] LLM 推理失败: {e}, 回退到规则") # 策略2: LLM 失败或置信度低,使用规则匹配 return self._reason_with_rules(query, context) async def _reason_with_llm( self, query: str, context: Dict[str, Any] ) -> ReasoningResult: """使用 LLM 进行推理""" prompt = self._build_reasoning_prompt(query, context) llm = self._get_llm_service() response = await llm.ainvoke(prompt) return self._parse_llm_response(response.content, query) def _build_reasoning_prompt(self, query: str, context: Dict[str, Any]) -> str: """构建推理提示词""" # 构建上下文描述 context_parts = [] if context.get("retrieved_docs"): context_parts.append(f"- 已检索文档: {len(context['retrieved_docs'])} 条") if context.get("previous_actions"): context_parts.append(f"- 历史动作: {context['previous_actions']}") context_str = "\n".join(context_parts) if context_parts else "无" return f"""你是一个专业的意图推理助手。请分析用户的查询,决定下一步应该做什么。 可选动作: 1. DIRECT_RESPONSE - 直接回答(闲聊、打招呼、不需要额外信息) 2. RETRIEVE_RAG - 需要查询知识库(询问知识、政策、文档等) 3. RE_RETRIEVE_RAG - 需要重新检索(之前的结果不够,或者用户明确说"再查查"、"更多") 4. ROUTE_SUBGRAPH - 需要路由到专门的子图: - contact: 通讯录、联系人、邮件相关 - dictionary: 词典、翻译、单词相关 - news_analysis: 资讯、新闻、热点分析相关 5. CLARIFY - 需要澄清用户的问题(问题不明确) 用户查询: {query} 当前上下文: {context_str} 请按以下 JSON 格式输出(仅输出 JSON,不要其他内容): {{ "action": "DIRECT_RESPONSE|RETRIEVE_RAG|RE_RETRIEVE_RAG|ROUTE_SUBGRAPH|CLARIFY", "confidence": 0.85, "reasoning": "简要说明理由", "target_subgraph": "contact|dictionary|news_analysis|null (仅当 action=ROUTE_SUBGRAPH 时)", "retrieval_query": "优化后的检索查询 (可选)" }} """ def _parse_llm_response(self, response: str, original_query: str) -> ReasoningResult: """解析 LLM 响应""" result = ReasoningResult(original_query=original_query) # 提取 JSON json_match = re.search(r'\{[\s\S]*\}', response) if not json_match: # 没有 JSON,回退到规则 result.confidence = 0.0 return result try: data = json.loads(json_match.group()) action_str = data.get("action", "UNKNOWN") # 转换为枚举 try: result.action = ReasoningAction[action_str] except KeyError: result.action = ReasoningAction.UNKNOWN result.confidence = float(data.get("confidence", 0.5)) result.reasoning = data.get("reasoning", "") # 处理子图路由 if result.action == ReasoningAction.ROUTE_SUBGRAPH: result.retrieval_config.target_subgraph = data.get("target_subgraph") result.metadata["target_subgraph"] = data.get("target_subgraph") # 处理检索查询 if result.action in [ReasoningAction.RETRIEVE_RAG, ReasoningAction.RE_RETRIEVE_RAG]: result.retrieval_config.need_retrieval = True result.retrieval_config.need_re_retrieval = (result.action == ReasoningAction.RE_RETRIEVE_RAG) result.retrieval_config.retrieval_query = data.get("retrieval_query", original_query) return result except Exception as e: print(f"[ReactReasoner] 解析 LLM 响应失败: {e}") result.confidence = 0.0 return result def _reason_with_rules( self, query: str, context: Dict[str, Any] ) -> ReasoningResult: """基于规则的降级推理""" result = ReasoningResult(original_query=query) query_lower = query.lower() # 1. 检查子图路由(最高优先级) for subgraph_name, keywords in self._subgraph_keywords.items(): if any(kw in query_lower for kw in keywords): result.action = ReasoningAction.ROUTE_SUBGRAPH result.confidence = 0.85 result.reasoning = f"关键词匹配: {subgraph_name} 子图" result.retrieval_config.target_subgraph = subgraph_name result.metadata["target_subgraph"] = subgraph_name return result # 2. 检查是否需要重新检索 re_retrieve_keywords = ["再", "重新", "更多", "不够", "其他", "没找到", "找不到", "不对", "another", "again", "more"] has_re_retrieve = any(kw in query_lower for kw in re_retrieve_keywords) has_docs = context.get("retrieved_docs") and len(context["retrieved_docs"]) > 0 if has_re_retrieve or (has_docs and len(context["retrieved_docs"]) < 2): result.action = ReasoningAction.RE_RETRIEVE_RAG result.confidence = 0.8 if has_re_retrieve else 0.65 result.reasoning = "需要重新检索更多/更好结果" result.retrieval_config.need_retrieval = True result.retrieval_config.need_re_retrieval = True result.retrieval_config.retrieval_query = query return result # 3. 检查是否需要 RAG 检索 retrieve_keywords = ["什么", "怎么", "如何", "为什么", "哪", "谁", "介绍", "解释", "说明", "资料", "文档", "查询", "搜索", "what", "how", "why", "where", "who", "tell me", "explain", "about", "information"] has_retrieve = any(kw in query_lower for kw in retrieve_keywords) if has_retrieve or len(query.strip()) > 5: result.action = ReasoningAction.RETRIEVE_RAG result.confidence = 0.8 if has_retrieve else 0.6 result.reasoning = "需要查询知识库" result.retrieval_config.need_retrieval = True result.retrieval_config.retrieval_query = query return result # 4. 检查直接回答 direct_keywords = ["你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好", "嗨", "谢谢", "感谢", "多谢", "thanks", "thank you", "再见", "拜拜", "goodbye", "回见"] if any(kw in query_lower for kw in direct_keywords): result.action = ReasoningAction.DIRECT_RESPONSE result.confidence = 0.9 result.reasoning = "直接回答(问候/感谢/道别)" return result # 5. 检查是否需要澄清 if len(query.strip()) < 3 or any(q in query for q in ["?", "?", "哪个", "哪些", "什么意思", "请", "能详细"]): result.action = ReasoningAction.CLARIFY result.confidence = 0.7 result.reasoning = "需要澄清问题" result.next_hints = ["请提供更多细节", "您想了解什么方面的内容?", "能否具体说明一下?"] return result # 6. 默认直接回答 result.action = ReasoningAction.DIRECT_RESPONSE result.confidence = 0.5 result.reasoning = "默认直接回答模式" return result # ========== 3. 便捷函数(保持与旧代码兼容) ========== # 全局推理器实例(懒加载) _reasoner: Optional[ReactIntentReasoner] = None def _get_reasoner() -> ReactIntentReasoner: """获取推理器实例""" global _reasoner if _reasoner is None: _reasoner = ReactIntentReasoner() return _reasoner async def react_reason_async( query: str, context: Optional[Dict[str, Any]] = None ) -> ReasoningResult: """ 便捷函数:异步 React 推理(推荐使用) Args: query: 用户查询 context: 上下文 Returns: ReasoningResult """ reasoner = _get_reasoner() return await reasoner.reason(query, context) def react_reason( query: str, context: Optional[Dict[str, Any]] = None ) -> ReasoningResult: """ 便捷函数:同步 React 推理(保持向后兼容) 注意:内部会运行事件循环,建议在异步环境中使用 react_reason_async Args: query: 用户查询 context: 上下文 Returns: ReasoningResult """ import asyncio try: # 尝试获取现有事件循环 loop = asyncio.get_event_loop() if loop.is_running(): # 已经在运行的循环中,创建任务 task = loop.create_task(react_reason_async(query, context)) # 注意:这里不能真正等待,会导致死锁 # 降级到规则推理 print("[ReactReasoner] 检测到运行中的事件循环,使用规则推理") reasoner = _get_reasoner() return reasoner._reason_with_rules(query, context or {}) except RuntimeError: pass # 创建新的事件循环 loop = asyncio.new_event_loop() try: asyncio.set_event_loop(loop) return loop.run_until_complete(react_reason_async(query, context)) finally: loop.close() def get_route_by_reasoning(result: ReasoningResult) -> str: """ 根据推理结果获取路由字符串(与旧代码兼容) Args: result: ReasoningResult Returns: str: 路由标识 """ action_to_route = { ReasoningAction.DIRECT_RESPONSE: "direct_response", ReasoningAction.RETRIEVE_RAG: "retrieve_rag", ReasoningAction.RE_RETRIEVE_RAG: "re_retrieve_rag", ReasoningAction.CLARIFY: "clarify", ReasoningAction.ROUTE_SUBGRAPH: result.metadata.get("target_subgraph", "unknown_subgraph"), ReasoningAction.UNKNOWN: "unknown", } return action_to_route.get(result.action, "unknown") # ========== 4. 导出 ========== __all__ = [ "ReasoningAction", "RetrievalConfig", "ReasoningResult", "ReactIntentReasoner", "react_reason", "react_reason_async", "get_route_by_reasoning" ]