""" MCP客户端 负责与MCP服务器通信 """ import asyncio from typing import Dict, Any, Optional, List from dataclasses import dataclass, field import json @dataclass class MCPServerConfig: """MCP服务器配置""" name: str server_type: str = "stdio" # stdio 或 http command: Optional[str] = None # for stdio args: List[str] = field(default_factory=list) # for stdio url: Optional[str] = None # for http headers: Dict[str, str] = field(default_factory=dict) # for http env: Dict[str, str] = field(default_factory=dict) timeout: int = 120 enabled: bool = True class MCPClient: """ MCP客户端 支持: 1. 多MCP服务器管理 2. 工具发现和调用 3. 连接管理和重试 """ def __init__(self): self._servers: Dict[str, MCPServerConfig] = {} self._connections: Dict[str, Any] = {} self._tools: Dict[str, Dict[str, Any]] = {} self._initialized = False def register_server(self, config: MCPServerConfig): """注册一个MCP服务器""" if not config.enabled: return self._servers[config.name] = config async def initialize(self): """初始化所有MCP服务器连接""" if self._initialized: return print(f"[MCP] 初始化 {len(self._servers)} 个MCP服务器...") for name, config in self._servers.items(): try: await self._connect_server(name, config) except Exception as e: print(f"[MCP] 服务器 {name} 连接失败: {e}") self._initialized = True print(f"[MCP] 初始化完成,可用工具: {list(self._tools.keys())}") async def _connect_server(self, name: str, config: MCPServerConfig): """连接到单个MCP服务器""" # 这里是简化实现,实际使用可以集成真实的MCP SDK # 目前先模拟MCP工具发现 print(f"[MCP] 连接服务器: {name} (type: {config.server_type})") # 模拟发现一些工具 if name == "filesystem": self._tools[f"{name}_list_directory"] = { "server": name, "name": "list_directory", "description": "列出目录内容", } self._tools[f"{name}_read_file"] = { "server": name, "name": "read_file", "description": "读取文件内容", } elif name == "news": self._tools[f"{name}_search_news"] = { "server": name, "name": "search_news", "description": "搜索新闻资讯", } elif name == "dictionary": self._tools[f"{name}_lookup_word"] = { "server": name, "name": "lookup_word", "description": "查询单词释义", } elif name == "email": self._tools[f"{name}_list_emails"] = { "server": name, "name": "list_emails", "description": "列出邮件", } self._tools[f"{name}_send_email"] = { "server": name, "name": "send_email", "description": "发送邮件", } async def call_tool( self, tool_name: str, arguments: Dict[str, Any] ) -> Dict[str, Any]: """ 调用MCP工具 Args: tool_name: 工具名称(带server前缀,如 "filesystem_read_file") arguments: 工具参数 Returns: 工具执行结果 """ if not self._initialized: await self.initialize() if tool_name not in self._tools: return { "success": False, "error": f"工具 {tool_name} 不存在", "fallback": True } tool_info = self._tools[tool_name] server_name = tool_info["server"] try: # 目前是模拟调用,实际使用时替换为真实的MCP SDK调用 result = await self._mock_tool_call(server_name, tool_info["name"], arguments) return { "success": True, "result": result, "source": f"mcp_{server_name}" } except Exception as e: return { "success": False, "error": str(e), "fallback": True } async def _mock_tool_call( self, server_name: str, tool_name: str, arguments: Dict[str, Any] ) -> Any: """模拟MCP工具调用(待替换为真实实现)""" from datetime import datetime if server_name == "news" and tool_name == "search_news": query = arguments.get("query", "") return [ { "title": f"最新关于 {query} 的资讯", "source": "MCP News", "summary": f"这是通过MCP获取的关于 {query} 的新闻摘要...", "published_at": datetime.now().isoformat(), "keywords": [query, "AI", "科技"] } ] elif server_name == "dictionary" and tool_name == "lookup_word": word = arguments.get("word", "") return { "word": word, "phonetic": "/ˈsɪmplɪ/", "definitions": [f"{word} 的释义1", f"{word} 的释义2"], "examples": [f"This is an example with {word}."] } elif server_name == "email" and tool_name == "list_emails": return [ { "id": "1", "subject": "来自MCP的邮件", "sender": "mcp@example.com", "date": datetime.now().isoformat(), "snippet": "这是通过MCP获取的邮件内容..." } ] elif server_name == "email" and tool_name == "send_email": return { "success": True, "message": "邮件已通过MCP发送" } else: return {"message": f"MCP工具 {server_name}.{tool_name} 已调用", "arguments": arguments} def get_available_tools(self) -> List[str]: """获取所有可用工具""" return list(self._tools.keys()) def is_available(self) -> bool: """检查MCP是否可用""" return len(self._tools) > 0