79 lines
2.0 KiB
Python
79 lines
2.0 KiB
Python
|
|
"""
|
|||
|
|
流式处理模块 - 处理 Agent 执行的流式输出
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import asyncio
|
|||
|
|
from typing import AsyncGenerator, Dict, Any
|
|||
|
|
|
|||
|
|
from backend.app.logger import info, error
|
|||
|
|
from .stream_context import set_stream_queue
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def run_graph_stream(
|
|||
|
|
graph,
|
|||
|
|
input_state: Dict[str, Any],
|
|||
|
|
config: Dict[str, Any],
|
|||
|
|
) -> AsyncGenerator[Dict[str, Any], None]:
|
|||
|
|
"""
|
|||
|
|
运行图并通过队列流式输出事件
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
graph: 编译后的 LangGraph
|
|||
|
|
input_state: 输入状态
|
|||
|
|
config: 配置
|
|||
|
|
|
|||
|
|
Yields:
|
|||
|
|
流式事件
|
|||
|
|
"""
|
|||
|
|
queue: asyncio.Queue = asyncio.Queue()
|
|||
|
|
set_stream_queue(queue)
|
|||
|
|
|
|||
|
|
async def run_graph():
|
|||
|
|
"""后台任务:运行 graph"""
|
|||
|
|
try:
|
|||
|
|
info(f"📡 开始调用 graph.astream()...")
|
|||
|
|
async for _ in graph.astream(
|
|||
|
|
input_state,
|
|||
|
|
config=config,
|
|||
|
|
stream_mode=["updates"],
|
|||
|
|
version="v2",
|
|||
|
|
subgraphs=True
|
|||
|
|
):
|
|||
|
|
# 流式事件都从 agent.py 节点内部通过队列发送了
|
|||
|
|
pass
|
|||
|
|
except Exception as e:
|
|||
|
|
error(f"❌ 执行图时出错: {e}")
|
|||
|
|
import traceback
|
|||
|
|
error(f"📋 堆栈: {traceback.format_exc()}")
|
|||
|
|
await queue.put({"type": "error", "message": str(e)})
|
|||
|
|
finally:
|
|||
|
|
await queue.put(None) # 结束哨兵
|
|||
|
|
|
|||
|
|
# 启动后台任务
|
|||
|
|
bg_task = asyncio.create_task(run_graph())
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
while True:
|
|||
|
|
event = await queue.get()
|
|||
|
|
if event is None:
|
|||
|
|
break
|
|||
|
|
yield event
|
|||
|
|
|
|||
|
|
except GeneratorExit:
|
|||
|
|
info("⚠️ GeneratorExit,取消后台任务")
|
|||
|
|
bg_task.cancel()
|
|||
|
|
raise
|
|||
|
|
finally:
|
|||
|
|
await _cleanup_task(bg_task)
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def _cleanup_task(bg_task: asyncio.Task) -> None:
|
|||
|
|
"""清理后台任务"""
|
|||
|
|
if not bg_task.done():
|
|||
|
|
info("⏹️ 清理后台任务")
|
|||
|
|
bg_task.cancel()
|
|||
|
|
try:
|
|||
|
|
await bg_task
|
|||
|
|
except asyncio.CancelledError:
|
|||
|
|
info("✅ 后台任务已取消")
|