""" 混合路由节点模块 - 前置路由 + 快速路径 """ import re import json from typing import Dict, Any, Optional, List from dataclasses import dataclass, field from datetime import datetime from app.main_graph.state import MainGraphState from app.logger import info, debug from app.model_services.chat_services import get_small_llm_service, get_chat_service from app.main_graph.nodes.rag_nodes import rag_retrieve_node # ========== 核心数据类型 ========== @dataclass class HybridRouterResult: """混合路由结果""" intent: str = "complex" # chitchat / knowledge / tool / complex confidence: float = 0.0 suggested_tools: List[str] = field(default_factory=list) path: str = "react_loop" # fast_chitchat / fast_rag / fast_tool / react_loop reasoning: str = "" # ========== 规则分流(无 LLM,<5ms) ========== # 问候、感谢等直接返回的关键词 AL_CHITCHAT = { "你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好", "谢谢", "感谢", "多谢", "thanks", "thank you", "再见", "拜拜", "goodbye", "bye" } # 子图关键词映射 SUBGRAPH_KEYWORDS = { "contact": ["通讯录", "联系人", "contact", "email", "邮件", "邮箱"], "dictionary": ["词典", "单词", "翻译", "dictionary", "translate", "生词"], "news_analysis": ["资讯", "新闻", "分析", "news", "report", "热点"] } def _rule_based_redirect(query: str) -> Optional[HybridRouterResult]: """ 规则分流:处理明显不需要推理的情况(超快速) Args: query: 用户查询 Returns: HybridRouterResult 或 None """ query_clean = query.strip().lower() # 1. 检查闲聊 if query_clean in AL_CHITCHAT or any(keyword in query_clean for keyword in AL_CHITCHAT): return HybridRouterResult( intent="chitchat", confidence=1.0, path="fast_chitchat", reasoning=f"规则匹配:闲聊类请求" ) # 2. 检查子图关键词(直接调用工具) for subgraph_name, keywords in SUBGRAPH_KEYWORDS.items(): if any(kw in query_clean for kw in keywords): return HybridRouterResult( intent="tool", confidence=0.9, suggested_tools=[subgraph_name], path="fast_tool", reasoning=f"规则匹配:{subgraph_name} 子图关键词" ) # 3. 检查是否是纯问号或很短的问题(可能需要澄清) if len(query_clean) < 3 or (query_clean.endswith("?") and len(query_clean) < 5): return HybridRouterResult( intent="complex", confidence=0.3, path="react_loop", reasoning="规则匹配:问题过于简短或不确定" ) return None # ========== 轻量级 LLM 分类 ========== async def _classify_with_small_llm(query: str) -> HybridRouterResult: """ 使用轻量级 LLM 进行意图分类 Args: query: 用户查询 Returns: HybridRouterResult """ try: llm = get_small_llm_service() prompt = f"""你是一个专业的意图分类助手。请分析用户的查询,并输出 JSON 格式的结果。 意图类型(4选一): - chitchat: 闲聊、问候、感谢、道别(不需要工具) - knowledge: 知识查询(需要查询知识库) - tool: 工具操作(需要调用通讯录/词典/新闻等子图) - complex: 复杂任务(多步骤、不确定、或需要推理) 用户查询: {query} 输出格式(仅 JSON,不要其他内容): {{ "intent": "chitchat|knowledge|tool|complex", "confidence": 0.0-1.0, "reasoning": "简要说明理由", "suggested_tools": ["contact|dictionary|news_analysis", "other"] }} 注意:如果不能100%确定意图,请选择 "complex",置信度设低一些。 """ response = await llm.ainvoke(prompt) content = response.content # 解析 JSON json_match = re.search(r'(\{[^{}]*\{[^{}]*\}[^{}]*\})|(\{[^{}]*\})', content) if json_match: try: data = json.loads(json_match.group(0)) intent = data.get("intent", "complex") confidence = float(data.get("confidence", 0.3)) reasoning = data.get("reasoning", "") suggested_tools = data.get("suggested_tools", []) # 置信度低于 0.5 一律走 complex if confidence < 0.5: intent = "complex" path = "react_loop" elif intent == "chitchat": path = "fast_chitchat" elif intent == "knowledge": path = "fast_rag" elif intent == "tool": path = "fast_tool" else: intent = "complex" path = "react_loop" return HybridRouterResult( intent=intent, confidence=confidence, suggested_tools=suggested_tools, path=path, reasoning=reasoning ) except Exception as e: debug(f"轻量 LLM 响应解析失败: {e}") pass except Exception as e: debug(f"轻量 LLM 调用失败: {e}") # LLM 失败,降级到规则+默认 return HybridRouterResult( intent="complex", confidence=0.3, path="react_loop", reasoning="LLM 调用失败,降级到 React 循环" ) # ========== 路由决策 ========== def _make_decision(classification_result: HybridRouterResult) -> HybridRouterResult: """ 根据分类结果最终决策 Args: classification_result: 分类结果 Returns: 最终决策结果 """ if classification_result.confidence < 0.5: classification_result.intent = "complex" classification_result.path = "react_loop" return classification_result return classification_result # ========== 混合路由主节点 ========== async def hybrid_router_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState: """ 混合路由节点:前置路由,决定走快速路径还是 React 循环 Args: state: 当前状态 config: LangChain 配置(用于发送自定义事件) Returns: 更新后的状态 """ state.current_phase = "hybrid_router" query = state.user_query or "" info(f"[Hybrid Router] 开始路由: {query[:50]}...") # 1. 规则分流(超快速) rule_result = _rule_based_redirect(query) if rule_result: info(f"[Hybrid Router] 规则分流命中: {rule_result.path}") decision = rule_result else: # 2. 轻量 LLM 分类 info(f"[Hybrid Router] 规则未命中,使用轻量 LLM 分类") classification_result = await _classify_with_small_llm(query) decision = _make_decision(classification_result) # 3. 发送 SSE 事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "intent_classified", { "intent": decision.intent, "confidence": decision.confidence, "reasoning": decision.reasoning, "suggested_tools": decision.suggested_tools }, callbacks=callbacks ) await adispatch_custom_event( "path_decision", { "path": decision.path, "intent": decision.intent, "reasoning": decision.reasoning }, callbacks=callbacks ) except Exception as e: debug(f"[Hybrid Router] 发送 SSE 事件失败: {e}") # 4. 更新状态 state.debug_info["hybrid_decision"] = decision state.debug_info["hybrid_start_time"] = datetime.now().isoformat() info(f"[Hybrid Router] 路由决策: {decision.path} (intent={decision.intent}, confidence={decision.confidence})") return state # ========== 快速路径:闲聊 ========== async def fast_chitchat_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState: """ 快速闲聊节点:直接返回回复,不走 RAG/工具/循环 Args: state: 当前状态 config: LangChain 配置 Returns: 更新后的状态 """ state.current_phase = "fast_chitchat" query = state.user_query or "" info(f"[Fast Chitchat] 处理: {query[:50]}") # 发送 SSE 事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "fast_path_start", {"path": "fast_chitchat"}, callbacks=callbacks ) except Exception as e: debug(f"[Fast Chitchat] 发送事件失败: {e}") # 快速回复(可以扩展为模板库) query_clean = query.strip().lower() if any(kw in query_clean for kw in ["谢谢", "感谢", "thanks", "thank you"]): reply = "不客气!如果还有其他问题,请随时告诉我 😊" elif any(kw in query_clean for kw in ["再见", "拜拜", "bye", "goodbye"]): reply = "再见!期待下次为您服务 👋" elif any(kw in query_clean for kw in ["你好", "您好", "hi", "hello", "hey", "早上好", "晚上好", "下午好"]): reply = "你好!有什么我可以帮您的吗?" else: # 兜底:用轻量 LLM 生成 try: llm = get_small_llm_service() response = await llm.ainvoke(f"你是一个友好的助手。用户说:{query}。请简短友好地回复:") reply = response.content except: reply = "你好!有什么我可以帮您的吗?" state.final_result = reply state.success = True state.current_phase = "finalizing" state.debug_info["fast_chitchat_success"] = True # 发送 fast_path_end 事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "fast_path_end", {"path": "fast_chitchat", "success": True}, callbacks=callbacks ) except Exception as e: debug(f"[Fast Chitchat] 发送完成事件失败: {e}") return state # ========== 快速路径:RAG(带自动升级) ========== async def fast_rag_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState: """ 快速 RAG 节点:先尝试快速检索,失败自动升级到 React 循环 Args: state: 当前状态 config: LangChain 配置 Returns: 更新后的状态 """ state.current_phase = "fast_rag" query = state.user_query or "" info(f"[Fast RAG] 开始处理: {query[:50]}") # 发送 SSE 事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "fast_path_start", {"path": "fast_rag"}, callbacks=callbacks ) except Exception as e: debug(f"[Fast RAG] 发送事件失败: {e}") try: # 先尝试 RAG 检索 state = rag_retrieve_node(state, config) # 检查检索结果 rag_docs = getattr(state, "rag_docs", []) rag_context = getattr(state, "rag_context", "") # 检查是否有有效结果 has_valid_results = (rag_docs and len(rag_docs) > 0) or (rag_context and len(rag_context) > 10) if has_valid_results: # 快速 RAG 成功!使用小模型快速生成回答 try: llm = get_chat_service() prompt = f"""请根据以下信息回答用户问题: 检索到的信息: {rag_context or str(rag_docs)[:2000]} 用户问题:{query} 请给出简洁、准确的回答:""" response = await llm.ainvoke(prompt) state.final_result = response.content state.success = True state.current_phase = "finalizing" state.debug_info["fast_rag_success"] = True # 发送成功事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "fast_path_end", {"path": "fast_rag", "success": True}, callbacks=callbacks ) except Exception as e: debug(f"[Fast RAG] 发送完成事件失败: {e}") return state except Exception as e: info(f"[Fast RAG] 快速回答生成失败: {e}") # 继续往下走,升级到 React 循环 # RAG 失败或无结果:标记升级 info(f"[Fast RAG] 无有效检索结果,升级到 React 循环") return mark_fast_path_failed(state, reason="无有效检索结果") except Exception as e: info(f"[Fast RAG] 执行失败: {e}") return mark_fast_path_failed(state, reason=str(e)) # ========== 快速路径:工具(带自动升级) ========== async def fast_tool_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState: """ 快速工具节点:尝试直接调用工具,失败自动升级到 React 循环 Args: state: 当前状态 config: LangChain 配置 Returns: 更新后的状态 """ state.current_phase = "fast_tool" decision: HybridRouterResult = state.debug_info.get("hybrid_decision", HybridRouterResult()) suggested_tools = decision.suggested_tools or [] query = state.user_query or "" info(f"[Fast Tool] 开始处理,建议工具: {suggested_tools}") # 发送 SSE 事件 if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: await adispatch_custom_event( "fast_path_start", {"path": "fast_tool", "suggested_tools": suggested_tools}, callbacks=callbacks ) except Exception as e: debug(f"[Fast Tool] 发送事件失败: {e}") # 检查是否有明确的工具建议 if not suggested_tools: info(f"[Fast Tool] 无明确工具建议,升级到 React 循环") return mark_fast_path_failed(state, reason="无明确工具建议") # 工具调用逻辑(这里暂时先标记升级,让 React 循环去处理) # 后续可以扩展为直接调用子图 info(f"[Fast Tool] 快速工具调用暂未完善,升级到 React 循环") return mark_fast_path_failed(state, reason="快速工具调用暂未完善") # ========== 标记快速路径失败(用于自动升级) ========== def mark_fast_path_failed(state: MainGraphState, reason: str = "") -> MainGraphState: """ 标记快速路径失败,准备升级到 React 循环 Args: state: 当前状态 reason: 失败原因 Returns: 更新后的状态 """ state.debug_info["fast_path_failed"] = True state.debug_info["fast_path_fail_reason"] = reason state.success = False # 发送 escalation 事件 config = state.debug_info.get("config") if config: try: from langchain_core.callbacks.manager import adispatch_custom_event callbacks = config.get("callbacks") if callbacks: # 这里需要在异步上下文中调用 pass except Exception as e: debug(f"[Fast Path] 发送升级事件失败: {e}") info(f"[Fast Path] 标记失败,准备升级: {reason}") return state # ========== 快速路径检查器(自动升级机制) ========== def route_from_hybrid_decision(state: MainGraphState) -> str: """ 从混合路由决策获取下一步的节点名称 Args: state: 当前状态 Returns: 节点名称 """ decision: HybridRouterResult = state.debug_info.get("hybrid_decision", HybridRouterResult()) return decision.path def check_fast_path_success(state: MainGraphState) -> str: """ 检查快速路径是否成功,成功直接到 finalize,失败升级到 react_reason Args: state: 当前状态 Returns: "success" 或 "escalate" """ # 检查是否有错误标记 if state.debug_info.get("fast_path_failed"): info(f"[Fast Path Check] 快速路径失败,升级到 React 循环") return "escalate" # 检查是否成功设置了 final_result if state.final_result: info(f"[Fast Path Check] 快速路径成功,进入 finalize") return "success" # 默认:认为成功(某些快速路径可能直接在节点中完成) return "success"