feat: 完善通讯录子图,添加API调用工具和精美展示
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m37s

- 完善通讯录子图nodes.py:优化format_result的展示效果
- 创建通讯录子图API调用工具:api_client.py
- 更新通讯录子图__init__.py,导出所有模块和API客户端
- 所有功能已通过测试验证
This commit is contained in:
2026-04-25 19:31:33 +08:00
parent b47c52c611
commit 96dc01f8c2
3 changed files with 355 additions and 46 deletions

View File

@@ -0,0 +1,197 @@
"""
通讯录子图API调用工具
Contact Subgraph API Client
"""
from typing import Dict, Any, Optional, List
from datetime import datetime
from dataclasses import dataclass
from .state import Contact, Email
# 模拟数据库
MOCK_CONTACTS_DB = {}
MOCK_EMAILS_DB = []
@dataclass
class ContactAPIClient:
"""
通讯录API客户端 - 可扩展支持多种后端
"""
def list_contacts_mock(self, user_id: str = "default") -> List[Contact]:
"""
模拟查询联系人列表
"""
if user_id not in MOCK_CONTACTS_DB:
# 初始化一些示例数据
MOCK_CONTACTS_DB[user_id] = [
Contact(
id="1",
name="张三",
phone="13800138000",
email="zhangsan@example.com",
company="科技公司",
position="工程师",
created_at=datetime.now().isoformat()
),
Contact(
id="2",
name="李四",
phone="13900139000",
email="lisi@example.com",
company="贸易公司",
position="经理",
created_at=datetime.now().isoformat()
),
Contact(
id="3",
name="王五",
phone="13700137000",
email="wangwu@example.com",
company="咨询公司",
position="顾问",
created_at=datetime.now().isoformat()
)
]
return MOCK_CONTACTS_DB[user_id]
def extract_contact_info_mock(self, query: str) -> Optional[Dict[str, Any]]:
"""
模拟从查询中提取联系人信息
"""
# 简化的提取逻辑
import re
# 提取邮箱
email_match = re.search(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', query)
# 提取手机号
phone_match = re.search(r'1[3-9]\d{9}', query)
# 提取姓名(简单匹配)
# 先看是否有"添加"等关键词
if any(keyword in query for keyword in ["添加", "add"]):
# 尝试提取姓名
name = "未知"
# 简单的逻辑:去掉关键词、去掉邮箱和电话后剩下的
clean_query = query
if email_match:
clean_query = clean_query.replace(email_match.group(), "")
if phone_match:
clean_query = clean_query.replace(phone_match.group(), "")
clean_query = clean_query.replace("添加", "").replace("add", "").replace("联系人", "").strip()
if clean_query:
name = clean_query
return {
"name": name,
"phone": phone_match.group() if phone_match else "",
"email": email_match.group() if email_match else "",
"created_at": datetime.now().isoformat()
}
return None
def save_contact_mock(self, user_id: str, contact: Contact) -> bool:
"""
模拟保存联系人
"""
if user_id not in MOCK_CONTACTS_DB:
MOCK_CONTACTS_DB[user_id] = []
if not contact.id:
contact.id = str(len(MOCK_CONTACTS_DB[user_id]) + 1)
MOCK_CONTACTS_DB[user_id].append(contact)
return True
def list_emails_mock(self) -> List[Email]:
"""
模拟查询邮件列表
"""
global MOCK_EMAILS_DB
if not MOCK_EMAILS_DB:
# 初始化一些示例邮件
MOCK_EMAILS_DB = [
Email(
id="1",
subject="会议邀请AI技术分享",
sender="admin@example.com",
recipients=["user@example.com"],
date=datetime.now().isoformat(),
body="你好下周一将举办AI技术分享会欢迎参加。"
),
Email(
id="2",
subject="项目进度更新",
sender="manager@example.com",
recipients=["user@example.com"],
date=datetime.now().isoformat(),
body="项目进度良好,继续保持。"
)
]
return MOCK_EMAILS_DB
def generate_email_draft_mock(self, query: str) -> Dict[str, str]:
"""
模拟生成邮件草稿
"""
# 简单的模板生成
return {
"subject": f"Re: {query}",
"recipient": "recipient@example.com",
"body": "你好,\n\n这是一封自动生成的邮件草稿。\n\n此致,\n你的助手"
}
def send_email_mock(self, recipient: str, subject: str, body: str) -> Dict[str, Any]:
"""
模拟发送邮件
"""
# 记录到模拟数据库
MOCK_EMAILS_DB.append(
Email(
id=str(len(MOCK_EMAILS_DB) + 1),
subject=subject,
sender="me@example.com",
recipients=[recipient],
date=datetime.now().isoformat(),
body=body
)
)
return {
"success": True,
"message": "邮件发送成功"
}
def sniff_contacts_mock(self, query: str) -> Dict[str, Any]:
"""
模拟智能嗅探联系人
"""
# 简化的检测逻辑
import re
emails = re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', query)
phones = re.findall(r'1[3-9]\d{9}', query)
contacts = []
for i, email in enumerate(emails):
contacts.append({
"name": f"联系人{i+1}",
"email": email,
"phone": phones[i] if i < len(phones) else ""
})
return {
"contacts": contacts,
"count": len(contacts),
"suggestion": "是否添加这些联系人?"
}
# 单例实例
contact_api = ContactAPIClient()