""" 通讯录子图 API 调用工具(使用MCP统一接口) """ from typing import Dict, Any, Optional, List from datetime import datetime from dataclasses import dataclass from .state import Contact, Email from ...mcp.mcp_manager import mcp_manager from ...mcp.adapters import ContactAdapter @dataclass class ContactAPIClient: """ 通讯录 API 客户端 - 使用MCP统一接口 保持向后兼容,内部使用MCP适配器 """ def __init__(self, conn=None): """ 初始化(使用延迟初始化,MCP在首次使用时初始化) Args: conn: 数据库连接(保留用于向后兼容) """ self.conn = conn self._mcp_initialized = False # 延迟初始化标志 async def _init_mcp(self): """初始化MCP系统(延迟初始化)""" if self._mcp_initialized: return # 已初始化,跳过 if not mcp_manager.get_adapter("contact"): # 获取repository(如果有) repo = None if self.conn: try: from ...db.models import ContactRepository repo = ContactRepository(self.conn) except Exception: pass mcp_manager.register_adapter(ContactAdapter(contact_repo=repo)) await mcp_manager.initialize() self._mcp_initialized = True async def list_contacts(self, user_id: str = "default") -> List[Contact]: """获取联系人列表""" await self._init_mcp() result = await mcp_manager.execute("contact", "list_contacts", user_id=user_id) if result.success: return result.data return [] async def add_contact(self, user_id: str, contact: Contact) -> bool: """添加联系人""" await self._init_mcp() result = await mcp_manager.execute( "contact", "add_contact", user_id=user_id, contact=contact ) return result.success and result.data async def list_emails(self, user_id: str = "default") -> List[Email]: """查询邮件列表""" await self._init_mcp() result = await mcp_manager.execute("contact", "list_emails", user_id=user_id) if result.success: return result.data return [] async def generate_email_draft(self, query: str) -> Dict[str, str]: """生成邮件草稿""" await self._init_mcp() result = await mcp_manager.execute( "contact", "generate_email_draft", query=query ) if result.success: return result.data return { "subject": f"Re: {query}", "recipient": "recipient@example.com", "body": "你好,\n\n这是一封自动生成的邮件草稿。" } async def send_email(self, user_id: str, recipient: str, subject: str, body: str) -> bool: """发送邮件""" await self._init_mcp() result = await mcp_manager.execute( "contact", "send_email", user_id=user_id, recipient=recipient, subject=subject, body=body ) return result.success async def sniff_contacts(self, query: str) -> List[Contact]: """智能嗅探联系人""" await self._init_mcp() result = await mcp_manager.execute( "contact", "sniff_contacts", query=query ) if result.success: return result.data return [] # 保持向后兼容的旧方法 def list_contacts_mock(self, user_id: str = "default") -> List[Contact]: """模拟查询(保留用于向后兼容)""" import asyncio try: return asyncio.run(self.list_contacts(user_id)) except Exception: return [] # 全局单例(保持向后兼容) contact_api = ContactAPIClient()