""" 通讯录适配器 整合MCP、数据库和模拟数据 """ from typing import Dict, Any, Optional, List from datetime import datetime from dataclasses import dataclass from .base_adapter import BaseAdapter, AdapterResult @dataclass class Contact: """简单的Contact数据结构(独立版本)""" id: str = "" name: str = "" phone: str = "" email: str = "" company: str = "" position: str = "" created_at: str = "" @dataclass class Email: """简单的Email数据结构(独立版本)""" id: str = "" subject: str = "" sender: str = "" recipients: List[str] = None date: str = "" body: str = "" def __post_init__(self): if self.recipients is None: self.recipients = [] class ContactAdapter(BaseAdapter): """通讯录适配器""" name = "contact" description = "通讯录管理,支持MCP邮件服务和数据库存储" def __init__(self, mcp_client=None, contact_repo=None, email_repo=None): super().__init__(mcp_client, contact_repo) self.email_repo = email_repo self._mock_db = {} self._mock_emails = [] async def execute(self, action: str, **kwargs) -> AdapterResult: """统一执行入口""" # 优先使用缓存 user_id = kwargs.get("user_id", "default") # 1. 尝试MCP调用 if self.mcp_client and self.mcp_client.is_available(): try: mcp_result = await self._execute_mcp(action, **kwargs) if mcp_result.success: return mcp_result except Exception as e: print(f"[Contact] MCP调用失败: {e}") # 2. 尝试数据库 if self.repository: try: db_result = await self._execute_db(action, **kwargs) if db_result.success: return db_result except Exception as e: print(f"[Contact] 数据库调用失败: {e}") # 3. 降级到模拟数据 return self._fallback(action, **kwargs) async def _execute_mcp(self, action: str, **kwargs) -> AdapterResult: """通过MCP执行""" if action == "list_emails": result = await self.mcp_client.call_tool( "email_list_emails", {} ) if result.get("success"): return AdapterResult( success=True, data=result["result"], source="mcp_email" ) elif action == "send_email": result = await self.mcp_client.call_tool( "email_send_email", { "to": kwargs.get("recipient", ""), "subject": kwargs.get("subject", ""), "body": kwargs.get("body", "") } ) if result.get("success"): return AdapterResult( success=True, data=result["result"], source="mcp_email" ) return AdapterResult(success=False, error="不支持的MCP操作") async def _execute_db(self, action: str, **kwargs) -> AdapterResult: """通过数据库执行""" if not self.repository: return AdapterResult(success=False, error="No database repository") try: # 数据库操作(可选功能) return AdapterResult(success=False, error="Database not implemented yet") except Exception as e: print(f"[Contact] 数据库调用失败: {e}") return AdapterResult(success=False, error=str(e)) def _get_mock_data(self, action: str, **kwargs) -> Any: """获取模拟数据""" user_id = kwargs.get("user_id", "default") if action == "list_contacts": if user_id not in self._mock_db: self._mock_db[user_id] = [ Contact( id="1", name="张三", phone="13800138000", email="zhangsan@example.com", company="科技公司", position="工程师", created_at=datetime.now().isoformat() ), Contact( id="2", name="李四", phone="13900139000", email="lisi@example.com", company="贸易公司", position="经理", created_at=datetime.now().isoformat() ) ] return self._mock_db[user_id] elif action == "list_emails": if not self._mock_emails: self._mock_emails = [ Email( id="1", subject="会议邀请:AI 技术分享", sender="admin@example.com", recipients=["user@example.com"], date=datetime.now().isoformat(), body="你好,下周一将举办 AI 技术分享会,欢迎参加。" ) ] return self._mock_emails elif action == "add_contact": contact = kwargs.get("contact") if user_id not in self._mock_db: self._mock_db[user_id] = [] if contact and not contact.id: contact.id = str(len(self._mock_db[user_id]) + 1) if contact: self._mock_db[user_id].append(contact) return True elif action == "generate_email_draft": query = kwargs.get("query", "") return { "subject": f"Re: {query}", "recipient": "recipient@example.com", "body": "你好,\n\n这是一封自动生成的邮件草稿。\n\n此致,\n你的助手" } elif action == "sniff_contacts": query = kwargs.get("query", "") import re emails = re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', query) phones = re.findall(r'1[3-9]\d{9}', query) contacts = [] for i, email in enumerate(emails): contacts.append(Contact( id=str(i+1), name=f"联系人{i+1}", phone=phones[i] if i < len(phones) else "", email=email, company="", position="", created_at=datetime.now().isoformat() )) return contacts return None