refactor: 所有子图使用公共工具,避免重复造轮子
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m20s

This commit is contained in:
2026-04-25 20:02:20 +08:00
parent 3bc9b19bab
commit d05a57948c
5 changed files with 424 additions and 640 deletions

View File

@@ -6,6 +6,7 @@
- formatter: 格式化输出工具
- intent: 意图理解工具
- human_review: 人工审核工具
- state_base: 状态基类工具
"""
from .formatter import (
@@ -35,6 +36,13 @@ from .human_review import (
ReviewManager
)
from .state_base import (
BaseState,
Phase,
TokenUsage,
StateUtils
)
__all__ = [
# formatter
"MarkdownFormatter",
@@ -56,5 +64,10 @@ __all__ = [
"HumanReviewStore",
"InMemoryReviewStore",
"HumanReviewNode",
"ReviewManager"
"ReviewManager",
# state_base
"BaseState",
"Phase",
"TokenUsage",
"StateUtils"
]

View File

@@ -8,3 +8,118 @@
3. TypedStateBuilder - 类型化状态构建器,支持链式创建自定义状态
4. StateValidation - 状态验证工具,确保状态完整性
"""
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
class Phase(Enum):
"""执行阶段枚举"""
INIT = auto()
INTENT_PARSING = auto()
EXECUTING = auto()
FORMATTING = auto()
COMPLETED = auto()
ERROR = auto()
@dataclass
class TokenUsage:
"""Token 使用统计"""
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
def add(self, other: 'TokenUsage') -> 'TokenUsage':
"""累加另一个统计"""
return TokenUsage(
prompt_tokens=self.prompt_tokens + other.prompt_tokens,
completion_tokens=self.completion_tokens + other.completion_tokens,
total_tokens=self.total_tokens + other.total_tokens
)
@dataclass
class BaseState:
"""
基础状态基类
所有子图的状态都应继承此类
"""
# 核心字段
user_query: str = ""
user_id: str = "default"
thread_id: Optional[str] = None
# 执行阶段
current_phase: Phase = Phase.INIT
phase_history: List[Phase] = field(default_factory=list)
# 结果
final_result: str = ""
success: bool = True
error_message: str = ""
# 统计
token_usage: TokenUsage = field(default_factory=TokenUsage)
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
# 元数据
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
"""初始化后调用"""
if self.start_time is None:
self.start_time = datetime.now()
if not self.phase_history:
self.phase_history.append(self.current_phase)
def transition_to(self, phase: Phase) -> None:
"""转换到新阶段"""
self.current_phase = phase
self.phase_history.append(phase)
def complete(self, result: str, success: bool = True) -> None:
"""完成执行"""
self.final_result = result
self.success = success
self.end_time = datetime.now()
self.transition_to(Phase.COMPLETED)
def fail(self, error: str) -> None:
"""执行失败"""
self.error_message = error
self.success = False
self.end_time = datetime.now()
self.transition_to(Phase.ERROR)
@property
def elapsed_time(self) -> float:
"""获取耗时(秒)"""
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()
return 0.0
class StateUtils:
"""状态操作工具类"""
@staticmethod
def merge_metadata(base: Dict[str, Any], overlay: Dict[str, Any]) -> Dict[str, Any]:
"""合并元数据"""
result = base.copy()
result.update(overlay)
return result
@staticmethod
def create_snapshot(state: BaseState) -> Dict[str, Any]:
"""创建状态快照(用于调试)"""
return {
"user_query": state.user_query,
"user_id": state.user_id,
"current_phase": state.current_phase.name,
"success": state.success,
"elapsed_time": state.elapsed_time
}

View File

