Files
ailine/backend/app/mcp/mcp_client.py

201 lines
6.6 KiB
Python
Raw Permalink Normal View History

"""
MCP客户端
负责与MCP服务器通信
"""
import asyncio
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
import json
@dataclass
class MCPServerConfig:
"""MCP服务器配置"""
name: str
server_type: str = "stdio" # stdio 或 http
command: Optional[str] = None # for stdio
args: List[str] = field(default_factory=list) # for stdio
url: Optional[str] = None # for http
headers: Dict[str, str] = field(default_factory=dict) # for http
env: Dict[str, str] = field(default_factory=dict)
timeout: int = 120
enabled: bool = True
class MCPClient:
"""
MCP客户端
支持
1. 多MCP服务器管理
2. 工具发现和调用
3. 连接管理和重试
"""
def __init__(self):
self._servers: Dict[str, MCPServerConfig] = {}
self._connections: Dict[str, Any] = {}
self._tools: Dict[str, Dict[str, Any]] = {}
self._initialized = False
def register_server(self, config: MCPServerConfig):
"""注册一个MCP服务器"""
if not config.enabled:
return
self._servers[config.name] = config
async def initialize(self):
"""初始化所有MCP服务器连接"""
if self._initialized:
return
print(f"[MCP] 初始化 {len(self._servers)} 个MCP服务器...")
for name, config in self._servers.items():
try:
await self._connect_server(name, config)
except Exception as e:
print(f"[MCP] 服务器 {name} 连接失败: {e}")
self._initialized = True
print(f"[MCP] 初始化完成,可用工具: {list(self._tools.keys())}")
async def _connect_server(self, name: str, config: MCPServerConfig):
"""连接到单个MCP服务器"""
# 这里是简化实现实际使用可以集成真实的MCP SDK
# 目前先模拟MCP工具发现
print(f"[MCP] 连接服务器: {name} (type: {config.server_type})")
# 模拟发现一些工具
if name == "filesystem":
self._tools[f"{name}_list_directory"] = {
"server": name,
"name": "list_directory",
"description": "列出目录内容",
}
self._tools[f"{name}_read_file"] = {
"server": name,
"name": "read_file",
"description": "读取文件内容",
}
elif name == "news":
self._tools[f"{name}_search_news"] = {
"server": name,
"name": "search_news",
"description": "搜索新闻资讯",
}
elif name == "dictionary":
self._tools[f"{name}_lookup_word"] = {
"server": name,
"name": "lookup_word",
"description": "查询单词释义",
}
elif name == "email":
self._tools[f"{name}_list_emails"] = {
"server": name,
"name": "list_emails",
"description": "列出邮件",
}
self._tools[f"{name}_send_email"] = {
"server": name,
"name": "send_email",
"description": "发送邮件",
}
async def call_tool(
self,
tool_name: str,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
调用MCP工具
Args:
tool_name: 工具名称带server前缀 "filesystem_read_file"
arguments: 工具参数
Returns:
工具执行结果
"""
if not self._initialized:
await self.initialize()
if tool_name not in self._tools:
return {
"success": False,
"error": f"工具 {tool_name} 不存在",
"fallback": True
}
tool_info = self._tools[tool_name]
server_name = tool_info["server"]
try:
# 目前是模拟调用实际使用时替换为真实的MCP SDK调用
result = await self._mock_tool_call(server_name, tool_info["name"], arguments)
return {
"success": True,
"result": result,
"source": f"mcp_{server_name}"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"fallback": True
}
async def _mock_tool_call(
self,
server_name: str,
tool_name: str,
arguments: Dict[str, Any]
) -> Any:
"""模拟MCP工具调用待替换为真实实现"""
from datetime import datetime
if server_name == "news" and tool_name == "search_news":
query = arguments.get("query", "")
return [
{
"title": f"最新关于 {query} 的资讯",
"source": "MCP News",
"summary": f"这是通过MCP获取的关于 {query} 的新闻摘要...",
"published_at": datetime.now().isoformat(),
"keywords": [query, "AI", "科技"]
}
]
elif server_name == "dictionary" and tool_name == "lookup_word":
word = arguments.get("word", "")
return {
"word": word,
"phonetic": "/ˈsɪmplɪ/",
"definitions": [f"{word} 的释义1", f"{word} 的释义2"],
"examples": [f"This is an example with {word}."]
}
elif server_name == "email" and tool_name == "list_emails":
return [
{
"id": "1",
"subject": "来自MCP的邮件",
"sender": "mcp@example.com",
"date": datetime.now().isoformat(),
"snippet": "这是通过MCP获取的邮件内容..."
}
]
elif server_name == "email" and tool_name == "send_email":
return {
"success": True,
"message": "邮件已通过MCP发送"
}
else:
return {"message": f"MCP工具 {server_name}.{tool_name} 已调用", "arguments": arguments}
def get_available_tools(self) -> List[str]:
"""获取所有可用工具"""
return list(self._tools.keys())
def is_available(self) -> bool:
"""检查MCP是否可用"""
return len(self._tools) > 0