Files
ailine/backend/app/deprecated/intent.py

548 lines
21 KiB
Python
Raw Normal View History

"""
意图理解与推理模块React 模式
核心改进
1. 使用统一的 JSON 解析器保证稳定性
2. 优化 Prompt更清晰的指令
3. 更好的错误处理和降级策略
"""
import re
import json
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from enum import Enum, auto
from backend.app.core.json_parser import (
extract_and_parse_json,
safe_get,
safe_get_float,
safe_get_str,
)
# ========== 1. 核心数据类型 ==========
class ReasoningAction(Enum):
"""推理动作枚举 - 决定下一步做什么"""
DIRECT_RESPONSE = auto() # 直接回答,不需要额外信息
RETRIEVE_RAG = auto() # 需要调用 RAG 检索
RE_RETRIEVE_RAG = auto() # 需要重新检索(更多/更好结果)
WEB_SEARCH = auto() # 需要联网搜索
ROUTE_SUBGRAPH = auto() # 需要路由到子图contact/dictionary/news_analysis/research
CLARIFY = auto() # 需要澄清用户的问题
UNKNOWN = auto() # 未知动作
@dataclass
class RetrievalConfig:
"""检索配置"""
need_retrieval: bool = False
need_re_retrieval: bool = False
retrieval_query: Optional[str] = None
target_subgraph: Optional[str] = None
collection_name: Optional[str] = None
k: int = 5
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ReasoningResult:
"""推理结果数据类"""
action: ReasoningAction = ReasoningAction.UNKNOWN
confidence: float = 0.0
reasoning: str = ""
retrieval_config: RetrievalConfig = field(default_factory=RetrievalConfig)
extracted_entities: Dict[str, Any] = field(default_factory=dict)
next_hints: List[str] = field(default_factory=list)
original_query: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
# ========== 2. React 推理器 ==========
class ReactIntentReasoner:
"""
React 模式意图推理器
核心功能
1. 使用 LLM 分析用户意图
2. 决定是否需要 RAG 检索/重新检索
3. 决定是否需要路由到子图
4. 提供降级策略规则匹配
可以选择使用大模型或小模型
"""
def __init__(self, use_small_llm: bool = False):
"""
初始化推理器
Args:
use_small_llm: 是否使用轻量级模型用于意图分类
"""
self._llm_service = None
self._use_small_llm = use_small_llm
self._subgraph_keywords = {
"contact": ["通讯录", "联系人", "contact", "email", "邮件", "邮箱"],
"dictionary": ["词典", "单词", "翻译", "dictionary", "translate", "生词"],
"news_analysis": ["资讯", "新闻", "分析", "news", "report", "热点"],
"research": ["研究", "深度分析", "报告", "引用", "溯源", "research", "analyze", "report"]
}
def _get_llm_service(self):
"""懒加载 LLM 服务(避免循环导入)"""
if self._llm_service is None:
2026-05-05 23:17:00 +08:00
from backend.app.model_services.chat_services import get_chat_service, get_small_llm_service
if self._use_small_llm:
self._llm_service = get_small_llm_service()
else:
self._llm_service = get_chat_service()
return self._llm_service
async def reason(
self,
query: str,
context: Optional[Dict[str, Any]] = None
) -> ReasoningResult:
"""
推理意图决定下一步动作
Args:
query: 用户查询
context: 上下文信息可能包含已检索文档对话历史等
Returns:
ReasoningResult
"""
context = context or {}
result = ReasoningResult(original_query=query)
# 关键修复 1检查是否已经有检索结果或子图结果如果是直接回答
previous_actions = context.get("previous_actions", [])
if "subgraph_completed" in previous_actions:
result.action = ReasoningAction.DIRECT_RESPONSE
result.confidence = 1.0
result.reasoning = "子图已执行完成,直接回答"
return result
retrieved_docs = context.get("retrieved_docs", [])
messages = context.get("messages", [])
2026-05-06 04:26:06 +08:00
# 获取 RAG 相关状态
previous_actions = context.get("previous_actions", [])
2026-05-06 04:26:06 +08:00
rag_count = previous_actions.count("RETRIEVE_RAG")
rag_attempts = context.get("rag_attempts", rag_count)
rag_confidence = context.get("rag_confidence", 0.0)
retrieved_docs = context.get("retrieved_docs", [])
2026-05-06 04:26:06 +08:00
web_search_count = previous_actions.count("web_search")
2026-05-05 00:54:04 +08:00
# 检查 RAG 是否多次失败reasoning_history 中有失败的 RAG 记录)
rag_history = context.get("reasoning_history", [])
rag_fail_count = sum(
1 for h in rag_history
if h.get("action") in ("RETRIEVE_RAG", "RE_RETRIEVE_RAG") and h.get("confidence", 1.0) == 0.0
)
2026-05-06 04:26:06 +08:00
# 如果有检索文档,根据置信度判断下一步
if retrieved_docs and len(retrieved_docs) > 0:
if rag_confidence >= 0.6:
# 置信度足够高,直接回答
result.action = ReasoningAction.DIRECT_RESPONSE
result.confidence = 0.95
result.reasoning = f"已获取检索文档,置信度={rag_confidence:.2f},直接回答"
return result
elif rag_attempts >= 2 or rag_fail_count >= 2:
# 尝试次数已够或多次失败,放弃 RAG转向联网搜索
result.action = ReasoningAction.WEB_SEARCH
result.confidence = 0.8
result.reasoning = f"RAG 置信度={rag_confidence:.2f} < 0.6,且已尝试 {rag_attempts} 次,转向联网搜索"
result.metadata["need_web_search"] = True
result.metadata["search_query"] = query
return result
else:
# 置信度不够但还有尝试机会,再查一次
result.action = ReasoningAction.RETRIEVE_RAG
result.confidence = 0.8
result.reasoning = f"已获取检索文档但置信度={rag_confidence:.2f} < 0.6,可再尝试一次"
result.retrieval_config.need_retrieval = True
result.retrieval_config.retrieval_query = query
return result
# 如果 RAG 已多次失败且无文档,直接回答(基于常识)
2026-05-05 00:54:04 +08:00
if rag_fail_count >= 2:
result.action = ReasoningAction.DIRECT_RESPONSE
result.confidence = 0.7
result.reasoning = f"RAG 已尝试 {rag_fail_count} 次均失败,知识库无相关内容,直接基于常识回答"
return result
2026-05-06 04:26:06 +08:00
# 如果 web search 已执行过,直接回答
if web_search_count >= 1:
result.action = ReasoningAction.DIRECT_RESPONSE
result.confidence = 0.95
result.reasoning = "已获取联网搜索结果,直接回答"
return result
# 策略1尝试使用 LLM 推理
try:
llm_result = await self._reason_with_llm(query, context)
if llm_result.confidence >= 0.6: # 置信度足够高,直接返回
return llm_result
except Exception as e:
print(f"[ReactReasoner] LLM 推理失败: {e}, 回退到规则")
# 策略2LLM 失败或置信度低,使用规则匹配
return self._reason_with_rules(query, context)
async def _reason_with_llm(
self,
query: str,
context: Dict[str, Any]
) -> ReasoningResult:
"""使用 LLM 进行推理"""
prompt = self._build_reasoning_prompt(query, context)
llm = self._get_llm_service()
response = await llm.ainvoke(prompt)
return self._parse_llm_response(response.content, query)
def _build_reasoning_prompt(self, query: str, context: Dict[str, Any]) -> str:
"""
构建推理提示词优化版
改进点
1. 更清晰的指令和格式要求
2. 明确要求纯 JSON 输出不要 markdown
3. 更好的示例和决策规则
"""
# 构建上下文描述
context_parts = []
if context.get("retrieved_docs"):
context_parts.append(f"- 已检索文档: {len(context['retrieved_docs'])}")
rag_confidence = context.get("rag_confidence")
if rag_confidence is not None:
context_parts.append(f"- RAG 置信度: {rag_confidence:.2f}")
rag_attempts = context.get("rag_attempts", 0)
if rag_attempts:
context_parts.append(f"- RAG 尝试次数: {rag_attempts}")
previous_actions = context.get("previous_actions", [])
if previous_actions:
context_parts.append(f"- 历史动作: {previous_actions}")
context_str = "\n".join(context_parts) if context_parts else ""
return f"""你是一个决策控制器。你需要根据当前状态决定下一步操作。
格式要求
你必须严格输出 JSON 格式不要加任何 Markdown 代码块标记 ```json
仅输出纯 JSON 字符串不要有其他解释文字
可用动作
1. DIRECT_RESPONSE - 直接回答已有足够信息不需要额外工具
2. RETRIEVE_RAG - 检索知识库需要查询相关知识
3. RE_RETRIEVE_RAG - 重新检索已有结果不够需要再次尝试
4. WEB_SEARCH - 联网搜索需要最新资讯或知识库没有的内容
5. ROUTE_SUBGRAPH - 路由到子图通讯录/词典/资讯分析
6. CLARIFY - 澄清问题问题不明确需要用户补充
动作参数说明
每个动作需要的参数
- RETRIEVE_RAG: {{"retrieval_query": "优化后的检索查询字符串"}}
- RE_RETRIEVE_RAG: {{"retrieval_query": "优化后的检索查询字符串"}}
- WEB_SEARCH: {{"search_query": "优化后的搜索查询字符串"}}
- ROUTE_SUBGRAPH: {{"target_subgraph": "contact|dictionary|news_analysis"}}
- DIRECT_RESPONSE/CLARIFY: {{}}无需参数
决策规则
1. 如果 RAG 置信度 >= 0.6 且有检索文档使用 DIRECT_RESPONSE
2. 如果 RAG 置信度 < 0.6 且尝试次数 < 2使用 RETRIEVE_RAG/RE_RETRIEVE_RAG
3. 如果 RAG 置信度 < 0.6 且尝试次数 >= 2使用 WEB_SEARCH
4. 如果已执行过联网搜索使用 DIRECT_RESPONSE
5. 如果问题涉及通讯录/词典/资讯分析使用 ROUTE_SUBGRAPH
6. 如果问题不明确使用 CLARIFY
输出格式
{{
"action": "动作名称(大写)",
"confidence": 0.85,
"reasoning": "简要说明决策理由",
"target_subgraph": "contact|dictionary|news_analysis|null",
"retrieval_query": "优化后的检索查询(可选)",
"search_query": "优化后的搜索查询(可选)"
}}
重要提示
- target_subgraph 仅在 action=ROUTE_SUBGRAPH 时提供否则设为 null 或不包含
- retrieval_query 仅在 action=RETRIEVE_RAG/RE_RETRIEVE_RAG 时提供
- search_query 仅在 action=WEB_SEARCH 时提供
- confidence 是你对当前决策的信心0.0-1.0
2026-05-06 04:26:06 +08:00
当前状态
用户查询: {query}
当前上下文:
{context_str}
现在开始
请根据以上信息输出你的决策 JSON"""
def _parse_llm_response(self, response: str, original_query: str) -> ReasoningResult:
"""
解析 LLM 响应优化版
使用统一的 JSON 解析器支持多种格式
"""
result = ReasoningResult(original_query=original_query)
# 使用新的 JSON 解析器
parse_result = extract_and_parse_json(response)
if not parse_result.success or not parse_result.data:
# 解析失败,使用规则推理降级
result.action = ReasoningAction.UNKNOWN
result.confidence = 0.0
result.reasoning = f"LLM 响应解析失败: {parse_result.error or '未知错误'}"
return result
data = parse_result.data
# 安全地提取字段
action_str = safe_get_str(data, "action", "UNKNOWN")
confidence = safe_get_float(data, "confidence", 0.5)
reasoning = safe_get_str(data, "reasoning", "")
target_subgraph = safe_get_str(data, "target_subgraph", None)
retrieval_query = safe_get_str(data, "retrieval_query", original_query)
search_query = safe_get_str(data, "search_query", original_query)
# 转换为枚举
try:
result.action = ReasoningAction[action_str]
except (KeyError, ValueError):
result.action = ReasoningAction.UNKNOWN
result.confidence = confidence
result.reasoning = reasoning
# 处理子图路由
if result.action == ReasoningAction.ROUTE_SUBGRAPH and target_subgraph:
result.retrieval_config.target_subgraph = target_subgraph
result.metadata["target_subgraph"] = target_subgraph
# 处理检索查询
if result.action in (ReasoningAction.RETRIEVE_RAG, ReasoningAction.RE_RETRIEVE_RAG):
result.retrieval_config.need_retrieval = True
result.retrieval_config.need_re_retrieval = (result.action == ReasoningAction.RE_RETRIEVE_RAG)
result.retrieval_config.retrieval_query = retrieval_query
# 处理联网搜索
if result.action == ReasoningAction.WEB_SEARCH:
result.metadata["need_web_search"] = True
result.metadata["search_query"] = search_query
return result
def _reason_with_rules(
self,
query: str,
context: Dict[str, Any]
) -> ReasoningResult:
"""基于规则的降级推理"""
result = ReasoningResult(original_query=query)
query_lower = query.lower()
# 1. 检查子图路由(最高优先级)
for subgraph_name, keywords in self._subgraph_keywords.items():
if any(kw in query_lower for kw in keywords):
result.action = ReasoningAction.ROUTE_SUBGRAPH
result.confidence = 0.85
result.reasoning = f"关键词匹配: {subgraph_name} 子图"
result.retrieval_config.target_subgraph = subgraph_name
result.metadata["target_subgraph"] = subgraph_name
return result
# 2. 检查是否需要联网搜索(谨慎触发)
# 只有用户明确要求搜索才触发
web_search_keywords = ["搜索", "搜索一下", "帮我搜", "search for", "web search", "搜索资料"]
has_web_search = any(kw in query_lower for kw in web_search_keywords)
if has_web_search:
result.action = ReasoningAction.WEB_SEARCH
result.confidence = 0.9
result.reasoning = "用户明确要求联网搜索"
result.metadata["need_web_search"] = True
result.metadata["search_query"] = query
return result
# 3. 检查是否需要重新检索
re_retrieve_keywords = ["", "重新", "更多", "不够", "其他", "没找到", "找不到", "不对", "another", "again", "more"]
has_re_retrieve = any(kw in query_lower for kw in re_retrieve_keywords)
has_docs = context.get("retrieved_docs") and len(context["retrieved_docs"]) > 0
if has_re_retrieve or (has_docs and len(context["retrieved_docs"]) < 2):
result.action = ReasoningAction.RE_RETRIEVE_RAG
result.confidence = 0.8 if has_re_retrieve else 0.65
result.reasoning = "需要重新检索更多/更好结果"
result.retrieval_config.need_retrieval = True
result.retrieval_config.need_re_retrieval = True
result.retrieval_config.retrieval_query = query
return result
# 3. 检查是否需要 RAG 检索
retrieve_keywords = ["什么", "怎么", "如何", "为什么", "", "", "介绍", "解释", "说明", "资料", "文档", "查询", "搜索", "what", "how", "why", "where", "who", "tell me", "explain", "about", "information"]
has_retrieve = any(kw in query_lower for kw in retrieve_keywords)
if has_retrieve or len(query.strip()) > 5:
result.action = ReasoningAction.RETRIEVE_RAG
result.confidence = 0.8 if has_retrieve else 0.6
result.reasoning = "需要查询知识库"
result.retrieval_config.need_retrieval = True
result.retrieval_config.retrieval_query = query
return result
# 4. 检查直接回答
direct_keywords = ["你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好", "", "谢谢", "感谢", "多谢", "thanks", "thank you", "再见", "拜拜", "goodbye", "回见"]
if any(kw in query_lower for kw in direct_keywords):
result.action = ReasoningAction.DIRECT_RESPONSE
result.confidence = 0.9
result.reasoning = "直接回答(问候/感谢/道别)"
return result
# 5. 检查是否需要澄清
if len(query.strip()) < 3 or any(q in query for q in ["?", "", "哪个", "哪些", "什么意思", "", "能详细"]):
result.action = ReasoningAction.CLARIFY
result.confidence = 0.7
result.reasoning = "需要澄清问题"
result.next_hints = ["请提供更多细节", "您想了解什么方面的内容?", "能否具体说明一下?"]
return result
# 6. 默认直接回答
result.action = ReasoningAction.DIRECT_RESPONSE
result.confidence = 0.5
result.reasoning = "默认直接回答模式"
return result
# ========== 3. 便捷函数(保持与旧代码兼容) ==========
# 全局推理器实例(懒加载)
_reasoner: Optional[ReactIntentReasoner] = None
_small_reasoner: Optional[ReactIntentReasoner] = None
2026-05-05 00:54:04 +08:00
def _get_reasoner(use_small_llm: bool = True) -> ReactIntentReasoner:
"""
获取推理器实例
Args:
use_small_llm: 是否使用轻量级模型
Returns:
ReactIntentReasoner 实例
"""
global _reasoner, _small_reasoner
if use_small_llm:
if _small_reasoner is None:
_small_reasoner = ReactIntentReasoner(use_small_llm=True)
return _small_reasoner
else:
if _reasoner is None:
_reasoner = ReactIntentReasoner(use_small_llm=False)
return _reasoner
async def react_reason_async(
query: str,
context: Optional[Dict[str, Any]] = None,
2026-05-05 00:54:04 +08:00
use_small_llm: bool = True
) -> ReasoningResult:
"""
便捷函数异步 React 推理推荐使用
Args:
query: 用户查询
context: 上下文
use_small_llm: 是否使用轻量级模型
Returns:
ReasoningResult
"""
reasoner = _get_reasoner(use_small_llm=use_small_llm)
return await reasoner.reason(query, context)
def react_reason(
query: str,
context: Optional[Dict[str, Any]] = None,
use_small_llm: bool = False
) -> ReasoningResult:
"""
便捷函数同步 React 推理保持向后兼容
注意内部会运行事件循环建议在异步环境中使用 react_reason_async
Args:
query: 用户查询
context: 上下文
use_small_llm: 是否使用轻量级模型
Returns:
ReasoningResult
"""
import asyncio
try:
# 尝试获取现有事件循环
loop = asyncio.get_event_loop()
if loop.is_running():
# 已经在运行的循环中,创建任务
# 注意:这里不能真正等待,会导致死锁
# 降级到规则推理
print(f"[ReactReasoner] 检测到运行中的事件循环,使用规则推理")
reasoner = _get_reasoner(use_small_llm=use_small_llm)
return reasoner._reason_with_rules(query, context or {})
except RuntimeError:
pass
# 创建新的事件循环
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(react_reason_async(query, context, use_small_llm=use_small_llm))
finally:
loop.close()
loop.close()
def get_route_by_reasoning(result: ReasoningResult) -> str:
"""
根据推理结果获取路由字符串与旧代码兼容
Args:
result: ReasoningResult
Returns:
str: 路由标识
"""
action_to_route = {
ReasoningAction.DIRECT_RESPONSE: "direct_response",
ReasoningAction.RETRIEVE_RAG: "retrieve_rag",
ReasoningAction.RE_RETRIEVE_RAG: "re_retrieve_rag",
ReasoningAction.WEB_SEARCH: "web_search",
ReasoningAction.CLARIFY: "clarify",
ReasoningAction.ROUTE_SUBGRAPH: result.metadata.get("target_subgraph", "unknown_subgraph"),
ReasoningAction.UNKNOWN: "unknown",
}
return action_to_route.get(result.action, "unknown")
# ========== 4. 导出 ==========
__all__ = [
"ReasoningAction",
"RetrievalConfig",
"ReasoningResult",
"ReactIntentReasoner",
"react_reason",
"react_reason_async",
"get_route_by_reasoning"
]