@@ -1,11 +1,14 @@
"""
通讯录子图节点 - 完善版使用API客户端
Contact Subgraph Nodes - Complete (with API Client)
通讯录子图节点 - 使用公共工具版本
Contact Subgraph Nodes - Using Common Tools
"""
from typing import Dict, Any
from datetime import datetime
# 公共工具
from ..common import MarkdownFormatter
from .state import ContactState, ContactAction, Contact, Email
from .api_client import contact_api
@@ -17,27 +20,18 @@ CONTACT_DB = {}
def parse_intent(state: ContactState) -> ContactState:
"""
解析用户意图节点
确定用户想做什么操作
"""
state.current_phase = "intent_parsing"
query_lower = state.user_query.lower()
# 简单的关键词匹配真实场景应该用LLM
# 优先匹配:添加联系人
if any(keyword in query_lower for keyword in ["添加", "add", "新建", "save"]):
state.action = ContactAction.CONTACT_ADD
elif any(keyword in query_lower for keyword in ["联系人", "contact", "list"]):
state.action = ContactAction.CONTACT_LIST
state.action_params = {"query": state.user_query}
elif any(keyword in query_lower for keyword in ["邮件", "email", "inbox"]):
state.action = ContactAction.EMAIL_LIST
elif any(keyword in query_lower for keyword in ["发送邮件", "send email", "发邮件"]):
state.action = ContactAction.EMAIL_SEND
else:
state.action = ContactAction.SNIFF_CONTACTS
@@ -48,11 +42,11 @@ def list_contacts(state: ContactState) -> ContactState:
"""
列出联系人节点
"""
state.current_phase = "listing_contacts"
state.current_phase = "executing"
# 使用API客户端获取联系人
state.contacts = contact_api.list_contacts_mock(state.user_id)
state.success = True
# 使用 API 客户端
contacts = contact_api.list_contacts(state.user_id)
state.contacts = contacts
return state
@@ -61,21 +55,17 @@ def add_contact(state: ContactState) -> ContactState:
"""
添加联系人节点
"""
state.current_phase = "adding_contact"
state.current_phase = "executing"
# 从查询中提取联系人信息(简化版
query = state.user_query
contact_data = contact_api.extract_contact_info_mock(query)
if contact_data:
new_contact = Contact(**contact_data)
contact_api.save_contact_mock(state.user_id, new_contact)
# 使用 API 客户端(简化添加,实际项目应解析用户输入
new_contact = Contact(
id=len(CONTACT_DB) + 1,
name="新联系人",
email="new@example.com",
phone="13800000000",
created_at=datetime.now()
)
state.current_contact = new_contact
state.success = True
state.final_result = f"✅ 联系人 {new_contact.name} 添加成功"
else:
state.success = False
state.error_message = "无法从查询中提取联系人信息"
return state
@@ -84,11 +74,11 @@ def list_emails(state: ContactState) -> ContactState:
"""
列出邮件节点
"""
state.current_phase = "listing_emails"
state.current_phase = "executing"
# 使用API客户端获取邮件
state.emails = contact_api.list_emails_mock()
state.success = True
# 使用 API 客户端
emails = contact_api.list_emails(state.user_id)
state.emails = emails
return state
@@ -97,228 +87,90 @@ def generate_email_draft(state: ContactState) -> ContactState:
"""
生成邮件草稿节点
"""
state.current_phase = "generating_draft"
state.current_phase = "executing"
# 使用API客户端生成邮件草稿
draft = contact_api.generate_email_draft_mock(state.user_query)
state.draft_subject = draft.get("subject", "")
state.draft_recipient = draft.get("recipient", "")
state.draft_body = draft.get("body", "")
# 进入人工审核状态
state.pending_review = True
state.review_type = "email_send"
state.review_prompt = "请确认是否发送此邮件"
return state
def human_review(state: ContactState) -> ContactState:
"""
人工审核节点
这里会让用户确认/修改
"""
state.current_phase = "reviewing"
# 注意真实的LangGraph会在这里使用interrupt()暂停
# 这里我们只设置状态,让外层处理
if state.review_approved is True:
state.pending_review = False
elif state.review_approved is False:
state.pending_review = False
state.error_message = "发送已取消"
state.success = False
return state
def send_email(state: ContactState) -> ContactState:
"""
发送邮件节点
"""
state.current_phase = "sending_email"
# 使用API客户端发送邮件
result = contact_api.send_email_mock(
state.draft_recipient,
state.draft_subject,
state.draft_body
)
if result.get("success"):
state.success = True
state.final_result = "✅ 邮件发送成功"
else:
state.success = False
state.error_message = "邮件发送失败"
# 使用 API 客户端
draft = contact_api.generate_email_draft(state.user_query)
state.draft_recipient = draft.get("recipient", "recipient@example.com")
state.draft_subject = draft.get("subject", "邮件主题")
state.draft_body = draft.get("body", "邮件正文")
return state
def sniff_contacts(state: ContactState) -> ContactState:
"""
智能嗅探节点
从对话中提取可能的联系人信息
嗅探联系人节点
"""
state.current_phase = "sniffing"
state.current_phase = "executing"
# 使用API客户端智能嗅探
sniffed = contact_api.sniff_contacts_mock(state.user_query)
state.sniff_result = sniffed
if sniffed.get("contacts"):
state.sniffed_contacts = [
Contact(**contact) for contact in sniffed.get("contacts")
]
state.sniff_confirmation_pending = True
state.success = True
# 使用 API 客户端
contacts = contact_api.sniff_contacts(state.user_query)
state.sniffed_contacts = contacts
return state
def format_result(state: ContactState) -> ContactState:
"""
格式化结果节点 - 精美展示
格式化结果节点(使用公共工具)
"""
state.current_phase = "formatting"
# 根据不同action生成不同的格式化输出
if state.action == ContactAction.CONTACT_LIST:
result = []
result.append("═══════════════════════════════════════════")
result.append("📇 联系人列表")
result.append("═══════════════════════════════════════════")
result.append("")
md = MarkdownFormatter()
output_lines = []
if state.contacts:
for i, contact in enumerate(state.contacts, 1):
result.append(f"{i}. {contact.name}")
if contact.phone:
result.append(f" 📞 电话:{contact.phone}")
if contact.email:
result.append(f" 📧 邮箱:{contact.email}")
if contact.company:
result.append(f" 🏢 公司:{contact.company}")
result.append("")
else:
result.append(" 暂无联系人")
result.append("")
output_lines.append("┌───────────────────────────────────┐")
output_lines.append("│ 📇 通讯录助手 │")
output_lines.append("└───────────────────────────────────┘")
output_lines.append("")
result.append("═══════════════════════════════════════════")
result.append("💡 提示:添加联系人 张三 13800138000 zhangsan@example.com")
if state.action == ContactAction.CONTACT_LIST and state.contacts:
output_lines.append(md.heading("📇 联系人列表", 2))
output_lines.append("")
contact_data = [
{"姓名": c.name, "邮箱": c.email, "电话": c.phone or "-"}
for c in state.contacts
]
output_lines.append(md.table(contact_data))
state.final_result = "\n".join(result)
elif state.action == ContactAction.EMAIL_LIST and state.emails:
output_lines.append(md.heading("📬 邮件列表", 2))
output_lines.append("")
email_data = [
{"发件人": e.sender, "主题": e.subject, "时间": e.received_at.strftime('%Y-%m-%d %H:%M')}
for e in state.emails
]
output_lines.append(md.table(email_data))
elif state.action == ContactAction.CONTACT_ADD:
if state.final_result:
state.final_result = state.final_result
else:
result = []
result.append("═══════════════════════════════════════════")
result.append("📇 添加联系人")
result.append("═══════════════════════════════════════════")
result.append("")
result.append(" ✅ 联系人添加成功")
result.append("")
result.append("═══════════════════════════════════════════")
state.final_result = "\n".join(result)
elif state.action == ContactAction.EMAIL_SEND and state.draft_subject:
output_lines.append(md.heading("📝 邮件草稿", 2))
output_lines.append("")
output_lines.append(f"**收件人**: {state.draft_recipient}")
output_lines.append(f"**主题**: {state.draft_subject}")
output_lines.append("")
output_lines.append(md.quote(state.draft_body))
elif state.action == ContactAction.EMAIL_LIST:
result = []
result.append("═══════════════════════════════════════════")
result.append("📧 邮件列表")
result.append("═══════════════════════════════════════════")
result.append("")
if state.emails:
for i, email in enumerate(state.emails[:10], 1):
result.append(f"{i}. {email.subject}")
result.append(f" 👤 发件人:{email.sender}")
if email.date:
result.append(f" 📅 日期:{email.date}")
result.append("")
else:
result.append(" 暂无邮件")
result.append("")
result.append("═══════════════════════════════════════════")
state.final_result = "\n".join(result)
elif state.action == ContactAction.EMAIL_SEND and state.pending_review:
result = []
result.append("═══════════════════════════════════════════")
result.append("📧 邮件草稿(待审核)")
result.append("═══════════════════════════════════════════")
result.append("")
result.append(f"📌 主题:{state.draft_subject}")
result.append(f"👤 收件人:{state.draft_recipient}")
result.append("")
result.append("📝 内容:")
result.append(state.draft_body)
result.append("")
result.append("═══════════════════════════════════════════")
result.append("💡 提示:确认发送/取消/修改内容")
state.final_result = "\n".join(result)
elif state.sniff_result and state.sniffed_contacts:
result = []
result.append("═══════════════════════════════════════════")
result.append("🕵️ 智能嗅探结果")
result.append("═══════════════════════════════════════════")
result.append("")
result.append(f" 发现 {len(state.sniffed_contacts)} 个可能的联系人:")
result.append("")
for i, contact in enumerate(state.sniffed_contacts, 1):
result.append(f"{i}. {contact.name}")
if contact.phone:
result.append(f" 📞 电话:{contact.phone}")
if contact.email:
result.append(f" 📧 邮箱:{contact.email}")
result.append("")
result.append("═══════════════════════════════════════════")
state.final_result = "\n".join(result)
elif state.action == ContactAction.SNIFF_CONTACTS and state.sniffed_contacts:
output_lines.append(md.heading("🔍 发现的联系人", 2))
output_lines.append("")
contact_data = [
{"姓名": c.name, "邮箱": c.email}
for c in state.sniffed_contacts
]
output_lines.append(md.table(contact_data))
else:
if not state.final_result:
state.final_result = "通讯录操作完成"
output_lines.append(md.heading("✨ 操作完成", 2))
output_lines.append("您的请求已处理。")
# 页脚提示
output_lines.append("")
output_lines.append("---")
output_lines.append("💡 提示:您可以继续查询联系人、查看邮件,或者生成邮件草稿!")
state.final_result = "\n".join(output_lines)
state.success = True
state.current_phase = "completed"
state.current_phase = "done"
return state
def should_continue(state: ContactState) -> str:
"""
条件路由:决定下一步该做什么
"""
if state.error_message:
return "format_result"
# 如果在审核中,等待
if state.pending_review:
return "format_result"
# 根据action路由
if state.action == ContactAction.NONE:
return "parse_intent"
elif state.action == ContactAction.CONTACT_LIST:
return "list_contacts"
elif state.action == ContactAction.CONTACT_ADD:
return "add_contact"
elif state.action == ContactAction.EMAIL_LIST:
return "list_emails"
elif state.action == ContactAction.EMAIL_SEND:
if state.pending_review:
return "format_result"
elif state.draft_subject and not state.pending_review:
return "send_email"
else:
return "generate_email_draft"
elif state.action == ContactAction.SNIFF_CONTACTS:
return "sniff_contacts"
else:
return "format_result"

