This commit is contained in:
242
backend/app/agent/hybrid_router.py
Normal file
242
backend/app/agent/hybrid_router.py
Normal file
@@ -0,0 +1,242 @@
|
||||
# backend/app/agent/hybrid_router.py
|
||||
|
||||
from enum import Enum
|
||||
from typing import Optional, List, Dict, Any
|
||||
from dataclasses import dataclass
|
||||
import sys
|
||||
import os
|
||||
|
||||
# 添加项目路径
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
|
||||
from app.agent.intent_classifier import IntentClassifier, IntentType, get_intent_classifier
|
||||
|
||||
|
||||
class RouterAction(Enum):
|
||||
"""路由动作"""
|
||||
FAST_RAG = "fast_rag" # 快速 RAG 路径
|
||||
FAST_TOOL = "fast_tool" # 快速工具路径
|
||||
REACT_LOOP = "react_loop" # React 循环路径
|
||||
DIRECT_ANSWER = "direct_answer" # 直接回答
|
||||
CLARIFY = "clarify" # 澄清反问
|
||||
|
||||
|
||||
@dataclass
|
||||
class RouterDecision:
|
||||
"""路由决策结果"""
|
||||
action: RouterAction
|
||||
intent: IntentType
|
||||
confidence: float
|
||||
reasoning: str
|
||||
metadata: Dict[str, Any] = None
|
||||
|
||||
|
||||
class HybridRouter:
|
||||
"""混合路由决策器"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
intent_classifier: IntentClassifier,
|
||||
rag_pipeline = None,
|
||||
tool_registry: Dict[str, Any] = None,
|
||||
react_graph = None
|
||||
):
|
||||
self.classifier = intent_classifier
|
||||
self.rag = rag_pipeline
|
||||
self.tools = tool_registry or {}
|
||||
self.react_graph = react_graph
|
||||
|
||||
async def route(self, user_input: str, context: Optional[str] = None) -> RouterDecision:
|
||||
"""
|
||||
路由决策
|
||||
|
||||
Args:
|
||||
user_input: 用户输入
|
||||
context: 对话上下文
|
||||
|
||||
Returns:
|
||||
RouterDecision
|
||||
"""
|
||||
# 1. 意图分类
|
||||
intent_result = await self.classifier.classify(user_input, context)
|
||||
|
||||
# 2. 根据意图路由
|
||||
decision = self._make_decision(intent_result, user_input)
|
||||
|
||||
return decision
|
||||
|
||||
def _make_decision(self, intent_result: IntentResult, user_input: str) -> RouterDecision:
|
||||
"""根据意图做出路由决策"""
|
||||
intent = intent_result.intent_type
|
||||
confidence = intent_result.confidence
|
||||
|
||||
# 低置信度 → 走 React 循环(更安全)
|
||||
if confidence < 0.6:
|
||||
return RouterDecision(
|
||||
action=RouterAction.REACT_LOOP,
|
||||
intent=intent,
|
||||
confidence=confidence,
|
||||
reasoning=f"置信度 {confidence:.2f} 较低,走 React 循环"
|
||||
)
|
||||
|
||||
# 根据意图路由
|
||||
routing_map = {
|
||||
IntentType.KNOWLEDGE: RouterAction.FAST_RAG,
|
||||
IntentType.REALTIME: RouterAction.FAST_TOOL,
|
||||
IntentType.ACTION: RouterAction.FAST_TOOL,
|
||||
IntentType.CHITCHAT: RouterAction.DIRECT_ANSWER,
|
||||
IntentType.CLARIFY: RouterAction.CLARIFY,
|
||||
IntentType.MIXED: RouterAction.REACT_LOOP,
|
||||
IntentType.UNKNOWN: RouterAction.REACT_LOOP,
|
||||
}
|
||||
|
||||
action = routing_map.get(intent, RouterAction.REACT_LOOP)
|
||||
|
||||
return RouterDecision(
|
||||
action=action,
|
||||
intent=intent,
|
||||
confidence=confidence,
|
||||
reasoning=intent_result.reasoning
|
||||
)
|
||||
|
||||
async def execute(self, decision: RouterDecision, user_input: str, thread_id: str) -> str:
|
||||
"""
|
||||
根据决策执行对应路径
|
||||
|
||||
Args:
|
||||
decision: 路由决策
|
||||
user_input: 用户输入
|
||||
thread_id: 线程 ID
|
||||
|
||||
Returns:
|
||||
最终答案
|
||||
"""
|
||||
if decision.action == RouterAction.FAST_RAG:
|
||||
return await self._execute_fast_rag(user_input)
|
||||
elif decision.action == RouterAction.FAST_TOOL:
|
||||
return await self._execute_fast_tool(user_input)
|
||||
elif decision.action == RouterAction.DIRECT_ANSWER:
|
||||
return await self._execute_direct_answer(user_input)
|
||||
elif decision.action == RouterAction.CLARIFY:
|
||||
return await self._execute_clarify(user_input)
|
||||
elif decision.action == RouterAction.REACT_LOOP:
|
||||
return await self._execute_react_loop(user_input, thread_id)
|
||||
else:
|
||||
return await self._execute_react_loop(user_input, thread_id)
|
||||
|
||||
async def _execute_fast_rag(self, user_input: str) -> str:
|
||||
"""快速 RAG 路径"""
|
||||
print("🚀 执行快速 RAG 路径")
|
||||
|
||||
# 1. 检索文档(如果 RAG 可用)
|
||||
docs = []
|
||||
if self.rag and hasattr(self.rag, 'aretrieve'):
|
||||
docs = await self.rag.aretrieve(user_input)
|
||||
|
||||
# 2. 格式化上下文
|
||||
context = ""
|
||||
if self.rag and hasattr(self.rag, 'format_context'):
|
||||
context = self.rag.format_context(docs)
|
||||
|
||||
# 3. 生成回答
|
||||
prompt = f"""
|
||||
请根据以下文档回答用户问题。
|
||||
|
||||
参考文档:
|
||||
{context or "(无文档)"}
|
||||
|
||||
用户问题: {user_input}
|
||||
"""
|
||||
|
||||
response = await self.classifier.llm.ainvoke(prompt)
|
||||
return response.content
|
||||
|
||||
async def _execute_fast_tool(self, user_input: str) -> str:
|
||||
"""快速工具路径"""
|
||||
print("🚀 执行快速工具路径")
|
||||
|
||||
# 这里简化处理,实际项目中:
|
||||
# 1. 解析需要调用的工具
|
||||
# 2. 生成工具参数
|
||||
# 3. 执行工具
|
||||
# 4. 生成回答
|
||||
|
||||
return "快速工具路径:功能开发中..."
|
||||
|
||||
async def _execute_direct_answer(self, user_input: str) -> str:
|
||||
"""直接回答路径"""
|
||||
print("💬 执行直接回答路径")
|
||||
|
||||
prompt = f"""
|
||||
用户说: {user_input}
|
||||
|
||||
请友好回应。
|
||||
"""
|
||||
|
||||
response = await self.classifier.llm.ainvoke(prompt)
|
||||
return response.content
|
||||
|
||||
async def _execute_clarify(self, user_input: str) -> str:
|
||||
"""澄清反问路径"""
|
||||
print("❓ 执行澄清反问路径")
|
||||
|
||||
prompt = f"""
|
||||
用户说: {user_input}
|
||||
|
||||
用户的问题不太明确,请礼貌地询问更多细节。
|
||||
"""
|
||||
|
||||
response = await self.classifier.llm.ainvoke(prompt)
|
||||
return response.content
|
||||
|
||||
async def _execute_react_loop(self, user_input: str, thread_id: str) -> str:
|
||||
"""React 循环路径"""
|
||||
print("🔄 执行 React 循环路径")
|
||||
|
||||
# 这里调用现有的完整 LangGraph 流程
|
||||
# 具体实现根据您的项目结构
|
||||
return "React 循环路径:调用现有 LangGraph..."
|
||||
|
||||
|
||||
# 便捷函数
|
||||
async def hybrid_agent_route(
|
||||
user_input: str,
|
||||
thread_id: str,
|
||||
context: Optional[str] = None
|
||||
) -> str:
|
||||
"""
|
||||
混合 Agent 路由入口函数
|
||||
|
||||
Args:
|
||||
user_input: 用户输入
|
||||
thread_id: 线程 ID
|
||||
context: 对话上下文
|
||||
|
||||
Returns:
|
||||
最终答案
|
||||
"""
|
||||
# 获取依赖(实际项目应该用依赖注入)
|
||||
classifier = get_intent_classifier()
|
||||
# rag = get_rag_pipeline()
|
||||
# tools = get_tool_registry()
|
||||
# graph = get_react_graph()
|
||||
|
||||
# 创建路由器
|
||||
router = HybridRouter(
|
||||
intent_classifier=classifier,
|
||||
rag_pipeline=None, # 实际项目中传入
|
||||
tool_registry={}, # 实际项目中传入
|
||||
react_graph=None # 实际项目中传入
|
||||
)
|
||||
|
||||
# 路由决策
|
||||
decision = await router.route(user_input, context)
|
||||
print(f"🧭 路由决策: {decision.action} (意图: {decision.intent}, 置信度: {decision.confidence:.2f})")
|
||||
print(f"📝 推理: {decision.reasoning}")
|
||||
|
||||
# 执行
|
||||
# result = await router.execute(decision, user_input, thread_id)
|
||||
# return result
|
||||
|
||||
# 临时返回
|
||||
return f"路由决策: {decision.action}"
|
||||
193
backend/app/agent/intent_classifier.py
Normal file
193
backend/app/agent/intent_classifier.py
Normal file
@@ -0,0 +1,193 @@
|
||||
# backend/app/agent/intent_classifier.py
|
||||
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Dict, Any
|
||||
import sys
|
||||
import os
|
||||
|
||||
# 添加项目路径
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
|
||||
from app.model_services.chat_services import get_chat_service
|
||||
|
||||
|
||||
class IntentType(Enum):
|
||||
"""意图类型枚举"""
|
||||
KNOWLEDGE = "knowledge" # 知识查询 → RAG
|
||||
REALTIME = "realtime" # 实时数据 → 工具
|
||||
ACTION = "action" # 执行操作 → 工具
|
||||
CHITCHAT = "chitchat" # 闲聊 → 直接回答
|
||||
CLARIFY = "clarify" # 需要澄清 → 反问用户
|
||||
MIXED = "mixed" # 复杂任务 → React 循环
|
||||
UNKNOWN = "unknown" # 未知意图
|
||||
|
||||
|
||||
@dataclass
|
||||
class IntentResult:
|
||||
"""意图识别结果"""
|
||||
intent_type: IntentType
|
||||
confidence: float
|
||||
reasoning: str
|
||||
metadata: Dict[str, Any] = None
|
||||
|
||||
|
||||
class IntentClassifier:
|
||||
"""意图分类器"""
|
||||
|
||||
def __init__(self):
|
||||
self.llm = get_chat_service()
|
||||
self._intent_examples = self._build_examples()
|
||||
|
||||
def _build_examples(self) -> str:
|
||||
"""构建少样本示例"""
|
||||
return """
|
||||
<示例>
|
||||
用户: "公司的报销政策是什么?"
|
||||
意图: knowledge
|
||||
推理: 用户询问公司内部政策,需要查询知识库
|
||||
|
||||
用户: "帮我查一下订单 12345 的状态"
|
||||
意图: realtime
|
||||
推理: 需要查询实时订单数据
|
||||
|
||||
用户: "帮我申请退款,订单号 67890"
|
||||
意图: action
|
||||
推理: 需要执行退款操作
|
||||
|
||||
用户: "今天天气怎么样?"
|
||||
意图: realtime
|
||||
推理: 需要查询实时天气数据
|
||||
|
||||
用户: "帮我写一份邮件给客户,查询订单状态,然后附上退款政策"
|
||||
意图: mixed
|
||||
推理: 需要查询订单、查询政策、生成邮件,多步骤任务
|
||||
|
||||
用户: "你好"
|
||||
意图: chitchat
|
||||
推理: 简单寒暄
|
||||
|
||||
用户: "我想查点东西..."
|
||||
意图: clarify
|
||||
推理: 用户没有说清楚要查什么
|
||||
</示例>
|
||||
"""
|
||||
|
||||
async def classify(self, user_input: str, context: Optional[str] = None) -> IntentResult:
|
||||
"""
|
||||
分类用户意图
|
||||
|
||||
Args:
|
||||
user_input: 用户输入
|
||||
context: 对话上下文(可选)
|
||||
|
||||
Returns:
|
||||
IntentResult
|
||||
"""
|
||||
prompt = self._build_classification_prompt(user_input, context)
|
||||
|
||||
try:
|
||||
response = await self.llm.ainvoke(prompt)
|
||||
result = self._parse_response(response.content)
|
||||
return result
|
||||
except Exception as e:
|
||||
print(f"Intent classification error: {e}")
|
||||
# 降级策略:默认返回 mixed,走 React 循环
|
||||
return IntentResult(
|
||||
intent_type=IntentType.MIXED,
|
||||
confidence=0.5,
|
||||
reasoning="分类失败,走通用路径"
|
||||
)
|
||||
|
||||
def _build_classification_prompt(self, user_input: str, context: Optional[str]) -> str:
|
||||
"""构建分类提示词"""
|
||||
context_part = f"\n对话上下文:\n{context}" if context else ""
|
||||
|
||||
return f"""
|
||||
你是一个专业的意图识别助手。请分析用户的输入,判断其意图类型。
|
||||
|
||||
可选意图类型:
|
||||
- knowledge: 用户询问知识、政策、文档等,需要查询知识库
|
||||
- realtime: 用户需要查询实时数据(订单状态、天气、股票等)
|
||||
- action: 用户需要执行某项操作(退款、下单、发送邮件等)
|
||||
- chitchat: 用户只是闲聊、打招呼,不需要工具或检索
|
||||
- clarify: 用户的问题不明确,需要追问澄清
|
||||
- mixed: 复杂任务,需要多步骤处理(同时需要检索+工具)
|
||||
|
||||
{self._intent_examples}
|
||||
|
||||
用户输入: {user_input}
|
||||
{context_part}
|
||||
|
||||
请按以下格式输出(纯JSON):
|
||||
{{
|
||||
"intent": "knowledge|realtime|action|chitchat|clarify|mixed",
|
||||
"confidence": 0.85,
|
||||
"reasoning": "简要说明为什么这个意图"
|
||||
}}
|
||||
"""
|
||||
|
||||
def _parse_response(self, response: str) -> IntentResult:
|
||||
"""解析 LLM 响应"""
|
||||
import json
|
||||
import re
|
||||
|
||||
# 尝试提取 JSON
|
||||
json_match = re.search(r'\{[\s\S]*\}', response)
|
||||
if json_match:
|
||||
try:
|
||||
data = json.loads(json_match.group())
|
||||
return IntentResult(
|
||||
intent_type=IntentType(data['intent']),
|
||||
confidence=float(data['confidence']),
|
||||
reasoning=data['reasoning']
|
||||
)
|
||||
except:
|
||||
pass
|
||||
|
||||
# 降级策略:关键词匹配
|
||||
return self._fallback_classify(response)
|
||||
|
||||
def _fallback_classify(self, user_input: str) -> IntentResult:
|
||||
"""关键词匹配降级策略"""
|
||||
keywords = {
|
||||
IntentType.KNOWLEDGE: ['政策', '文档', '规定', '手册', '指南', '什么是', '怎么'],
|
||||
IntentType.REALTIME: ['订单', '状态', '天气', '股票', '价格', '库存'],
|
||||
IntentType.ACTION: ['退款', '取消', '发送', '申请', '修改', '删除'],
|
||||
IntentType.CHITCHAT: ['你好', 'hi', 'hello', '嗨', '早上好', '晚上好'],
|
||||
}
|
||||
|
||||
for intent_type, words in keywords.items():
|
||||
if any(word in user_input.lower() for word in words):
|
||||
return IntentResult(
|
||||
intent_type=intent_type,
|
||||
confidence=0.7,
|
||||
reasoning=f"关键词匹配: {', '.join(words)}"
|
||||
)
|
||||
|
||||
# 默认走混合路径
|
||||
return IntentResult(
|
||||
intent_type=IntentType.MIXED,
|
||||
confidence=0.5,
|
||||
reasoning="无法明确分类,走通用路径"
|
||||
)
|
||||
|
||||
async def batch_classify(self, inputs: list[str]) -> list[IntentResult]:
|
||||
"""批量分类(带缓存)"""
|
||||
# 可以添加缓存逻辑
|
||||
results = []
|
||||
for inp in inputs:
|
||||
results.append(await self.classify(inp))
|
||||
return results
|
||||
|
||||
|
||||
# 全局实例
|
||||
_classifier: Optional[IntentClassifier] = None
|
||||
|
||||
|
||||
def get_intent_classifier() -> IntentClassifier:
|
||||
"""获取意图分类器实例"""
|
||||
global _classifier
|
||||
if _classifier is None:
|
||||
_classifier = IntentClassifier()
|
||||
return _classifier
|
||||
@@ -10,6 +10,7 @@ from ..graph.graph_builder import GraphBuilder, GraphContext
|
||||
from ..graph.graph_tools import AVAILABLE_TOOLS, TOOLS_BY_NAME
|
||||
from ..model_services.chat_services import get_all_chat_services, LocalVLLMChatProvider
|
||||
from .rag_initializer import init_rag_tool
|
||||
from .intent_classifier import get_intent_classifier
|
||||
from ..logger import info, warning
|
||||
|
||||
class AIAgentService:
|
||||
@@ -18,6 +19,8 @@ class AIAgentService:
|
||||
self.graphs = {}
|
||||
self.tools = AVAILABLE_TOOLS.copy()
|
||||
self.tools_by_name = TOOLS_BY_NAME.copy()
|
||||
# 添加:意图分类器
|
||||
self.intent_classifier = get_intent_classifier()
|
||||
|
||||
async def initialize(self):
|
||||
# 1. 初始化 RAG 工具(如果需要)
|
||||
|
||||
Reference in New Issue
Block a user