feat: 集成MCP统一外部接口管理系统
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m38s
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m38s
- 添加MCP Manager统一入口管理 - 实现Contact/Dictionary/News三个适配器 - 三层降级策略:MCP -> Database -> Mock - 保持原有api_client向后兼容 - 添加完整文档和测试
This commit is contained in:
73
backend/app/mcp/adapters/base_adapter.py
Normal file
73
backend/app/mcp/adapters/base_adapter.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""
|
||||
MCP适配器基类
|
||||
所有外部接口适配器都继承自这个基类
|
||||
"""
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Dict, Any, Optional, List
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class AdapterResult:
|
||||
"""适配器执行结果"""
|
||||
success: bool
|
||||
data: Any = None
|
||||
error: Optional[str] = None
|
||||
source: str = "mcp"
|
||||
|
||||
|
||||
class BaseAdapter(ABC):
|
||||
"""
|
||||
MCP适配器基类
|
||||
|
||||
职责:
|
||||
1. 定义统一的接口规范
|
||||
2. 处理缓存逻辑
|
||||
3. 错误处理和降级
|
||||
"""
|
||||
|
||||
name: str = "" # 适配器名称
|
||||
description: str = "" # 适配器描述
|
||||
|
||||
def __init__(self, mcp_client=None, repository=None):
|
||||
self.mcp_client = mcp_client
|
||||
self.repository = repository
|
||||
self._use_cache = repository is not None
|
||||
|
||||
@abstractmethod
|
||||
async def execute(self, action: str, **kwargs) -> AdapterResult:
|
||||
"""
|
||||
执行操作(统一入口)
|
||||
|
||||
Args:
|
||||
action: 操作类型
|
||||
**kwargs: 操作参数
|
||||
|
||||
Returns:
|
||||
AdapterResult: 执行结果
|
||||
"""
|
||||
pass
|
||||
|
||||
async def _get_from_cache(self, key: str, **kwargs) -> Optional[Any]:
|
||||
"""从缓存获取数据(子类实现)"""
|
||||
return None
|
||||
|
||||
async def _save_to_cache(self, key: str, data: Any, **kwargs):
|
||||
"""保存数据到缓存(子类实现)"""
|
||||
pass
|
||||
|
||||
def _fallback(self, action: str, **kwargs) -> AdapterResult:
|
||||
"""
|
||||
降级方案(模拟数据)
|
||||
|
||||
当MCP不可用时,返回模拟数据保持系统可用
|
||||
"""
|
||||
return AdapterResult(
|
||||
success=True,
|
||||
data=self._get_mock_data(action, **kwargs),
|
||||
source="mock"
|
||||
)
|
||||
|
||||
def _get_mock_data(self, action: str, **kwargs) -> Any:
|
||||
"""获取模拟数据(子类实现)"""
|
||||
return None
|
||||
Reference in New Issue
Block a user