Files
ailine/backend/app/mcp/mcp_client.py
root 9c53f58165
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m38s
feat: 集成MCP统一外部接口管理系统
- 添加MCP Manager统一入口管理
- 实现Contact/Dictionary/News三个适配器
- 三层降级策略:MCP -> Database -> Mock
- 保持原有api_client向后兼容
- 添加完整文档和测试
2026-05-03 12:36:12 +08:00

201 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MCP客户端
负责与MCP服务器通信
"""
import asyncio
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
import json
@dataclass
class MCPServerConfig:
"""MCP服务器配置"""
name: str
server_type: str = "stdio" # stdio 或 http
command: Optional[str] = None # for stdio
args: List[str] = field(default_factory=list) # for stdio
url: Optional[str] = None # for http
headers: Dict[str, str] = field(default_factory=dict) # for http
env: Dict[str, str] = field(default_factory=dict)
timeout: int = 120
enabled: bool = True
class MCPClient:
"""
MCP客户端
支持:
1. 多MCP服务器管理
2. 工具发现和调用
3. 连接管理和重试
"""
def __init__(self):
self._servers: Dict[str, MCPServerConfig] = {}
self._connections: Dict[str, Any] = {}
self._tools: Dict[str, Dict[str, Any]] = {}
self._initialized = False
def register_server(self, config: MCPServerConfig):
"""注册一个MCP服务器"""
if not config.enabled:
return
self._servers[config.name] = config
async def initialize(self):
"""初始化所有MCP服务器连接"""
if self._initialized:
return
print(f"[MCP] 初始化 {len(self._servers)} 个MCP服务器...")
for name, config in self._servers.items():
try:
await self._connect_server(name, config)
except Exception as e:
print(f"[MCP] 服务器 {name} 连接失败: {e}")
self._initialized = True
print(f"[MCP] 初始化完成,可用工具: {list(self._tools.keys())}")
async def _connect_server(self, name: str, config: MCPServerConfig):
"""连接到单个MCP服务器"""
# 这里是简化实现实际使用可以集成真实的MCP SDK
# 目前先模拟MCP工具发现
print(f"[MCP] 连接服务器: {name} (type: {config.server_type})")
# 模拟发现一些工具
if name == "filesystem":
self._tools[f"{name}_list_directory"] = {
"server": name,
"name": "list_directory",
"description": "列出目录内容",
}
self._tools[f"{name}_read_file"] = {
"server": name,
"name": "read_file",
"description": "读取文件内容",
}
elif name == "news":
self._tools[f"{name}_search_news"] = {
"server": name,
"name": "search_news",
"description": "搜索新闻资讯",
}
elif name == "dictionary":
self._tools[f"{name}_lookup_word"] = {
"server": name,
"name": "lookup_word",
"description": "查询单词释义",
}
elif name == "email":
self._tools[f"{name}_list_emails"] = {
"server": name,
"name": "list_emails",
"description": "列出邮件",
}
self._tools[f"{name}_send_email"] = {
"server": name,
"name": "send_email",
"description": "发送邮件",
}
async def call_tool(
self,
tool_name: str,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
调用MCP工具
Args:
tool_name: 工具名称带server前缀"filesystem_read_file"
arguments: 工具参数
Returns:
工具执行结果
"""
if not self._initialized:
await self.initialize()
if tool_name not in self._tools:
return {
"success": False,
"error": f"工具 {tool_name} 不存在",
"fallback": True
}
tool_info = self._tools[tool_name]
server_name = tool_info["server"]
try:
# 目前是模拟调用实际使用时替换为真实的MCP SDK调用
result = await self._mock_tool_call(server_name, tool_info["name"], arguments)
return {
"success": True,
"result": result,
"source": f"mcp_{server_name}"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"fallback": True
}
async def _mock_tool_call(
self,
server_name: str,
tool_name: str,
arguments: Dict[str, Any]
) -> Any:
"""模拟MCP工具调用待替换为真实实现"""
from datetime import datetime
if server_name == "news" and tool_name == "search_news":
query = arguments.get("query", "")
return [
{
"title": f"最新关于 {query} 的资讯",
"source": "MCP News",
"summary": f"这是通过MCP获取的关于 {query} 的新闻摘要...",
"published_at": datetime.now().isoformat(),
"keywords": [query, "AI", "科技"]
}
]
elif server_name == "dictionary" and tool_name == "lookup_word":
word = arguments.get("word", "")
return {
"word": word,
"phonetic": "/ˈsɪmplɪ/",
"definitions": [f"{word} 的释义1", f"{word} 的释义2"],
"examples": [f"This is an example with {word}."]
}
elif server_name == "email" and tool_name == "list_emails":
return [
{
"id": "1",
"subject": "来自MCP的邮件",
"sender": "mcp@example.com",
"date": datetime.now().isoformat(),
"snippet": "这是通过MCP获取的邮件内容..."
}
]
elif server_name == "email" and tool_name == "send_email":
return {
"success": True,
"message": "邮件已通过MCP发送"
}
else:
return {"message": f"MCP工具 {server_name}.{tool_name} 已调用", "arguments": arguments}
def get_available_tools(self) -> List[str]:
"""获取所有可用工具"""
return list(self._tools.keys())
def is_available(self) -> bool:
"""检查MCP是否可用"""
return len(self._tools) > 0