""" 通讯录子图节点 - 使用公共工具版本 Contact Subgraph Nodes - Using Common Tools """ from typing import Dict, Any from datetime import datetime # 公共工具 from ..common import MarkdownFormatter from .state import ContactState, ContactAction, Contact, Email from .api_client import contact_api # 模拟联系人数据库(临时存储) CONTACT_DB = {} def parse_intent(state: ContactState) -> ContactState: """ 解析用户意图节点 """ query_lower = state.user_query.lower() if any(keyword in query_lower for keyword in ["添加", "add", "新建", "save"]): state.action = ContactAction.CONTACT_ADD elif any(keyword in query_lower for keyword in ["联系人", "contact", "list"]): state.action = ContactAction.CONTACT_LIST state.action_params = {"query": state.user_query} elif any(keyword in query_lower for keyword in ["邮件", "email", "inbox"]): state.action = ContactAction.EMAIL_LIST elif any(keyword in query_lower for keyword in ["发送邮件", "send email", "发邮件"]): state.action = ContactAction.EMAIL_SEND else: state.action = ContactAction.SNIFF_CONTACTS return state def list_contacts(state: ContactState) -> ContactState: """ 列出联系人节点 """ state.current_phase = "executing" # 使用 API 客户端 contacts = contact_api.list_contacts(state.user_id) state.contacts = contacts return state def add_contact(state: ContactState) -> ContactState: """ 添加联系人节点 """ state.current_phase = "executing" # 使用 API 客户端(简化添加,实际项目应解析用户输入) new_contact = Contact( id=len(CONTACT_DB) + 1, name="新联系人", email="new@example.com", phone="13800000000", created_at=datetime.now() ) state.current_contact = new_contact return state def list_emails(state: ContactState) -> ContactState: """ 列出邮件节点 """ state.current_phase = "executing" # 使用 API 客户端 emails = contact_api.list_emails(state.user_id) state.emails = emails return state def generate_email_draft(state: ContactState) -> ContactState: """ 生成邮件草稿节点 """ state.current_phase = "executing" # 使用 API 客户端 draft = contact_api.generate_email_draft(state.user_query) state.draft_recipient = draft.get("recipient", "recipient@example.com") state.draft_subject = draft.get("subject", "邮件主题") state.draft_body = draft.get("body", "邮件正文") return state def sniff_contacts(state: ContactState) -> ContactState: """ 嗅探联系人节点 """ state.current_phase = "executing" # 使用 API 客户端 contacts = contact_api.sniff_contacts(state.user_query) state.sniffed_contacts = contacts return state def format_result(state: ContactState) -> ContactState: """ 格式化结果节点(使用公共工具) """ state.current_phase = "formatting" md = MarkdownFormatter() output_lines = [] output_lines.append("┌───────────────────────────────────┐") output_lines.append("│ 📇 通讯录助手 │") output_lines.append("└───────────────────────────────────┘") output_lines.append("") if state.action == ContactAction.CONTACT_LIST and state.contacts: output_lines.append(md.heading("📇 联系人列表", 2)) output_lines.append("") contact_data = [ {"姓名": c.name, "邮箱": c.email, "电话": c.phone or "-"} for c in state.contacts ] output_lines.append(md.table(contact_data)) elif state.action == ContactAction.EMAIL_LIST and state.emails: output_lines.append(md.heading("📬 邮件列表", 2)) output_lines.append("") email_data = [ {"发件人": e.sender, "主题": e.subject, "时间": e.received_at.strftime('%Y-%m-%d %H:%M')} for e in state.emails ] output_lines.append(md.table(email_data)) elif state.action == ContactAction.EMAIL_SEND and state.draft_subject: output_lines.append(md.heading("📝 邮件草稿", 2)) output_lines.append("") output_lines.append(f"**收件人**: {state.draft_recipient}") output_lines.append(f"**主题**: {state.draft_subject}") output_lines.append("") output_lines.append(md.quote(state.draft_body)) elif state.action == ContactAction.SNIFF_CONTACTS and state.sniffed_contacts: output_lines.append(md.heading("🔍 发现的联系人", 2)) output_lines.append("") contact_data = [ {"姓名": c.name, "邮箱": c.email} for c in state.sniffed_contacts ] output_lines.append(md.table(contact_data)) else: output_lines.append(md.heading("✨ 操作完成", 2)) output_lines.append("您的请求已处理。") # 页脚提示 output_lines.append("") output_lines.append("---") output_lines.append("💡 提示:您可以继续查询联系人、查看邮件,或者生成邮件草稿!") state.final_result = "\n".join(output_lines) state.success = True state.current_phase = "completed" return state def human_review(state: ContactState) -> ContactState: """ 人工审核节点(用于邮件草稿) """ state.current_phase = "reviewing" # 标记需要审核,等待用户决定 state.needs_approval = True return state def send_email(state: ContactState) -> ContactState: """ 发送邮件节点 """ state.current_phase = "executing" # 使用 API 客户端发送邮件 success = contact_api.send_email( state.user_id, state.draft_recipient, state.draft_subject, state.draft_body ) state.success = success return state def should_continue(state: ContactState) -> str: """ 条件路由函数:根据 action 和状态决定下一个节点 """ # 如果是从 human_review 来的,根据审核状态决定 if state.current_phase == "reviewing": if state.needs_approval: # 这里会等待用户操作,实际运行时通过 checkpointer 或后端 API 处理 return "format_result" else: return "send_email" # 普通路由 action = state.action if action == ContactAction.CONTACT_LIST: return "list_contacts" elif action == ContactAction.CONTACT_ADD: return "add_contact" elif action == ContactAction.EMAIL_LIST: return "list_emails" elif action == ContactAction.EMAIL_SEND: return "generate_email_draft" elif action == ContactAction.SNIFF_CONTACTS: return "sniff_contacts" else: return "format_result"