Files
ailine/backend/app/agent_subgraphs/common/intent.py

383 lines
14 KiB
Python
Raw Normal View History

"""
意图理解与推理模块 (React 模式)
Intent Understanding & Reasoning Module (React Pattern)
这个模块实现了 React (Reasoning + Acting) 模式的意图理解节点用于
1. 理解用户的查询意图
2. 判断是否需要调用 RAG 检索
3. 判断是否需要重新检索
4. 决定下一步的动作路由到子图直接回答等
核心设计
- 使用项目已有的 chat_services.py 进行 LLM 调用
- 保持与现有架构一致服务层模式
- 支持降级策略LLM 失败时回退到规则
- react_nodes.py 无缝集成
"""
import re
import json
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from enum import Enum, auto
# ========== 1. 核心数据类型 ==========
class ReasoningAction(Enum):
"""推理动作枚举 - 决定下一步做什么"""
DIRECT_RESPONSE = auto() # 直接回答,不需要额外信息
RETRIEVE_RAG = auto() # 需要调用 RAG 检索
RE_RETRIEVE_RAG = auto() # 需要重新检索(更多/更好结果)
ROUTE_SUBGRAPH = auto() # 需要路由到子图contact/dictionary/news_analysis
CLARIFY = auto() # 需要澄清用户的问题
UNKNOWN = auto() # 未知动作
@dataclass
class RetrievalConfig:
"""检索配置"""
need_retrieval: bool = False
need_re_retrieval: bool = False
retrieval_query: Optional[str] = None
target_subgraph: Optional[str] = None
collection_name: Optional[str] = None
k: int = 5
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ReasoningResult:
"""推理结果数据类"""
action: ReasoningAction = ReasoningAction.UNKNOWN
confidence: float = 0.0
reasoning: str = ""
retrieval_config: RetrievalConfig = field(default_factory=RetrievalConfig)
extracted_entities: Dict[str, Any] = field(default_factory=dict)
next_hints: List[str] = field(default_factory=list)
original_query: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
# ========== 2. React 推理器 ==========
class ReactIntentReasoner:
"""
React 模式意图推理器
核心功能
1. 使用 LLM 分析用户意图
2. 决定是否需要 RAG 检索/重新检索
3. 决定是否需要路由到子图
4. 提供降级策略规则匹配
"""
def __init__(self):
"""初始化推理器 - 懒加载 LLM 服务"""
self._llm_service = None
self._subgraph_keywords = {
"contact": ["通讯录", "联系人", "contact", "email", "邮件", "邮箱"],
"dictionary": ["词典", "单词", "翻译", "dictionary", "translate", "生词"],
"news_analysis": ["资讯", "新闻", "分析", "news", "report", "热点"]
}
def _get_llm_service(self):
"""懒加载 LLM 服务(避免循环导入)"""
if self._llm_service is None:
from app.model_services.chat_services import get_chat_service
self._llm_service = get_chat_service()
return self._llm_service
async def reason(
self,
query: str,
context: Optional[Dict[str, Any]] = None
) -> ReasoningResult:
"""
推理意图决定下一步动作
Args:
query: 用户查询
context: 上下文信息可能包含已检索文档对话历史等
Returns:
ReasoningResult
"""
context = context or {}
result = ReasoningResult(original_query=query)
# 策略1: 尝试使用 LLM 推理
try:
llm_result = await self._reason_with_llm(query, context)
if llm_result.confidence >= 0.6: # 置信度足够高,直接返回
return llm_result
except Exception as e:
print(f"[ReactReasoner] LLM 推理失败: {e}, 回退到规则")
# 策略2: LLM 失败或置信度低,使用规则匹配
return self._reason_with_rules(query, context)
async def _reason_with_llm(
self,
query: str,
context: Dict[str, Any]
) -> ReasoningResult:
"""使用 LLM 进行推理"""
prompt = self._build_reasoning_prompt(query, context)
llm = self._get_llm_service()
response = await llm.ainvoke(prompt)
return self._parse_llm_response(response.content, query)
def _build_reasoning_prompt(self, query: str, context: Dict[str, Any]) -> str:
"""构建推理提示词"""
# 构建上下文描述
context_parts = []
if context.get("retrieved_docs"):
context_parts.append(f"- 已检索文档: {len(context['retrieved_docs'])}")
if context.get("previous_actions"):
context_parts.append(f"- 历史动作: {context['previous_actions']}")
context_str = "\n".join(context_parts) if context_parts else ""
return f"""你是一个专业的意图推理助手。请分析用户的查询,决定下一步应该做什么。
可选动作
1. DIRECT_RESPONSE - 直接回答闲聊打招呼不需要额外信息
2. RETRIEVE_RAG - 需要查询知识库询问知识政策文档等
3. RE_RETRIEVE_RAG - 需要重新检索之前的结果不够或者用户明确说"再查查""更多"
4. ROUTE_SUBGRAPH - 需要路由到专门的子图
- contact: 通讯录联系人邮件相关
- dictionary: 词典翻译单词相关
- news_analysis: 资讯新闻热点分析相关
5. CLARIFY - 需要澄清用户的问题问题不明确
用户查询: {query}
当前上下文:
{context_str}
请按以下 JSON 格式输出仅输出 JSON不要其他内容:
{{
"action": "DIRECT_RESPONSE|RETRIEVE_RAG|RE_RETRIEVE_RAG|ROUTE_SUBGRAPH|CLARIFY",
"confidence": 0.85,
"reasoning": "简要说明理由",
"target_subgraph": "contact|dictionary|news_analysis|null (仅当 action=ROUTE_SUBGRAPH 时)",
"retrieval_query": "优化后的检索查询 (可选)"
}}
"""
def _parse_llm_response(self, response: str, original_query: str) -> ReasoningResult:
"""解析 LLM 响应"""
result = ReasoningResult(original_query=original_query)
# 提取 JSON
json_match = re.search(r'\{[\s\S]*\}', response)
if not json_match:
# 没有 JSON回退到规则
result.confidence = 0.0
return result
try:
data = json.loads(json_match.group())
action_str = data.get("action", "UNKNOWN")
# 转换为枚举
try:
result.action = ReasoningAction[action_str]
except KeyError:
result.action = ReasoningAction.UNKNOWN
result.confidence = float(data.get("confidence", 0.5))
result.reasoning = data.get("reasoning", "")
# 处理子图路由
if result.action == ReasoningAction.ROUTE_SUBGRAPH:
result.retrieval_config.target_subgraph = data.get("target_subgraph")
result.metadata["target_subgraph"] = data.get("target_subgraph")
# 处理检索查询
if result.action in [ReasoningAction.RETRIEVE_RAG, ReasoningAction.RE_RETRIEVE_RAG]:
result.retrieval_config.need_retrieval = True
result.retrieval_config.need_re_retrieval = (result.action == ReasoningAction.RE_RETRIEVE_RAG)
result.retrieval_config.retrieval_query = data.get("retrieval_query", original_query)
return result
except Exception as e:
print(f"[ReactReasoner] 解析 LLM 响应失败: {e}")
result.confidence = 0.0
return result
def _reason_with_rules(
self,
query: str,
context: Dict[str, Any]
) -> ReasoningResult:
"""基于规则的降级推理"""
result = ReasoningResult(original_query=query)
query_lower = query.lower()
# 1. 检查子图路由(最高优先级)
for subgraph_name, keywords in self._subgraph_keywords.items():
if any(kw in query_lower for kw in keywords):
result.action = ReasoningAction.ROUTE_SUBGRAPH
result.confidence = 0.85
result.reasoning = f"关键词匹配: {subgraph_name} 子图"
result.retrieval_config.target_subgraph = subgraph_name
result.metadata["target_subgraph"] = subgraph_name
return result
# 2. 检查是否需要重新检索
re_retrieve_keywords = ["", "重新", "更多", "不够", "其他", "没找到", "找不到", "不对", "another", "again", "more"]
has_re_retrieve = any(kw in query_lower for kw in re_retrieve_keywords)
has_docs = context.get("retrieved_docs") and len(context["retrieved_docs"]) > 0
if has_re_retrieve or (has_docs and len(context["retrieved_docs"]) < 2):
result.action = ReasoningAction.RE_RETRIEVE_RAG
result.confidence = 0.8 if has_re_retrieve else 0.65
result.reasoning = "需要重新检索更多/更好结果"
result.retrieval_config.need_retrieval = True
result.retrieval_config.need_re_retrieval = True
result.retrieval_config.retrieval_query = query
return result
# 3. 检查是否需要 RAG 检索
retrieve_keywords = ["什么", "怎么", "如何", "为什么", "", "", "介绍", "解释", "说明", "资料", "文档", "查询", "搜索", "what", "how", "why", "where", "who", "tell me", "explain", "about", "information"]
has_retrieve = any(kw in query_lower for kw in retrieve_keywords)
if has_retrieve or len(query.strip()) > 5:
result.action = ReasoningAction.RETRIEVE_RAG
result.confidence = 0.8 if has_retrieve else 0.6
result.reasoning = "需要查询知识库"
result.retrieval_config.need_retrieval = True
result.retrieval_config.retrieval_query = query
return result
# 4. 检查直接回答
direct_keywords = ["你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好", "", "谢谢", "感谢", "多谢", "thanks", "thank you", "再见", "拜拜", "goodbye", "回见"]
if any(kw in query_lower for kw in direct_keywords):
result.action = ReasoningAction.DIRECT_RESPONSE
result.confidence = 0.9
result.reasoning = "直接回答(问候/感谢/道别)"
return result
# 5. 检查是否需要澄清
if len(query.strip()) < 3 or any(q in query for q in ["?", "", "哪个", "哪些", "什么意思", "", "能详细"]):
result.action = ReasoningAction.CLARIFY
result.confidence = 0.7
result.reasoning = "需要澄清问题"
result.next_hints = ["请提供更多细节", "您想了解什么方面的内容?", "能否具体说明一下?"]
return result
# 6. 默认直接回答
result.action = ReasoningAction.DIRECT_RESPONSE
result.confidence = 0.5
result.reasoning = "默认直接回答模式"
return result
# ========== 3. 便捷函数(保持与旧代码兼容) ==========
# 全局推理器实例(懒加载)
_reasoner: Optional[ReactIntentReasoner] = None
def _get_reasoner() -> ReactIntentReasoner:
"""获取推理器实例"""
global _reasoner
if _reasoner is None:
_reasoner = ReactIntentReasoner()
return _reasoner
async def react_reason_async(
query: str,
context: Optional[Dict[str, Any]] = None
) -> ReasoningResult:
"""
便捷函数异步 React 推理推荐使用
Args:
query: 用户查询
context: 上下文
Returns:
ReasoningResult
"""
reasoner = _get_reasoner()
return await reasoner.reason(query, context)
def react_reason(
query: str,
context: Optional[Dict[str, Any]] = None
) -> ReasoningResult:
"""
便捷函数同步 React 推理保持向后兼容
注意内部会运行事件循环建议在异步环境中使用 react_reason_async
Args:
query: 用户查询
context: 上下文
Returns:
ReasoningResult
"""
import asyncio
try:
# 尝试获取现有事件循环
loop = asyncio.get_event_loop()
if loop.is_running():
# 已经在运行的循环中,创建任务
task = loop.create_task(react_reason_async(query, context))
# 注意:这里不能真正等待,会导致死锁
# 降级到规则推理
print("[ReactReasoner] 检测到运行中的事件循环,使用规则推理")
reasoner = _get_reasoner()
return reasoner._reason_with_rules(query, context or {})
except RuntimeError:
pass
# 创建新的事件循环
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(react_reason_async(query, context))
finally:
loop.close()
def get_route_by_reasoning(result: ReasoningResult) -> str:
"""
根据推理结果获取路由字符串与旧代码兼容
Args:
result: ReasoningResult
Returns:
str: 路由标识
"""
action_to_route = {
ReasoningAction.DIRECT_RESPONSE: "direct_response",
ReasoningAction.RETRIEVE_RAG: "retrieve_rag",
ReasoningAction.RE_RETRIEVE_RAG: "re_retrieve_rag",
ReasoningAction.CLARIFY: "clarify",
ReasoningAction.ROUTE_SUBGRAPH: result.metadata.get("target_subgraph", "unknown_subgraph"),
ReasoningAction.UNKNOWN: "unknown",
}
return action_to_route.get(result.action, "unknown")
# ========== 4. 导出 ==========
__all__ = [
"ReasoningAction",
"RetrievalConfig",
"ReasoningResult",
"ReactIntentReasoner",
"react_reason",
"react_reason_async",
"get_route_by_reasoning"
]