79 lines
2.0 KiB
Python
79 lines
2.0 KiB
Python
"""
|
||
流式处理模块 - 处理 Agent 执行的流式输出
|
||
"""
|
||
|
||
import asyncio
|
||
from typing import AsyncGenerator, Dict, Any
|
||
|
||
from backend.app.logger import info, error
|
||
from .stream_context import set_stream_queue
|
||
|
||
|
||
async def run_graph_stream(
|
||
graph,
|
||
input_state: Dict[str, Any],
|
||
config: Dict[str, Any],
|
||
) -> AsyncGenerator[Dict[str, Any], None]:
|
||
"""
|
||
运行图并通过队列流式输出事件
|
||
|
||
Args:
|
||
graph: 编译后的 LangGraph
|
||
input_state: 输入状态
|
||
config: 配置
|
||
|
||
Yields:
|
||
流式事件
|
||
"""
|
||
queue: asyncio.Queue = asyncio.Queue()
|
||
set_stream_queue(queue)
|
||
|
||
async def run_graph():
|
||
"""后台任务:运行 graph"""
|
||
try:
|
||
info(f"📡 开始调用 graph.astream()...")
|
||
async for _ in graph.astream(
|
||
input_state,
|
||
config=config,
|
||
stream_mode=["updates"],
|
||
version="v2",
|
||
subgraphs=True
|
||
):
|
||
# 流式事件都从 agent.py 节点内部通过队列发送了
|
||
pass
|
||
except Exception as e:
|
||
error(f"❌ 执行图时出错: {e}")
|
||
import traceback
|
||
error(f"📋 堆栈: {traceback.format_exc()}")
|
||
await queue.put({"type": "error", "message": str(e)})
|
||
finally:
|
||
await queue.put(None) # 结束哨兵
|
||
|
||
# 启动后台任务
|
||
bg_task = asyncio.create_task(run_graph())
|
||
|
||
try:
|
||
while True:
|
||
event = await queue.get()
|
||
if event is None:
|
||
break
|
||
yield event
|
||
|
||
except GeneratorExit:
|
||
info("⚠️ GeneratorExit,取消后台任务")
|
||
bg_task.cancel()
|
||
raise
|
||
finally:
|
||
await _cleanup_task(bg_task)
|
||
|
||
|
||
async def _cleanup_task(bg_task: asyncio.Task) -> None:
|
||
"""清理后台任务"""
|
||
if not bg_task.done():
|
||
info("⏹️ 清理后台任务")
|
||
bg_task.cancel()
|
||
try:
|
||
await bg_task
|
||
except asyncio.CancelledError:
|
||
info("✅ 后台任务已取消")
|