#!/usr/bin/env python3 """ 简化版MCP测试 - 直接测试mcp模块 """ import asyncio import sys from pathlib import Path # 直接添加mcp模块路径 sys.path.insert(0, str(Path(__file__).parent / "backend" / "app")) async def test_mcp_direct(): """直接测试MCP模块""" print("=" * 70) print("🧪 直接测试 MCP 模块") print("=" * 70) # 直接导入mcp子模块 print("\n[1/4] 导入MCP核心模块...") try: from mcp.mcp_manager import MCPManager, mcp_manager from mcp.adapters.base_adapter import BaseAdapter, AdapterResult print("✅ 核心模块导入成功") except Exception as e: print(f"❌ 核心模块导入失败: {e}") import traceback traceback.print_exc() return False # 导入适配器 print("\n[2/4] 导入适配器...") try: from mcp.adapters.contact_adapter import ContactAdapter from mcp.adapters.dictionary_adapter import DictionaryAdapter from mcp.adapters.news_adapter import NewsAdapter print("✅ 适配器导入成功") except Exception as e: print(f"❌ 适配器导入失败: {e}") import traceback traceback.print_exc() return False # 测试适配器 print("\n[3/4] 测试各个适配器...") print("\n📖 测试 DictionaryAdapter...") dict_adapter = DictionaryAdapter() result = dict_adapter._fallback("query_word", word="ephemeral") print(f" 来源: {result.source}") print(f" 成功: {result.success}") print(f" 单词: {result.data.get('word', '')}") print(f" 释义: {result.data.get('definitions', [])}") print("\n📰 测试 NewsAdapter...") news_adapter = NewsAdapter() result = news_adapter._fallback("query_news", query="AI") print(f" 来源: {result.source}") print(f" 成功: {result.success}") print(f" 数量: {len(result.data) if result.data else 0}") print("\n👥 测试 ContactAdapter...") contact_adapter = ContactAdapter() result = contact_adapter._fallback("list_contacts", user_id="test") print(f" 来源: {result.source}") print(f" 成功: {result.success}") print(f" 数量: {len(result.data) if result.data else 0}") # 测试MCP管理器 print("\n[4/4] 测试 MCP Manager...") try: mcp_manager.register_adapter(ContactAdapter()) mcp_manager.register_adapter(DictionaryAdapter()) mcp_manager.register_adapter(NewsAdapter()) await mcp_manager.initialize() print(f"✅ 可用适配器: {mcp_manager.get_available_adapters()}") print(f"✅ 可用工具: {mcp_manager.get_available_tools()}") # 测试通过manager调用 print("\n测试通过Manager调用...") result = await mcp_manager.execute( "dictionary", "query_word", word="serendipity", user_id="test" ) print(f" 成功: {result.success}") print(f" 来源: {result.source}") except Exception as e: print(f"❌ MCP Manager 测试失败: {e}") import traceback traceback.print_exc() print("\n" + "=" * 70) print("🎉 直接测试完成!") print("=" * 70) print("\n✅ MCP集成系统架构已就绪:") print(" ┌─────────────────────────────────────────────┐") print(" │ MCP Manager (统一入口) │") print(" ├─────────────────────────────────────────────┤") print(" │ ContactAdapter │ DictionaryAdapter │ News │") print(" ├─────────────────────────────────────────────┤") print(" │ MCP Client -> Database -> Mock (降级) │") print(" └─────────────────────────────────────────────┘") print("\n📖 文档: docs/MCP_INTEGRATION.md") print("⚙️ 配置: backend/app/mcp/mcp_config.example.yaml") return True if __name__ == "__main__": asyncio.run(test_mcp_direct())