Files
ailine/backend/app/mcp/mcp_example.py
root 9c53f58165
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m38s
feat: 集成MCP统一外部接口管理系统
- 添加MCP Manager统一入口管理
- 实现Contact/Dictionary/News三个适配器
- 三层降级策略:MCP -> Database -> Mock
- 保持原有api_client向后兼容
- 添加完整文档和测试
2026-05-03 12:36:12 +08:00

88 lines
2.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MCP集成示例
展示如何使用统一的MCP接口
"""
import asyncio
from ..mcp.mcp_manager import mcp_manager
from ..mcp.adapters import ContactAdapter, DictionaryAdapter, NewsAdapter
async def setup_mcp():
"""设置MCP系统"""
# 1. 配置MCP服务器可选
servers_config = {
"news": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-news"],
"enabled": False # 先禁用,等配置好后启用
},
"dictionary": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-dictionary"],
"enabled": False
},
"email": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-gmail"],
"enabled": False
}
}
mcp_manager.configure_servers(servers_config)
# 2. 注册适配器
mcp_manager.register_adapter(ContactAdapter())
mcp_manager.register_adapter(DictionaryAdapter())
mcp_manager.register_adapter(NewsAdapter())
# 3. 初始化
await mcp_manager.initialize()
async def example_usage():
"""使用示例"""
await setup_mcp()
print("=" * 60)
print("可用适配器:", mcp_manager.get_available_adapters())
print("可用MCP工具:", mcp_manager.get_available_tools())
print("=" * 60)
# 1. 查询词典
print("\n📖 查询单词 'ephemeral':")
result = await mcp_manager.execute(
"dictionary",
"query_word",
word="ephemeral",
user_id="default"
)
print(f"来源: {result.source}")
print(f"结果: {result.data}")
# 2. 查询新闻
print("\n📰 查询新闻 'AI':")
result = await mcp_manager.execute(
"news",
"query_news",
query="AI",
user_id="default"
)
print(f"来源: {result.source}")
print(f"结果数量: {len(result.data) if result.data else 0}")
# 3. 获取联系人
print("\n👥 获取联系人列表:")
result = await mcp_manager.execute(
"contact",
"list_contacts",
user_id="default"
)
print(f"来源: {result.source}")
print(f"联系人数量: {len(result.data) if result.data else 0}")
if __name__ == "__main__":
asyncio.run(example_usage())