diff --git a/backend/app/agent/agent_service.py b/backend/app/agent/agent_service.py index 142acdf..4e15210 100644 --- a/backend/app/agent/agent_service.py +++ b/backend/app/agent/agent_service.py @@ -151,123 +151,140 @@ class AIAgentService: # ======================================== # ========== React 循环路径 ========== + info(f"🚀 开始执行 React 图,模型: {model_name}") current_node = None tool_calls_in_progress = {} - async for chunk in graph.astream( - input_state, - config=config, - stream_mode=["messages", "updates", "custom"], - version="v2", - subgraphs=True - ): - chunk_type = chunk["type"] - processed_event = {} + try: + info(f"📡 开始调用 graph.astream()...") + chunk_count = 0 + async for chunk in graph.astream( + input_state, + config=config, + stream_mode=["messages", "updates", "custom"], + version="v2", + subgraphs=True + ): + chunk_count += 1 + chunk_type = chunk["type"] + info(f"📦 收到第 {chunk_count} 个chunk, type: {chunk_type}") + processed_event = {} - if chunk_type == "messages": - message_chunk, metadata = chunk["data"] - node_name = metadata.get("langgraph_node", "unknown") + if chunk_type == "messages": + message_chunk, metadata = chunk["data"] + node_name = metadata.get("langgraph_node", "unknown") + info(f"📨 处理消息chunk, node: {node_name}") - # 检测节点变化,发送节点开始事件 - if node_name != current_node: - if current_node: - yield { - "type": "node_end", - "node": current_node - } - yield { - "type": "node_start", - "node": node_name - } - current_node = node_name - - # 处理消息内容 - token_content = getattr(message_chunk, 'content', str(message_chunk)) - reasoning_token = "" - if hasattr(message_chunk, 'additional_kwargs'): - reasoning_token = message_chunk.additional_kwargs.get("reasoning_content", "") - - # 处理思考过程 - if reasoning_token: - processed_event = { - "type": "llm_token", - "node": node_name, - "reasoning_token": reasoning_token - } - # 处理工具调用 - elif hasattr(message_chunk, 'tool_calls') and message_chunk.tool_calls: - for tool_call in message_chunk.tool_calls: - tool_call_id = tool_call.get("id", "") - tool_name = tool_call.get("name", "") - tool_args = tool_call.get("args", {}) - - # 记录工具调用开始 - if tool_call_id not in tool_calls_in_progress: - tool_calls_in_progress[tool_call_id] = { - "name": tool_name, - "args": tool_args - } + # 检测节点变化,发送节点开始事件 + if node_name != current_node: + if current_node: yield { - "type": "tool_call_start", - "tool": tool_name, - "args": tool_args, - "id": tool_call_id + "type": "node_end", + "node": current_node } - # 处理普通 token - elif token_content: - processed_event = { - "type": "llm_token", - "node": node_name, - "token": token_content, - "reasoning_token": reasoning_token - } + yield { + "type": "node_start", + "node": node_name + } + current_node = node_name - elif chunk_type == "updates": - updates_data = chunk["data"] - serialized_data = self._serialize_value(updates_data) + # 处理消息内容 + token_content = getattr(message_chunk, 'content', str(message_chunk)) + reasoning_token = "" + if hasattr(message_chunk, 'additional_kwargs'): + reasoning_token = message_chunk.additional_kwargs.get("reasoning_content", "") - # 检查是否有人工审核请求 - if "review_pending" in serialized_data and serialized_data["review_pending"]: - review_id = serialized_data.get("review_id", "") - content_to_review = serialized_data.get("content_to_review", "") - yield { - "type": "human_review_request", - "review_id": review_id, - "content": content_to_review - } + # 处理思考过程 + if reasoning_token: + processed_event = { + "type": "llm_token", + "node": node_name, + "reasoning_token": reasoning_token + } + # 处理工具调用 + elif hasattr(message_chunk, 'tool_calls') and message_chunk.tool_calls: + for tool_call in message_chunk.tool_calls: + tool_call_id = tool_call.get("id", "") + tool_name = tool_call.get("name", "") + tool_args = tool_call.get("args", {}) - # 检查是否有工具结果 - if "messages" in serialized_data: - for msg in serialized_data["messages"]: - # 检测工具结果消息 - if msg.get("role") == "tool": - tool_call_id = msg.get("tool_call_id", "") - tool_name = msg.get("name", "") - tool_output = msg.get("content", "") - - if tool_call_id in tool_calls_in_progress: - yield { - "type": "tool_call_end", - "tool": tool_name, - "id": tool_call_id, - "result": tool_output + # 记录工具调用开始 + if tool_call_id not in tool_calls_in_progress: + tool_calls_in_progress[tool_call_id] = { + "name": tool_name, + "args": tool_args } - del tool_calls_in_progress[tool_call_id] + yield { + "type": "tool_call_start", + "tool": tool_name, + "args": tool_args, + "id": tool_call_id + } + # 处理普通 token + elif token_content: + processed_event = { + "type": "llm_token", + "node": node_name, + "token": token_content, + "reasoning_token": reasoning_token + } - processed_event = { - "type": "state_update", - "data": serialized_data - } + elif chunk_type == "updates": + info(f"🔄 处理updates chunk") + updates_data = chunk["data"] + serialized_data = self._serialize_value(updates_data) - elif chunk_type == "custom": - serialized_data = self._serialize_value(chunk["data"]) - processed_event = { - "type": "custom", - "data": serialized_data - } + # 检查是否有人工审核请求 + if "review_pending" in serialized_data and serialized_data["review_pending"]: + review_id = serialized_data.get("review_id", "") + content_to_review = serialized_data.get("content_to_review", "") + yield { + "type": "human_review_request", + "review_id": review_id, + "content": content_to_review + } - if processed_event: - yield processed_event + # 检查是否有工具结果 + if "messages" in serialized_data: + for msg in serialized_data["messages"]: + # 检测工具结果消息 + if msg.get("role") == "tool": + tool_call_id = msg.get("tool_call_id", "") + tool_name = msg.get("name", "") + tool_output = msg.get("content", "") + + if tool_call_id in tool_calls_in_progress: + yield { + "type": "tool_call_end", + "tool": tool_name, + "id": tool_call_id, + "result": tool_output + } + del tool_calls_in_progress[tool_call_id] + + processed_event = { + "type": "state_update", + "data": serialized_data + } + + elif chunk_type == "custom": + info(f"🎯 处理custom chunk") + serialized_data = self._serialize_value(chunk["data"]) + processed_event = { + "type": "custom", + "data": serialized_data + } + + if processed_event: + yield processed_event + + info(f"✅ graph.astream() 完成,共 {chunk_count} 个chunks") + + except Exception as e: + error(f"❌ 执行 React 图时出错: {e}") + import traceback + error(f"📋 堆栈: {traceback.format_exc()}") + raise # 发送结束事件 if current_node: