From 2893accbc457e7601600cf18767dc09db850619b Mon Sep 17 00:00:00 2001 From: root <953994191@qq.com> Date: Sat, 2 May 2026 09:00:34 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=B8=89=E4=B8=AA=E9=97=AE?= =?UTF-8?q?=E9=A2=98=EF=BC=9A1.=20=E5=AD=90=E5=9B=BE=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=90=8E=E7=9A=84=E6=97=A0=E9=99=90=E5=BE=AA=E7=8E=AF=202.=20l?= =?UTF-8?q?lm=5Fcall=E6=B2=A1=E6=9C=89=E8=BE=93=E5=87=BA=203.=20=E6=80=9D?= =?UTF-8?q?=E8=80=83=E6=89=93=E5=8D=B0=E4=B8=A4=E6=AC=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 子图执行后直接进入finalize,避免回到react_reason循环 - llm_call节点检查是否已有final_result,避免重复调用LLM - 直接在react_reason_node中通过adispatch_custom_event发送推理事件,避免通过state传递导致重复 --- backend/app/agent/agent_service.py | 65 ++++++--------- backend/app/core/intent.py | 23 ++++- backend/app/main_graph/nodes/llm_call.py | 17 +++- backend/app/main_graph/nodes/react_nodes.py | 83 +++++++++++-------- .../main_graph/utils/main_graph_builder.py | 37 +++++++-- 5 files changed, 139 insertions(+), 86 deletions(-) diff --git a/backend/app/agent/agent_service.py b/backend/app/agent/agent_service.py index 3ecf125..f0ffae4 100644 --- a/backend/app/agent/agent_service.py +++ b/backend/app/agent/agent_service.py @@ -146,7 +146,7 @@ class AIAgentService: "current_action": CurrentAction.NONE } - # ========== 意图识别(保留用于日志) ========== + # ========== 意图识别(保留用于日志)========== intent_result = await self.intent_classifier.classify(message) info(f"🧠 意图识别: {intent_result.intent_type} (置信度: {intent_result.confidence:.2f})") info(f"📝 推理: {intent_result.reasoning}") @@ -265,34 +265,9 @@ class AIAgentService: updates_data = chunk["data"] serialized_data = self._serialize_value(updates_data) - # 检查是否有最新的推理内容(来自 react_reason_node) - if isinstance(serialized_data, dict): - # 遍历所有节点的数据 - for node_name, node_data in serialized_data.items(): - if isinstance(node_data, dict) and "debug_info" in node_data: - debug_info = node_data["debug_info"] - latest_reasoning = debug_info.get("latest_reasoning") - if latest_reasoning and not latest_reasoning.get("sent"): - # 发送推理过程到前端 - step = latest_reasoning.get("step", 1) - action = latest_reasoning.get("action", "unknown") - confidence = latest_reasoning.get("confidence", 0) - reasoning = latest_reasoning.get("reasoning", "") - - info(f"[Agent Service] 发送推理过程 #{step}: {action}") - - # 发送推理事件 - yield { - "type": "react_reasoning", - "step": step, - "action": action, - "confidence": confidence, - "reasoning": reasoning - } - - # 标记为已发送(避免重复发送) - latest_reasoning["sent"] = True - + # 关键修复:不再从 updates 中读取 latest_reasoning,避免重复 + # 因为我们现在直接通过 custom 事件发送推理结果了 + # 检查是否有人工审核请求 if "review_pending" in serialized_data and serialized_data["review_pending"]: review_id = serialized_data.get("review_id", "") @@ -302,7 +277,7 @@ class AIAgentService: "review_id": review_id, "content": content_to_review } - + # 检查是否有工具结果 if "messages" in serialized_data: for msg in serialized_data["messages"]: @@ -311,7 +286,7 @@ class AIAgentService: tool_call_id = msg.get("tool_call_id", "") tool_name = msg.get("name", "") tool_output = msg.get("content", "") - + if tool_call_id in tool_calls_in_progress: yield { "type": "tool_call_end", @@ -320,19 +295,33 @@ class AIAgentService: "result": tool_output } del tool_calls_in_progress[tool_call_id] - + processed_event = { "type": "state_update", "data": serialized_data } elif chunk_type == "custom": - info(f"🎯 处理custom chunk") - serialized_data = self._serialize_value(chunk["data"]) - processed_event = { - "type": "custom", - "data": serialized_data - } + info(f"🎯 处理custom chunk: {chunk['data']}") + custom_data = chunk["data"] + + # 关键修复:处理我们从 react_reason_node 发送的自定义推理事件 + if isinstance(custom_data, dict) and custom_data.get("type") == "react_reasoning": + info(f"[Agent Service] 收到自定义推理事件") + yield { + "type": "react_reasoning", + "step": custom_data.get("step", 1), + "action": custom_data.get("action", "unknown"), + "confidence": custom_data.get("confidence", 0), + "reasoning": custom_data.get("reasoning", "") + } + else: + # 处理其他自定义事件 + serialized_data = self._serialize_value(custom_data) + processed_event = { + "type": "custom", + "data": serialized_data + } if processed_event: yield processed_event diff --git a/backend/app/core/intent.py b/backend/app/core/intent.py index 78b108a..f70eb0e 100644 --- a/backend/app/core/intent.py +++ b/backend/app/core/intent.py @@ -107,8 +107,27 @@ class ReactIntentReasoner: """ context = context or {} result = ReasoningResult(original_query=query) + + # 关键修复 1:检查是否已经有检索结果或子图结果,如果是,直接回答 + previous_actions = context.get("previous_actions", []) + if "subgraph_completed" in previous_actions: + result.action = ReasoningAction.DIRECT_RESPONSE + result.confidence = 1.0 + result.reasoning = "子图已执行完成,直接回答" + return result + + retrieved_docs = context.get("retrieved_docs", []) + messages = context.get("messages", []) + + # 关键修复 2:如果已经有 rag_context 或 web_search_results(通过 messages 推断),直接回答 + # 检查是否已经执行过 rag_retrieve 或 web_search + if "rag_retrieve" in previous_actions or "web_search" in previous_actions: + result.action = ReasoningAction.DIRECT_RESPONSE + result.confidence = 0.95 + result.reasoning = "已获取信息,直接回答" + return result - # 策略1: 尝试使用 LLM 推理 + # 策略1:尝试使用 LLM 推理 try: llm_result = await self._reason_with_llm(query, context) if llm_result.confidence >= 0.6: # 置信度足够高,直接返回 @@ -116,7 +135,7 @@ class ReactIntentReasoner: except Exception as e: print(f"[ReactReasoner] LLM 推理失败: {e}, 回退到规则") - # 策略2: LLM 失败或置信度低,使用规则匹配 + # 策略2:LLM 失败或置信度低,使用规则匹配 return self._reason_with_rules(query, context) async def _reason_with_llm( diff --git a/backend/app/main_graph/nodes/llm_call.py b/backend/app/main_graph/nodes/llm_call.py index f84aa95..8651f71 100644 --- a/backend/app/main_graph/nodes/llm_call.py +++ b/backend/app/main_graph/nodes/llm_call.py @@ -37,11 +37,11 @@ def create_llm_call_node(llm, tools: list): async def call_llm(state: MainGraphState, config: RunnableConfig) -> Dict[str, Any]: """ LLM 调用节点(异步方法) - + Args: state: 当前对话状态 config: LangChain/LangGraph 自动注入的配置,包含 callbacks 等信息 - + Returns: 更新后的状态字典 """ @@ -49,6 +49,19 @@ def create_llm_call_node(llm, tools: list): memory_context = getattr(state, "memory_context", "暂无用户信息") start_time = time.time() + + # 关键修复:如果 state.final_result 已经存在(比如子图执行完),直接返回 + if state.final_result: + info(f"[llm_call] 检测到已有最终结果,直接返回: {state.final_result[:100]}...") + elapsed_time = time.time() - start_time + return { + "final_result": state.final_result, + "success": True, + "current_phase": "done", + "llm_calls": getattr(state, 'llm_calls', 0) + 1, + "last_elapsed_time": elapsed_time, + "turns_since_last_summary": getattr(state, 'turns_since_last_summary', 0) + 1, + } try: # 添加 RAG 上下文到消息 diff --git a/backend/app/main_graph/nodes/react_nodes.py b/backend/app/main_graph/nodes/react_nodes.py index ef8eaca..695c393 100644 --- a/backend/app/main_graph/nodes/react_nodes.py +++ b/backend/app/main_graph/nodes/react_nodes.py @@ -16,6 +16,7 @@ from datetime import datetime # 导入我们的 intent.py from app.core.intent import ( react_reason, + react_reason_async, get_route_by_reasoning, ReasoningAction, ReasoningResult @@ -31,31 +32,17 @@ from app.logger import info # ========== 1. React 推理节点 ========== -def react_reason_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState: +async def react_reason_node(state: MainGraphState, config: Optional[Dict[str, Any]] = None) -> MainGraphState: """ - React 模式推理节点:判断下一步做什么 + React 模式推理节点:判断下一步做什么(异步版本) Returns: 更新后的状态 """ state.current_phase = "react_reasoning" state.reasoning_step += 1 - # 发送推理开始事件 - if config: - try: - from langchain_core.runnables.config import RunnableConfig - # 尝试获取回调管理器并发送自定义事件 - callbacks = config.get("callbacks") - if callbacks: - from langchain_core.callbacks.manager import adispatch_custom_event - # 注意:这是异步操作,我们需要特殊处理 - # 这里我们使用自定义流式写入器(如果存在) - pass - except Exception as e: - info(f"[react_reason] 无法发送回调事件: {e}") - info(f"[react_reason] 第 {state.reasoning_step} 次推理开始") - + # 检查是否超过最大步数 if state.reasoning_step > state.max_steps: state.current_phase = "max_steps_exceeded" @@ -66,7 +53,7 @@ def react_reason_node(state: MainGraphState, config: Optional[Dict[str, Any]] = ) state.success = False return state - + # 准备上下文 context = { "retrieved_docs": state.rag_docs, @@ -74,15 +61,35 @@ def react_reason_node(state: MainGraphState, config: Optional[Dict[str, Any]] = "messages": state.messages, "errors": state.errors } - - # 使用 intent.py 进行推理 - # 注意:这里使用同步版本,内部会根据情况处理 - result: ReasoningResult = react_reason(state.user_query, context) + + # 使用 intent.py 进行推理(现在直接用异步版本) + result: ReasoningResult = await react_reason_async(state.user_query, context) info(f"[react_reason] 推理结果: action={result.action.name}, confidence={result.confidence}") if result.reasoning: info(f"[react_reason] 推理过程: {result.reasoning}") - + + # 关键修复:直接发送自定义事件给 agent_service,而不是通过 state + if config: + try: + from langchain_core.callbacks.manager import adispatch_custom_event + + callbacks = config.get("callbacks") + if callbacks: + info(f"[react_reason] 直接发送推理事件 #{state.reasoning_step}") + await adispatch_custom_event( + "react_reasoning", + { + "step": state.reasoning_step, + "action": result.action.name, + "confidence": result.confidence, + "reasoning": result.reasoning + }, + callbacks=callbacks + ) + except Exception as e: + info(f"[react_reason] 无法发送自定义事件: {e}") + # 记录推理历史 state.reasoning_history.append({ "step": state.reasoning_step, @@ -91,30 +98,24 @@ def react_reason_node(state: MainGraphState, config: Optional[Dict[str, Any]] = "reasoning": result.reasoning, "timestamp": datetime.now().isoformat() }) - + # 更新状态 state.debug_info["last_reasoning"] = { "action": result.action.name, "confidence": result.confidence, "reasoning": result.reasoning } - + # 保存推理结果到状态 state.debug_info["reasoning_result"] = result - + # 确定下一步动作 state.last_action = result.action.name - # 发送推理完成事件(通过状态更新,agent_service 会处理) - # 我们在状态中保存推理内容,以便 agent_service 可以通过 state_update 事件发送 - state.debug_info["latest_reasoning"] = { - "step": state.reasoning_step, - "action": result.action.name, - "confidence": result.confidence, - "reasoning": result.reasoning, - "sent": False # 标记是否已发送到前端 - } - + # 关键修复:不再设置 latest_reasoning,避免 agent_service 重复读取 + if "latest_reasoning" in state.debug_info: + del state.debug_info["latest_reasoning"] + return state @@ -300,6 +301,16 @@ def route_by_reasoning(state: MainGraphState) -> str: if state.retry_action and "rag" in state.retry_action.lower(): return "rag_retrieve" return "react_reason" + + # 关键修复:检查是否已经执行过子图,如果是,直接去 llm_call + previous_actions = [h.get("action") for h in state.reasoning_history] + if "subgraph_completed" in previous_actions or state.final_result: + return "llm_call" + + # 检查是否刚刚执行完 rag 或 web search,应该继续推理一次然后去 llm_call + # 但为了避免死循环,我们设置一个简单的规则 + if len(previous_actions) > 3: + return "llm_call" # 获取推理结果 reasoning_result: Optional[ReasoningResult] = state.debug_info.get("reasoning_result") diff --git a/backend/app/main_graph/utils/main_graph_builder.py b/backend/app/main_graph/utils/main_graph_builder.py index 45c1222..7c64f94 100644 --- a/backend/app/main_graph/utils/main_graph_builder.py +++ b/backend/app/main_graph/utils/main_graph_builder.py @@ -61,15 +61,34 @@ def wrap_subgraph_for_error_handling(subgraph, name: str): result = subgraph.invoke(state) # 更新主图状态 + subgraph_result = None if name == "contact": state.contact_result = result + subgraph_result = result.get("final_result", "") elif name == "dictionary": state.dictionary_result = result + subgraph_result = result.get("final_result", "") elif name == "news_analysis": state.news_result = result + subgraph_result = result.get("final_result", "") + + # 关键:设置最终结果,这样就不需要再回到 react_reason 了 + if subgraph_result: + state.final_result = subgraph_result + else: + state.final_result = "子图执行完成" # 标记成功 state.success = True + state.current_phase = "done" + # 标记不再需要推理,避免循环 + state.reasoning_history.append({ + "step": state.reasoning_step, + "action": "subgraph_completed", + "confidence": 1.0, + "reasoning": f"{name}子图执行完成", + "timestamp": datetime.now().isoformat() + }) return state except Exception as e: @@ -189,18 +208,18 @@ def build_react_main_graph(llm=None, tools=None, mem0_client=None) -> StateGraph graph.add_node("finalize", finalize_node) # ========== 添加边 ========== - + # 第一阶段:记忆检索 if retrieve_memory_node: graph.add_edge(START, "retrieve_memory") graph.add_edge("retrieve_memory", "memory_trigger") else: graph.add_edge(START, "memory_trigger") - + # 进入第二阶段 graph.add_edge("memory_trigger", "init_state") graph.add_edge("init_state", "react_reason") - + # 第二阶段:React 循环推理 graph.add_conditional_edges( "react_reason", @@ -215,15 +234,17 @@ def build_react_main_graph(llm=None, tools=None, mem0_client=None) -> StateGraph "llm_call": "llm_call" } ) - - # 循环边 + + # 循环边(rag、web_search、error 回到 reason) graph.add_edge("rag_retrieve", "react_reason") graph.add_edge("web_search", "react_reason") - graph.add_edge("contact_subgraph", "react_reason") - graph.add_edge("dictionary_subgraph", "react_reason") - graph.add_edge("news_analysis_subgraph", "react_reason") graph.add_edge("handle_error", "react_reason") + # 关键修改:子图执行完后直接去 finalize(避免循环) + graph.add_edge("contact_subgraph", "finalize") + graph.add_edge("dictionary_subgraph", "finalize") + graph.add_edge("news_analysis_subgraph", "finalize") + # 第三阶段:llm_call 后进入完成处理 if llm_node is not None: if summarize_node: