Files
ailine/backend/app/main_graph/nodes/agent.py

285 lines
14 KiB
Python
Raw Normal View History

"""
Agent 节点完整的 ReAct 循环 + 流式 Tool Calling 拼接
完全参考指南实现
"""
from typing import Dict, Any, Optional, List
from langchain_core.messages import SystemMessage, AIMessage, AIMessageChunk, ToolMessage
from langchain_core.runnables.config import RunnableConfig
from backend.app.main_graph.state import AgentState
from backend.app.logger import info, warning, error
from backend.app.agent.stream_context import get_stream_queue
from backend.app.tools import ALL_TOOLS
# 系统提示词(从 main_graph_builder.py 搬过来)
SYSTEM_PROMPT = """你是一个智能助手,可以使用多种工具完成复杂任务。你必须用中文回复。
## 核心工具与能力
你可以使用以下工具函数但只能在真正需要时调用禁止无意义的测试调用或重复调用
1. rag_search 从内部知识库中检索文档输入为优化后的查询字符串
2. web_search 联网搜索获取最新信息输入为搜索关键词
3. contact_lookup 查询企业通讯录输入姓名部门或邮箱等
4. dictionary_lookup 翻译单词查询词典或提取术语
5. news_analysis 获取或分析新闻资讯
## 工作流程ReAct 决策闭环)
你必须严格按照思考 行动 观察的闭环来处理每个请求具体规则如下
### 1. 初始决策
- 如果用户的问题很明确且你已有足够内部知识可以直接回答无需调用任何工具
- 如果需要外部信息请按以下优先级选择工具
- 优先使用 rag_search
- 若第一次 rag_search 返回的结果不相关或质量低你可以改写查询关键词再次调用 rag_search最多重复一次
- 如果两次 rag_search 均无法获得满意信息或者用户明确要求实时资讯则必须切换为 web_search
- 遇到通讯录词典新闻类明确需求直接调用对应的专用工具
### 2. 观察与反思
- 每次工具调用返回结果后你必须先评估结果质量内容是否相关是否充分
- 如果信息不足根据上述规则决定下一步行动如果信息足够则直接生成最终答案绝不再调用任何工具
- 在整个过程中禁止使用工具返回的信息直接重复或编造来源必须如实标注
### 3. 结束条件
当你认为已经拥有足够信息回答用户时输出最终回复并停止调用工具若连续调用工具超过 5 轮仍未解决也必须基于当前收集到的信息给出最佳回答并说明局限性
## 回答规范
1. 来源标注回答开头用方括号注明信息来源如多处来源按使用顺序列出
- 知识库知识库相关文档主题
- 联网搜索联网搜索来源网站或摘要
2. 思维链对于需要复杂推理的问题请将推理过程放在 <think>...</think> 标签内并置于回答最前面来源标注之前
3. 内容要求回答应重点突出条理清晰优先结合用户背景信息进行个性化若无任何可靠依据如实说明暂时无法回答
## 特别注意
- 不要向用户暴露任何工具调用的技术细节如参数函数名
- 如果用户只是闲聊问候或道别直接友好回复严禁调用任何工具
- 所有联网搜索必须以获取帮助用户为目的不得搜索无关内容
现在请遵循以上规则处理用户的每一次输入记住思考 行动 观察 直到完成"""
def create_agent_node(llm_with_tools, llm):
"""创建 Agent 节点函数,完整 ReAct 循环"""
async def agent_node(state: AgentState, config: Optional[RunnableConfig] = None) -> Dict[str, Any]:
"""
Agent 节点完整的 ReAct 循环带流式 token 和工具调用事件
兼容流式和非流式两种情况
Args:
state: 当前状态
config: 运行配置
Returns:
状态更新字典
"""
# 获取队列
queue = get_stream_queue()
is_streaming = queue is not None
# 获取当前步数
current_step = getattr(state, "current_step", 0)
max_steps = getattr(state, "max_steps", 10)
info(f"[Agent] 从第 {current_step} 步开始,最大步数: {max_steps},流式: {is_streaming}")
# 组装完整消息
messages = [SystemMessage(content=SYSTEM_PROMPT)] + list(state.messages)
turn = current_step # 轮次从当前步数开始
try:
while turn < max_steps:
turn += 1
info(f"[Agent] 第 {turn} 轮思考")
# 告诉前端:新的一轮开始(如果流式)
if is_streaming:
await queue.put({
"type": "node_start",
"node": "agent",
})
# 选择 LLM
if turn >= max_steps:
info(f"[Agent] 达到步数上限,用不带工具的 LLM")
current_llm = llm.bind_tools([])
else:
current_llm = llm_with_tools
# 初始化变量
full_content = ""
full_reasoning_content = ""
pending_tool_calls = {} # key: index, value: {id, name, args_str}
final_tool_calls = []
# 只有流式的时候用 astream非流式直接用 ainvoke 更快!
if is_streaming:
async for chunk in current_llm.astream(messages):
if isinstance(chunk, AIMessageChunk):
# 1. 处理文本 token
if chunk.content:
full_content += chunk.content
await queue.put({
"type": "llm_token",
"node": "agent",
"token": chunk.content,
"reasoning_token": ""
})
# 2. 处理 reasoning token
if hasattr(chunk, 'additional_kwargs') and chunk.additional_kwargs:
reasoning_content = chunk.additional_kwargs.get("reasoning_content", "")
if reasoning_content:
full_reasoning_content += reasoning_content
await queue.put({
"type": "llm_token",
"node": "agent",
"token": "",
"reasoning_token": reasoning_content
})
# 3. 流式 Tool Calling 拼接逻辑(核心!用 tool_call_chunks
if hasattr(chunk, 'tool_call_chunks') and chunk.tool_call_chunks:
for tc_chunk in chunk.tool_call_chunks:
idx = tc_chunk.get("index", 0)
if idx not in pending_tool_calls:
pending_tool_calls[idx] = {
"id": "",
"name": "",
"args": "" # 初始化为字符串
}
if tc_chunk.get("id"):
pending_tool_calls[idx]["id"] += tc_chunk["id"]
if tc_chunk.get("name"):
pending_tool_calls[idx]["name"] += tc_chunk["name"]
if tc_chunk.get("args"):
args_val = tc_chunk["args"]
if isinstance(args_val, str):
pending_tool_calls[idx]["args"] += args_val
else:
import json
pending_tool_calls[idx]["args"] += json.dumps(args_val)
else:
# 非流式,直接 ainvoke
result = await current_llm.ainvoke(messages)
full_content = result.content if result.content else ""
if hasattr(result, 'tool_calls') and result.tool_calls:
final_tool_calls = result.tool_calls
if hasattr(result, 'additional_kwargs') and result.additional_kwargs:
full_reasoning_content = result.additional_kwargs.get("reasoning_content", "")
# 流式调用结束后,整理最终的 tool_calls只在流式时处理 pending
if is_streaming:
for idx in sorted(pending_tool_calls.keys()):
tc_data = pending_tool_calls[idx]
if tc_data["name"]: # 只有有名字的才是有效工具调用
# 解析参数字符串
args = {}
if tc_data["args"]:
try:
import json
args = json.loads(tc_data["args"])
except Exception as e:
info(f"[Agent] Failed to parse args JSON: {e}, raw: {tc_data['args']}")
final_tool_calls.append({
"id": tc_data["id"],
"name": tc_data["name"],
"args": args
})
# 判断是否有工具调用
if final_tool_calls:
info(f"[Agent] 第 {turn} 轮:调用 {len(final_tool_calls)} 个工具")
# 执行工具调用
new_messages = []
for tc in final_tool_calls:
tool_name = tc["name"]
tool_args = tc["args"]
tool_id = tc["id"]
# 发送工具开始事件(如果流式)
if is_streaming:
await queue.put({
"type": "custom",
"data": {
"type": "tool_start",
"tool": tool_name,
"args": tool_args,
"id": tool_id
}
})
# 找到并执行对应工具
tool_result = ""
tool_found = False
for tool in ALL_TOOLS:
if tool.name == tool_name:
tool_found = True
try:
tool_result = await tool.ainvoke(tool_args)
except Exception as e:
tool_result = f"工具调用出错: {str(e)}"
error(f"[Agent] 工具 {tool_name} 调用出错: {e}")
break
if not tool_found:
tool_result = f"未找到工具: {tool_name}"
# 发送工具结束事件(如果流式)
if is_streaming:
await queue.put({
"type": "custom",
"data": {
"type": "tool_end",
"tool": tool_name,
"id": tool_id,
"result": str(tool_result)
}
})
# 构造 ToolMessage
tool_msg = ToolMessage(
content=str(tool_result),
tool_call_id=tool_id,
name=tool_name
)
new_messages.append(tool_msg)
# 添加到 messages继续下一轮
messages.extend(new_messages)
continue
else:
# 没有工具调用,最终输出(不需要发 final_answer因为 llm_token 已经发了)
info(f"[Agent] 第 {turn} 轮:完成,无工具调用")
break
# 构建完整的 AIMessage 用于状态更新
response_kwargs = {"content": full_content}
if final_tool_calls:
response_kwargs["tool_calls"] = final_tool_calls
response = AIMessage(**response_kwargs)
if full_reasoning_content:
response.additional_kwargs["reasoning_content"] = full_reasoning_content
# 返回状态更新
return {
"messages": [response],
"current_step": turn,
"llm_calls": getattr(state, "llm_calls", 0) + 1
}
except Exception as e:
error(f"[Agent] ❌ 第 {turn} 轮出错: {e}")
import traceback
error(f"[Agent] 堆栈: {traceback.format_exc()}")
# 发送错误事件(如果流式)
if is_streaming:
await queue.put({
"type": "error",
"message": str(e)
})
raise
return agent_node