#!/usr/bin/env python3 """ 完整后端测试 - 验证 Agent 所有功能 包括:短期记忆、长期记忆、工具调用、流式对话、历史查询 """ import asyncio import os import sys import uuid from dotenv import load_dotenv # 添加项目根目录和 backend 目录到 Python 路径 project_root = os.path.join(os.path.dirname(__file__), "..") backend_dir = os.path.join(project_root, "backend") sys.path.insert(0, project_root) sys.path.insert(0, backend_dir) load_dotenv() from backend.app.config import DB_URI from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver from backend.app.agent.service import AIAgentService from backend.app.agent.history import ThreadHistoryService from backend.app.logger import info, warning, error # PostgreSQL 连接字符串 async def print_section(title): """打印测试区块标题""" print("\n" + "=" * 70) print(f" {title}") print("=" * 70) async def test_short_term_memory(agent_service): """测试短期记忆(同一 thread_id 继续对话)""" await print_section("测试 1: 短期记忆(Short-term Memory)") thread_id = str(uuid.uuid4()) user_id = "test_user_memory" print(f"\n使用 thread_id: {thread_id[:8]}...") print(f"使用 user_id: {user_id}") # 第一轮对话 print("\n[第一轮] 发送消息: '我叫张三,今年28岁'") result1 = await agent_service.process_message( "我叫张三,今年28岁", thread_id, "local", user_id ) print(f"回复: {result1['reply'][:100]}...") # 第二轮对话 - 测试记忆 print("\n[第二轮] 发送消息: '我叫什么名字?今年多大?'") result2 = await agent_service.process_message( "我叫什么名字?今年多大?", thread_id, "local", user_id ) print(f"回复: {result2['reply']}") # 验证记忆是否存在 if "张三" in result2['reply'] or "28" in result2['reply']: print("\n✅ 短期记忆测试通过!") return True else: print("\n❌ 短期记忆测试失败!") return False async def test_tool_calling(agent_service): """测试工具调用(RAG 搜索)""" await print_section("测试 2: 工具调用(Tool Calling)") thread_id = str(uuid.uuid4()) user_id = "test_user_tools" print(f"\n使用 thread_id: {thread_id[:8]}...") print(f"使用 user_id: {user_id}") # 发送需要 RAG 搜索的问题 print("\n发送消息: '请告诉我,黄双银在魔王大陆的故事?'") result = await agent_service.process_message( "请告诉我,黄双银在魔王大陆的故事?", thread_id, "local", user_id ) print(f"回复: {result['reply'][:200]}...") # 检查是否调用了 RAG 工具(回复中会有黄双银相关内容) if "黄双银" in result['reply']: print("\n✅ 工具调用测试通过!") return True else: print("\n⚠️ 工具调用测试结果不确定,需要手动验证") return None async def test_streaming(agent_service): """测试流式对话""" await print_section("测试 3: 流式对话(Streaming)") thread_id = str(uuid.uuid4()) user_id = "test_user_stream" print(f"\n使用 thread_id: {thread_id[:8]}...") print(f"使用 user_id: {user_id}") print("\n发送消息: '用100字介绍一下AI人工智能' (流式)...") print("流式输出: ", end="", flush=True) full_reply = "" chunk_count = 0 try: async for chunk in agent_service.process_message_stream( "用100字介绍一下AI人工智能", thread_id, "local", user_id ): chunk_count += 1 if chunk.get("type") == "llm_token": token = chunk.get("token", "") print(token, end="", flush=True) full_reply += token elif chunk.get("type") == "state_update": pass # 状态更新不显示 print(f"\n\n共收到 {chunk_count} 个 chunk") print(f"完整回复长度: {len(full_reply)} 字") if chunk_count > 0 and len(full_reply) > 10: print("\n✅ 流式对话测试通过!") return True else: print("\n❌ 流式对话测试失败!") return False except Exception as e: print(f"\n❌ 流式对话异常: {e}") return False async def test_history_service(agent_service, history_service): """测试历史查询服务""" await print_section("测试 4: 历史查询服务(History Service)") user_id = "test_user_history" # 先创建几个对话 print(f"\n为 user_id={user_id} 创建测试对话...") thread_ids = [] for i in range(3): thread_id = str(uuid.uuid4()) thread_ids.append(thread_id) await agent_service.process_message( f"这是第 {i+1} 个测试对话", thread_id, "local", user_id ) print(f" 创建线程 {i+1}: {thread_id[:8]}...") # 1. 测试获取用户线程列表 print("\n[4.1] 测试获取用户线程列表...") threads = await history_service.get_user_threads(user_id, limit=10) print(f" 找到 {len(threads)} 个线程") if len(threads) >= 3: print(" ✅ 线程列表查询通过") else: print(" ⚠️ 线程数量少于预期") # 2. 测试获取单个线程的消息历史 if thread_ids: test_thread_id = thread_ids[0] print(f"\n[4.2] 测试获取线程消息历史 (thread_id={test_thread_id[:8]}...)") messages = await history_service.get_thread_messages(test_thread_id) print(f" 找到 {len(messages)} 条消息") if len(messages) >= 2: # 至少有一问一答 print(" ✅ 消息历史查询通过") else: print(" ⚠️ 消息数量少于预期") # 3. 测试获取线程摘要 print(f"\n[4.3] 测试获取线程摘要...") summary = await history_service.get_thread_summary(test_thread_id) print(f" 摘要: {summary.get('summary', '')[:50]}...") print(f" 消息数: {summary.get('message_count', 0)}") if summary.get('message_count', 0) > 0: print(" ✅ 线程摘要查询通过") else: print(" ⚠️ 摘要查询结果不确定") return len(threads) >= 3 async def test_long_term_memory(agent_service): """测试长期记忆(mem0)""" await print_section("测试 5: 长期记忆(Long-term Memory - mem0)") thread_id1 = str(uuid.uuid4()) thread_id2 = str(uuid.uuid4()) # 不同的线程 user_id = "test_user_longterm" print(f"\n使用 user_id: {user_id}") print(f"线程 1: {thread_id1[:8]}...") print(f"线程 2: {thread_id2[:8]}...") # 在第一个线程中保存信息 print("\n[线程 1] 发送消息: '记住,我的宠物名字叫小白,是一只猫'") result1 = await agent_service.process_message( "记住,我的宠物名字叫小白,是一只猫", thread_id1, "local", user_id ) print(f"回复: {result1['reply'][:100]}...") # 等待一下,让 mem0 保存 await asyncio.sleep(1) # 在第二个线程中询问(不同的 thread_id) print("\n[线程 2] 发送消息: '我的宠物叫什么名字?是什么动物?'") result2 = await agent_service.process_message( "我的宠物叫什么名字?是什么动物?", thread_id2, "local", user_id ) print(f"回复: {result2['reply']}") # 验证长期记忆 if "小白" in result2['reply'] or "猫" in result2['reply']: print("\n✅ 长期记忆测试通过!") return True else: print("\n⚠️ 长期记忆可能未启用,或需要手动验证") return None async def main(): """主测试函数""" print("\n" + "=" * 70) print(" 后端完整功能测试") print("=" * 70) results = {} try: # 创建数据库连接和服务 print("\n正在初始化数据库连接...") async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer: await checkpointer.setup() print("✅ 数据库连接成功") # 创建服务实例 print("\n正在初始化 Agent 服务...") agent_service = AIAgentService(checkpointer) await agent_service.initialize() print("✅ Agent 服务初始化成功") history_service = ThreadHistoryService(checkpointer) print("✅ 历史服务初始化成功") print(f"\n可用模型: {list(agent_service.graphs.keys())}") # 运行测试 results["短期记忆"] = await test_short_term_memory(agent_service) await asyncio.sleep(1) results["工具调用"] = await test_tool_calling(agent_service) await asyncio.sleep(1) results["流式对话"] = await test_streaming(agent_service) await asyncio.sleep(1) results["历史查询"] = await test_history_service(agent_service, history_service) await asyncio.sleep(1) results["长期记忆"] = await test_long_term_memory(agent_service) await asyncio.sleep(1) # 打印总结 await print_section("测试总结") print("\n测试结果:") print("-" * 40) pass_count = 0 fail_count = 0 skip_count = 0 for test_name, result in results.items(): if result is True: status = "✅ 通过" pass_count += 1 elif result is False: status = "❌ 失败" fail_count += 1 else: status = "⚠️ 待验证" skip_count += 1 print(f" {test_name:12s}: {status}") print("-" * 40) print(f"总计: {len(results)} 个测试") print(f"通过: {pass_count}, 失败: {fail_count}, 待验证: {skip_count}") if fail_count == 0: print("\n🎉 所有核心测试通过!") else: print(f"\n⚠️ 有 {fail_count} 个测试失败") except Exception as e: error(f"\n❌ 测试运行异常: {e}") import traceback traceback.print_exc() return 1 return 0 if fail_count == 0 else 1 if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code)