""" 混合路由节点模块 - 前置路由 + 快速路径 """ import re import json from typing import Dict, Any, Optional, List from dataclasses import dataclass, field from datetime import datetime from ..state import MainGraphState from ...logger import info, debug from ...model_services.chat_services import get_small_llm_service, get_chat_service from .rag_nodes import rag_retrieve_node # ========== 核心数据类型 ========== @dataclass class HybridRouterResult: """混合路由结果""" intent: str = "complex" # chitchat / knowledge / tool / complex confidence: float = 0.0 suggested_tools: List[str] = field(default_factory=list) path: str = "react_loop" # fast_chitchat / fast_rag / fast_tool / react_loop reasoning: str = "" # ========== 规则分流(无 LLM,<5ms) ========== # 问候、感谢等直接返回的关键词 AL_CHITCHAT = { "你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好", "谢谢", "感谢", "多谢", "thanks", "thank you", "再见", "拜拜", "goodbye", "bye" } # 子图关键词映射 SUBGRAPH_KEYWORDS = { "contact": ["通讯录", "联系人", "contact", "email", "邮件", "邮箱"], "dictionary": ["词典", "单词", "翻译", "dictionary", "translate", "生词"], "news_analysis": ["资讯", "新闻", "分析", "news", "report", "热点"] } def _rule_based_redirect(query: str) -> Optional[HybridRouterResult]: """ 规则分流:处理明显不需要推理的情况(超快速) Args: query: 用户查询 Returns: HybridRouterResult 或 None """ query_clean = query.strip().lower() # 1. 检查闲聊 if query_clean in AL_CHITCHAT or any(keyword in query_clean for keyword in AL_CHITCHAT): return HybridRouterResult( intent="chitchat", confidence=1.0, path="fast_chitchat", reasoning=f"规则匹配:闲聊类请求" ) # 2. 检查子图关键词(直接调用工具) for subgraph_name, keywords in SUBGRAPH_KEYWORDS.items(): if any(kw in query_clean for kw in keywords): return HybridRouterResult( intent="tool", confidence=0.9, suggested_tools=[subgraph_name], path="fast_tool", reasoning=f"规则匹配:{subgraph_name} 子图关键词" ) # 3. 检查是否是纯问号或很短的问题(可能需要澄清) if len(query_clean) < 3 or (query_clean.endswith("?") and len(query_clean) < 5): return HybridRouterResult( intent="complex", confidence=0.3, path="react_loop", reasoning="规则匹配:问题过于简短或不确定" ) return None # ========== 轻量级 LLM 分类 ========== async def _classify_with_small_llm(query: str) -> HybridRouterResult: """ 使用轻量级 LLM 进行意图分类 Args: query: 用户查询 Returns: HybridRouterResult """ try: llm = get_small_llm_service() prompt = f"""你是一个专业的意图分类助手。请分析用户的查询,并输出 JSON 格式的结果。 意图类型(4选一): - chitchat: 闲聊、问候、感谢、道别(不需要工具) - knowledge: 知识查询(需要查询知识库) - tool: 工具操作(需要调用通讯录/词典/新闻等子图) - complex: 复杂任务(多步骤、不确定、或需要推理) 用户查询: {query} 输出格式(仅 JSON,不要其他内容): {{ "intent": "chitchat|knowledge|tool|complex", "confidence": 0.0-1.0, "reasoning": "简要说明理由", "suggested_tools": ["contact|dictionary|news_analysis", "other"] }} 注意:如果不能100%确定意图,请选择 "complex",置信度设低一些。 """ response = await llm.ainvoke(prompt) content = response.content # 解析 JSON json_match = re.search(r'(\{[^{}]*\{[^{}]*\}[^{}]*\})|(\{[^{}]*\})', content) if json_match: try: data = json.loads(json_match.group(0)) intent = data.get("intent", "complex") confidence = float(data.get("confidence", 0.3)) reasoning = data.get("reasoning", "") suggested_tools = data.get("suggested_tools", []) # 置信度低于 0.5 一律走 complex if confidence < 0.5: intent = "complex" path = "react_loop" elif intent == "chitchat": path = "fast_chitchat" elif intent == "knowledge": path = "fast_rag" elif intent == "tool": path = "fast_tool" else: intent = "complex" path = "react_loop" return HybridRouterResult( intent=intent, confidence=confidence, suggested_tools=suggested_tools, path=path, reasoning=reasoning ) except Exception as e: debug(f"轻量 LLM 响应解析失败: {e}") pass except Exception as e: debug(f"轻量 LLM 调用失败: {e}") # LLM 失败,降级到规则+默认 return HybridRouterResult( intent="complex", confidence=0.3, path="react_loop", reasoning="LLM 调用失败,降级到 React 循环" ) # ========== 路由决策 ========== def _make_decision(classification_result: HybridRouterResult) -> HybridRouterResult: """ 根据分类结果最终决策 Args: classification_result: 分类结果 Returns: 最终决策结果 """ if classification_result.confidence < 0.5: classification_result.intent = "complex" classification_result.path = "react_loop" return classification_result return classification_result # ========== 混合路由主节点 ========== async def hybrid_router_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState: """ 混合路由节点:前置路由,决定走快速路径还是 React 循环 Args: state: 当前状态 config: LangChain 配置(用于发送自定义事件) Returns: 更新后的状态 """ state.current_phase = "hybrid_router" query = state.user_query or "" info(f"[Hybrid Router] 开始路由: {query[:50]}...") # 1. 规则分流(超快速) rule_result = _rule_based_redirect(query) if rule_result: info(f"[Hybrid Router] 规则分流命中: {rule_result.path}") decision = rule_result else: # 2. 轻量 LLM 分类 info(f"[Hybrid Router] 规则未命中,使用轻量 LLM 分类") classification_result = await _classify_with_small_llm(query) decision = _make_decision(classification_result) # 3. 发送 SSE 事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "intent_classified", { "intent": decision.intent, "confidence": decision.confidence, "reasoning": decision.reasoning, "suggested_tools": decision.suggested_tools }, callbacks=callbacks ) await adispatch_custom_event( "path_decision", { "path": decision.path, "intent": decision.intent, "reasoning": decision.reasoning }, callbacks=callbacks ) except Exception as e: debug(f"[Hybrid Router] 发送 SSE 事件失败: {e}") # 4. 更新状态 state.debug_info["hybrid_decision"] = decision state.debug_info["hybrid_start_time"] = datetime.now().isoformat() info(f"[Hybrid Router] 路由决策: {decision.path} (intent={decision.intent}, confidence={decision.confidence})") return state # ========== 快速路径:闲聊 ========== async def fast_chitchat_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState: """ 快速闲聊节点:直接返回回复,不走 RAG/工具/循环 Args: state: 当前状态 config: LangChain 配置 Returns: 更新后的状态 """ state.current_phase = "fast_chitchat" query = state.user_query or "" info(f"[Fast Chitchat] 处理: {query[:50]}") # 发送 SSE 事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "fast_path_start", {"path": "fast_chitchat"}, callbacks=callbacks ) except Exception as e: debug(f"[Fast Chitchat] 发送事件失败: {e}") # 快速回复(可以扩展为模板库) query_clean = query.strip().lower() if any(kw in query_clean for kw in ["谢谢", "感谢", "thanks", "thank you"]): reply = "不客气!如果还有其他问题,请随时告诉我 😊" elif any(kw in query_clean for kw in ["再见", "拜拜", "bye", "goodbye"]): reply = "再见!期待下次为您服务 👋" elif any(kw in query_clean for kw in ["你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好"]): reply = "你好!有什么我可以帮您的吗?" else: # 兜底:用轻量 LLM 生成 try: llm = get_small_llm_service() response = await llm.ainvoke(f"你是一个友好的助手。用户说:{query}。请简短友好地回复:") reply = response.content except: reply = "你好!有什么我可以帮您的吗?" state.final_result = reply state.success = True state.current_phase = "finalizing" state.debug_info["fast_chitchat_success"] = True # 发送 fast_path_end 事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "fast_path_end", {"path": "fast_chitchat", "success": True}, callbacks=callbacks ) except Exception as e: debug(f"[Fast Chitchat] 发送完成事件失败: {e}") return state # ========== 快速路径:RAG(带自动升级) ========== async def fast_rag_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState: """ 快速 RAG 节点:先尝试快速检索,失败自动升级到 React 循环 Args: state: 当前状态 config: LangChain 配置 Returns: 更新后的状态 """ state.current_phase = "fast_rag" query = state.user_query or "" info(f"[Fast RAG] 开始处理: {query[:50]}") # 发送 SSE 事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "fast_path_start", {"path": "fast_rag"}, callbacks=callbacks ) except Exception as e: debug(f"[Fast RAG] 发送事件失败: {e}") try: # 先尝试 RAG 检索 - 注意:rag_retrieve_node 是异步函数,需要 await state = await rag_retrieve_node(state, config) # 检查检索结果 rag_docs = getattr(state, "rag_docs", []) rag_context = getattr(state, "rag_context", "") # 检查是否有有效结果 has_valid_results = (rag_docs and len(rag_docs) > 0) or (rag_context and len(rag_context) > 10) if has_valid_results: # 快速 RAG 成功!使用小模型快速生成回答 try: llm = get_chat_service() prompt = f"""请根据以下信息回答用户问题: 检索到的信息: {rag_context or str(rag_docs)[:2000]} 用户问题:{query} 请给出简洁、准确的回答:""" response = await llm.ainvoke(prompt) state.final_result = response.content state.success = True state.current_phase = "finalizing" state.debug_info["fast_rag_success"] = True # 发送成功事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "fast_path_end", {"path": "fast_rag", "success": True}, callbacks=callbacks ) except Exception as e: debug(f"[Fast RAG] 发送完成事件失败: {e}") return state except Exception as e: info(f"[Fast RAG] 快速回答生成失败: {e}") # 继续往下走,升级到 React 循环 # RAG 失败或无结果:标记升级 info(f"[Fast RAG] 无有效检索结果,升级到 React 循环") return mark_fast_path_failed(state, reason="无有效检索结果") except Exception as e: info(f"[Fast RAG] 执行失败: {e}") return mark_fast_path_failed(state, reason=str(e)) # ========== 快速路径:工具(带自动升级) ========== async def fast_tool_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState: """ 快速工具节点:尝试直接调用工具,失败自动升级到 React 循环 Args: state: 当前状态 config: LangChain 配置 Returns: 更新后的状态 """ state.current_phase = "fast_tool" decision: HybridRouterResult = state.debug_info.get("hybrid_decision", HybridRouterResult()) suggested_tools = decision.suggested_tools or [] query = state.user_query or "" info(f"[Fast Tool] 开始处理,建议工具: {suggested_tools}") # 发送 SSE 事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "fast_path_start", {"path": "fast_tool", "suggested_tools": suggested_tools}, callbacks=callbacks ) except Exception as e: debug(f"[Fast Tool] 发送事件失败: {e}") # 检查是否有明确的工具建议 if not suggested_tools: info(f"[Fast Tool] 无明确工具建议,升级到 React 循环") return mark_fast_path_failed(state, reason="无明确工具建议") # 工具调用逻辑(这里暂时先标记升级,让 React 循环去处理) # 后续可以扩展为直接调用子图 info(f"[Fast Tool] 快速工具调用暂未完善,升级到 React 循环") return mark_fast_path_failed(state, reason="快速工具调用暂未完善") # ========== 标记快速路径失败(用于自动升级) ========== def mark_fast_path_failed(state: MainGraphState, reason: str = "") -> MainGraphState: """ 标记快速路径失败,准备升级到 React 循环 Args: state: 当前状态 reason: 失败原因 Returns: 更新后的状态 """ state.debug_info["fast_path_failed"] = True state.debug_info["fast_path_fail_reason"] = reason state.success = False # 发送 escalation 事件 config = state.debug_info.get("config") if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: # 这里需要在异步上下文中调用 pass except Exception as e: debug(f"[Fast Path] 发送升级事件失败: {e}") info(f"[Fast Path] 标记失败,准备升级: {reason}") return state # ========== 快速路径检查器(自动升级机制) ========== def route_from_hybrid_decision(state: MainGraphState) -> str: """ 从混合路由决策获取下一步的节点名称 Args: state: 当前状态 Returns: 节点名称 """ decision: HybridRouterResult = state.debug_info.get("hybrid_decision", HybridRouterResult()) return decision.path def check_fast_path_success(state: MainGraphState) -> str: """ 检查快速路径是否成功,成功直接到 finalize,失败升级到 react_reason Args: state: 当前状态 Returns: "success" 或 "escalate" """ # 检查是否有错误标记 if state.debug_info.get("fast_path_failed"): info(f"[Fast Path Check] 快速路径失败,升级到 React 循环") return "escalate" # 检查是否成功设置了 final_result if state.final_result: info(f"[Fast Path Check] 快速路径成功,进入 finalize") return "success" # 默认:认为成功(某些快速路径可能直接在节点中完成) return "success"