Files
ailine/backend/app/subgraphs/contact/api_client.py

119 lines
3.9 KiB
Python
Raw Normal View History

"""
通讯录子图 API 调用工具使用MCP统一接口
"""
from typing import Dict, Any, Optional, List
from datetime import datetime
from dataclasses import dataclass
from .state import Contact, Email
from ...mcp.mcp_manager import mcp_manager
from ...mcp.adapters import ContactAdapter
@dataclass
class ContactAPIClient:
"""
通讯录 API 客户端 - 使用MCP统一接口
保持向后兼容内部使用MCP适配器
"""
def __init__(self, conn=None):
"""
2026-05-05 23:17:00 +08:00
初始化使用延迟初始化MCP在首次使用时初始化
Args:
conn: 数据库连接保留用于向后兼容
"""
self.conn = conn
2026-05-05 23:17:00 +08:00
self._mcp_initialized = False # 延迟初始化标志
async def _init_mcp(self):
2026-05-05 23:17:00 +08:00
"""初始化MCP系统延迟初始化"""
if self._mcp_initialized:
return # 已初始化,跳过
if not mcp_manager.get_adapter("contact"):
# 获取repository如果有
repo = None
if self.conn:
try:
from ...db.models import ContactRepository
repo = ContactRepository(self.conn)
except Exception:
pass
2026-05-05 23:17:00 +08:00
mcp_manager.register_adapter(ContactAdapter(contact_repo=repo))
await mcp_manager.initialize()
2026-05-05 23:17:00 +08:00
self._mcp_initialized = True
async def list_contacts(self, user_id: str = "default") -> List[Contact]:
"""获取联系人列表"""
await self._init_mcp()
result = await mcp_manager.execute("contact", "list_contacts", user_id=user_id)
if result.success:
return result.data
return []
async def add_contact(self, user_id: str, contact: Contact) -> bool:
"""添加联系人"""
await self._init_mcp()
result = await mcp_manager.execute(
"contact", "add_contact",
user_id=user_id, contact=contact
)
return result.success and result.data
async def list_emails(self, user_id: str = "default") -> List[Email]:
"""查询邮件列表"""
await self._init_mcp()
result = await mcp_manager.execute("contact", "list_emails", user_id=user_id)
if result.success:
return result.data
return []
async def generate_email_draft(self, query: str) -> Dict[str, str]:
"""生成邮件草稿"""
await self._init_mcp()
result = await mcp_manager.execute(
"contact", "generate_email_draft", query=query
)
if result.success:
return result.data
return {
"subject": f"Re: {query}",
"recipient": "recipient@example.com",
"body": "你好,\n\n这是一封自动生成的邮件草稿。"
}
async def send_email(self, user_id: str, recipient: str, subject: str, body: str) -> bool:
"""发送邮件"""
await self._init_mcp()
result = await mcp_manager.execute(
"contact", "send_email",
user_id=user_id, recipient=recipient, subject=subject, body=body
)
return result.success
async def sniff_contacts(self, query: str) -> List[Contact]:
"""智能嗅探联系人"""
await self._init_mcp()
result = await mcp_manager.execute(
"contact", "sniff_contacts", query=query
)
if result.success:
return result.data
return []
# 保持向后兼容的旧方法
def list_contacts_mock(self, user_id: str = "default") -> List[Contact]:
"""模拟查询(保留用于向后兼容)"""
import asyncio
try:
return asyncio.run(self.list_contacts(user_id))
except Exception:
return []
# 全局单例(保持向后兼容)
contact_api = ContactAPIClient()