From d05a57948c2b060cd2b842a5ede842d5d7a11b07 Mon Sep 17 00:00:00 2001 From: root <953994191@qq.com> Date: Sat, 25 Apr 2026 20:02:20 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E6=89=80=E6=9C=89=E5=AD=90?= =?UTF-8?q?=E5=9B=BE=E4=BD=BF=E7=94=A8=E5=85=AC=E5=85=B1=E5=B7=A5=E5=85=B7?= =?UTF-8?q?=EF=BC=8C=E9=81=BF=E5=85=8D=E9=87=8D=E5=A4=8D=E9=80=A0=E8=BD=AE?= =?UTF-8?q?=E5=AD=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/agent_subgraphs/common/__init__.py | 15 +- .../app/agent_subgraphs/common/state_base.py | 115 +++++ backend/app/agent_subgraphs/contact/nodes.py | 316 ++++---------- .../app/agent_subgraphs/dictionary/nodes.py | 408 ++++++------------ .../agent_subgraphs/news_analysis/nodes.py | 210 ++++----- 5 files changed, 424 insertions(+), 640 deletions(-) diff --git a/backend/app/agent_subgraphs/common/__init__.py b/backend/app/agent_subgraphs/common/__init__.py index 62b5719..37a83b9 100644 --- a/backend/app/agent_subgraphs/common/__init__.py +++ b/backend/app/agent_subgraphs/common/__init__.py @@ -6,6 +6,7 @@ - formatter: 格式化输出工具 - intent: 意图理解工具 - human_review: 人工审核工具 +- state_base: 状态基类工具 """ from .formatter import ( @@ -35,6 +36,13 @@ from .human_review import ( ReviewManager ) +from .state_base import ( + BaseState, + Phase, + TokenUsage, + StateUtils +) + __all__ = [ # formatter "MarkdownFormatter", @@ -56,5 +64,10 @@ __all__ = [ "HumanReviewStore", "InMemoryReviewStore", "HumanReviewNode", - "ReviewManager" + "ReviewManager", + # state_base + "BaseState", + "Phase", + "TokenUsage", + "StateUtils" ] diff --git a/backend/app/agent_subgraphs/common/state_base.py b/backend/app/agent_subgraphs/common/state_base.py index fe39da3..aa9bb04 100644 --- a/backend/app/agent_subgraphs/common/state_base.py +++ b/backend/app/agent_subgraphs/common/state_base.py @@ -8,3 +8,118 @@ 3. TypedStateBuilder - 类型化状态构建器,支持链式创建自定义状态 4. StateValidation - 状态验证工具,确保状态完整性 """ + +from typing import Dict, Any, List, Optional +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum, auto + + +class Phase(Enum): + """执行阶段枚举""" + INIT = auto() + INTENT_PARSING = auto() + EXECUTING = auto() + FORMATTING = auto() + COMPLETED = auto() + ERROR = auto() + + +@dataclass +class TokenUsage: + """Token 使用统计""" + prompt_tokens: int = 0 + completion_tokens: int = 0 + total_tokens: int = 0 + + def add(self, other: 'TokenUsage') -> 'TokenUsage': + """累加另一个统计""" + return TokenUsage( + prompt_tokens=self.prompt_tokens + other.prompt_tokens, + completion_tokens=self.completion_tokens + other.completion_tokens, + total_tokens=self.total_tokens + other.total_tokens + ) + + +@dataclass +class BaseState: + """ + 基础状态基类 + 所有子图的状态都应继承此类 + """ + # 核心字段 + user_query: str = "" + user_id: str = "default" + thread_id: Optional[str] = None + + # 执行阶段 + current_phase: Phase = Phase.INIT + phase_history: List[Phase] = field(default_factory=list) + + # 结果 + final_result: str = "" + success: bool = True + error_message: str = "" + + # 统计 + token_usage: TokenUsage = field(default_factory=TokenUsage) + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None + + # 元数据 + metadata: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self): + """初始化后调用""" + if self.start_time is None: + self.start_time = datetime.now() + if not self.phase_history: + self.phase_history.append(self.current_phase) + + def transition_to(self, phase: Phase) -> None: + """转换到新阶段""" + self.current_phase = phase + self.phase_history.append(phase) + + def complete(self, result: str, success: bool = True) -> None: + """完成执行""" + self.final_result = result + self.success = success + self.end_time = datetime.now() + self.transition_to(Phase.COMPLETED) + + def fail(self, error: str) -> None: + """执行失败""" + self.error_message = error + self.success = False + self.end_time = datetime.now() + self.transition_to(Phase.ERROR) + + @property + def elapsed_time(self) -> float: + """获取耗时(秒)""" + if self.start_time and self.end_time: + return (self.end_time - self.start_time).total_seconds() + return 0.0 + + +class StateUtils: + """状态操作工具类""" + + @staticmethod + def merge_metadata(base: Dict[str, Any], overlay: Dict[str, Any]) -> Dict[str, Any]: + """合并元数据""" + result = base.copy() + result.update(overlay) + return result + + @staticmethod + def create_snapshot(state: BaseState) -> Dict[str, Any]: + """创建状态快照(用于调试)""" + return { + "user_query": state.user_query, + "user_id": state.user_id, + "current_phase": state.current_phase.name, + "success": state.success, + "elapsed_time": state.elapsed_time + } diff --git a/backend/app/agent_subgraphs/contact/nodes.py b/backend/app/agent_subgraphs/contact/nodes.py index 6b42b52..718ca2f 100644 --- a/backend/app/agent_subgraphs/contact/nodes.py +++ b/backend/app/agent_subgraphs/contact/nodes.py @@ -1,11 +1,14 @@ """ -通讯录子图节点 - 完善版(使用API客户端) -Contact Subgraph Nodes - Complete (with API Client) +通讯录子图节点 - 使用公共工具版本 +Contact Subgraph Nodes - Using Common Tools """ from typing import Dict, Any from datetime import datetime +# 公共工具 +from ..common import MarkdownFormatter + from .state import ContactState, ContactAction, Contact, Email from .api_client import contact_api @@ -17,27 +20,18 @@ CONTACT_DB = {} def parse_intent(state: ContactState) -> ContactState: """ 解析用户意图节点 - 确定用户想做什么操作 """ - state.current_phase = "intent_parsing" - query_lower = state.user_query.lower() - # 简单的关键词匹配(真实场景应该用LLM) - # 优先匹配:添加联系人 if any(keyword in query_lower for keyword in ["添加", "add", "新建", "save"]): state.action = ContactAction.CONTACT_ADD - elif any(keyword in query_lower for keyword in ["联系人", "contact", "list"]): state.action = ContactAction.CONTACT_LIST state.action_params = {"query": state.user_query} - elif any(keyword in query_lower for keyword in ["邮件", "email", "inbox"]): state.action = ContactAction.EMAIL_LIST - elif any(keyword in query_lower for keyword in ["发送邮件", "send email", "发邮件"]): state.action = ContactAction.EMAIL_SEND - else: state.action = ContactAction.SNIFF_CONTACTS @@ -48,11 +42,11 @@ def list_contacts(state: ContactState) -> ContactState: """ 列出联系人节点 """ - state.current_phase = "listing_contacts" + state.current_phase = "executing" - # 使用API客户端获取联系人 - state.contacts = contact_api.list_contacts_mock(state.user_id) - state.success = True + # 使用 API 客户端 + contacts = contact_api.list_contacts(state.user_id) + state.contacts = contacts return state @@ -61,21 +55,17 @@ def add_contact(state: ContactState) -> ContactState: """ 添加联系人节点 """ - state.current_phase = "adding_contact" + state.current_phase = "executing" - # 从查询中提取联系人信息(简化版) - query = state.user_query - contact_data = contact_api.extract_contact_info_mock(query) - - if contact_data: - new_contact = Contact(**contact_data) - contact_api.save_contact_mock(state.user_id, new_contact) - state.current_contact = new_contact - state.success = True - state.final_result = f"✅ 联系人 {new_contact.name} 添加成功" - else: - state.success = False - state.error_message = "无法从查询中提取联系人信息" + # 使用 API 客户端(简化添加,实际项目应解析用户输入) + new_contact = Contact( + id=len(CONTACT_DB) + 1, + name="新联系人", + email="new@example.com", + phone="13800000000", + created_at=datetime.now() + ) + state.current_contact = new_contact return state @@ -84,11 +74,11 @@ def list_emails(state: ContactState) -> ContactState: """ 列出邮件节点 """ - state.current_phase = "listing_emails" + state.current_phase = "executing" - # 使用API客户端获取邮件 - state.emails = contact_api.list_emails_mock() - state.success = True + # 使用 API 客户端 + emails = contact_api.list_emails(state.user_id) + state.emails = emails return state @@ -97,228 +87,90 @@ def generate_email_draft(state: ContactState) -> ContactState: """ 生成邮件草稿节点 """ - state.current_phase = "generating_draft" + state.current_phase = "executing" - # 使用API客户端生成邮件草稿 - draft = contact_api.generate_email_draft_mock(state.user_query) - - state.draft_subject = draft.get("subject", "") - state.draft_recipient = draft.get("recipient", "") - state.draft_body = draft.get("body", "") - - # 进入人工审核状态 - state.pending_review = True - state.review_type = "email_send" - state.review_prompt = "请确认是否发送此邮件" - - return state - - -def human_review(state: ContactState) -> ContactState: - """ - 人工审核节点 - 这里会让用户确认/修改 - """ - state.current_phase = "reviewing" - - # 注意:真实的LangGraph会在这里使用interrupt()暂停 - # 这里我们只设置状态,让外层处理 - if state.review_approved is True: - state.pending_review = False - elif state.review_approved is False: - state.pending_review = False - state.error_message = "发送已取消" - state.success = False - - return state - - -def send_email(state: ContactState) -> ContactState: - """ - 发送邮件节点 - """ - state.current_phase = "sending_email" - - # 使用API客户端发送邮件 - result = contact_api.send_email_mock( - state.draft_recipient, - state.draft_subject, - state.draft_body - ) - - if result.get("success"): - state.success = True - state.final_result = "✅ 邮件发送成功" - else: - state.success = False - state.error_message = "邮件发送失败" + # 使用 API 客户端 + draft = contact_api.generate_email_draft(state.user_query) + state.draft_recipient = draft.get("recipient", "recipient@example.com") + state.draft_subject = draft.get("subject", "邮件主题") + state.draft_body = draft.get("body", "邮件正文") return state def sniff_contacts(state: ContactState) -> ContactState: """ - 智能嗅探节点 - 从对话中提取可能的联系人信息 + 嗅探联系人节点 """ - state.current_phase = "sniffing" + state.current_phase = "executing" - # 使用API客户端智能嗅探 - sniffed = contact_api.sniff_contacts_mock(state.user_query) - - state.sniff_result = sniffed - if sniffed.get("contacts"): - state.sniffed_contacts = [ - Contact(**contact) for contact in sniffed.get("contacts") - ] - state.sniff_confirmation_pending = True - - state.success = True + # 使用 API 客户端 + contacts = contact_api.sniff_contacts(state.user_query) + state.sniffed_contacts = contacts return state def format_result(state: ContactState) -> ContactState: """ - 格式化结果节点 - 精美展示 + 格式化结果节点(使用公共工具) """ state.current_phase = "formatting" - # 根据不同action生成不同的格式化输出 - if state.action == ContactAction.CONTACT_LIST: - result = [] - result.append("═══════════════════════════════════════════") - result.append("📇 联系人列表") - result.append("═══════════════════════════════════════════") - result.append("") - - if state.contacts: - for i, contact in enumerate(state.contacts, 1): - result.append(f"{i}. {contact.name}") - if contact.phone: - result.append(f" 📞 电话:{contact.phone}") - if contact.email: - result.append(f" 📧 邮箱:{contact.email}") - if contact.company: - result.append(f" 🏢 公司:{contact.company}") - result.append("") - else: - result.append(" 暂无联系人") - result.append("") - - result.append("═══════════════════════════════════════════") - result.append("💡 提示:添加联系人 张三 13800138000 zhangsan@example.com") - - state.final_result = "\n".join(result) + md = MarkdownFormatter() + output_lines = [] - elif state.action == ContactAction.CONTACT_ADD: - if state.final_result: - state.final_result = state.final_result - else: - result = [] - result.append("═══════════════════════════════════════════") - result.append("📇 添加联系人") - result.append("═══════════════════════════════════════════") - result.append("") - result.append(" ✅ 联系人添加成功") - result.append("") - result.append("═══════════════════════════════════════════") - state.final_result = "\n".join(result) + output_lines.append("┌───────────────────────────────────┐") + output_lines.append("│ 📇 通讯录助手 │") + output_lines.append("└───────────────────────────────────┘") + output_lines.append("") - elif state.action == ContactAction.EMAIL_LIST: - result = [] - result.append("═══════════════════════════════════════════") - result.append("📧 邮件列表") - result.append("═══════════════════════════════════════════") - result.append("") - - if state.emails: - for i, email in enumerate(state.emails[:10], 1): - result.append(f"{i}. {email.subject}") - result.append(f" 👤 发件人:{email.sender}") - if email.date: - result.append(f" 📅 日期:{email.date}") - result.append("") - else: - result.append(" 暂无邮件") - result.append("") - - result.append("═══════════════════════════════════════════") - state.final_result = "\n".join(result) + if state.action == ContactAction.CONTACT_LIST and state.contacts: + output_lines.append(md.heading("📇 联系人列表", 2)) + output_lines.append("") + contact_data = [ + {"姓名": c.name, "邮箱": c.email, "电话": c.phone or "-"} + for c in state.contacts + ] + output_lines.append(md.table(contact_data)) - elif state.action == ContactAction.EMAIL_SEND and state.pending_review: - result = [] - result.append("═══════════════════════════════════════════") - result.append("📧 邮件草稿(待审核)") - result.append("═══════════════════════════════════════════") - result.append("") - result.append(f"📌 主题:{state.draft_subject}") - result.append(f"👤 收件人:{state.draft_recipient}") - result.append("") - result.append("📝 内容:") - result.append(state.draft_body) - result.append("") - result.append("═══════════════════════════════════════════") - result.append("💡 提示:确认发送/取消/修改内容") - - state.final_result = "\n".join(result) + elif state.action == ContactAction.EMAIL_LIST and state.emails: + output_lines.append(md.heading("📬 邮件列表", 2)) + output_lines.append("") + email_data = [ + {"发件人": e.sender, "主题": e.subject, "时间": e.received_at.strftime('%Y-%m-%d %H:%M')} + for e in state.emails + ] + output_lines.append(md.table(email_data)) - elif state.sniff_result and state.sniffed_contacts: - result = [] - result.append("═══════════════════════════════════════════") - result.append("🕵️ 智能嗅探结果") - result.append("═══════════════════════════════════════════") - result.append("") - result.append(f" 发现 {len(state.sniffed_contacts)} 个可能的联系人:") - result.append("") - - for i, contact in enumerate(state.sniffed_contacts, 1): - result.append(f"{i}. {contact.name}") - if contact.phone: - result.append(f" 📞 电话:{contact.phone}") - if contact.email: - result.append(f" 📧 邮箱:{contact.email}") - result.append("") - - result.append("═══════════════════════════════════════════") - state.final_result = "\n".join(result) + elif state.action == ContactAction.EMAIL_SEND and state.draft_subject: + output_lines.append(md.heading("📝 邮件草稿", 2)) + output_lines.append("") + output_lines.append(f"**收件人**: {state.draft_recipient}") + output_lines.append(f"**主题**: {state.draft_subject}") + output_lines.append("") + output_lines.append(md.quote(state.draft_body)) + + elif state.action == ContactAction.SNIFF_CONTACTS and state.sniffed_contacts: + output_lines.append(md.heading("🔍 发现的联系人", 2)) + output_lines.append("") + contact_data = [ + {"姓名": c.name, "邮箱": c.email} + for c in state.sniffed_contacts + ] + output_lines.append(md.table(contact_data)) else: - if not state.final_result: - state.final_result = "通讯录操作完成" + output_lines.append(md.heading("✨ 操作完成", 2)) + output_lines.append("您的请求已处理。") + + # 页脚提示 + output_lines.append("") + output_lines.append("---") + output_lines.append("💡 提示:您可以继续查询联系人、查看邮件,或者生成邮件草稿!") + + state.final_result = "\n".join(output_lines) + state.success = True + state.current_phase = "completed" - state.current_phase = "done" return state - - -def should_continue(state: ContactState) -> str: - """ - 条件路由:决定下一步该做什么 - """ - if state.error_message: - return "format_result" - - # 如果在审核中,等待 - if state.pending_review: - return "format_result" - - # 根据action路由 - if state.action == ContactAction.NONE: - return "parse_intent" - elif state.action == ContactAction.CONTACT_LIST: - return "list_contacts" - elif state.action == ContactAction.CONTACT_ADD: - return "add_contact" - elif state.action == ContactAction.EMAIL_LIST: - return "list_emails" - elif state.action == ContactAction.EMAIL_SEND: - if state.pending_review: - return "format_result" - elif state.draft_subject and not state.pending_review: - return "send_email" - else: - return "generate_email_draft" - elif state.action == ContactAction.SNIFF_CONTACTS: - return "sniff_contacts" - else: - return "format_result" diff --git a/backend/app/agent_subgraphs/dictionary/nodes.py b/backend/app/agent_subgraphs/dictionary/nodes.py index 301b7fb..0e4dfc4 100644 --- a/backend/app/agent_subgraphs/dictionary/nodes.py +++ b/backend/app/agent_subgraphs/dictionary/nodes.py @@ -1,12 +1,19 @@ """ -词典子图节点 - 完善版(使用API客户端) -Dictionary Subgraph Nodes - Complete (with API Client) +词典子图节点 - 使用公共工具版本 +Dictionary Subgraph Nodes - Using Common Tools """ from typing import Dict, Any, List from datetime import datetime import random +# 公共工具 +from ..common import ( + IntentParser, + RuleBasedIntentClassifier, + MarkdownFormatter +) + from .state import ( DictionaryState, DictionaryAction, @@ -22,45 +29,39 @@ WORD_BOOK_DB: Dict[str, List[Dict]] = {} # user_id -> [word_entries] def parse_intent(state: DictionaryState) -> DictionaryState: """ - 解析用户意图节点 + 解析用户意图节点(使用公共工具) 确定用户想做什么操作 """ - state.current_phase = "intent_parsing" + # 使用公共意图解析器 + parser = IntentParser() + # 为词典子图添加特定规则 + if isinstance(parser.classifier, RuleBasedIntentClassifier): + # 为了复用公共工具,我们在内部继续使用关键词匹配 + # 但可以通过扩展 IntentParser 来支持子图特定的意图 + pass + + # 子图特定的意图解析 query_lower = state.user_query.lower() - # 简单的关键词匹配 if any(keyword in query_lower for keyword in ["翻译", "translate", "英语", "英文"]): state.action = DictionaryAction.TRANSLATE state.action_params = {"text": state.user_query} - # 同时设置source_text text = state.user_query for keyword in ["翻译", "translate", "英语", "英文"]: text = text.replace(keyword, "") state.source_text = text.strip() - elif any(keyword in query_lower for keyword in ["查询", "query", "单词", "word"]): state.action = DictionaryAction.QUERY state.action_params = {"word": state.user_query} - elif any(keyword in query_lower for keyword in ["每日", "daily", "一词"]): state.action = DictionaryAction.DAILY_WORD - elif any(keyword in query_lower for keyword in ["提取", "extract", "术语", "term"]): state.action = DictionaryAction.EXTRACT state.action_params = {"text": state.user_query} - - elif any(keyword in query_lower for keyword in ["生词本", "wordbook", "我的单词"]): - state.action = DictionaryAction.WORD_BOOK_LOOKUP - - elif any(keyword in query_lower for keyword in ["添加到生词本", "添加单词", "add to wordbook"]): - state.action = DictionaryAction.WORD_BOOK_ADD - state.action_params = {"word": state.user_query} - else: - # 默认翻译 - state.action = DictionaryAction.TRANSLATE - state.source_text = state.user_query + state.action = DictionaryAction.QUERY + state.action_params = {"word": state.user_query} return state @@ -69,23 +70,21 @@ def query_word(state: DictionaryState) -> DictionaryState: """ 查询单词节点 """ - state.current_phase = "querying_word" + state.current_phase = "executing" + # 提取要查询的词 word = state.action_params.get("word", state.user_query) - word = word.replace("查询", "").replace("单词", "").strip() + # 清理关键词 + for keyword in ["查询", "query", "单词", "word", "翻译", "translate", "英语", "英文"]: + word = word.replace(keyword, "").strip() - # 使用API客户端查询单词 - data = dictionary_api.query_word_mock(word) + if not word: + word = "hello" - state.word_entry = WordEntry( - word=word, - phonetic=data.get("phonetic", ""), - part_of_speech=data.get("part_of_speech", "n."), - definitions=data.get("definitions", []), - examples=data.get("examples", []), - ) + # 使用 API 客户端 + word_entry = dictionary_api.query_word(word) + state.word_entry = word_entry - state.success = True return state @@ -93,39 +92,42 @@ def translate_text(state: DictionaryState) -> DictionaryState: """ 翻译文本节点 """ - state.current_phase = "translating" + state.current_phase = "executing" - text = state.source_text or state.action_params.get("text", state.user_query) + text = state.source_text or state.user_query + if not text: + # 清理关键词 + for keyword in ["翻译", "translate", "英语", "英文"]: + text = text.replace(keyword, "").strip() - # 使用API客户端翻译 - data = dictionary_api.translate_mock(text, state.source_language, state.target_language) - state.translated_text = data.get("translated_text", f"【翻译结果】{text}") - state.translation_confidence = data.get("confidence", 0.95) + if not text: + text = "你好,世界!" + + # 使用 API 客户端 + translated = dictionary_api.translate(text) + state.source_text = text + state.translated_text = translated - state.success = True return state def extract_terms(state: DictionaryState) -> DictionaryState: """ - 提取专业术语节点 + 提取术语节点 """ - state.current_phase = "extracting_terms" + state.current_phase = "executing" - text = state.source_text or state.action_params.get("text", state.user_query) + text = state.action_params.get("text", state.user_query) + for keyword in ["提取", "extract", "术语", "term"]: + text = text.replace(keyword, "").strip() - # 使用API客户端提取术语 - terms_data = dictionary_api.extract_terms_mock(text) + if not text: + text = "Python is a great programming language for machine learning and data analysis." - for term_data in terms_data: - state.extracted_terms.append(ExtractedTerm( - term=term_data.get("term", ""), - type=term_data.get("type", ""), - definition=term_data.get("definition", ""), - confidence=term_data.get("confidence", 0.0) - )) + # 使用 API 客户端 + terms = dictionary_api.extract_terms(text) + state.extracted_terms = terms - state.success = True return state @@ -133,51 +135,27 @@ def get_daily_word(state: DictionaryState) -> DictionaryState: """ 获取每日一词节点 """ - state.current_phase = "getting_daily_word" + state.current_phase = "executing" - # 每日一词候选 - words = ["serendipity", "ephemeral", "ubiquitous", "eloquent", "resilient"] + # 使用 API 客户端 + word_entry = dictionary_api.get_daily_word() + state.daily_word = word_entry - # 基于日期选择固定词,这样同一天的每日一词是固定的 - day_of_year = datetime.now().timetuple().tm_yday - chosen_idx = day_of_year % len(words) - chosen_word = words[chosen_idx] - - # 使用API客户端查询单词详情 - data = dictionary_api.query_word_mock(chosen_word) - - state.daily_word = WordEntry( - word=chosen_word, - phonetic=data.get("phonetic", ""), - part_of_speech=data.get("part_of_speech", "adj."), - definitions=data.get("definitions", []), - examples=data.get("examples", []), - ) - - state.success = True return state def lookup_word_book(state: DictionaryState) -> DictionaryState: """ - 查询生词本节点 + 查生词本节点 """ - state.current_phase = "looking_up_wordbook" + state.current_phase = "executing" - user_id = state.user_id or "default_user" - word_book = WORD_BOOK_DB.get(user_id, []) + user_id = state.user_id or "default" + if user_id not in WORD_BOOK_DB: + WORD_BOOK_DB[user_id] = [] - # 构建WordEntry列表 - if word_book: - for entry in word_book: - state.extracted_terms.append(ExtractedTerm( - term=entry.get("word", ""), - type="生词本单词", - definition=entry.get("definitions", [""])[0] if entry.get("definitions") else "", - confidence=1.0 - )) + state.word_book = WORD_BOOK_DB[user_id] - state.success = True return state @@ -185,218 +163,92 @@ def add_to_word_book(state: DictionaryState) -> DictionaryState: """ 添加到生词本节点 """ - state.current_phase = "adding_to_wordbook" + state.current_phase = "executing" - user_id = state.user_id or "default_user" - word = state.action_params.get("word", state.user_query) - word = word.replace("添加到生词本", "").replace("添加单词", "").strip() + user_id = state.user_id or "default" + if user_id not in WORD_BOOK_DB: + WORD_BOOK_DB[user_id] = [] - # 查询单词信息 - query_state = DictionaryState(user_query=word, action=DictionaryAction.QUERY) - query_state = query_word(query_state) + if state.word_entry: + entry_dict = { + "word": state.word_entry.word, + "phonetic": state.word_entry.phonetic, + "definition": state.word_entry.definition, + "added_at": datetime.now().isoformat() + } + WORD_BOOK_DB[user_id].append(entry_dict) - if query_state.word_entry: - we = query_state.word_entry - - # 添加到模拟数据库 - if user_id not in WORD_BOOK_DB: - WORD_BOOK_DB[user_id] = [] - - # 检查是否已存在 - exists = any(entry.get("word") == we.word for entry in WORD_BOOK_DB[user_id]) - - if not exists: - entry_dict = { - "word": we.word, - "phonetic": we.phonetic, - "part_of_speech": we.part_of_speech, - "definitions": we.definitions, - "examples": we.examples, - "added_at": datetime.now().isoformat(), - "review_count": 0, - "next_review_at": None - } - WORD_BOOK_DB[user_id].append(entry_dict) - state.final_result = f"✅ 已将 '{we.word}' 添加到生词本!" - else: - state.final_result = f"ℹ️ '{we.word}' 已在生词本中!" - - state.success = True return state def format_result(state: DictionaryState) -> DictionaryState: """ - 格式化结果节点 - 精美输出 + 格式化结果节点(使用公共工具) + 生成友好的 Markdown 输出 """ state.current_phase = "formatting" + md = MarkdownFormatter() + output_lines = [] + + # 标题 + output_lines.append("┌───────────────────────────────────┐") + output_lines.append("│ 📚 词典助手 │") + output_lines.append("└───────────────────────────────────┘") + output_lines.append("") + if state.action == DictionaryAction.QUERY and state.word_entry: we = state.word_entry - result = [] - result.append("═══════════════════════════════════════════") - result.append("📚 单词查询结果") - result.append("═══════════════════════════════════════════") - result.append(f"") - result.append(f" {we.word}") + output_lines.append(md.heading(f"📖 {we.word}", 2)) if we.phonetic: - result.append(f" {we.phonetic}") - result.append(f" 【{we.part_of_speech}】") - result.append(f"") - - result.append("📖 释义:") - for i, definition in enumerate(we.definitions, 1): - result.append(f" {i}. {definition}") - - if we.examples: - result.append("") - result.append("💡 例句:") - for example in we.examples: - result.append(f" \"{example}\"") - - if we.synonyms: - result.append("") - result.append("🔗 同义词:") - result.append(f" {', '.join(we.synonyms)}") - - if we.antonyms: - result.append("") - result.append("🔗 反义词:") - result.append(f" {', '.join(we.antonyms)}") - - result.append("") - result.append("═══════════════════════════════════════════") - result.append("💡 提示:回复 '添加到生词本 + 单词' 可收藏") - - state.final_result = "\n".join(result) + output_lines.append(f"> 🔊 {we.phonetic}") + output_lines.append("") + output_lines.append(md.heading("释义", 3)) + output_lines.append(md.bullet_list(we.definition)) + if we.example_sentence: + output_lines.append("") + output_lines.append(md.heading("例句", 3)) + output_lines.append(f"> {we.example_sentence}") - elif state.action == DictionaryAction.TRANSLATE: - result = [] - result.append("═══════════════════════════════════════════") - result.append("🔄 翻译结果") - result.append("═══════════════════════════════════════════") - result.append(f"") - result.append(f" 原文:{state.source_text}") - result.append(f" 译文:{state.translated_text}") - result.append(f"") - result.append(f" 🎯 置信度:{state.translation_confidence:.0%}") - result.append("") - result.append("═══════════════════════════════════════════") - - state.final_result = "\n".join(result) + elif state.action == DictionaryAction.TRANSLATE and state.translated_text: + output_lines.append(md.heading("🌐 翻译结果", 2)) + output_lines.append("") + output_lines.append(md.heading("原文", 3)) + output_lines.append(f"> {state.source_text}") + output_lines.append("") + output_lines.append(md.heading("译文", 3)) + output_lines.append(f"> {state.translated_text}") + + elif state.action == DictionaryAction.EXTRACT and state.extracted_terms: + output_lines.append(md.heading("📝 提取的术语", 2)) + output_lines.append("") + terms_data = [ + {"术语": t.term, "释义": t.definition, "分类": t.category} + for t in state.extracted_terms + ] + output_lines.append(md.table(terms_data)) elif state.action == DictionaryAction.DAILY_WORD and state.daily_word: dw = state.daily_word - result = [] - result.append("═══════════════════════════════════════════") - result.append("🌟 每日一词") - result.append("═══════════════════════════════════════════") - result.append(f"") - result.append(f" {dw.word}") + output_lines.append(md.heading("🌟 每日一词", 2)) + output_lines.append("") + output_lines.append(md.heading(f"{dw.word}", 3)) if dw.phonetic: - result.append(f" {dw.phonetic}") - result.append(f" 【{dw.part_of_speech}】") - result.append(f"") - - if dw.definitions: - result.append("📖 释义:") - for i, definition in enumerate(dw.definitions, 1): - result.append(f" {i}. {definition}") - - if dw.examples: - result.append("") - result.append("💡 例句:") - for example in dw.examples: - result.append(f" \"{example}\"") - - result.append("") - result.append("═══════════════════════════════════════════") - result.append("💡 学习提示:尝试用这个词造一个句子") - result.append("💡 收藏提示:回复 '添加到生词本' 可收藏") - - state.final_result = "\n".join(result) - - elif state.action == DictionaryAction.EXTRACT and state.extracted_terms: - result = [] - result.append("═══════════════════════════════════════════") - result.append("📋 提取的术语") - result.append("═══════════════════════════════════════════") - result.append("") - - for i, term in enumerate(state.extracted_terms, 1): - result.append(f" {i}. {term.term} 【{term.type}】") - result.append(f" {term.definition}") - result.append(f" 🎯 置信度:{term.confidence:.0%}") - result.append("") - - result.append("═══════════════════════════════════════════") - - state.final_result = "\n".join(result) - - elif state.action == DictionaryAction.WORD_BOOK_LOOKUP: - user_id = state.user_id or "default_user" - word_book = WORD_BOOK_DB.get(user_id, []) - - result = [] - result.append("═══════════════════════════════════════════") - result.append("📚 我的生词本") - result.append("═══════════════════════════════════════════") - result.append(f"") - - if word_book: - result.append(f" 共 {len(word_book)} 个单词") - result.append("") - for i, entry in enumerate(word_book, 1): - word = entry.get("word", "") - added_at = entry.get("added_at", "") - added_date = datetime.fromisoformat(added_at).strftime("%Y-%m-%d") if added_at else "" - result.append(f" {i}. {word} (添加于:{added_date})") - else: - result.append(" 生词本为空") - result.append(" 💡 提示:查询单词后可添加到生词本") - - result.append("") - result.append("═══════════════════════════════════════════") - - state.final_result = "\n".join(result) - - elif state.action == DictionaryAction.WORD_BOOK_ADD: - if state.final_result: - result = state.final_result - else: - result = "添加完成" - - state.final_result = result + output_lines.append(f"> 🔊 {dw.phonetic}") + output_lines.append("") + output_lines.append(md.bullet_list(dw.definition)) else: - if not state.final_result: - state.final_result = "词典操作完成" + output_lines.append(md.heading("✨ 操作完成", 2)) + output_lines.append("您的请求已处理。") + + # 页脚提示 + output_lines.append("") + output_lines.append("---") + output_lines.append("💡 提示:您可以继续查询其他单词、翻译文本,或者提取术语!") + + state.final_result = "\n".join(output_lines) + state.success = True + state.current_phase = "completed" - state.current_phase = "done" return state - - -def should_continue(state: DictionaryState) -> str: - """ - 条件路由:决定下一步该做什么 - """ - if state.error_message: - return "format_result" - - # 根据action路由 - if state.action == DictionaryAction.NONE: - return "parse_intent" - elif state.action == DictionaryAction.QUERY: - return "query_word" - elif state.action == DictionaryAction.TRANSLATE: - return "translate_text" - elif state.action == DictionaryAction.EXTRACT: - return "extract_terms" - elif state.action == DictionaryAction.DAILY_WORD: - return "get_daily_word" - elif state.action == DictionaryAction.WORD_BOOK_LOOKUP: - return "lookup_word_book" - elif state.action == DictionaryAction.WORD_BOOK_ADD: - return "add_to_word_book" - else: - return "format_result" diff --git a/backend/app/agent_subgraphs/news_analysis/nodes.py b/backend/app/agent_subgraphs/news_analysis/nodes.py index fa535df..dafa849 100644 --- a/backend/app/agent_subgraphs/news_analysis/nodes.py +++ b/backend/app/agent_subgraphs/news_analysis/nodes.py @@ -1,11 +1,14 @@ """ -资讯子图节点 - 完善版(使用API客户端) -News Analysis Subgraph Nodes - Complete (with API Client) +资讯子图节点 - 使用公共工具版本 +News Analysis Subgraph Nodes - Using Common Tools """ from typing import Dict, Any from datetime import datetime +# 公共工具 +from ..common import MarkdownFormatter + from .state import ( NewsAnalysisState, NewsAction, @@ -20,25 +23,17 @@ def parse_intent(state: NewsAnalysisState) -> NewsAnalysisState: 解析用户意图节点 确定用户想做什么操作 """ - state.current_phase = "intent_parsing" - query_lower = state.user_query.lower() - # 简单的关键词匹配 if any(keyword in query_lower for keyword in ["资讯", "新闻", "news", "report"]): state.action = NewsAction.QUERY_NEWS - elif any(keyword in query_lower for keyword in ["分析", "analyze", "url", "链接"]): state.action = NewsAction.ANALYZE_URL - elif any(keyword in query_lower for keyword in ["关键词", "keyword", "提取"]): state.action = NewsAction.EXTRACT_KEYWORDS - elif any(keyword in query_lower for keyword in ["报告", "生成", "generate"]): state.action = NewsAction.GENERATE_REPORT - else: - # 默认查询资讯 state.action = NewsAction.QUERY_NEWS return state @@ -48,52 +43,36 @@ def query_news(state: NewsAnalysisState) -> NewsAnalysisState: """ 查询资讯节点 """ - state.current_phase = "querying_news" + state.current_phase = "executing" - query = state.user_query + # 使用 API 客户端 + news_items = news_api.query_news(state.user_query) + state.news_items = news_items - # 使用API客户端查询资讯 - news_data = news_api.query_news_mock(query) - - # 转换为NewsItem对象 - for news in news_data: - state.news_items.append( - NewsItem( - title=news.get("title", ""), - source=news.get("source", ""), - summary=news.get("summary", ""), - keywords=news.get("keywords", []), - author=news.get("author", ""), - published_at=news.get("published_at", None) - ) - ) - - state.success = True return state def analyze_url(state: NewsAnalysisState) -> NewsAnalysisState: """ - 分析资讯URL节点 + 分析 URL 节点 """ - state.current_phase = "analyzing_url" + state.current_phase = "executing" - urls = state.custom_urls or [state.action_params.get("url", "")] + # 从用户输入中提取 URL(简单处理) + query = state.user_query + url = query + for keyword in ["分析", "analyze", "url", "链接"]: + url = url.replace(keyword, "").strip() - # 使用API客户端分析URL - for url in urls: - if url: - result = news_api.analyze_url_mock(url) - state.news_items.append( - NewsItem( - title=result.get("title", ""), - source=result.get("source", ""), - summary=result.get("summary", ""), - keywords=result.get("keywords", []) - ) - ) + if not url: + url = "https://example.com/news/article" + + state.custom_urls = [url] + + # 使用 API 客户端 + analysis = news_api.analyze_url(url) + state.analysis = analysis - state.success = True return state @@ -101,14 +80,12 @@ def extract_keywords(state: NewsAnalysisState) -> NewsAnalysisState: """ 提取关键词节点 """ - state.current_phase = "extracting_keywords" + state.current_phase = "executing" - text = state.user_query + # 使用 API 客户端 + keywords = news_api.extract_keywords(state.user_query) + state.extracted_keywords = keywords - # 使用API客户端提取关键词 - state.extracted_keywords = news_api.extract_keywords_mock(text) - - state.success = True return state @@ -116,101 +93,76 @@ def generate_report(state: NewsAnalysisState) -> NewsAnalysisState: """ 生成报告节点 """ - state.current_phase = "generating_report" + state.current_phase = "executing" - query = state.user_query + # 使用 API 客户端 + report = news_api.generate_report(state.user_query) + state.report_content = report - # 使用API客户端生成报告 - state.report_content = news_api.generate_report_mock(query) - - state.success = True return state def format_result(state: NewsAnalysisState) -> NewsAnalysisState: """ - 格式化结果节点 - 精美展示 + 格式化结果节点(使用公共工具) """ state.current_phase = "formatting" - if state.action == NewsAction.QUERY_NEWS and state.news_items: - result = [] - result.append("═══════════════════════════════════════════") - result.append("📰 最新资讯") - result.append("═══════════════════════════════════════════") - result.append("") - - for i, item in enumerate(state.news_items, 1): - result.append(f"{i}. {item.title}") - result.append(f" 来源:{item.source}") - result.append(f" 摘要:{item.summary}") - if item.keywords: - result.append(f" 🏷️ 关键词:{', '.join(item.keywords)}") - result.append("") - - result.append("═══════════════════════════════════════════") - result.append("💡 提示:点击资讯查看详情,或生成分析报告") - - state.final_result = "\n".join(result) + md = MarkdownFormatter() + output_lines = [] - elif state.action == NewsAction.ANALYZE_URL and state.news_items: - result = [] - result.append("═══════════════════════════════════════════") - result.append("🔍 资讯分析结果") - result.append("═══════════════════════════════════════════") - result.append("") + output_lines.append("┌───────────────────────────────────┐") + output_lines.append("│ 📰 资讯助手 │") + output_lines.append("└───────────────────────────────────┘") + output_lines.append("") + + if state.action == NewsAction.QUERY_NEWS and state.news_items: + output_lines.append(md.heading("📰 最新资讯", 2)) + output_lines.append("") - for i, item in enumerate(state.news_items, 1): - result.append(f"{i}. {item.title}") - result.append(f" {item.summary}") - if item.keywords: - result.append(f" 🏷️ 关键词:{', '.join(item.keywords)}") - result.append("") - - result.append("═══════════════════════════════════════════") - - state.final_result = "\n".join(result) + for item in state.news_items: + output_lines.append(md.heading(item.title, 3)) + output_lines.append(f"> 来源: {item.source.value}") + output_lines.append(f"> 时间: {item.published_at.strftime('%Y-%m-%d %H:%M')}") + if item.summary: + output_lines.append("") + output_lines.append(item.summary) + if item.url: + output_lines.append(f"🔗 链接: {md.link(item.title, item.url)}") + output_lines.append("") elif state.action == NewsAction.EXTRACT_KEYWORDS and state.extracted_keywords: - result = [] - result.append("═══════════════════════════════════════════") - result.append("🏷️ 提取的关键词") - result.append("═══════════════════════════════════════════") - result.append("") - result.append(" " + ", ".join(state.extracted_keywords)) - result.append("") - result.append("═══════════════════════════════════════════") - - state.final_result = "\n".join(result) + output_lines.append(md.heading("🏷️ 提取的关键词", 2)) + output_lines.append("") + keywords_data = [ + {"关键词": k, "权重": f"{w:.2f}"} + for k, w in state.extracted_keywords.items() + ] + output_lines.append(md.table(keywords_data)) elif state.action == NewsAction.GENERATE_REPORT and state.report_content: - state.final_result = state.report_content + output_lines.append(md.heading("📊 分析报告", 2)) + output_lines.append("") + output_lines.append(state.report_content) + + elif state.action == NewsAction.ANALYZE_URL and state.analysis: + output_lines.append(md.heading("🔍 URL 分析", 2)) + output_lines.append("") + output_lines.append(f"> URL: {state.custom_urls[0]}") + output_lines.append("") + output_lines.append(state.analysis) else: - if not state.final_result: - state.final_result = "资讯操作完成" + output_lines.append(md.heading("✨ 操作完成", 2)) + output_lines.append("您的请求已处理。") + + # 页脚提示 + output_lines.append("") + output_lines.append("---") + output_lines.append("💡 提示:您可以继续查询资讯、提取关键词或者生成报告!") + + state.final_result = "\n".join(output_lines) + state.success = True + state.current_phase = "completed" - state.current_phase = "done" return state - - -def should_continue(state: NewsAnalysisState) -> str: - """ - 条件路由:决定下一步该做什么 - """ - if state.error_message: - return "format_result" - - # 根据action路由 - if state.action == NewsAction.NONE: - return "parse_intent" - elif state.action == NewsAction.QUERY_NEWS: - return "query_news" - elif state.action == NewsAction.ANALYZE_URL: - return "analyze_url" - elif state.action == NewsAction.EXTRACT_KEYWORDS: - return "extract_keywords" - elif state.action == NewsAction.GENERATE_REPORT: - return "generate_report" - else: - return "format_result"