Files
ailine/backend/app/main_graph/nodes/hybrid_router.py
root a5fc9cd5d8
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 6m8s
完整的混合路由优化系统
1. 双模型服务 (llm + smallLLM)
   - 增加 get_small_llm_service() 函数
   - 支持智谱/DeepSeek 小模型作为轻量级选项

2. 前置混合路由
   - 规则快速分流(无 LLM,超快速)
   - 轻量级意图分类(smallLLM)
   - 快速路径:fast_chitchat, fast_rag, fast_tool

3. 自动升级机制
   - 快速路径失败 → 自动回到 React 循环
   - SSE 事件增强:intent_classified, path_decision, fast_path_*, escalation

4. 向后兼容
   - build_react_main_graph(use_hybrid_router=True/False)
   - 可选择启用或禁用混合路由

5. 更新 intent.py
   - 支持 use_small_llm 参数
   - 保留原有完整功能供 React 循环使用
2026-05-03 16:45:46 +08:00

546 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
混合路由节点模块 - 前置路由 + 快速路径
"""
import re
import json
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from datetime import datetime
from app.main_graph.state import MainGraphState
from app.logger import info, debug
from app.model_services.chat_services import get_small_llm_service, get_chat_service
from app.main_graph.nodes.rag_nodes import rag_retrieve_node
# ========== 核心数据类型 ==========
@dataclass
class HybridRouterResult:
"""混合路由结果"""
intent: str = "complex" # chitchat / knowledge / tool / complex
confidence: float = 0.0
suggested_tools: List[str] = field(default_factory=list)
path: str = "react_loop" # fast_chitchat / fast_rag / fast_tool / react_loop
reasoning: str = ""
# ========== 规则分流(无 LLM<5ms ==========
# 问候、感谢等直接返回的关键词
AL_CHITCHAT = {
"你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好",
"谢谢", "感谢", "多谢", "thanks", "thank you",
"再见", "拜拜", "goodbye", "bye"
}
# 子图关键词映射
SUBGRAPH_KEYWORDS = {
"contact": ["通讯录", "联系人", "contact", "email", "邮件", "邮箱"],
"dictionary": ["词典", "单词", "翻译", "dictionary", "translate", "生词"],
"news_analysis": ["资讯", "新闻", "分析", "news", "report", "热点"]
}
def _rule_based_redirect(query: str) -> Optional[HybridRouterResult]:
"""
规则分流:处理明显不需要推理的情况(超快速)
Args:
query: 用户查询
Returns:
HybridRouterResult 或 None
"""
query_clean = query.strip().lower()
# 1. 检查闲聊
if query_clean in AL_CHITCHAT or any(keyword in query_clean for keyword in AL_CHITCHAT):
return HybridRouterResult(
intent="chitchat",
confidence=1.0,
path="fast_chitchat",
reasoning=f"规则匹配:闲聊类请求"
)
# 2. 检查子图关键词(直接调用工具)
for subgraph_name, keywords in SUBGRAPH_KEYWORDS.items():
if any(kw in query_clean for kw in keywords):
return HybridRouterResult(
intent="tool",
confidence=0.9,
suggested_tools=[subgraph_name],
path="fast_tool",
reasoning=f"规则匹配:{subgraph_name} 子图关键词"
)
# 3. 检查是否是纯问号或很短的问题(可能需要澄清)
if len(query_clean) < 3 or (query_clean.endswith("?") and len(query_clean) < 5):
return HybridRouterResult(
intent="complex",
confidence=0.3,
path="react_loop",
reasoning="规则匹配:问题过于简短或不确定"
)
return None
# ========== 轻量级 LLM 分类 ==========
async def _classify_with_small_llm(query: str) -> HybridRouterResult:
"""
使用轻量级 LLM 进行意图分类
Args:
query: 用户查询
Returns:
HybridRouterResult
"""
try:
llm = get_small_llm_service()
prompt = f"""你是一个专业的意图分类助手。请分析用户的查询,并输出 JSON 格式的结果。
意图类型4选一
- chitchat: 闲聊、问候、感谢、道别(不需要工具)
- knowledge: 知识查询(需要查询知识库)
- tool: 工具操作(需要调用通讯录/词典/新闻等子图)
- complex: 复杂任务(多步骤、不确定、或需要推理)
用户查询:
{query}
输出格式(仅 JSON不要其他内容
{{
"intent": "chitchat|knowledge|tool|complex",
"confidence": 0.0-1.0,
"reasoning": "简要说明理由",
"suggested_tools": ["contact|dictionary|news_analysis", "other"]
}}
注意如果不能100%确定意图,请选择 "complex",置信度设低一些。
"""
response = await llm.ainvoke(prompt)
content = response.content
# 解析 JSON
json_match = re.search(r'(\{[^{}]*\{[^{}]*\}[^{}]*\})|(\{[^{}]*\})', content)
if json_match:
try:
data = json.loads(json_match.group(0))
intent = data.get("intent", "complex")
confidence = float(data.get("confidence", 0.3))
reasoning = data.get("reasoning", "")
suggested_tools = data.get("suggested_tools", [])
# 置信度低于 0.5 一律走 complex
if confidence < 0.5:
intent = "complex"
path = "react_loop"
elif intent == "chitchat":
path = "fast_chitchat"
elif intent == "knowledge":
path = "fast_rag"
elif intent == "tool":
path = "fast_tool"
else:
intent = "complex"
path = "react_loop"
return HybridRouterResult(
intent=intent,
confidence=confidence,
suggested_tools=suggested_tools,
path=path,
reasoning=reasoning
)
except Exception as e:
debug(f"轻量 LLM 响应解析失败: {e}")
pass
except Exception as e:
debug(f"轻量 LLM 调用失败: {e}")
# LLM 失败,降级到规则+默认
return HybridRouterResult(
intent="complex",
confidence=0.3,
path="react_loop",
reasoning="LLM 调用失败,降级到 React 循环"
)
# ========== 路由决策 ==========
def _make_decision(classification_result: HybridRouterResult) -> HybridRouterResult:
"""
根据分类结果最终决策
Args:
classification_result: 分类结果
Returns:
最终决策结果
"""
if classification_result.confidence < 0.5:
classification_result.intent = "complex"
classification_result.path = "react_loop"
return classification_result
return classification_result
# ========== 混合路由主节点 ==========
async def hybrid_router_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState:
"""
混合路由节点:前置路由,决定走快速路径还是 React 循环
Args:
state: 当前状态
config: LangChain 配置(用于发送自定义事件)
Returns:
更新后的状态
"""
state.current_phase = "hybrid_router"
query = state.user_query or ""
info(f"[Hybrid Router] 开始路由: {query[:50]}...")
# 1. 规则分流(超快速)
rule_result = _rule_based_redirect(query)
if rule_result:
info(f"[Hybrid Router] 规则分流命中: {rule_result.path}")
decision = rule_result
else:
# 2. 轻量 LLM 分类
info(f"[Hybrid Router] 规则未命中,使用轻量 LLM 分类")
classification_result = await _classify_with_small_llm(query)
decision = _make_decision(classification_result)
# 3. 发送 SSE 事件
if config:
try:
from langchain_core.callbacks.manager import adispatch_custom_event
callbacks = config.get("callbacks")
if callbacks:
await adispatch_custom_event(
"intent_classified",
{
"intent": decision.intent,
"confidence": decision.confidence,
"reasoning": decision.reasoning,
"suggested_tools": decision.suggested_tools
},
callbacks=callbacks
)
await adispatch_custom_event(
"path_decision",
{
"path": decision.path,
"intent": decision.intent,
"reasoning": decision.reasoning
},
callbacks=callbacks
)
except Exception as e:
debug(f"[Hybrid Router] 发送 SSE 事件失败: {e}")
# 4. 更新状态
state.debug_info["hybrid_decision"] = decision
state.debug_info["hybrid_start_time"] = datetime.now().isoformat()
info(f"[Hybrid Router] 路由决策: {decision.path} (intent={decision.intent}, confidence={decision.confidence})")
return state
# ========== 快速路径:闲聊 ==========
async def fast_chitchat_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState:
"""
快速闲聊节点:直接返回回复,不走 RAG/工具/循环
Args:
state: 当前状态
config: LangChain 配置
Returns:
更新后的状态
"""
state.current_phase = "fast_chitchat"
query = state.user_query or ""
info(f"[Fast Chitchat] 处理: {query[:50]}")
# 发送 SSE 事件
if config:
try:
from langchain_core.callbacks.manager import adispatch_custom_event
callbacks = config.get("callbacks")
if callbacks:
await adispatch_custom_event(
"fast_path_start",
{"path": "fast_chitchat"},
callbacks=callbacks
)
except Exception as e:
debug(f"[Fast Chitchat] 发送事件失败: {e}")
# 快速回复(可以扩展为模板库)
query_clean = query.strip().lower()
if any(kw in query_clean for kw in ["谢谢", "感谢", "thanks", "thank you"]):
reply = "不客气!如果还有其他问题,请随时告诉我 😊"
elif any(kw in query_clean for kw in ["再见", "拜拜", "bye", "goodbye"]):
reply = "再见!期待下次为您服务 👋"
elif any(kw in query_clean for kw in ["你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好"]):
reply = "你好!有什么我可以帮您的吗?"
else:
# 兜底:用轻量 LLM 生成
try:
llm = get_small_llm_service()
response = await llm.ainvoke(f"你是一个友好的助手。用户说:{query}。请简短友好地回复:")
reply = response.content
except:
reply = "你好!有什么我可以帮您的吗?"
state.final_result = reply
state.success = True
state.current_phase = "finalizing"
state.debug_info["fast_chitchat_success"] = True
# 发送 fast_path_end 事件
if config:
try:
from langchain_core.callbacks.manager import adispatch_custom_event
callbacks = config.get("callbacks")
if callbacks:
await adispatch_custom_event(
"fast_path_end",
{"path": "fast_chitchat", "success": True},
callbacks=callbacks
)
except Exception as e:
debug(f"[Fast Chitchat] 发送完成事件失败: {e}")
return state
# ========== 快速路径RAG带自动升级 ==========
async def fast_rag_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState:
"""
快速 RAG 节点:先尝试快速检索,失败自动升级到 React 循环
Args:
state: 当前状态
config: LangChain 配置
Returns:
更新后的状态
"""
state.current_phase = "fast_rag"
query = state.user_query or ""
info(f"[Fast RAG] 开始处理: {query[:50]}")
# 发送 SSE 事件
if config:
try:
from langchain_core.callbacks.manager import adispatch_custom_event
callbacks = config.get("callbacks")
if callbacks:
await adispatch_custom_event(
"fast_path_start",
{"path": "fast_rag"},
callbacks=callbacks
)
except Exception as e:
debug(f"[Fast RAG] 发送事件失败: {e}")
try:
# 先尝试 RAG 检索
state = rag_retrieve_node(state, config)
# 检查检索结果
rag_docs = getattr(state, "rag_docs", [])
rag_context = getattr(state, "rag_context", "")
# 检查是否有有效结果
has_valid_results = (rag_docs and len(rag_docs) > 0) or (rag_context and len(rag_context) > 10)
if has_valid_results:
# 快速 RAG 成功!使用小模型快速生成回答
try:
llm = get_chat_service()
prompt = f"""请根据以下信息回答用户问题:
检索到的信息:
{rag_context or str(rag_docs)[:2000]}
用户问题:{query}
请给出简洁、准确的回答:"""
response = await llm.ainvoke(prompt)
state.final_result = response.content
state.success = True
state.current_phase = "finalizing"
state.debug_info["fast_rag_success"] = True
# 发送成功事件
if config:
try:
from langchain_core.callbacks.manager import adispatch_custom_event
callbacks = config.get("callbacks")
if callbacks:
await adispatch_custom_event(
"fast_path_end",
{"path": "fast_rag", "success": True},
callbacks=callbacks
)
except Exception as e:
debug(f"[Fast RAG] 发送完成事件失败: {e}")
return state
except Exception as e:
info(f"[Fast RAG] 快速回答生成失败: {e}")
# 继续往下走,升级到 React 循环
# RAG 失败或无结果:标记升级
info(f"[Fast RAG] 无有效检索结果,升级到 React 循环")
return mark_fast_path_failed(state, reason="无有效检索结果")
except Exception as e:
info(f"[Fast RAG] 执行失败: {e}")
return mark_fast_path_failed(state, reason=str(e))
# ========== 快速路径:工具(带自动升级) ==========
async def fast_tool_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState:
"""
快速工具节点:尝试直接调用工具,失败自动升级到 React 循环
Args:
state: 当前状态
config: LangChain 配置
Returns:
更新后的状态
"""
state.current_phase = "fast_tool"
decision: HybridRouterResult = state.debug_info.get("hybrid_decision", HybridRouterResult())
suggested_tools = decision.suggested_tools or []
query = state.user_query or ""
info(f"[Fast Tool] 开始处理,建议工具: {suggested_tools}")
# 发送 SSE 事件
if config:
try:
from langchain_core.callbacks.manager import adispatch_custom_event
callbacks = config.get("callbacks")
if callbacks:
await adispatch_custom_event(
"fast_path_start",
{"path": "fast_tool", "suggested_tools": suggested_tools},
callbacks=callbacks
)
except Exception as e:
debug(f"[Fast Tool] 发送事件失败: {e}")
# 检查是否有明确的工具建议
if not suggested_tools:
info(f"[Fast Tool] 无明确工具建议,升级到 React 循环")
return mark_fast_path_failed(state, reason="无明确工具建议")
# 工具调用逻辑(这里暂时先标记升级,让 React 循环去处理)
# 后续可以扩展为直接调用子图
info(f"[Fast Tool] 快速工具调用暂未完善,升级到 React 循环")
return mark_fast_path_failed(state, reason="快速工具调用暂未完善")
# ========== 标记快速路径失败(用于自动升级) ==========
def mark_fast_path_failed(state: MainGraphState, reason: str = "") -> MainGraphState:
"""
标记快速路径失败,准备升级到 React 循环
Args:
state: 当前状态
reason: 失败原因
Returns:
更新后的状态
"""
state.debug_info["fast_path_failed"] = True
state.debug_info["fast_path_fail_reason"] = reason
state.success = False
# 发送 escalation 事件
config = state.debug_info.get("config")
if config:
try:
from langchain_core.callbacks.manager import adispatch_custom_event
callbacks = config.get("callbacks")
if callbacks:
# 这里需要在异步上下文中调用
pass
except Exception as e:
debug(f"[Fast Path] 发送升级事件失败: {e}")
info(f"[Fast Path] 标记失败,准备升级: {reason}")
return state
# ========== 快速路径检查器(自动升级机制) ==========
def route_from_hybrid_decision(state: MainGraphState) -> str:
"""
从混合路由决策获取下一步的节点名称
Args:
state: 当前状态
Returns:
节点名称
"""
decision: HybridRouterResult = state.debug_info.get("hybrid_decision", HybridRouterResult())
return decision.path
def check_fast_path_success(state: MainGraphState) -> str:
"""
检查快速路径是否成功,成功直接到 finalize失败升级到 react_reason
Args:
state: 当前状态
Returns:
"success""escalate"
"""
# 检查是否有错误标记
if state.debug_info.get("fast_path_failed"):
info(f"[Fast Path Check] 快速路径失败,升级到 React 循环")
return "escalate"
# 检查是否成功设置了 final_result
if state.final_result:
info(f"[Fast Path Check] 快速路径成功,进入 finalize")
return "success"
# 默认:认为成功(某些快速路径可能直接在节点中完成)
return "success"