Files
ailine/backend/app/mcp/mcp_example.py

88 lines
2.3 KiB
Python
Raw Permalink Normal View History

"""
MCP集成示例
展示如何使用统一的MCP接口
"""
import asyncio
from ..mcp.mcp_manager import mcp_manager
from ..mcp.adapters import ContactAdapter, DictionaryAdapter, NewsAdapter
async def setup_mcp():
"""设置MCP系统"""
# 1. 配置MCP服务器可选
servers_config = {
"news": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-news"],
"enabled": False # 先禁用,等配置好后启用
},
"dictionary": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-dictionary"],
"enabled": False
},
"email": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-gmail"],
"enabled": False
}
}
mcp_manager.configure_servers(servers_config)
# 2. 注册适配器
mcp_manager.register_adapter(ContactAdapter())
mcp_manager.register_adapter(DictionaryAdapter())
mcp_manager.register_adapter(NewsAdapter())
# 3. 初始化
await mcp_manager.initialize()
async def example_usage():
"""使用示例"""
await setup_mcp()
print("=" * 60)
print("可用适配器:", mcp_manager.get_available_adapters())
print("可用MCP工具:", mcp_manager.get_available_tools())
print("=" * 60)
# 1. 查询词典
print("\n📖 查询单词 'ephemeral':")
result = await mcp_manager.execute(
"dictionary",
"query_word",
word="ephemeral",
user_id="default"
)
print(f"来源: {result.source}")
print(f"结果: {result.data}")
# 2. 查询新闻
print("\n📰 查询新闻 'AI':")
result = await mcp_manager.execute(
"news",
"query_news",
query="AI",
user_id="default"
)
print(f"来源: {result.source}")
print(f"结果数量: {len(result.data) if result.data else 0}")
# 3. 获取联系人
print("\n👥 获取联系人列表:")
result = await mcp_manager.execute(
"contact",
"list_contacts",
user_id="default"
)
print(f"来源: {result.source}")
print(f"联系人数量: {len(result.data) if result.data else 0}")
if __name__ == "__main__":
asyncio.run(example_usage())