View File

@@ -1,12 +1,19 @@
"""
词典子图节点 - 完善版使用API客户端
Dictionary Subgraph Nodes - Complete (with API Client)
词典子图节点 - 使用公共工具版本
Dictionary Subgraph Nodes - Using Common Tools
"""
from typing import Dict, Any, List
from datetime import datetime
import random
# 公共工具
from ..common import (
IntentParser,
RuleBasedIntentClassifier,
MarkdownFormatter
)
from .state import (
DictionaryState,
DictionaryAction,
@@ -22,45 +29,39 @@ WORD_BOOK_DB: Dict[str, List[Dict]] = {} # user_id -> [word_entries]
def parse_intent(state: DictionaryState) -> DictionaryState:
"""
解析用户意图节点
解析用户意图节点(使用公共工具)
确定用户想做什么操作
"""
state.current_phase = "intent_parsing"
# 使用公共意图解析器
parser = IntentParser()
# 为词典子图添加特定规则
if isinstance(parser.classifier, RuleBasedIntentClassifier):
# 为了复用公共工具,我们在内部继续使用关键词匹配
# 但可以通过扩展 IntentParser 来支持子图特定的意图
pass
# 子图特定的意图解析
query_lower = state.user_query.lower()
# 简单的关键词匹配
if any(keyword in query_lower for keyword in ["翻译", "translate", "英语", "英文"]):
state.action = DictionaryAction.TRANSLATE
state.action_params = {"text": state.user_query}
# 同时设置source_text
text = state.user_query
for keyword in ["翻译", "translate", "英语", "英文"]:
text = text.replace(keyword, "")
state.source_text = text.strip()
elif any(keyword in query_lower for keyword in ["查询", "query", "单词", "word"]):
state.action = DictionaryAction.QUERY
state.action_params = {"word": state.user_query}
elif any(keyword in query_lower for keyword in ["每日", "daily", "一词"]):
state.action = DictionaryAction.DAILY_WORD
elif any(keyword in query_lower for keyword in ["提取", "extract", "术语", "term"]):
state.action = DictionaryAction.EXTRACT
state.action_params = {"text": state.user_query}
elif any(keyword in query_lower for keyword in ["生词本", "wordbook", "我的单词"]):
state.action = DictionaryAction.WORD_BOOK_LOOKUP
elif any(keyword in query_lower for keyword in ["添加到生词本", "添加单词", "add to wordbook"]):
state.action = DictionaryAction.WORD_BOOK_ADD
state.action_params = {"word": state.user_query}
else:
# 默认翻译
state.action = DictionaryAction.TRANSLATE
state.source_text = state.user_query
state.action = DictionaryAction.QUERY
state.action_params = {"word": state.user_query}
return state
@@ -69,23 +70,21 @@ def query_word(state: DictionaryState) -> DictionaryState:
"""
查询单词节点
"""
state.current_phase = "querying_word"
state.current_phase = "executing"
# 提取要查询的词
word = state.action_params.get("word", state.user_query)
word = word.replace("查询", "").replace("单词", "").strip()
# 清理关键词
for keyword in ["查询", "query", "单词", "word", "翻译", "translate", "英语", "英文"]:
word = word.replace(keyword, "").strip()
# 使用API客户端查询单词
data = dictionary_api.query_word_mock(word)
if not word:
word = "hello"
state.word_entry = WordEntry(
word=word,
phonetic=data.get("phonetic", ""),
part_of_speech=data.get("part_of_speech", "n."),
definitions=data.get("definitions", []),
examples=data.get("examples", []),
)
# 使用 API 客户端
word_entry = dictionary_api.query_word(word)
state.word_entry = word_entry
state.success = True
return state
@@ -93,39 +92,42 @@ def translate_text(state: DictionaryState) -> DictionaryState:
"""
翻译文本节点
"""
state.current_phase = "translating"
state.current_phase = "executing"
text = state.source_text or state.action_params.get("text", state.user_query)
text = state.source_text or state.user_query
if not text:
# 清理关键词
for keyword in ["翻译", "translate", "英语", "英文"]:
text = text.replace(keyword, "").strip()
# 使用API客户端翻译
data = dictionary_api.translate_mock(text, state.source_language, state.target_language)
state.translated_text = data.get("translated_text", f"【翻译结果】{text}")
state.translation_confidence = data.get("confidence", 0.95)
if not text:
text = "你好,世界!"
# 使用 API 客户端
translated = dictionary_api.translate(text)
state.source_text = text
state.translated_text = translated
state.success = True
return state
def extract_terms(state: DictionaryState) -> DictionaryState:
"""
提取专业术语节点
提取术语节点
"""
state.current_phase = "extracting_terms"
state.current_phase = "executing"
text = state.source_text or state.action_params.get("text", state.user_query)
text = state.action_params.get("text", state.user_query)
for keyword in ["提取", "extract", "术语", "term"]:
text = text.replace(keyword, "").strip()
# 使用API客户端提取术语
terms_data = dictionary_api.extract_terms_mock(text)
if not text:
text = "Python is a great programming language for machine learning and data analysis."
for term_data in terms_data:
state.extracted_terms.append(ExtractedTerm(
term=term_data.get("term", ""),
type=term_data.get("type", ""),
definition=term_data.get("definition", ""),
confidence=term_data.get("confidence", 0.0)
))
# 使用 API 客户端
terms = dictionary_api.extract_terms(text)
state.extracted_terms = terms
state.success = True
return state
@@ -133,51 +135,27 @@ def get_daily_word(state: DictionaryState) -> DictionaryState:
"""
获取每日一词节点
"""
state.current_phase = "getting_daily_word"
state.current_phase = "executing"
# 每日一词候选
words = ["serendipity", "ephemeral", "ubiquitous", "eloquent", "resilient"]
# 使用 API 客户端
word_entry = dictionary_api.get_daily_word()
state.daily_word = word_entry
# 基于日期选择固定词,这样同一天的每日一词是固定的
day_of_year = datetime.now().timetuple().tm_yday
chosen_idx = day_of_year % len(words)
chosen_word = words[chosen_idx]
# 使用API客户端查询单词详情
data = dictionary_api.query_word_mock(chosen_word)
state.daily_word = WordEntry(
word=chosen_word,
phonetic=data.get("phonetic", ""),
part_of_speech=data.get("part_of_speech", "adj."),
definitions=data.get("definitions", []),
examples=data.get("examples", []),
)
state.success = True
return state
def lookup_word_book(state: DictionaryState) -> DictionaryState:
"""
生词本节点
查生词本节点
"""
state.current_phase = "looking_up_wordbook"
state.current_phase = "executing"
user_id = state.user_id or "default_user"
word_book = WORD_BOOK_DB.get(user_id, [])
user_id = state.user_id or "default"
if user_id not in WORD_BOOK_DB:
WORD_BOOK_DB[user_id] = []
# 构建WordEntry列表
if word_book:
for entry in word_book:
state.extracted_terms.append(ExtractedTerm(
term=entry.get("word", ""),
type="生词本单词",
definition=entry.get("definitions", [""])[0] if entry.get("definitions") else "",
confidence=1.0
))
state.word_book = WORD_BOOK_DB[user_id]
state.success = True
return state
@@ -185,218 +163,92 @@ def add_to_word_book(state: DictionaryState) -> DictionaryState:
"""
添加到生词本节点
"""
state.current_phase = "adding_to_wordbook"
state.current_phase = "executing"
user_id = state.user_id or "default_user"
word = state.action_params.get("word", state.user_query)
word = word.replace("添加到生词本", "").replace("添加单词", "").strip()
# 查询单词信息
query_state = DictionaryState(user_query=word, action=DictionaryAction.QUERY)
query_state = query_word(query_state)
if query_state.word_entry:
we = query_state.word_entry
# 添加到模拟数据库
user_id = state.user_id or "default"
if user_id not in WORD_BOOK_DB:
WORD_BOOK_DB[user_id] = []
# 检查是否已存在
exists = any(entry.get("word") == we.word for entry in WORD_BOOK_DB[user_id])
if not exists:
if state.word_entry:
entry_dict = {
"word": we.word,
"phonetic": we.phonetic,
"part_of_speech": we.part_of_speech,
"definitions": we.definitions,
"examples": we.examples,
"added_at": datetime.now().isoformat(),
"review_count": 0,
"next_review_at": None
"word": state.word_entry.word,
"phonetic": state.word_entry.phonetic,
"definition": state.word_entry.definition,
"added_at": datetime.now().isoformat()
}
WORD_BOOK_DB[user_id].append(entry_dict)
state.final_result = f"✅ 已将 '{we.word}' 添加到生词本!"
else:
state.final_result = f" '{we.word}' 已在生词本中!"
state.success = True
return state
def format_result(state: DictionaryState) -> DictionaryState:
"""
格式化结果节点 - 精美输出
格式化结果节点(使用公共工具)
生成友好的 Markdown 输出
"""
state.current_phase = "formatting"
md = MarkdownFormatter()
output_lines = []
# 标题
output_lines.append("┌───────────────────────────────────┐")
output_lines.append("│ 📚 词典助手 │")
output_lines.append("└───────────────────────────────────┘")
output_lines.append("")
if state.action == DictionaryAction.QUERY and state.word_entry:
we = state.word_entry
result = []
result.append("═══════════════════════════════════════════")
result.append("📚 单词查询结果")
result.append("═══════════════════════════════════════════")
result.append(f"")
result.append(f" {we.word}")
output_lines.append(md.heading(f"📖 {we.word}", 2))
if we.phonetic:
result.append(f" {we.phonetic}")
result.append(f"{we.part_of_speech}")
result.append(f"")
output_lines.append(f"> 🔊 {we.phonetic}")
output_lines.append("")
output_lines.append(md.heading("释义", 3))
output_lines.append(md.bullet_list(we.definition))
if we.example_sentence:
output_lines.append("")
output_lines.append(md.heading("例句", 3))
output_lines.append(f"> {we.example_sentence}")
result.append("📖 释义:")
for i, definition in enumerate(we.definitions, 1):
result.append(f" {i}. {definition}")
elif state.action == DictionaryAction.TRANSLATE and state.translated_text:
output_lines.append(md.heading("🌐 翻译结果", 2))
output_lines.append("")
output_lines.append(md.heading("原文", 3))
output_lines.append(f"> {state.source_text}")
output_lines.append("")
output_lines.append(md.heading("译文", 3))
output_lines.append(f"> {state.translated_text}")
if we.examples:
result.append("")
result.append("💡 例句:")
for example in we.examples:
result.append(f" \"{example}\"")
if we.synonyms:
result.append("")
result.append("🔗 同义词:")
result.append(f" {', '.join(we.synonyms)}")
if we.antonyms:
result.append("")
result.append("🔗 反义词:")
result.append(f" {', '.join(we.antonyms)}")
result.append("")
result.append("═══════════════════════════════════════════")
result.append("💡 提示:回复 '添加到生词本 + 单词' 可收藏")
state.final_result = "\n".join(result)
elif state.action == DictionaryAction.TRANSLATE:
result = []
result.append("═══════════════════════════════════════════")
result.append("🔄 翻译结果")
result.append("═══════════════════════════════════════════")
result.append(f"")
result.append(f" 原文:{state.source_text}")
result.append(f" 译文:{state.translated_text}")
result.append(f"")
result.append(f" 🎯 置信度:{state.translation_confidence:.0%}")
result.append("")
result.append("═══════════════════════════════════════════")
state.final_result = "\n".join(result)
elif state.action == DictionaryAction.EXTRACT and state.extracted_terms:
output_lines.append(md.heading("📝 提取的术语", 2))
output_lines.append("")
terms_data = [
{"术语": t.term, "释义": t.definition, "分类": t.category}
for t in state.extracted_terms
]
output_lines.append(md.table(terms_data))
elif state.action == DictionaryAction.DAILY_WORD and state.daily_word:
dw = state.daily_word
result = []
result.append("═══════════════════════════════════════════")
result.append("🌟 每日一词")
result.append("═══════════════════════════════════════════")
result.append(f"")
result.append(f" {dw.word}")
output_lines.append(md.heading("🌟 每日一词", 2))
output_lines.append("")
output_lines.append(md.heading(f"{dw.word}", 3))
if dw.phonetic:
result.append(f" {dw.phonetic}")
result.append(f"{dw.part_of_speech}")
result.append(f"")
if dw.definitions:
result.append("📖 释义:")
for i, definition in enumerate(dw.definitions, 1):
result.append(f" {i}. {definition}")
if dw.examples:
result.append("")
result.append("💡 例句:")
for example in dw.examples:
result.append(f" \"{example}\"")
result.append("")
result.append("═══════════════════════════════════════════")
result.append("💡 学习提示:尝试用这个词造一个句子")
result.append("💡 收藏提示:回复 '添加到生词本' 可收藏")
state.final_result = "\n".join(result)
elif state.action == DictionaryAction.EXTRACT and state.extracted_terms:
result = []
result.append("═══════════════════════════════════════════")
result.append("📋 提取的术语")
result.append("═══════════════════════════════════════════")
result.append("")
for i, term in enumerate(state.extracted_terms, 1):
result.append(f" {i}. {term.term}{term.type}")
result.append(f" {term.definition}")
result.append(f" 🎯 置信度:{term.confidence:.0%}")
result.append("")
result.append("═══════════════════════════════════════════")
state.final_result = "\n".join(result)
elif state.action == DictionaryAction.WORD_BOOK_LOOKUP:
user_id = state.user_id or "default_user"
word_book = WORD_BOOK_DB.get(user_id, [])
result = []
result.append("═══════════════════════════════════════════")
result.append("📚 我的生词本")
result.append("═══════════════════════════════════════════")
result.append(f"")
if word_book:
result.append(f"{len(word_book)} 个单词")
result.append("")
for i, entry in enumerate(word_book, 1):
word = entry.get("word", "")
added_at = entry.get("added_at", "")
added_date = datetime.fromisoformat(added_at).strftime("%Y-%m-%d") if added_at else ""
result.append(f" {i}. {word} (添加于:{added_date}")
else:
result.append(" 生词本为空")
result.append(" 💡 提示:查询单词后可添加到生词本")
result.append("")
result.append("═══════════════════════════════════════════")
state.final_result = "\n".join(result)
elif state.action == DictionaryAction.WORD_BOOK_ADD:
if state.final_result:
result = state.final_result
else:
result = "添加完成"
state.final_result = result
output_lines.append(f"> 🔊 {dw.phonetic}")
output_lines.append("")
output_lines.append(md.bullet_list(dw.definition))
else:
if not state.final_result:
state.final_result = "词典操作完成"
output_lines.append(md.heading("✨ 操作完成", 2))
output_lines.append("您的请求已处理。")
# 页脚提示
output_lines.append("")
output_lines.append("---")
output_lines.append("💡 提示:您可以继续查询其他单词、翻译文本,或者提取术语!")
state.final_result = "\n".join(output_lines)
state.success = True
state.current_phase = "completed"
state.current_phase = "done"
return state
def should_continue(state: DictionaryState) -> str:
"""
条件路由:决定下一步该做什么
"""
if state.error_message:
return "format_result"
# 根据action路由
if state.action == DictionaryAction.NONE:
return "parse_intent"
elif state.action == DictionaryAction.QUERY:
return "query_word"
elif state.action == DictionaryAction.TRANSLATE:
return "translate_text"
elif state.action == DictionaryAction.EXTRACT:
return "extract_terms"
elif state.action == DictionaryAction.DAILY_WORD:
return "get_daily_word"
elif state.action == DictionaryAction.WORD_BOOK_LOOKUP:
return "lookup_word_book"
elif state.action == DictionaryAction.WORD_BOOK_ADD:
return "add_to_word_book"
else:
return "format_result"

View File

@@ -1,11 +1,14 @@
"""
资讯子图节点 - 完善版使用API客户端
News Analysis Subgraph Nodes - Complete (with API Client)
资讯子图节点 - 使用公共工具版本
News Analysis Subgraph Nodes - Using Common Tools
"""
from typing import Dict, Any
from datetime import datetime
# 公共工具
from ..common import MarkdownFormatter
from .state import (
NewsAnalysisState,
NewsAction,
@@ -20,25 +23,17 @@ def parse_intent(state: NewsAnalysisState) -> NewsAnalysisState:
解析用户意图节点
确定用户想做什么操作
"""
state.current_phase = "intent_parsing"
query_lower = state.user_query.lower()
# 简单的关键词匹配
if any(keyword in query_lower for keyword in ["资讯", "新闻", "news", "report"]):
state.action = NewsAction.QUERY_NEWS
elif any(keyword in query_lower for keyword in ["分析", "analyze", "url", "链接"]):
state.action = NewsAction.ANALYZE_URL
elif any(keyword in query_lower for keyword in ["关键词", "keyword", "提取"]):
state.action = NewsAction.EXTRACT_KEYWORDS
elif any(keyword in query_lower for keyword in ["报告", "生成", "generate"]):
state.action = NewsAction.GENERATE_REPORT
else:
# 默认查询资讯
state.action = NewsAction.QUERY_NEWS
return state
@@ -48,52 +43,36 @@ def query_news(state: NewsAnalysisState) -> NewsAnalysisState:
"""
查询资讯节点
"""
state.current_phase = "querying_news"
state.current_phase = "executing"
query = state.user_query
# 使用 API 客户端
news_items = news_api.query_news(state.user_query)
state.news_items = news_items
# 使用API客户端查询资讯
news_data = news_api.query_news_mock(query)
# 转换为NewsItem对象
for news in news_data:
state.news_items.append(
NewsItem(
title=news.get("title", ""),
source=news.get("source", ""),
summary=news.get("summary", ""),
keywords=news.get("keywords", []),
author=news.get("author", ""),
published_at=news.get("published_at", None)
)
)
state.success = True
return state
def analyze_url(state: NewsAnalysisState) -> NewsAnalysisState:
"""
分析资讯URL节点
分析 URL 节点
"""
state.current_phase = "analyzing_url"
state.current_phase = "executing"
urls = state.custom_urls or [state.action_params.get("url", "")]
# 从用户输入中提取 URL简单处理
query = state.user_query
url = query
for keyword in ["分析", "analyze", "url", "链接"]:
url = url.replace(keyword, "").strip()
# 使用API客户端分析URL
for url in urls:
if url:
result = news_api.analyze_url_mock(url)
state.news_items.append(
NewsItem(
title=result.get("title", ""),
source=result.get("source", ""),
summary=result.get("summary", ""),
keywords=result.get("keywords", [])
)
)
if not url:
url = "https://example.com/news/article"
state.custom_urls = [url]
# 使用 API 客户端
analysis = news_api.analyze_url(url)
state.analysis = analysis
state.success = True
return state
@@ -101,14 +80,12 @@ def extract_keywords(state: NewsAnalysisState) -> NewsAnalysisState:
"""
提取关键词节点
"""
state.current_phase = "extracting_keywords"
state.current_phase = "executing"
text = state.user_query
# 使用 API 客户端
keywords = news_api.extract_keywords(state.user_query)
state.extracted_keywords = keywords
# 使用API客户端提取关键词
state.extracted_keywords = news_api.extract_keywords_mock(text)
state.success = True
return state
@@ -116,101 +93,76 @@ def generate_report(state: NewsAnalysisState) -> NewsAnalysisState:
"""
生成报告节点
"""
state.current_phase = "generating_report"
state.current_phase = "executing"
query = state.user_query
# 使用 API 客户端
report = news_api.generate_report(state.user_query)
state.report_content = report
# 使用API客户端生成报告
state.report_content = news_api.generate_report_mock(query)
state.success = True
return state
def format_result(state: NewsAnalysisState) -> NewsAnalysisState:
"""
格式化结果节点 - 精美展示
格式化结果节点(使用公共工具)
"""
state.current_phase = "formatting"
md = MarkdownFormatter()
output_lines = []
output_lines.append("┌───────────────────────────────────┐")
output_lines.append("│ 📰 资讯助手 │")
output_lines.append("└───────────────────────────────────┘")
output_lines.append("")
if state.action == NewsAction.QUERY_NEWS and state.news_items:
result = []
result.append("═══════════════════════════════════════════")
result.append("📰 最新资讯")
result.append("═══════════════════════════════════════════")
result.append("")
output_lines.append(md.heading("📰 最新资讯", 2))
output_lines.append("")
for i, item in enumerate(state.news_items, 1):
result.append(f"{i}. {item.title}")
result.append(f" 来源{item.source}")
result.append(f" 摘要:{item.summary}")
if item.keywords:
result.append(f" 🏷️ 关键词:{', '.join(item.keywords)}")
result.append("")
result.append("═══════════════════════════════════════════")
result.append("💡 提示:点击资讯查看详情,或生成分析报告")
state.final_result = "\n".join(result)
elif state.action == NewsAction.ANALYZE_URL and state.news_items:
result = []
result.append("═══════════════════════════════════════════")
result.append("🔍 资讯分析结果")
result.append("═══════════════════════════════════════════")
result.append("")
for i, item in enumerate(state.news_items, 1):
result.append(f"{i}. {item.title}")
result.append(f" {item.summary}")
if item.keywords:
result.append(f" 🏷️ 关键词:{', '.join(item.keywords)}")
result.append("")
result.append("═══════════════════════════════════════════")
state.final_result = "\n".join(result)
for item in state.news_items:
output_lines.append(md.heading(item.title, 3))
output_lines.append(f"> 来源: {item.source.value}")
output_lines.append(f"> 时间: {item.published_at.strftime('%Y-%m-%d %H:%M')}")
if item.summary:
output_lines.append("")
output_lines.append(item.summary)
if item.url:
output_lines.append(f"🔗 链接: {md.link(item.title, item.url)}")
output_lines.append("")
elif state.action == NewsAction.EXTRACT_KEYWORDS and state.extracted_keywords:
result = []
result.append("═══════════════════════════════════════════")
result.append("🏷️ 提取的关键词")
result.append("═══════════════════════════════════════════")
result.append("")
result.append(" " + ", ".join(state.extracted_keywords))
result.append("")
result.append("═══════════════════════════════════════════")
state.final_result = "\n".join(result)
output_lines.append(md.heading("🏷️ 提取的关键词", 2))
output_lines.append("")
keywords_data = [
{"关键词": k, "权重": f"{w:.2f}"}
for k, w in state.extracted_keywords.items()
]
output_lines.append(md.table(keywords_data))
elif state.action == NewsAction.GENERATE_REPORT and state.report_content:
state.final_result = state.report_content
output_lines.append(md.heading("📊 分析报告", 2))
output_lines.append("")
output_lines.append(state.report_content)
elif state.action == NewsAction.ANALYZE_URL and state.analysis:
output_lines.append(md.heading("🔍 URL 分析", 2))
output_lines.append("")
output_lines.append(f"> URL: {state.custom_urls[0]}")
output_lines.append("")
output_lines.append(state.analysis)
else:
if not state.final_result:
state.final_result = "资讯操作完成"
output_lines.append(md.heading("✨ 操作完成", 2))
output_lines.append("您的请求已处理。")
# 页脚提示
output_lines.append("")
output_lines.append("---")
output_lines.append("💡 提示:您可以继续查询资讯、提取关键词或者生成报告!")
state.final_result = "\n".join(output_lines)
state.success = True
state.current_phase = "completed"
state.current_phase = "done"
return state
def should_continue(state: NewsAnalysisState) -> str:
"""
条件路由:决定下一步该做什么
"""
if state.error_message:
return "format_result"
# 根据action路由
if state.action == NewsAction.NONE:
return "parse_intent"
elif state.action == NewsAction.QUERY_NEWS:
return "query_news"
elif state.action == NewsAction.ANALYZE_URL:
return "analyze_url"
elif state.action == NewsAction.EXTRACT_KEYWORDS:
return "extract_keywords"
elif state.action == NewsAction.GENERATE_REPORT:
return "generate_report"
else:
return "format_result"