Files
ailine/backend/app/subgraphs/contact/api_client.py
root 3ae9daa01a
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m44s
导入方式修改
2026-05-05 23:17:00 +08:00

119 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
通讯录子图 API 调用工具使用MCP统一接口
"""
from typing import Dict, Any, Optional, List
from datetime import datetime
from dataclasses import dataclass
from .state import Contact, Email
from ...mcp.mcp_manager import mcp_manager
from ...mcp.adapters import ContactAdapter
@dataclass
class ContactAPIClient:
"""
通讯录 API 客户端 - 使用MCP统一接口
保持向后兼容内部使用MCP适配器
"""
def __init__(self, conn=None):
"""
初始化使用延迟初始化MCP在首次使用时初始化
Args:
conn: 数据库连接(保留用于向后兼容)
"""
self.conn = conn
self._mcp_initialized = False # 延迟初始化标志
async def _init_mcp(self):
"""初始化MCP系统延迟初始化"""
if self._mcp_initialized:
return # 已初始化,跳过
if not mcp_manager.get_adapter("contact"):
# 获取repository如果有
repo = None
if self.conn:
try:
from ...db.models import ContactRepository
repo = ContactRepository(self.conn)
except Exception:
pass
mcp_manager.register_adapter(ContactAdapter(contact_repo=repo))
await mcp_manager.initialize()
self._mcp_initialized = True
async def list_contacts(self, user_id: str = "default") -> List[Contact]:
"""获取联系人列表"""
await self._init_mcp()
result = await mcp_manager.execute("contact", "list_contacts", user_id=user_id)
if result.success:
return result.data
return []
async def add_contact(self, user_id: str, contact: Contact) -> bool:
"""添加联系人"""
await self._init_mcp()
result = await mcp_manager.execute(
"contact", "add_contact",
user_id=user_id, contact=contact
)
return result.success and result.data
async def list_emails(self, user_id: str = "default") -> List[Email]:
"""查询邮件列表"""
await self._init_mcp()
result = await mcp_manager.execute("contact", "list_emails", user_id=user_id)
if result.success:
return result.data
return []
async def generate_email_draft(self, query: str) -> Dict[str, str]:
"""生成邮件草稿"""
await self._init_mcp()
result = await mcp_manager.execute(
"contact", "generate_email_draft", query=query
)
if result.success:
return result.data
return {
"subject": f"Re: {query}",
"recipient": "recipient@example.com",
"body": "你好,\n\n这是一封自动生成的邮件草稿。"
}
async def send_email(self, user_id: str, recipient: str, subject: str, body: str) -> bool:
"""发送邮件"""
await self._init_mcp()
result = await mcp_manager.execute(
"contact", "send_email",
user_id=user_id, recipient=recipient, subject=subject, body=body
)
return result.success
async def sniff_contacts(self, query: str) -> List[Contact]:
"""智能嗅探联系人"""
await self._init_mcp()
result = await mcp_manager.execute(
"contact", "sniff_contacts", query=query
)
if result.success:
return result.data
return []
# 保持向后兼容的旧方法
def list_contacts_mock(self, user_id: str = "default") -> List[Contact]:
"""模拟查询(保留用于向后兼容)"""
import asyncio
try:
return asyncio.run(self.list_contacts(user_id))
except Exception:
return []
# 全局单例(保持向后兼容)
contact_api = ContactAPIClient()