重构:清理废弃代码 + 优化 Agent 架构
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m24s
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m24s
主要变更: - 删除 deprecated 文件夹(intent/hybrid_router/rag_nodes 等) - 删除 intent_classifier.py(未使用) - 删除 subgraph_wrapper.py(死代码) - 重构 agent.py:简化工厂函数,支持动态模型切换 - 重构 prompts.py:添加信息获取优先级、思维链要求、工具调用约束 - 优化 tools:统一位置,rag_search 返回置信度评估 - 新增 RAG 置信度评估:embedding(25%) + rerank(25%) + LLM(50%) - 添加循环检测:防止工具无限重复调用 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -1,120 +1,151 @@
|
||||
"""
|
||||
Agent 节点:完整的 ReAct 循环 + 流式 Tool Calling 拼接
|
||||
完全参考指南实现!
|
||||
Agent 节点 - 简化版本
|
||||
直接定义 agent_node 函数,支持动态模型切换和循环检测
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
from typing import Dict, Any, Optional, List
|
||||
from langchain_core.messages import SystemMessage, AIMessage, AIMessageChunk, ToolMessage
|
||||
from langchain_core.runnables.config import RunnableConfig
|
||||
from langchain_core.messages import AIMessage, AIMessageChunk, SystemMessage, ToolMessage
|
||||
|
||||
from backend.app.main_graph.state import AgentState
|
||||
from backend.app.logger import info, warning, error
|
||||
from backend.app.agent.stream_context import get_stream_queue
|
||||
from backend.app.logger import info, error
|
||||
from backend.app.tools import ALL_TOOLS
|
||||
from backend.app.agent.stream_context import get_stream_queue
|
||||
from backend.app.agent.prompts import SYSTEM_PROMPT
|
||||
|
||||
|
||||
# 系统提示词(从 main_graph_builder.py 搬过来)
|
||||
SYSTEM_PROMPT = """你是一个智能助手,可以使用多种工具完成复杂任务。你必须用中文回复。
|
||||
|
||||
## 核心工具与能力
|
||||
你可以使用以下工具(函数),但只能在真正需要时调用,禁止无意义的测试调用或重复调用:
|
||||
1. rag_search – 从内部知识库中检索文档,输入为优化后的查询字符串。
|
||||
2. web_search – 联网搜索获取最新信息,输入为搜索关键词。
|
||||
3. contact_lookup – 查询企业通讯录,输入姓名、部门或邮箱等。
|
||||
4. dictionary_lookup – 翻译单词、查询词典或提取术语。
|
||||
5. news_analysis – 获取或分析新闻资讯。
|
||||
|
||||
## 工作流程(ReAct 决策闭环)
|
||||
你必须严格按照思考 → 行动 → 观察的闭环来处理每个请求,具体规则如下:
|
||||
|
||||
### 1. 初始决策
|
||||
- 如果用户的问题很明确且你已有足够内部知识,可以直接回答,无需调用任何工具。
|
||||
- 如果需要外部信息,请按以下优先级选择工具:
|
||||
- 优先使用 rag_search。
|
||||
- 若第一次 rag_search 返回的结果不相关或质量低,你可以改写查询关键词再次调用 rag_search(最多重复一次)。
|
||||
- 如果两次 rag_search 均无法获得满意信息,或者用户明确要求实时资讯,则必须切换为 web_search。
|
||||
- 遇到通讯录、词典、新闻类明确需求,直接调用对应的专用工具。
|
||||
|
||||
### 2. 观察与反思
|
||||
- 每次工具调用返回结果后,你必须先评估结果质量(内容是否相关、是否充分)。
|
||||
- 如果信息不足,根据上述规则决定下一步行动;如果信息足够,则直接生成最终答案,绝不再调用任何工具。
|
||||
- 在整个过程中,禁止使用工具返回的信息直接重复或编造来源,必须如实标注。
|
||||
|
||||
### 3. 结束条件
|
||||
当你认为已经拥有足够信息回答用户时,输出最终回复并停止调用工具。若连续调用工具超过 5 轮仍未解决,也必须基于当前收集到的信息给出最佳回答并说明局限性。
|
||||
|
||||
## 回答规范
|
||||
1. 来源标注:回答开头用方括号注明信息来源,如多处来源按使用顺序列出:
|
||||
- 知识库:【知识库:相关文档主题】
|
||||
- 联网搜索:【联网搜索:来源网站或摘要】
|
||||
2. 思维链:对于需要复杂推理的问题,请将推理过程放在 <think>...</think> 标签内,并置于回答最前面(来源标注之前)。
|
||||
3. 内容要求:回答应重点突出、条理清晰,优先结合用户背景信息进行个性化;若无任何可靠依据,如实说明“暂时无法回答”。
|
||||
|
||||
## 特别注意
|
||||
- 不要向用户暴露任何工具调用的技术细节(如参数、函数名)。
|
||||
- 如果用户只是闲聊、问候或道别,直接友好回复,严禁调用任何工具。
|
||||
- 所有联网搜索必须以获取帮助用户为目的,不得搜索无关内容。
|
||||
|
||||
现在,请遵循以上规则处理用户的每一次输入。记住:思考 → 行动 → 观察 → 直到完成。"""
|
||||
def _normalize_args(args: dict) -> str:
|
||||
"""标准化工具参数用于比较"""
|
||||
return str(sorted(args.items()))
|
||||
|
||||
|
||||
def create_agent_node(llm_with_tools, llm):
|
||||
"""创建 Agent 节点函数,完整 ReAct 循环"""
|
||||
def _is_similar_result(results: List[str], threshold: float = 0.8) -> bool:
|
||||
"""检测结果是否相似(简单实现:长度相似+部分内容重复)"""
|
||||
if len(results) < 2:
|
||||
return False
|
||||
|
||||
latest = results[-1]
|
||||
prev = results[-2]
|
||||
|
||||
# 长度差异太大,不算相似
|
||||
if len(latest) == 0 or len(prev) == 0:
|
||||
return len(latest) == len(prev)
|
||||
|
||||
len_ratio = min(len(latest), len(prev)) / max(len(latest), len(prev))
|
||||
if len_ratio < 0.5:
|
||||
return False
|
||||
|
||||
# 检查内容重复度(简单:前100字符)
|
||||
common_len = 0
|
||||
for a, b in zip(latest[:100], prev[:100]):
|
||||
if a == b:
|
||||
common_len += 1
|
||||
else:
|
||||
break
|
||||
|
||||
return (common_len / 100) > threshold
|
||||
|
||||
|
||||
def _should_stop_for_loop(tool_calls: List[dict], tool_results: List[str]) -> bool:
|
||||
"""
|
||||
检测是否应该停止(循环检测)
|
||||
|
||||
条件:连续2次调用相同工具 + 参数相似 + 结果相似
|
||||
"""
|
||||
if len(tool_calls) < 2:
|
||||
return False
|
||||
|
||||
# 检查最近的工具调用是否相同
|
||||
last_tc = tool_calls[-1]
|
||||
prev_tc = tool_calls[-2]
|
||||
|
||||
if last_tc["name"] != prev_tc["name"]:
|
||||
return False
|
||||
|
||||
# 参数是否相似
|
||||
last_args = _normalize_args(last_tc["args"])
|
||||
prev_args = _normalize_args(prev_tc["args"])
|
||||
|
||||
if last_args != prev_args:
|
||||
return False
|
||||
|
||||
# 结果是否相似
|
||||
if len(tool_results) >= 2:
|
||||
return _is_similar_result(tool_results[-2:])
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def create_agent_node(chat_services: dict):
|
||||
"""
|
||||
创建 Agent 节点 - 支持动态模型切换
|
||||
|
||||
简化设计:
|
||||
- 直接返回 async 函数,无需工厂包装
|
||||
- 从 config 中获取模型名称,运行时动态切换
|
||||
"""
|
||||
|
||||
async def agent_node(state: AgentState, config: Optional[RunnableConfig] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Agent 节点:完整的 ReAct 循环,带流式 token 和工具调用事件
|
||||
兼容流式和非流式两种情况!
|
||||
|
||||
Args:
|
||||
state: 当前状态
|
||||
config: 运行配置
|
||||
|
||||
Returns:
|
||||
状态更新字典
|
||||
"""
|
||||
# 获取队列
|
||||
"""Agent 节点:完整的 ReAct 循环"""
|
||||
queue = get_stream_queue()
|
||||
is_streaming = queue is not None
|
||||
|
||||
# 获取当前步数
|
||||
# 获取步数
|
||||
current_step = getattr(state, "current_step", 0)
|
||||
max_steps = getattr(state, "max_steps", 10)
|
||||
info(f"[Agent] 从第 {current_step} 步开始,最大步数: {max_steps},流式: {is_streaming}")
|
||||
|
||||
# 组装完整消息
|
||||
messages = [SystemMessage(content=SYSTEM_PROMPT)] + list(state.messages)
|
||||
turn = current_step # 轮次从当前步数开始
|
||||
# 动态获取模型
|
||||
model_name = "primary"
|
||||
if config:
|
||||
configurable = config.get("configurable", {})
|
||||
model_name = configurable.get("model", "primary")
|
||||
|
||||
llm = chat_services.get(model_name)
|
||||
if llm is None:
|
||||
llm = next(iter(chat_services.values()))
|
||||
info(f"[Agent] 模型 '{model_name}' 不可用,使用 '{type(llm).__name__}'")
|
||||
|
||||
llm_with_tools = llm.bind_tools(ALL_TOOLS)
|
||||
|
||||
# 获取记忆上下文
|
||||
memory_context = getattr(state, "memory_context", "暂无用户背景信息")
|
||||
|
||||
# 组装消息(注入记忆上下文到提示词)
|
||||
prompt_with_memory = SYSTEM_PROMPT.format(memory_context=memory_context)
|
||||
messages = [SystemMessage(content=prompt_with_memory)] + list(state.messages)
|
||||
turn = current_step
|
||||
|
||||
try:
|
||||
while turn < max_steps:
|
||||
turn += 1
|
||||
info(f"[Agent] 第 {turn} 轮思考")
|
||||
|
||||
# 告诉前端:新的一轮开始(如果流式)
|
||||
if is_streaming:
|
||||
await queue.put({
|
||||
"type": "node_start",
|
||||
"node": "agent",
|
||||
})
|
||||
await queue.put({"type": "node_start", "node": "agent"})
|
||||
|
||||
# 选择 LLM
|
||||
# 选择 LLM(最后一轮不带工具)
|
||||
if turn >= max_steps:
|
||||
info(f"[Agent] 达到步数上限,用不带工具的 LLM")
|
||||
current_llm = llm.bind_tools([])
|
||||
info(f"[Agent] 达到步数上限,使用无工具模型")
|
||||
else:
|
||||
current_llm = llm_with_tools
|
||||
|
||||
# 初始化变量
|
||||
# 初始化
|
||||
full_content = ""
|
||||
full_reasoning_content = ""
|
||||
pending_tool_calls = {} # key: index, value: {id, name, args_str}
|
||||
pending_tool_calls = {}
|
||||
final_tool_calls = []
|
||||
|
||||
# 只有流式的时候用 astream,非流式直接用 ainvoke 更快!
|
||||
# 循环检测:记录历史调用
|
||||
tool_call_history: List[dict] = []
|
||||
tool_result_history: List[str] = []
|
||||
|
||||
# 调用 LLM
|
||||
if is_streaming:
|
||||
async for chunk in current_llm.astream(messages):
|
||||
if isinstance(chunk, AIMessageChunk):
|
||||
# 1. 处理文本 token
|
||||
if chunk.content:
|
||||
full_content += chunk.content
|
||||
await queue.put({
|
||||
@@ -123,29 +154,23 @@ def create_agent_node(llm_with_tools, llm):
|
||||
"token": chunk.content,
|
||||
"reasoning_token": ""
|
||||
})
|
||||
|
||||
# 2. 处理 reasoning token
|
||||
|
||||
if hasattr(chunk, 'additional_kwargs') and chunk.additional_kwargs:
|
||||
reasoning_content = chunk.additional_kwargs.get("reasoning_content", "")
|
||||
if reasoning_content:
|
||||
full_reasoning_content += reasoning_content
|
||||
reasoning = chunk.additional_kwargs.get("reasoning_content", "")
|
||||
if reasoning:
|
||||
full_reasoning_content += reasoning
|
||||
await queue.put({
|
||||
"type": "llm_token",
|
||||
"node": "agent",
|
||||
"token": "",
|
||||
"reasoning_token": reasoning_content
|
||||
"reasoning_token": reasoning
|
||||
})
|
||||
|
||||
# 3. 流式 Tool Calling 拼接逻辑(核心!用 tool_call_chunks!)
|
||||
if hasattr(chunk, 'tool_call_chunks') and chunk.tool_call_chunks:
|
||||
for tc_chunk in chunk.tool_call_chunks:
|
||||
idx = tc_chunk.get("index", 0)
|
||||
if idx not in pending_tool_calls:
|
||||
pending_tool_calls[idx] = {
|
||||
"id": "",
|
||||
"name": "",
|
||||
"args": "" # 初始化为字符串
|
||||
}
|
||||
pending_tool_calls[idx] = {"id": "", "name": "", "args": ""}
|
||||
|
||||
if tc_chunk.get("id"):
|
||||
pending_tool_calls[idx]["id"] += tc_chunk["id"]
|
||||
@@ -159,57 +184,48 @@ def create_agent_node(llm_with_tools, llm):
|
||||
import json
|
||||
pending_tool_calls[idx]["args"] += json.dumps(args_val)
|
||||
else:
|
||||
# 非流式,直接 ainvoke
|
||||
result = await current_llm.ainvoke(messages)
|
||||
full_content = result.content if result.content else ""
|
||||
if hasattr(result, 'tool_calls') and result.tool_calls:
|
||||
final_tool_calls = result.tool_calls
|
||||
if hasattr(result, 'additional_kwargs') and result.additional_kwargs:
|
||||
if hasattr(result, 'additional_kwargs'):
|
||||
full_reasoning_content = result.additional_kwargs.get("reasoning_content", "")
|
||||
|
||||
# 流式调用结束后,整理最终的 tool_calls(只在流式时处理 pending!)
|
||||
# 整理工具调用
|
||||
if is_streaming:
|
||||
for idx in sorted(pending_tool_calls.keys()):
|
||||
tc_data = pending_tool_calls[idx]
|
||||
if tc_data["name"]: # 只有有名字的才是有效工具调用
|
||||
# 解析参数字符串
|
||||
if tc_data["name"]:
|
||||
args = {}
|
||||
if tc_data["args"]:
|
||||
try:
|
||||
import json
|
||||
args = json.loads(tc_data["args"])
|
||||
except Exception as e:
|
||||
info(f"[Agent] Failed to parse args JSON: {e}, raw: {tc_data['args']}")
|
||||
info(f"[Agent] 解析参数失败: {e}")
|
||||
final_tool_calls.append({
|
||||
"id": tc_data["id"],
|
||||
"name": tc_data["name"],
|
||||
"args": args
|
||||
})
|
||||
|
||||
# 判断是否有工具调用
|
||||
# 执行工具
|
||||
if final_tool_calls:
|
||||
info(f"[Agent] 第 {turn} 轮:调用 {len(final_tool_calls)} 个工具")
|
||||
|
||||
# 执行工具调用
|
||||
new_messages = []
|
||||
|
||||
for tc in final_tool_calls:
|
||||
tool_name = tc["name"]
|
||||
tool_args = tc["args"]
|
||||
tool_id = tc["id"]
|
||||
|
||||
# 发送工具开始事件(如果流式)
|
||||
if is_streaming:
|
||||
await queue.put({
|
||||
"type": "custom",
|
||||
"data": {
|
||||
"type": "tool_start",
|
||||
"tool": tool_name,
|
||||
"args": tool_args,
|
||||
"id": tool_id
|
||||
}
|
||||
"data": {"type": "tool_start", "tool": tool_name, "args": tool_args, "id": tool_id}
|
||||
})
|
||||
|
||||
# 找到并执行对应工具
|
||||
# 查找并执行工具
|
||||
tool_result = ""
|
||||
tool_found = False
|
||||
for tool in ALL_TOOLS:
|
||||
@@ -225,36 +241,32 @@ def create_agent_node(llm_with_tools, llm):
|
||||
if not tool_found:
|
||||
tool_result = f"未找到工具: {tool_name}"
|
||||
|
||||
# 发送工具结束事件(如果流式)
|
||||
if is_streaming:
|
||||
await queue.put({
|
||||
"type": "custom",
|
||||
"data": {
|
||||
"type": "tool_end",
|
||||
"tool": tool_name,
|
||||
"id": tool_id,
|
||||
"result": str(tool_result)
|
||||
}
|
||||
"data": {"type": "tool_end", "tool": tool_name, "id": tool_id, "result": str(tool_result)}
|
||||
})
|
||||
|
||||
# 构造 ToolMessage
|
||||
tool_msg = ToolMessage(
|
||||
content=str(tool_result),
|
||||
tool_call_id=tool_id,
|
||||
name=tool_name
|
||||
)
|
||||
new_messages.append(tool_msg)
|
||||
# 记录历史(用于循环检测)
|
||||
tool_call_history.append({"name": tool_name, "args": tool_args})
|
||||
tool_result_history.append(str(tool_result))
|
||||
|
||||
new_messages.append(ToolMessage(content=str(tool_result), tool_call_id=tool_id, name=tool_name))
|
||||
|
||||
# 循环检测:相同工具 + 相似参数 + 相似结果 → 终止
|
||||
if _should_stop_for_loop(tool_call_history, tool_result_history):
|
||||
info(f"[Agent] ⚠️ 检测到循环,强制终止")
|
||||
# 添加一条终止消息
|
||||
messages.append(AIMessage(content="[系统] 检测到工具调用循环,已终止。"))
|
||||
break
|
||||
|
||||
# 添加到 messages,继续下一轮
|
||||
messages.extend(new_messages)
|
||||
continue
|
||||
|
||||
else:
|
||||
# 没有工具调用,最终输出(不需要发 final_answer,因为 llm_token 已经发了)
|
||||
info(f"[Agent] 第 {turn} 轮:完成,无工具调用")
|
||||
break
|
||||
|
||||
# 构建完整的 AIMessage 用于状态更新
|
||||
# 构建响应
|
||||
response_kwargs = {"content": full_content}
|
||||
if final_tool_calls:
|
||||
response_kwargs["tool_calls"] = final_tool_calls
|
||||
@@ -262,7 +274,6 @@ def create_agent_node(llm_with_tools, llm):
|
||||
if full_reasoning_content:
|
||||
response.additional_kwargs["reasoning_content"] = full_reasoning_content
|
||||
|
||||
# 返回状态更新
|
||||
return {
|
||||
"messages": [response],
|
||||
"current_step": turn,
|
||||
@@ -273,12 +284,8 @@ def create_agent_node(llm_with_tools, llm):
|
||||
error(f"[Agent] ❌ 第 {turn} 轮出错: {e}")
|
||||
import traceback
|
||||
error(f"[Agent] 堆栈: {traceback.format_exc()}")
|
||||
# 发送错误事件(如果流式)
|
||||
if is_streaming:
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": str(e)
|
||||
})
|
||||
await queue.put({"type": "error", "message": str(e)})
|
||||
raise
|
||||
|
||||
return agent_node
|
||||
|
||||
Reference in New Issue
Block a user