""" MCP集成示例 展示如何使用统一的MCP接口 """ import asyncio from ..mcp.mcp_manager import mcp_manager from ..mcp.adapters import ContactAdapter, DictionaryAdapter, NewsAdapter async def setup_mcp(): """设置MCP系统""" # 1. 配置MCP服务器(可选) servers_config = { "news": { "type": "stdio", "command": "npx", "args": ["-y", "@modelcontextprotocol/server-news"], "enabled": False # 先禁用,等配置好后启用 }, "dictionary": { "type": "stdio", "command": "npx", "args": ["-y", "@modelcontextprotocol/server-dictionary"], "enabled": False }, "email": { "type": "stdio", "command": "npx", "args": ["-y", "@modelcontextprotocol/server-gmail"], "enabled": False } } mcp_manager.configure_servers(servers_config) # 2. 注册适配器 mcp_manager.register_adapter(ContactAdapter()) mcp_manager.register_adapter(DictionaryAdapter()) mcp_manager.register_adapter(NewsAdapter()) # 3. 初始化 await mcp_manager.initialize() async def example_usage(): """使用示例""" await setup_mcp() print("=" * 60) print("可用适配器:", mcp_manager.get_available_adapters()) print("可用MCP工具:", mcp_manager.get_available_tools()) print("=" * 60) # 1. 查询词典 print("\n📖 查询单词 'ephemeral':") result = await mcp_manager.execute( "dictionary", "query_word", word="ephemeral", user_id="default" ) print(f"来源: {result.source}") print(f"结果: {result.data}") # 2. 查询新闻 print("\n📰 查询新闻 'AI':") result = await mcp_manager.execute( "news", "query_news", query="AI", user_id="default" ) print(f"来源: {result.source}") print(f"结果数量: {len(result.data) if result.data else 0}") # 3. 获取联系人 print("\n👥 获取联系人列表:") result = await mcp_manager.execute( "contact", "list_contacts", user_id="default" ) print(f"来源: {result.source}") print(f"联系人数量: {len(result.data) if result.data else 0}") if __name__ == "__main__": asyncio.run(example_usage())