""" 通讯录子图 API 调用工具 支持模拟数据和真实数据库两种模式 """ from typing import Dict, Any, Optional, List from datetime import datetime from dataclasses import dataclass from .state import Contact, Email # ========== 模拟数据(保留作为备选)========== # 模拟数据库 MOCK_CONTACTS_DB = {} MOCK_EMAILS_DB = [] @dataclass class ContactAPIClient: """ 通讯录 API 客户端 - 支持真实数据库和模拟模式 使用方式: 1. 真实数据库模式:传入 conn 参数 2. 模拟模式:不传入 conn,或 conn 为 None """ def __init__(self, conn=None): """ 初始化 Args: conn: 数据库连接(来自 checkpointer.conn),为 None 时使用模拟模式 """ self.conn = conn self._use_db = conn is not None if self._use_db: try: from ...db.models import ContactRepository, ContactEntity self._repo = ContactRepository(conn) except Exception as e: print(f"Repository 初始化失败,回退到模拟模式: {e}") self._use_db = False self._repo = None # ========== 真实数据库方法 ========== async def list_contacts_db(self, user_id: str = "default") -> List[Contact]: """真实数据库:获取联系人列表""" if not self._repo: return await self.list_contacts_mock(user_id) entities = await self._repo.list_by_user(user_id) return [ Contact( id=e.id, name=e.name, phone=e.phone, email=e.email, company=e.company, position=e.position, created_at=e.created_at ) for e in entities ] async def add_contact_db(self, user_id: str, contact: Contact) -> bool: """真实数据库:添加联系人""" if not self._repo: return await self.save_contact_mock(user_id, contact) from ...db.models import ContactEntity entity = ContactEntity( user_id=user_id, name=contact.name, phone=contact.phone, email=contact.email, company=contact.company, position=contact.position, created_at=contact.created_at or datetime.now().isoformat() ) await self._repo.insert(entity) return True # ========== 模拟数据方法(保留)========== def list_contacts_mock(self, user_id: str = "default") -> List[Contact]: """模拟查询联系人列表""" if user_id not in MOCK_CONTACTS_DB: # 初始化一些示例数据 MOCK_CONTACTS_DB[user_id] = [ Contact( id="1", name="张三", phone="13800138000", email="zhangsan@example.com", company="科技公司", position="工程师", created_at=datetime.now().isoformat() ), Contact( id="2", name="李四", phone="13900139000", email="lisi@example.com", company="贸易公司", position="经理", created_at=datetime.now().isoformat() ), Contact( id="3", name="王五", phone="13700137000", email="wangwu@example.com", company="咨询公司", position="顾问", created_at=datetime.now().isoformat() ), ] return MOCK_CONTACTS_DB[user_id] def extract_contact_info_mock(self, query: str) -> Optional[Dict[str, Any]]: """模拟从查询中提取联系人信息""" import re # 提取邮箱 email_match = re.search(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', query) # 提取手机号 phone_match = re.search(r'1[3-9]\d{9}', query) # 提取姓名(简单匹配) if any(keyword in query for keyword in ["添加", "add"]): name = "未知" clean_query = query if email_match: clean_query = clean_query.replace(email_match.group(), "") if phone_match: clean_query = clean_query.replace(phone_match.group(), "") clean_query = clean_query.replace("添加", "").replace("add", "").replace("联系人", "").strip() if clean_query: name = clean_query return { "name": name, "phone": phone_match.group() if phone_match else "", "email": email_match.group() if email_match else "", "created_at": datetime.now().isoformat() } return None def save_contact_mock(self, user_id: str, contact: Contact) -> bool: """模拟保存联系人""" if user_id not in MOCK_CONTACTS_DB: MOCK_CONTACTS_DB[user_id] = [] if not contact.id: contact.id = str(len(MOCK_CONTACTS_DB[user_id]) + 1) MOCK_CONTACTS_DB[user_id].append(contact) return True def list_emails_mock(self) -> List[Email]: """模拟查询邮件列表""" global MOCK_EMAILS_DB if not MOCK_EMAILS_DB: MOCK_EMAILS_DB = [ Email( id="1", subject="会议邀请:AI 技术分享", sender="admin@example.com", recipients=["user@example.com"], date=datetime.now().isoformat(), body="你好,下周一将举办 AI 技术分享会,欢迎参加。" ), Email( id="2", subject="项目进度更新", sender="manager@example.com", recipients=["user@example.com"], date=datetime.now().isoformat(), body="项目进度良好,继续保持。" ), ] return MOCK_EMAILS_DB def generate_email_draft_mock(self, query: str) -> Dict[str, str]: """模拟生成邮件草稿""" return { "subject": f"Re: {query}", "recipient": "recipient@example.com", "body": "你好,\n\n这是一封自动生成的邮件草稿。\n\n此致,\n你的助手" } def send_email_mock(self, recipient: str, subject: str, body: str) -> Dict[str, Any]: """模拟发送邮件""" global MOCK_EMAILS_DB MOCK_EMAILS_DB.append( Email( id=str(len(MOCK_EMAILS_DB) + 1), subject=subject, sender="me@example.com", recipients=[recipient], date=datetime.now().isoformat(), body=body ) ) return { "success": True, "message": "邮件发送成功" } def sniff_contacts_mock(self, query: str) -> Dict[str, Any]: """模拟智能嗅探联系人""" import re emails = re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', query) phones = re.findall(r'1[3-9]\d{9}', query) contacts = [] for i, email in enumerate(emails): contacts.append({ "name": f"联系人{i+1}", "email": email, "phone": phones[i] if i < len(phones) else "" }) return { "contacts": contacts, "count": len(contacts), "suggestion": "是否添加这些联系人?" } # ========== 公共方法(自动选择模式)========== async def list_contacts(self, user_id: str = "default") -> List[Contact]: """获取联系人列表(自动选择数据库或模拟模式)""" if self._use_db: return await self.list_contacts_db(user_id) return self.list_contacts_mock(user_id) async def add_contact(self, user_id: str, contact: Contact) -> bool: """添加联系人(自动选择数据库或模拟模式)""" if self._use_db: return await self.add_contact_db(user_id, contact) return self.save_contact_mock(user_id, contact) async def list_emails(self, user_id: str = "default") -> List[Email]: """查询邮件列表(目前用模拟)""" return self.list_emails_mock() async def generate_email_draft(self, query: str) -> Dict[str, str]: """生成邮件草稿(目前用模拟)""" return self.generate_email_draft_mock(query) async def send_email(self, user_id: str, recipient: str, subject: str, body: str) -> bool: """发送邮件(目前用模拟)""" result = self.send_email_mock(recipient, subject, body) return result.get("success", False) async def sniff_contacts(self, query: str) -> List[Contact]: """智能嗅探联系人(目前用模拟)""" result = self.sniff_contacts_mock(query) contact_dicts = result.get("contacts", []) return [ Contact( id=str(i+1), name=c.get("name", ""), phone=c.get("phone", ""), email=c.get("email", ""), company="", position="", created_at=datetime.now().isoformat() ) for i, c in enumerate(contact_dicts) ] # 全局实例(模拟模式,保留向后兼容) contact_api = ContactAPIClient()