Files
ailine/backend/app/mcp/adapters/base_adapter.py
root 9c53f58165
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m38s
feat: 集成MCP统一外部接口管理系统
- 添加MCP Manager统一入口管理
- 实现Contact/Dictionary/News三个适配器
- 三层降级策略:MCP -> Database -> Mock
- 保持原有api_client向后兼容
- 添加完整文档和测试
2026-05-03 12:36:12 +08:00

74 lines
1.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MCP适配器基类
所有外部接口适配器都继承自这个基类
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
@dataclass
class AdapterResult:
"""适配器执行结果"""
success: bool
data: Any = None
error: Optional[str] = None
source: str = "mcp"
class BaseAdapter(ABC):
"""
MCP适配器基类
职责:
1. 定义统一的接口规范
2. 处理缓存逻辑
3. 错误处理和降级
"""
name: str = "" # 适配器名称
description: str = "" # 适配器描述
def __init__(self, mcp_client=None, repository=None):
self.mcp_client = mcp_client
self.repository = repository
self._use_cache = repository is not None
@abstractmethod
async def execute(self, action: str, **kwargs) -> AdapterResult:
"""
执行操作(统一入口)
Args:
action: 操作类型
**kwargs: 操作参数
Returns:
AdapterResult: 执行结果
"""
pass
async def _get_from_cache(self, key: str, **kwargs) -> Optional[Any]:
"""从缓存获取数据(子类实现)"""
return None
async def _save_to_cache(self, key: str, data: Any, **kwargs):
"""保存数据到缓存(子类实现)"""
pass
def _fallback(self, action: str, **kwargs) -> AdapterResult:
"""
降级方案(模拟数据)
当MCP不可用时返回模拟数据保持系统可用
"""
return AdapterResult(
success=True,
data=self._get_mock_data(action, **kwargs),
source="mock"
)
def _get_mock_data(self, action: str, **kwargs) -> Any:
"""获取模拟数据(子类实现)"""
return None