Files
ailine/tools/test/test_stream.py
root 5b41598d50
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m41s
重构:简化流式架构,将 ReAct 循环移入 agent 节点
主要变更:
- 简化 agent_service:移除复杂双协程,只用 stream_mode=["updates"]
- stream_context:提供更清晰的 API (set_stream_queue/get_stream_queue)
- main_graph_builder:简化图结构,移除 tools 节点和条件边
- agent 节点:包含完整 ReAct 循环 + 流式 Tool Calling 拼接
- 前端:适配新的事件格式
- 添加测试文件:test_full_react_streaming.py, test_stream.py
2026-05-07 02:56:35 +08:00

80 lines
3.1 KiB
Python

#!/usr/bin/env python3
"""测试后端流式接口,看看是否真的有流式输出"""
import asyncio
import aiohttp
import json
BACKEND_URL = "http://localhost:8079/chat/stream"
async def test_stream():
print("=" * 60)
print("🧪 测试后端流式接口")
print("=" * 60)
async with aiohttp.ClientSession() as session:
payload = {
"message": "你好,请简单介绍一下自己",
"thread_id": "test-thread-001",
"model": "zhipu",
"user_id": "test-user"
}
print(f"\n📤 发送请求: {json.dumps(payload, ensure_ascii=False)}")
try:
async with session.post(BACKEND_URL, json=payload) as response:
print(f"\n✅ 响应状态: {response.status}")
print(f"\n📥 开始接收流式响应...\n")
event_count = 0
token_count = 0
async for line in response.content:
line = line.decode('utf-8').strip()
if line:
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
print("\n🏁 收到 [DONE] 事件")
break
try:
event = json.loads(data_str)
event_count += 1
print(f" [{event_count}] {event.get('type')}")
if event.get('type') == 'llm_token' and 'token' in event:
token = event['token']
token_count += 1
print(f" → token: {repr(token)}")
if event.get('type') == 'node_start':
print(f" → node: {event.get('node')}")
if event.get('type') == 'tool_call_start':
print(f" → tool: {event.get('tool')}")
if event.get('type') == 'tool_call_end':
print(f" → tool: {event.get('tool')}")
if event.get('type') == 'error':
print(f" ❌ 错误: {event.get('message')}")
except Exception as e:
print(f" ❌ 解析失败: {e}, 原始数据: {repr(data_str)}")
else:
print(f" 📝 原始行: {repr(line)}")
print(f"\n📊 统计: {event_count} 个事件, {token_count} 个 token")
except Exception as e:
print(f"\n❌ 请求异常: {e}")
import traceback
print(f"📋 堆栈: {traceback.format_exc()}")
if __name__ == "__main__":
asyncio.run(test_stream())