Files
ailine/backend/app/agent_subgraphs/contact/nodes.py
root e6337eb0fc
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m10s
feat: 完善子图,添加路由函数和审核节点
2026-04-25 20:46:30 +08:00

231 lines
7.0 KiB
Python

"""
通讯录子图节点 - 使用公共工具版本
Contact Subgraph Nodes - Using Common Tools
"""
from typing import Dict, Any
from datetime import datetime
# 公共工具
from ..common import MarkdownFormatter
from .state import ContactState, ContactAction, Contact, Email
from .api_client import contact_api
# 模拟联系人数据库(临时存储)
CONTACT_DB = {}
def parse_intent(state: ContactState) -> ContactState:
"""
解析用户意图节点
"""
query_lower = state.user_query.lower()
if any(keyword in query_lower for keyword in ["添加", "add", "新建", "save"]):
state.action = ContactAction.CONTACT_ADD
elif any(keyword in query_lower for keyword in ["联系人", "contact", "list"]):
state.action = ContactAction.CONTACT_LIST
state.action_params = {"query": state.user_query}
elif any(keyword in query_lower for keyword in ["邮件", "email", "inbox"]):
state.action = ContactAction.EMAIL_LIST
elif any(keyword in query_lower for keyword in ["发送邮件", "send email", "发邮件"]):
state.action = ContactAction.EMAIL_SEND
else:
state.action = ContactAction.SNIFF_CONTACTS
return state
def list_contacts(state: ContactState) -> ContactState:
"""
列出联系人节点
"""
state.current_phase = "executing"
# 使用 API 客户端
contacts = contact_api.list_contacts(state.user_id)
state.contacts = contacts
return state
def add_contact(state: ContactState) -> ContactState:
"""
添加联系人节点
"""
state.current_phase = "executing"
# 使用 API 客户端(简化添加,实际项目应解析用户输入)
new_contact = Contact(
id=len(CONTACT_DB) + 1,
name="新联系人",
email="new@example.com",
phone="13800000000",
created_at=datetime.now()
)
state.current_contact = new_contact
return state
def list_emails(state: ContactState) -> ContactState:
"""
列出邮件节点
"""
state.current_phase = "executing"
# 使用 API 客户端
emails = contact_api.list_emails(state.user_id)
state.emails = emails
return state
def generate_email_draft(state: ContactState) -> ContactState:
"""
生成邮件草稿节点
"""
state.current_phase = "executing"
# 使用 API 客户端
draft = contact_api.generate_email_draft(state.user_query)
state.draft_recipient = draft.get("recipient", "recipient@example.com")
state.draft_subject = draft.get("subject", "邮件主题")
state.draft_body = draft.get("body", "邮件正文")
return state
def sniff_contacts(state: ContactState) -> ContactState:
"""
嗅探联系人节点
"""
state.current_phase = "executing"
# 使用 API 客户端
contacts = contact_api.sniff_contacts(state.user_query)
state.sniffed_contacts = contacts
return state
def format_result(state: ContactState) -> ContactState:
"""
格式化结果节点(使用公共工具)
"""
state.current_phase = "formatting"
md = MarkdownFormatter()
output_lines = []
output_lines.append("┌───────────────────────────────────┐")
output_lines.append("│ 📇 通讯录助手 │")
output_lines.append("└───────────────────────────────────┘")
output_lines.append("")
if state.action == ContactAction.CONTACT_LIST and state.contacts:
output_lines.append(md.heading("📇 联系人列表", 2))
output_lines.append("")
contact_data = [
{"姓名": c.name, "邮箱": c.email, "电话": c.phone or "-"}
for c in state.contacts
]
output_lines.append(md.table(contact_data))
elif state.action == ContactAction.EMAIL_LIST and state.emails:
output_lines.append(md.heading("📬 邮件列表", 2))
output_lines.append("")
email_data = [
{"发件人": e.sender, "主题": e.subject, "时间": e.received_at.strftime('%Y-%m-%d %H:%M')}
for e in state.emails
]
output_lines.append(md.table(email_data))
elif state.action == ContactAction.EMAIL_SEND and state.draft_subject:
output_lines.append(md.heading("📝 邮件草稿", 2))
output_lines.append("")
output_lines.append(f"**收件人**: {state.draft_recipient}")
output_lines.append(f"**主题**: {state.draft_subject}")
output_lines.append("")
output_lines.append(md.quote(state.draft_body))
elif state.action == ContactAction.SNIFF_CONTACTS and state.sniffed_contacts:
output_lines.append(md.heading("🔍 发现的联系人", 2))
output_lines.append("")
contact_data = [
{"姓名": c.name, "邮箱": c.email}
for c in state.sniffed_contacts
]
output_lines.append(md.table(contact_data))
else:
output_lines.append(md.heading("✨ 操作完成", 2))
output_lines.append("您的请求已处理。")
# 页脚提示
output_lines.append("")
output_lines.append("---")
output_lines.append("💡 提示:您可以继续查询联系人、查看邮件,或者生成邮件草稿!")
state.final_result = "\n".join(output_lines)
state.success = True
state.current_phase = "completed"
return state
def human_review(state: ContactState) -> ContactState:
"""
人工审核节点(用于邮件草稿)
"""
state.current_phase = "reviewing"
# 标记需要审核,等待用户决定
state.needs_approval = True
return state
def send_email(state: ContactState) -> ContactState:
"""
发送邮件节点
"""
state.current_phase = "executing"
# 使用 API 客户端发送邮件
success = contact_api.send_email(
state.user_id,
state.draft_recipient,
state.draft_subject,
state.draft_body
)
state.success = success
return state
def should_continue(state: ContactState) -> str:
"""
条件路由函数:根据 action 和状态决定下一个节点
"""
# 如果是从 human_review 来的,根据审核状态决定
if state.current_phase == "reviewing":
if state.needs_approval:
# 这里会等待用户操作,实际运行时通过 checkpointer 或后端 API 处理
return "format_result"
else:
return "send_email"
# 普通路由
action = state.action
if action == ContactAction.CONTACT_LIST:
return "list_contacts"
elif action == ContactAction.CONTACT_ADD:
return "add_contact"
elif action == ContactAction.EMAIL_LIST:
return "list_emails"
elif action == ContactAction.EMAIL_SEND:
return "generate_email_draft"
elif action == ContactAction.SNIFF_CONTACTS:
return "sniff_contacts"
else:
return "format_result"