214 lines
7.2 KiB
Python
214 lines
7.2 KiB
Python
"""
|
||
混合路由节点模块 - 前置路由决策
|
||
负责决定走快速路径还是 React 循环
|
||
|
||
复用 intent.py 的推理逻辑,保证判断一致!
|
||
"""
|
||
|
||
from typing import Optional
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime
|
||
from langchain_core.runnables.config import RunnableConfig
|
||
|
||
from ..state import MainGraphState
|
||
from backend.app.logger import info, debug
|
||
# 直接复用 intent.py 的推理逻辑!
|
||
from backend.app.core.intent import (
|
||
react_reason_async,
|
||
ReasoningResult,
|
||
ReasoningAction,
|
||
)
|
||
from ._utils import dispatch_custom_event
|
||
|
||
|
||
# ========== 核心数据类型 ==========
|
||
@dataclass
|
||
class HybridRouterResult:
|
||
"""混合路由结果"""
|
||
intent: str = "complex" # chitchat / knowledge / tool / complex
|
||
confidence: float = 0.0
|
||
suggested_tools: list = field(default_factory=list)
|
||
path: str = "react_loop" # fast_chitchat / fast_rag / fast_tool / react_loop
|
||
reasoning: str = ""
|
||
|
||
|
||
# ========== 规则配置 ==========
|
||
# 保留规则分流,保持快速响应
|
||
CHITCHAT_KEYWORDS = {
|
||
"你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好",
|
||
"谢谢", "感谢", "多谢", "thanks", "thank you",
|
||
"再见", "拜拜", "goodbye", "bye"
|
||
}
|
||
|
||
SUBGRAPH_KEYWORDS = {
|
||
"contact": ["通讯录", "联系人", "contact", "email", "邮件", "邮箱"],
|
||
"dictionary": ["词典", "单词", "翻译", "dictionary", "translate", "生词"],
|
||
"news_analysis": ["资讯", "新闻", "分析", "news", "report", "热点"]
|
||
}
|
||
|
||
|
||
# ========== 从 ReasoningResult 映射到 HybridRouterResult ==========
|
||
def _map_reasoning_to_router(reasoning_result: ReasoningResult) -> HybridRouterResult:
|
||
"""将 intent.py 的推理结果映射为 hybrid_router 的结果"""
|
||
|
||
# ReasoningAction -> intent 映射
|
||
intent_map = {
|
||
ReasoningAction.DIRECT_RESPONSE: "chitchat",
|
||
ReasoningAction.RETRIEVE_RAG: "knowledge",
|
||
ReasoningAction.RE_RETRIEVE_RAG: "knowledge",
|
||
ReasoningAction.WEB_SEARCH: "complex", # WEB_SEARCH 走 React循环
|
||
ReasoningAction.ROUTE_SUBGRAPH: "tool",
|
||
ReasoningAction.CLARIFY: "chitchat",
|
||
ReasoningAction.UNKNOWN: "complex",
|
||
}
|
||
|
||
# ReasoningAction -> path 映射
|
||
path_map = {
|
||
ReasoningAction.DIRECT_RESPONSE: "fast_chitchat",
|
||
ReasoningAction.RETRIEVE_RAG: "fast_rag",
|
||
ReasoningAction.RE_RETRIEVE_RAG: "fast_rag",
|
||
ReasoningAction.WEB_SEARCH: "react_loop", # WEB_SEARCH 走 React循环
|
||
ReasoningAction.ROUTE_SUBGRAPH: "fast_tool",
|
||
ReasoningAction.CLARIFY: "fast_chitchat",
|
||
ReasoningAction.UNKNOWN: "react_loop",
|
||
}
|
||
|
||
intent = intent_map.get(reasoning_result.action, "complex")
|
||
path = path_map.get(reasoning_result.action, "react_loop")
|
||
|
||
suggested_tools = []
|
||
if reasoning_result.action == ReasoningAction.ROUTE_SUBGRAPH:
|
||
target_subgraph = reasoning_result.metadata.get("target_subgraph")
|
||
if target_subgraph:
|
||
suggested_tools = [target_subgraph]
|
||
|
||
return HybridRouterResult(
|
||
intent=intent,
|
||
confidence=reasoning_result.confidence,
|
||
suggested_tools=suggested_tools,
|
||
path=path,
|
||
reasoning=reasoning_result.reasoning
|
||
)
|
||
|
||
|
||
# ========== 规则分流(<5ms) ==========
|
||
def _rule_based_redirect(query: str) -> Optional[HybridRouterResult]:
|
||
"""规则分流:处理明显不需要推理的情况"""
|
||
query_clean = query.strip().lower()
|
||
|
||
# 1. 闲聊
|
||
if query_clean in CHITCHAT_KEYWORDS or any(kw in query_clean for kw in CHITCHAT_KEYWORDS):
|
||
return HybridRouterResult(
|
||
intent="chitchat",
|
||
confidence=1.0,
|
||
path="fast_chitchat",
|
||
reasoning="规则匹配:闲聊类请求"
|
||
)
|
||
|
||
# 2. 子图关键词
|
||
for subgraph_name, keywords in SUBGRAPH_KEYWORDS.items():
|
||
if any(kw in query_clean for kw in keywords):
|
||
return HybridRouterResult(
|
||
intent="tool",
|
||
confidence=0.9,
|
||
suggested_tools=[subgraph_name],
|
||
path="fast_tool",
|
||
reasoning=f"规则匹配:{subgraph_name} 子图关键词"
|
||
)
|
||
|
||
# 3. 短问题
|
||
if len(query_clean) < 3 or (query_clean.endswith("?") and len(query_clean) < 5):
|
||
return HybridRouterResult(
|
||
intent="complex",
|
||
confidence=0.3,
|
||
path="react_loop",
|
||
reasoning="规则匹配:问题过于简短"
|
||
)
|
||
|
||
return None
|
||
|
||
|
||
# ========== 默认结果 ==========
|
||
def _default_result() -> HybridRouterResult:
|
||
"""默认结果"""
|
||
return HybridRouterResult(
|
||
intent="complex",
|
||
confidence=0.3,
|
||
path="react_loop",
|
||
reasoning="降级到默认值,走 React 循环"
|
||
)
|
||
|
||
|
||
# ========== 主路由节点 ==========
|
||
async def hybrid_router_node(state: MainGraphState, config: Optional[RunnableConfig] = None) -> MainGraphState:
|
||
"""混合路由节点:前置路由,决定走快速路径还是 React循环"""
|
||
state.current_phase = "hybrid_router"
|
||
query = state.user_query or ""
|
||
|
||
info(f"[Hybrid Router] 开始路由: {query[:50]}...")
|
||
|
||
# 1. 规则分流
|
||
rule_result = _rule_based_redirect(query)
|
||
if rule_result:
|
||
decision = rule_result
|
||
info(f"[Hybrid Router] 规则命中: {decision.path}")
|
||
else:
|
||
# 2. 复用 intent.py 的推理逻辑!保证判断一致!
|
||
info("[Hybrid Router] 规则未命中,使用 intent.py 推理")
|
||
try:
|
||
reasoning_result = await react_reason_async(query, {})
|
||
decision = _map_reasoning_to_router(reasoning_result)
|
||
info(f"[Hybrid Router] 推理结果: action={reasoning_result.action.name}, path={decision.path}")
|
||
except Exception as e:
|
||
debug(f"[Hybrid Router] intent.py 推理失败: {e}")
|
||
decision = _default_result()
|
||
|
||
# 3. 更新状态
|
||
state.hybrid_router.decision = decision
|
||
state.hybrid_router.start_time = datetime.now().isoformat()
|
||
|
||
# 4. 发送事件
|
||
await dispatch_custom_event("intent_classified", {
|
||
"intent": decision.intent,
|
||
"confidence": decision.confidence,
|
||
"reasoning": decision.reasoning,
|
||
"suggested_tools": decision.suggested_tools
|
||
}, config)
|
||
|
||
await dispatch_custom_event("path_decision", {
|
||
"path": decision.path,
|
||
"intent": decision.intent,
|
||
"reasoning": decision.reasoning
|
||
}, config)
|
||
|
||
info(f"[Hybrid Router] 路由决策: {decision.path} (intent={decision.intent}, confidence={decision.confidence})")
|
||
return state
|
||
|
||
|
||
# ========== 条件路由函数 ==========
|
||
def route_from_hybrid_decision(state: MainGraphState) -> str:
|
||
"""从混合路由决策获取下一步节点"""
|
||
decision = state.hybrid_router.decision
|
||
if decision and hasattr(decision, 'path'):
|
||
return decision.path
|
||
return "react_loop"
|
||
|
||
|
||
def check_fast_path_success(state: MainGraphState) -> str:
|
||
"""检查快速路径是否成功"""
|
||
if state.fast_path.failed:
|
||
info("[Fast Path Check] 快速路径失败,升级到 React 循环")
|
||
return "escalate"
|
||
|
||
info("[Fast Path Check] 快速路径成功,进入 llm_call")
|
||
return "llm_call"
|
||
|
||
|
||
# ========== 导出 ==========
|
||
__all__ = [
|
||
"hybrid_router_node",
|
||
"route_from_hybrid_decision",
|
||
"check_fast_path_success",
|
||
"HybridRouterResult",
|
||
]
|