""" 流式处理模块 - 处理 Agent 执行的流式输出 """ import asyncio from typing import AsyncGenerator, Dict, Any from backend.app.logger import info, error from .stream_context import set_stream_queue async def run_graph_stream( graph, input_state: Dict[str, Any], config: Dict[str, Any], ) -> AsyncGenerator[Dict[str, Any], None]: """ 运行图并通过队列流式输出事件 Args: graph: 编译后的 LangGraph input_state: 输入状态 config: 配置 Yields: 流式事件 """ queue: asyncio.Queue = asyncio.Queue() set_stream_queue(queue) async def run_graph(): """后台任务:运行 graph""" try: info(f"📡 开始调用 graph.astream()...") async for _ in graph.astream( input_state, config=config, stream_mode=["updates"], version="v2", subgraphs=True ): # 流式事件都从 agent.py 节点内部通过队列发送了 pass except Exception as e: error(f"❌ 执行图时出错: {e}") import traceback error(f"📋 堆栈: {traceback.format_exc()}") await queue.put({"type": "error", "message": str(e)}) finally: await queue.put(None) # 结束哨兵 # 启动后台任务 bg_task = asyncio.create_task(run_graph()) try: while True: event = await queue.get() if event is None: break yield event except GeneratorExit: info("⚠️ GeneratorExit,取消后台任务") bg_task.cancel() raise finally: await _cleanup_task(bg_task) async def _cleanup_task(bg_task: asyncio.Task) -> None: """清理后台任务""" if not bg_task.done(): info("⏹️ 清理后台任务") bg_task.cancel() try: await bg_task except asyncio.CancelledError: info("✅ 后台任务已取消")