diff --git a/backend/app/agent_subgraphs/contact/graph.py b/backend/app/agent_subgraphs/contact/graph.py index 80266b3..0c00360 100644 --- a/backend/app/agent_subgraphs/contact/graph.py +++ b/backend/app/agent_subgraphs/contact/graph.py @@ -1,32 +1,55 @@ """ 通讯录子图构建器 Contact Subgraph Builder +支持 API 注入的工厂模式 """ from langgraph.graph import StateGraph, START, END from .state import ContactState -from .nodes import ( - parse_intent, - list_contacts, - add_contact, - list_emails, - generate_email_draft, - human_review, - send_email, - sniff_contacts, - format_result, - should_continue -) +from .nodes import create_contact_nodes -def build_contact_subgraph() -> StateGraph: +def build_contact_subgraph(contact_api=None): """ - 构建通讯录子图 + 构建通讯录子图(工厂模式) + + Args: + contact_api: 可选的 ContactAPIClient 实例(支持真实数据库或模拟模式) + 不传入则使用默认模拟 API(向后兼容) Returns: 配置好的 StateGraph """ + # 创建节点(传入 API) + nodes = create_contact_nodes(contact_api) if contact_api else None + + # 如果没有传入 API,使用向后兼容的导入 + if nodes is None: + from .nodes import ( + parse_intent, + list_contacts, + add_contact, + list_emails, + generate_email_draft, + human_review, + send_email, + sniff_contacts, + format_result, + should_continue + ) + else: + parse_intent = nodes["parse_intent"] + list_contacts = nodes["list_contacts"] + add_contact = nodes["add_contact"] + list_emails = nodes["list_emails"] + generate_email_draft = nodes["generate_email_draft"] + human_review = nodes["human_review"] + send_email = nodes["send_email"] + sniff_contacts = nodes["sniff_contacts"] + format_result = nodes["format_result"] + should_continue = nodes["should_continue"] + # 创建图 graph = StateGraph(ContactState) diff --git a/backend/app/agent_subgraphs/contact/nodes.py b/backend/app/agent_subgraphs/contact/nodes.py index 571fb44..c8a62ba 100644 --- a/backend/app/agent_subgraphs/contact/nodes.py +++ b/backend/app/agent_subgraphs/contact/nodes.py @@ -1,6 +1,7 @@ """ 通讯录子图节点 - 使用公共工具版本 Contact Subgraph Nodes - Using Common Tools +支持 async 和 API 注入 """ from typing import Dict, Any @@ -10,221 +11,268 @@ from datetime import datetime from ..common import MarkdownFormatter from .state import ContactState, ContactAction, Contact, Email -from .api_client import contact_api +from .api_client import ContactAPIClient -# 模拟联系人数据库(临时存储) +# 模拟联系人数据库(临时存储,保留作为备选) CONTACT_DB = {} -def parse_intent(state: ContactState) -> ContactState: +def create_contact_nodes(contact_api: ContactAPIClient): """ - 解析用户意图节点 + 创建通讯录子图节点工厂函数 + + Args: + contact_api: 已初始化的 ContactAPIClient(支持真实数据库或模拟模式) + + Returns: + 节点函数字典 """ - query_lower = state.user_query.lower() - if any(keyword in query_lower for keyword in ["添加", "add", "新建", "save"]): - state.action = ContactAction.CONTACT_ADD - elif any(keyword in query_lower for keyword in ["联系人", "contact", "list"]): - state.action = ContactAction.CONTACT_LIST - state.action_params = {"query": state.user_query} - elif any(keyword in query_lower for keyword in ["邮件", "email", "inbox"]): - state.action = ContactAction.EMAIL_LIST - elif any(keyword in query_lower for keyword in ["发送邮件", "send email", "发邮件"]): - state.action = ContactAction.EMAIL_SEND - else: - state.action = ContactAction.SNIFF_CONTACTS - - return state - - -def list_contacts(state: ContactState) -> ContactState: - """ - 列出联系人节点 - """ - state.current_phase = "executing" - - # 使用 API 客户端 - contacts = contact_api.list_contacts(state.user_id) - state.contacts = contacts - - return state - - -def add_contact(state: ContactState) -> ContactState: - """ - 添加联系人节点 - """ - state.current_phase = "executing" - - # 使用 API 客户端(简化添加,实际项目应解析用户输入) - new_contact = Contact( - id=len(CONTACT_DB) + 1, - name="新联系人", - email="new@example.com", - phone="13800000000", - created_at=datetime.now() - ) - state.current_contact = new_contact - - return state - - -def list_emails(state: ContactState) -> ContactState: - """ - 列出邮件节点 - """ - state.current_phase = "executing" - - # 使用 API 客户端 - emails = contact_api.list_emails(state.user_id) - state.emails = emails - - return state - - -def generate_email_draft(state: ContactState) -> ContactState: - """ - 生成邮件草稿节点 - """ - state.current_phase = "executing" - - # 使用 API 客户端 - draft = contact_api.generate_email_draft(state.user_query) - state.draft_recipient = draft.get("recipient", "recipient@example.com") - state.draft_subject = draft.get("subject", "邮件主题") - state.draft_body = draft.get("body", "邮件正文") - - return state - - -def sniff_contacts(state: ContactState) -> ContactState: - """ - 嗅探联系人节点 - """ - state.current_phase = "executing" - - # 使用 API 客户端 - contacts = contact_api.sniff_contacts(state.user_query) - state.sniffed_contacts = contacts - - return state - - -def format_result(state: ContactState) -> ContactState: - """ - 格式化结果节点(使用公共工具) - """ - state.current_phase = "formatting" - - md = MarkdownFormatter() - output_lines = [] - - output_lines.append("┌───────────────────────────────────┐") - output_lines.append("│ 📇 通讯录助手 │") - output_lines.append("└───────────────────────────────────┘") - output_lines.append("") - - if state.action == ContactAction.CONTACT_LIST and state.contacts: - output_lines.append(md.heading("📇 联系人列表", 2)) - output_lines.append("") - contact_data = [ - {"姓名": c.name, "邮箱": c.email, "电话": c.phone or "-"} - for c in state.contacts - ] - output_lines.append(md.table(contact_data)) - - elif state.action == ContactAction.EMAIL_LIST and state.emails: - output_lines.append(md.heading("📬 邮件列表", 2)) - output_lines.append("") - email_data = [ - {"发件人": e.sender, "主题": e.subject, "时间": e.received_at.strftime('%Y-%m-%d %H:%M')} - for e in state.emails - ] - output_lines.append(md.table(email_data)) - - elif state.action == ContactAction.EMAIL_SEND and state.draft_subject: - output_lines.append(md.heading("📝 邮件草稿", 2)) - output_lines.append("") - output_lines.append(f"**收件人**: {state.draft_recipient}") - output_lines.append(f"**主题**: {state.draft_subject}") - output_lines.append("") - output_lines.append(md.quote(state.draft_body)) - - elif state.action == ContactAction.SNIFF_CONTACTS and state.sniffed_contacts: - output_lines.append(md.heading("🔍 发现的联系人", 2)) - output_lines.append("") - contact_data = [ - {"姓名": c.name, "邮箱": c.email} - for c in state.sniffed_contacts - ] - output_lines.append(md.table(contact_data)) - - else: - output_lines.append(md.heading("✨ 操作完成", 2)) - output_lines.append("您的请求已处理。") - - # 页脚提示 - output_lines.append("") - output_lines.append("---") - output_lines.append("💡 提示:您可以继续查询联系人、查看邮件,或者生成邮件草稿!") - - state.final_result = "\n".join(output_lines) - state.success = True - state.current_phase = "completed" - - return state - - -def human_review(state: ContactState) -> ContactState: - """ - 人工审核节点(用于邮件草稿) - """ - state.current_phase = "reviewing" - # 标记需要审核,等待用户决定 - state.needs_approval = True - return state - - -def send_email(state: ContactState) -> ContactState: - """ - 发送邮件节点 - """ - state.current_phase = "executing" - # 使用 API 客户端发送邮件 - success = contact_api.send_email( - state.user_id, - state.draft_recipient, - state.draft_subject, - state.draft_body - ) - state.success = success - return state - - -def should_continue(state: ContactState) -> str: - """ - 条件路由函数:根据 action 和状态决定下一个节点 - """ - # 如果是从 human_review 来的,根据审核状态决定 - if state.current_phase == "reviewing": - if state.needs_approval: - # 这里会等待用户操作,实际运行时通过 checkpointer 或后端 API 处理 - return "format_result" + async def parse_intent(state: ContactState) -> ContactState: + """ + 解析用户意图节点 + """ + query_lower = state.user_query.lower() + + if any(keyword in query_lower for keyword in ["添加", "add", "新建", "save"]): + state.action = ContactAction.CONTACT_ADD + elif any(keyword in query_lower for keyword in ["联系人", "contact", "list"]): + state.action = ContactAction.CONTACT_LIST + state.action_params = {"query": state.user_query} + elif any(keyword in query_lower for keyword in ["邮件", "email", "inbox"]): + state.action = ContactAction.EMAIL_LIST + elif any(keyword in query_lower for keyword in ["发送邮件", "send email", "发邮件"]): + state.action = ContactAction.EMAIL_SEND else: - return "send_email" + state.action = ContactAction.SNIFF_CONTACTS + + return state + + async def list_contacts(state: ContactState) -> ContactState: + """ + 列出联系人节点 + """ + state.current_phase = "executing" + + # 使用 API 客户端(async) + contacts = await contact_api.list_contacts(state.user_id) + state.contacts = contacts + + return state + + async def add_contact(state: ContactState) -> ContactState: + """ + 添加联系人节点 + """ + state.current_phase = "executing" + + # 使用 API 客户端(简化添加,实际项目应解析用户输入) + new_contact = Contact( + id=str(len(CONTACT_DB) + 1), + name="新联系人", + email="new@example.com", + phone="13800000000", + created_at=datetime.now().isoformat() + ) + # 保存到数据库 + await contact_api.add_contact(state.user_id, new_contact) + state.current_contact = new_contact + + return state + + async def list_emails(state: ContactState) -> ContactState: + """ + 列出邮件节点 + """ + state.current_phase = "executing" + + # 使用 API 客户端(async) + emails = await contact_api.list_emails(state.user_id) + state.emails = emails + + return state + + async def generate_email_draft(state: ContactState) -> ContactState: + """ + 生成邮件草稿节点 + """ + state.current_phase = "executing" + + # 使用 API 客户端(async) + draft = await contact_api.generate_email_draft(state.user_query) + state.draft_recipient = draft.get("recipient", "recipient@example.com") + state.draft_subject = draft.get("subject", "邮件主题") + state.draft_body = draft.get("body", "邮件正文") + + return state + + async def sniff_contacts(state: ContactState) -> ContactState: + """ + 嗅探联系人节点 + """ + state.current_phase = "executing" + + # 使用 API 客户端(async) + contacts = await contact_api.sniff_contacts(state.user_query) + state.sniffed_contacts = contacts + + return state + + async def format_result(state: ContactState) -> ContactState: + """ + 格式化结果节点(使用公共工具) + """ + state.current_phase = "formatting" + + md = MarkdownFormatter() + output_lines = [] + + output_lines.append("┌───────────────────────────────────┐") + output_lines.append("│ 📇 通讯录助手 │") + output_lines.append("└───────────────────────────────────┘") + output_lines.append("") + + if state.action == ContactAction.CONTACT_LIST and state.contacts: + output_lines.append(md.heading("📇 联系人列表", 2)) + output_lines.append("") + contact_data = [ + {"姓名": c.name, "邮箱": c.email, "电话": c.phone or "-"} + for c in state.contacts + ] + output_lines.append(md.table(contact_data)) + + elif state.action == ContactAction.EMAIL_LIST and state.emails: + output_lines.append(md.heading("📬 邮件列表", 2)) + output_lines.append("") + # 兼容两种 date 格式 + email_data = [] + for e in state.emails: + date_str = e.date + if hasattr(e, 'received_at') and e.received_at: + try: + date_str = e.received_at.strftime('%Y-%m-%d %H:%M') + except: + pass + email_data.append({ + "发件人": e.sender, + "主题": e.subject, + "时间": date_str + }) + output_lines.append(md.table(email_data)) + + elif state.action == ContactAction.EMAIL_SEND and state.draft_subject: + output_lines.append(md.heading("📝 邮件草稿", 2)) + output_lines.append("") + output_lines.append(f"**收件人**: {state.draft_recipient}") + output_lines.append(f"**主题**: {state.draft_subject}") + output_lines.append("") + output_lines.append(md.quote(state.draft_body)) + + elif state.action == ContactAction.SNIFF_CONTACTS and state.sniffed_contacts: + output_lines.append(md.heading("🔍 发现的联系人", 2)) + output_lines.append("") + contact_data = [ + {"姓名": c.name, "邮箱": c.email} + for c in state.sniffed_contacts + ] + output_lines.append(md.table(contact_data)) + + else: + output_lines.append(md.heading("✨ 操作完成", 2)) + output_lines.append("您的请求已处理。") + + # 页脚提示 + output_lines.append("") + output_lines.append("---") + output_lines.append("💡 提示:您可以继续查询联系人、查看邮件,或者生成邮件草稿!") + + state.final_result = "\n".join(output_lines) + state.success = True + state.current_phase = "completed" + + return state + + async def human_review(state: ContactState) -> ContactState: + """ + 人工审核节点(用于邮件草稿) + """ + state.current_phase = "reviewing" + # 标记需要审核,等待用户决定 + state.needs_approval = True + return state + + async def send_email(state: ContactState) -> ContactState: + """ + 发送邮件节点 + """ + state.current_phase = "executing" + # 使用 API 客户端发送邮件(async) + success = await contact_api.send_email( + state.user_id, + state.draft_recipient, + state.draft_subject, + state.draft_body + ) + state.success = success + return state + + def should_continue(state: ContactState) -> str: + """ + 条件路由函数:根据 action 和状态决定下一个节点 + """ + # 如果是从 human_review 来的,根据审核状态决定 + if state.current_phase == "reviewing": + if state.needs_approval: + # 这里会等待用户操作,实际运行时通过 checkpointer 或后端 API 处理 + return "format_result" + else: + return "send_email" + + # 普通路由 + action = state.action + if action == ContactAction.CONTACT_LIST: + return "list_contacts" + elif action == ContactAction.CONTACT_ADD: + return "add_contact" + elif action == ContactAction.EMAIL_LIST: + return "list_emails" + elif action == ContactAction.EMAIL_SEND: + return "generate_email_draft" + elif action == ContactAction.SNIFF_CONTACTS: + return "sniff_contacts" + else: + return "format_result" + + # 返回节点字典 + return { + "parse_intent": parse_intent, + "list_contacts": list_contacts, + "add_contact": add_contact, + "list_emails": list_emails, + "generate_email_draft": generate_email_draft, + "sniff_contacts": sniff_contacts, + "format_result": format_result, + "human_review": human_review, + "send_email": send_email, + "should_continue": should_continue + } - # 普通路由 - action = state.action - if action == ContactAction.CONTACT_LIST: - return "list_contacts" - elif action == ContactAction.CONTACT_ADD: - return "add_contact" - elif action == ContactAction.EMAIL_LIST: - return "list_emails" - elif action == ContactAction.EMAIL_SEND: - return "generate_email_draft" - elif action == ContactAction.SNIFF_CONTACTS: - return "sniff_contacts" - else: - return "format_result" + +# ========== 向后兼容的全局版本(使用模拟 API) ========== +from .api_client import contact_api as _default_contact_api + +# 创建默认节点(用模拟 API,保持向后兼容) +_default_nodes = create_contact_nodes(_default_contact_api) + +# 导出默认节点 +parse_intent = _default_nodes["parse_intent"] +list_contacts = _default_nodes["list_contacts"] +add_contact = _default_nodes["add_contact"] +list_emails = _default_nodes["list_emails"] +generate_email_draft = _default_nodes["generate_email_draft"] +sniff_contacts = _default_nodes["sniff_contacts"] +format_result = _default_nodes["format_result"] +human_review = _default_nodes["human_review"] +send_email = _default_nodes["send_email"] +should_continue = _default_nodes["should_continue"]