""" 测试新的完整 ReAct 循环架构 + 流式 Tool Calling """ import asyncio import sys import os sys.path.insert(0, "/root/projects/ailine/backend") from app.main_graph.main_graph_builder import build_agent_graph from app.model_services import get_cached_chat_services from app.agent.stream_context import set_stream_queue from app.logger import info, error async def test_full_react_streaming(): """测试完整的 ReAct 循环流式架构""" info("=" * 60) info("🧪 测试完整 ReAct 循环 + 流式 Tool Calling") info("=" * 60) # 1. 获取服务 chat_services = get_cached_chat_services() info(f"✅ 加载了 {len(chat_services)} 个模型: {list(chat_services.keys())}") # 2. 构建图 graph_builder = build_agent_graph(chat_services, mem0_client=None) graph = graph_builder.compile() info(f"✅ 图构建完成") # 3. 创建队列 queue = asyncio.Queue() set_stream_queue(queue) # 4. 定义后台任务 async def run_graph(): try: input_state = { "messages": [ {"role": "user", "content": "你好,请介绍一下你自己"} ], "user_id": "test_user", } async for chunk in graph.astream( input_state, stream_mode=["updates"], version="v2" ): await queue.put({ "type": "graph_update", "data": chunk, }) except Exception as e: error(f"❌ 图执行出错: {e}") import traceback error(f"📋 堆栈: {traceback.format_exc()}") await queue.put({"type": "error", "message": str(e)}) finally: await queue.put(None) # 5. 启动后台任务并处理事件 bg_task = asyncio.create_task(run_graph()) info("\n📡 开始接收流式事件:\n") try: while True: event = await queue.get() if event is None: break if event["type"] == "llm_token": if event["token"]: print(event["token"], end="") if event["reasoning_token"]: print(f"{event['reasoning_token']}", end="") elif event["type"] == "turn_start": print(f"\n===== Turn {event['turn']} 开始 =====") elif event["type"] == "tool_start": print(f"\n🔧 工具调用: {event['tool']}") elif event["type"] == "tool_end": print(f"\n✅ 工具调用完成") elif event["type"] == "final_answer": print(f"\n📝 最终答案") elif event["type"] == "graph_update": # 忽略 update 事件,只关心 agent 节点发的事件 pass else: print(f"\n📋 其他事件: {event}") print("\n✅ 流式测试完成") return True except Exception as e: error(f"❌ 测试出错: {e}") import traceback error(f"📋 堆栈: {traceback.format_exc()}") return False finally: if not bg_task.done(): bg_task.cancel() if __name__ == "__main__": asyncio.run(test_full_react_streaming())