Files
ailine/backend/app/subgraphs/dictionary/nodes.py
root 4c119073bc
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 6m6s
优化输出
2026-05-09 01:51:18 +08:00

264 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
词典子图节点 - 使用公共工具版本
Dictionary Subgraph Nodes - Using Common Tools
"""
from typing import Dict, Any, List
from datetime import datetime
import random
from backend.app.core import get_formatter
from .state import (
DictionaryState,
DictionaryAction,
WordEntry,
ExtractedTerm
)
from .api_client import dictionary_api
# ========== 模拟生词本存储(后续可替换为数据库) ==========
WORD_BOOK_DB: Dict[str, List[Dict]] = {} # user_id -> [word_entries]
def parse_intent(state: DictionaryState) -> DictionaryState:
"""
解析用户意图节点(使用规则匹配)
确定用户想做什么操作
"""
# 子图特定的意图解析
query_lower = state.user_query.lower()
if any(keyword in query_lower for keyword in ["翻译", "translate", "英语", "英文"]):
state.action = DictionaryAction.TRANSLATE
state.action_params = {"text": state.user_query}
text = state.user_query
for keyword in ["翻译", "translate", "英语", "英文"]:
text = text.replace(keyword, "")
state.source_text = text.strip()
elif any(keyword in query_lower for keyword in ["查询", "query", "单词", "word"]):
state.action = DictionaryAction.QUERY
state.action_params = {"word": state.user_query}
elif any(keyword in query_lower for keyword in ["每日", "daily", "一词"]):
state.action = DictionaryAction.DAILY_WORD
elif any(keyword in query_lower for keyword in ["提取", "extract", "术语", "term"]):
state.action = DictionaryAction.EXTRACT
state.action_params = {"text": state.user_query}
else:
state.action = DictionaryAction.QUERY
state.action_params = {"word": state.user_query}
return state
def query_word(state: DictionaryState) -> DictionaryState:
"""
查询单词节点
"""
state.current_phase = "executing"
# 提取要查询的词
word = state.action_params.get("word", state.user_query)
# 清理关键词
for keyword in ["查询", "query", "单词", "word", "翻译", "translate", "英语", "英文"]:
word = word.replace(keyword, "").strip()
if not word:
word = "hello"
# 使用 API 客户端
word_entry = dictionary_api.query_word(word)
state.word_entry = word_entry
return state
def translate_text(state: DictionaryState) -> DictionaryState:
"""
翻译文本节点
"""
state.current_phase = "executing"
text = state.source_text or state.user_query
if not text:
# 清理关键词
for keyword in ["翻译", "translate", "英语", "英文"]:
text = text.replace(keyword, "").strip()
if not text:
text = "你好,世界!"
# 使用 API 客户端
translated = dictionary_api.translate(text)
state.source_text = text
state.translated_text = translated
return state
def extract_terms(state: DictionaryState) -> DictionaryState:
"""
提取术语节点
"""
state.current_phase = "executing"
text = state.action_params.get("text", state.user_query)
for keyword in ["提取", "extract", "术语", "term"]:
text = text.replace(keyword, "").strip()
if not text:
text = "Python is a great programming language for machine learning and data analysis."
# 使用 API 客户端
terms = dictionary_api.extract_terms(text)
state.extracted_terms = terms
return state
def get_daily_word(state: DictionaryState) -> DictionaryState:
"""
获取每日一词节点
"""
state.current_phase = "executing"
# 使用 API 客户端
word_entry = dictionary_api.get_daily_word()
state.daily_word = word_entry
return state
def lookup_word_book(state: DictionaryState) -> DictionaryState:
"""
查生词本节点
"""
state.current_phase = "executing"
user_id = state.user_id or "default"
if user_id not in WORD_BOOK_DB:
WORD_BOOK_DB[user_id] = []
state.word_book = WORD_BOOK_DB[user_id]
return state
def add_to_word_book(state: DictionaryState) -> DictionaryState:
"""
添加到生词本节点
"""
state.current_phase = "executing"
user_id = state.user_id or "default"
if user_id not in WORD_BOOK_DB:
WORD_BOOK_DB[user_id] = []
if state.word_entry:
entry_dict = {
"word": state.word_entry.word,
"phonetic": state.word_entry.phonetic,
"definition": state.word_entry.definition,
"added_at": datetime.now().isoformat()
}
WORD_BOOK_DB[user_id].append(entry_dict)
return state
def format_result(state: DictionaryState) -> DictionaryState:
"""
格式化结果节点(使用全局 Formatter
"""
state.current_phase = "formatting"
fmt = get_formatter()
md = fmt.md
output_lines = []
# 标题
output_lines.append("┌───────────────────────────────────┐")
output_lines.append("│ 📚 词典助手 │")
output_lines.append("└───────────────────────────────────┘")
output_lines.append("")
if state.action == DictionaryAction.QUERY and state.word_entry:
we = state.word_entry
output_lines.append(md.heading(f"📖 {we.word}", 2))
if we.phonetic:
output_lines.append(f"> 🔊 {we.phonetic}")
output_lines.append("")
output_lines.append(md.heading("释义", 3))
output_lines.append(md.bullet_list(we.definition))
if we.example_sentence:
output_lines.append("")
output_lines.append(md.heading("例句", 3))
output_lines.append(f"> {we.example_sentence}")
elif state.action == DictionaryAction.TRANSLATE and state.translated_text:
output_lines.append(md.heading("🌐 翻译结果", 2))
output_lines.append("")
output_lines.append(md.heading("原文", 3))
output_lines.append(f"> {state.source_text}")
output_lines.append("")
output_lines.append(md.heading("译文", 3))
output_lines.append(f"> {state.translated_text}")
elif state.action == DictionaryAction.EXTRACT and state.extracted_terms:
output_lines.append(md.heading("📝 提取的术语", 2))
output_lines.append("")
terms_data = [
{"术语": t.term, "释义": t.definition, "分类": t.category}
for t in state.extracted_terms
]
output_lines.append(md.table(terms_data))
elif state.action == DictionaryAction.DAILY_WORD and state.daily_word:
dw = state.daily_word
output_lines.append(md.heading("🌟 每日一词", 2))
output_lines.append("")
output_lines.append(md.heading(f"{dw.word}", 3))
if dw.phonetic:
output_lines.append(f"> 🔊 {dw.phonetic}")
output_lines.append("")
output_lines.append(md.bullet_list(dw.definition))
else:
output_lines.append(md.heading("✨ 操作完成", 2))
output_lines.append("您的请求已处理。")
# 页脚提示
output_lines.append("")
output_lines.append("---")
output_lines.append("💡 提示:您可以继续查询其他单词、翻译文本,或者提取术语!")
state.final_result = "\n".join(output_lines)
state.success = True
state.current_phase = "completed"
return state
def should_continue(state: DictionaryState) -> str:
"""
条件路由函数:根据 action 决定下一个节点
"""
action = state.action
if action == DictionaryAction.QUERY:
return "query_word"
elif action == DictionaryAction.TRANSLATE:
return "translate_text"
elif action == DictionaryAction.EXTRACT:
return "extract_terms"
elif action == DictionaryAction.DAILY_WORD:
return "get_daily_word"
elif action == DictionaryAction.LOOKUP_WORD_BOOK:
return "lookup_word_book"
elif action == DictionaryAction.ADD_TO_WORD_BOOK:
return "add_to_word_book"
else:
return "format_result"
return state