Files
ailine/backend/app/agent_subgraphs/contact/nodes.py
root a14744f18b
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m5s
feat: 完善词典子图,添加API调用和前端格式化工具
- 完善词典子图:添加生词本功能
- 创建API调用工具:dictionary_api
- 添加前端格式化展示工具:result_formatter.py
- 创建通讯录和资讯子图的基本结构
- 更新主图状态结构,添加MainGraphState
- 添加subgraph_builder.py用于子图集成
2026-04-25 18:29:23 +08:00

218 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
通讯录子图节点
Contact Subgraph Nodes
"""
from typing import Dict, Any
from datetime import datetime
from .state import ContactState, ContactAction, Contact, Email
def parse_intent(state: ContactState) -> ContactState:
"""
解析用户意图节点
确定用户想做什么操作
"""
state.current_phase = "intent_parsing"
query_lower = state.user_query.lower()
# 简单的关键词匹配真实场景应该用LLM
if any(keyword in query_lower for keyword in ["联系人", "contact", "list"]):
state.action = ContactAction.CONTACT_LIST
state.action_params = {"query": state.user_query}
elif any(keyword in query_lower for keyword in ["添加", "add", "新建", "save"]):
state.action = ContactAction.CONTACT_ADD
# TODO: 提取联系人信息
elif any(keyword in query_lower for keyword in ["邮件", "email", "inbox"]):
state.action = ContactAction.EMAIL_LIST
elif any(keyword in query_lower for keyword in ["发送邮件", "send email", "发邮件"]):
state.action = ContactAction.EMAIL_SEND
else:
state.action = ContactAction.SNIFF_CONTACTS
return state
def list_contacts(state: ContactState) -> ContactState:
"""
列出联系人节点
"""
state.current_phase = "listing_contacts"
# TODO: 从数据库查询
# 暂时返回空列表
state.contacts = []
state.success = True
state.final_result = "暂无联系人"
return state
def add_contact(state: ContactState) -> ContactState:
"""
添加联系人节点
"""
state.current_phase = "adding_contact"
# TODO: 实现添加联系人逻辑
state.success = True
state.final_result = "联系人添加成功(待实现)"
return state
def list_emails(state: ContactState) -> ContactState:
"""
列出邮件节点
"""
state.current_phase = "listing_emails"
# TODO: 从IMAP查询
state.emails = []
state.success = True
state.final_result = "暂无邮件"
return state
def generate_email_draft(state: ContactState) -> ContactState:
"""
生成邮件草稿节点
"""
state.current_phase = "generating_draft"
# TODO: 使用LLM生成邮件草稿
state.draft_subject = "邮件主题"
state.draft_recipient = "recipient@example.com"
state.draft_body = "这是邮件内容..."
# 进入人工审核状态
state.pending_review = True
state.review_type = "email_send"
state.review_prompt = "请确认是否发送此邮件"
return state
def human_review(state: ContactState) -> ContactState:
"""
人工审核节点
这里会让用户确认/修改
"""
state.current_phase = "reviewing"
# 注意真实的LangGraph会在这里使用interrupt()暂停
# 这里我们只设置状态,让外层处理
if state.review_approved is True:
state.pending_review = False
elif state.review_approved is False:
state.pending_review = False
state.error_message = "发送已取消"
state.success = False
return state
def send_email(state: ContactState) -> ContactState:
"""
发送邮件节点
"""
state.current_phase = "sending_email"
# TODO: 使用SMTP发送邮件
state.success = True
state.final_result = "邮件发送成功(待实现)"
return state
def sniff_contacts(state: ContactState) -> ContactState:
"""
智能嗅探节点
从对话中提取可能的联系人信息
"""
state.current_phase = "sniffing"
# TODO: 实现智能嗅探
state.success = True
state.final_result = "智能嗅探完成(待实现)"
return state
def format_result(state: ContactState) -> ContactState:
"""
格式化结果节点
"""
state.current_phase = "formatting"
# 根据不同action生成不同的格式化输出
if state.action == ContactAction.CONTACT_LIST:
if state.contacts:
result = "联系人列表:\n"
for i, contact in enumerate(state.contacts, 1):
result += f"{i}. {contact.name}"
if contact.phone:
result += f" - {contact.phone}"
if contact.email:
result += f" ({contact.email})"
result += "\n"
else:
result = "暂无联系人"
state.final_result = result
elif state.action == ContactAction.EMAIL_LIST:
if state.emails:
result = "邮件列表:\n"
for i, email in enumerate(state.emails[:10], 1):
result += f"{i}. {email.subject} - {email.sender}\n"
else:
result = "暂无邮件"
state.final_result = result
else:
if not state.final_result:
state.final_result = "操作完成"
state.current_phase = "done"
return state
def should_continue(state: ContactState) -> str:
"""
条件路由:决定下一步该做什么
"""
if state.error_message:
return "finalize"
# 如果在审核中,等待
if state.pending_review:
return "human_review"
# 根据action路由
if state.action == ContactAction.NONE:
return "parse_intent"
elif state.action == ContactAction.CONTACT_LIST:
return "list_contacts"
elif state.action == ContactAction.CONTACT_ADD:
return "add_contact"
elif state.action == ContactAction.EMAIL_LIST:
return "list_emails"
elif state.action == ContactAction.EMAIL_SEND:
if state.pending_review:
return "human_review"
elif state.draft_subject:
return "send_email"
else:
return "generate_email_draft"
elif state.action == ContactAction.SNIFF_CONTACTS:
return "sniff_contacts"
else:
return "format_result"