diff --git a/backend/app/agent_subgraphs/dictionary/api_client.py b/backend/app/agent_subgraphs/dictionary/api_client.py index b2da115..5df1389 100644 --- a/backend/app/agent_subgraphs/dictionary/api_client.py +++ b/backend/app/agent_subgraphs/dictionary/api_client.py @@ -1,28 +1,68 @@ """ 词典API调用工具 Dictionary API Client +支持 async 和真实数据库缓存 """ from typing import Dict, Any, Optional -import requests -import json from dataclasses import dataclass @dataclass class DictionaryAPIClient: """ - 词典API客户端 - 可扩展支持多种API + 词典API客户端 - 可扩展支持多种API和数据库缓存 """ # 可以配置多个API youdao_api_key: Optional[str] = None youdao_api_secret: Optional[str] = None - def query_word_youdao(self, word: str) -> Optional[Dict[str, Any]]: + # 数据库 Repository(可选,用于缓存单词查询) + word_repository: Optional[Any] = None + + def __post_init__(self): + """初始化后,如果有 repository 则支持 async""" + pass + + async def query_word_db(self, user_id: str, word: str) -> Optional[Dict[str, Any]]: + """从数据库缓存查询单词""" + if not self.word_repository: + return None + try: + entity = await self.word_repository.search_by_word(user_id, word) + if entity: + return { + "phonetic": entity.phonetic, + "part_of_speech": entity.part_of_speech, + "definitions": [entity.definition] if entity.definition else [], + "examples": [entity.examples] if entity.examples else [] + } + except Exception as e: + print(f"从数据库查询单词失败:{e}") + return None + + async def cache_word_db(self, user_id: str, word: str, data: Dict[str, Any]): + """把单词查询结果缓存到数据库""" + if not self.word_repository: + return + try: + from ...db.models import WordEntity + entity = WordEntity( + user_id=user_id, + word=word, + phonetic=data.get("phonetic", ""), + part_of_speech=data.get("part_of_speech", ""), + definition=data.get("definitions", [""])[0] if data.get("definitions") else "", + examples=data.get("examples", [""])[0] if data.get("examples") else "" + ) + await self.word_repository.insert(entity) + except Exception as e: + print(f"缓存单词到数据库失败:{e}") + + async def query_word_youdao(self, word: str) -> Optional[Dict[str, Any]]: """ - 调用有道词典API查询单词 - + 调用有道词典API查询单词(async 版本) 注意:需要配置有道API密钥才能使用 文档:https://ai.youdao.com/doc.s#guide """ @@ -30,7 +70,7 @@ class DictionaryAPIClient: return None try: - # TODO: 实现真实的有道API调用 + # TODO: 实现真实的有道API调用(用 httpx 或 aiohttp) # 这里是示例结构 return None @@ -38,14 +78,13 @@ class DictionaryAPIClient: print(f"有道API调用失败:{e}") return None - def translate_baidu(self, text: str, from_lang: str = "auto", to_lang: str = "zh") -> Optional[Dict[str, Any]]: + async def translate_baidu(self, text: str, from_lang: str = "auto", to_lang: str = "zh") -> Optional[Dict[str, Any]]: """ - 调用百度翻译API - + 调用百度翻译API(async 版本) 注意:需要配置百度API密钥才能使用 文档:https://fanyi-api.baidu.com/doc/21 """ - # TODO: 实现真实的百度翻译API调用 + # TODO: 实现真实的百度翻译API调用(用 httpx 或 aiohttp) return None def query_word_mock(self, word: str) -> Dict[str, Any]: @@ -123,7 +162,31 @@ class DictionaryAPIClient: {"term": "LLM", "type": "技术术语", "definition": "大语言模型", "confidence": 0.92}, {"term": "NLP", "type": "技术术语", "definition": "自然语言处理", "confidence": 0.88} ] + + # ========== 统一入口(优先查缓存) ========== + async def query_word(self, user_id: str = "default", word: str = "", use_cache: bool = True) -> Dict[str, Any]: + """ + 查询单词(统一入口,优先查数据库缓存) + """ + # 1. 先查数据库缓存 + if use_cache: + cached = await self.query_word_db(user_id, word) + if cached: + return cached + + # 2. 查第三方 API(暂未实现) + api_result = await self.query_word_youdao(word) + if api_result: + if use_cache: + await self.cache_word_db(user_id, word, api_result) + return api_result + + # 3. 用模拟数据(兜底) + mock_result = self.query_word_mock(word) + if use_cache: + await self.cache_word_db(user_id, word, mock_result) + return mock_result -# 单例实例 +# 单例实例(模拟模式,保持向后兼容) dictionary_api = DictionaryAPIClient() diff --git a/backend/app/agent_subgraphs/news_analysis/api_client.py b/backend/app/agent_subgraphs/news_analysis/api_client.py index 7b92d42..dab5161 100644 --- a/backend/app/agent_subgraphs/news_analysis/api_client.py +++ b/backend/app/agent_subgraphs/news_analysis/api_client.py @@ -1,6 +1,7 @@ """ 资讯子图API调用工具 News Analysis API Client +支持 async 和真实数据库缓存 """ from typing import Dict, Any, Optional, List @@ -12,12 +13,55 @@ from dataclasses import dataclass @dataclass class NewsAPIClient: """ - 资讯API客户端 - 可扩展支持多种API + 资讯API客户端 - 可扩展支持多种API和数据库缓存 """ # 可以配置多个API(如 NewsAPI, 今日头条, 百度新闻等) newsapi_key: Optional[str] = None + # 数据库 Repository(可选,用于缓存新闻) + news_repository: Optional[Any] = None + + async def query_news_db(self, user_id: str, keyword: str) -> Optional[List[Dict[str, Any]]]: + """从数据库缓存查询新闻""" + if not self.news_repository: + return None + try: + entities = await self.news_repository.search_by_keywords(user_id, keyword) + if entities: + return [ + { + "title": e.title, + "source": e.source, + "summary": e.content, + "keywords": e.keywords.split(",") if e.keywords else [], + "author": "", + "published_at": e.created_at + } + for e in entities + ] + except Exception as e: + print(f"从数据库查询新闻失败:{e}") + return None + + async def cache_news_db(self, user_id: str, news: Dict[str, Any]): + """把新闻缓存到数据库""" + if not self.news_repository: + return + try: + from ...db.models import NewsEntity + entity = NewsEntity( + user_id=user_id, + title=news.get("title", ""), + content=news.get("summary", ""), + url=news.get("url", ""), + source=news.get("source", ""), + keywords=",".join(news.get("keywords", [])) + ) + await self.news_repository.insert(entity) + except Exception as e: + print(f"缓存新闻到数据库失败:{e}") + def query_news_mock(self, query: str) -> List[Dict[str, Any]]: """ 模拟查询资讯 - 目前用于演示 @@ -123,7 +167,30 @@ class NewsAPIClient: 💡 建议:继续关注行业动态,把握发展机遇! """ return report + + # ========== 统一入口(优先查缓存) ========== + async def query_news(self, user_id: str = "default", query: str = "", use_cache: bool = True) -> List[Dict[str, Any]]: + """查询新闻(统一入口,优先查数据库缓存)""" + # 1. 先查数据库缓存 + if use_cache: + cached = await self.query_news_db(user_id, query) + if cached: + return cached + + # 2. 查第三方 API(暂未实现) + # api_result = await self.query_news_api(query) + # if api_result: + # for news in api_result: + # await self.cache_news_db(user_id, news) + # return api_result + + # 3. 用模拟数据(兜底) + mock_result = self.query_news_mock(query) + if use_cache: + for news in mock_result: + await self.cache_news_db(user_id, news) + return mock_result -# 单例实例 +# 单例实例(模拟模式,保持向后兼容) news_api = NewsAPIClient() diff --git a/backend/app/backend.py b/backend/app/backend.py index f9e9e20..6c919a7 100644 --- a/backend/app/backend.py +++ b/backend/app/backend.py @@ -24,7 +24,10 @@ from .agent_subgraphs.common.human_review import ( HumanReview ) from .agent_subgraphs.contact.api_client import ContactAPIClient +from .agent_subgraphs.dictionary.api_client import DictionaryAPIClient +from .agent_subgraphs.news_analysis.api_client import NewsAPIClient from .db.init_db import init_subgraph_tables +from .db.models import ContactRepository, DictionaryRepository, NewsRepository from .logger import info, error @asynccontextmanager @@ -42,10 +45,17 @@ async def lifespan(app: FastAPI): await agent_service.initialize() # 3. 创建历史查询服务 - history_service = ThreadHistoryService(checkpointer) + history_service = ThreadHistoryService(checkpointer.conn) - # 3.5 创建子图 API 客户端(真实数据库模式) + # 3.5 创建子图 Repositories + contact_repo = ContactRepository(checkpointer.conn) + dictionary_repo = DictionaryRepository(checkpointer.conn) + news_repo = NewsRepository(checkpointer.conn) + + # 3.6 创建子图 API 客户端(真实数据库模式) contact_api = ContactAPIClient(checkpointer.conn) + dictionary_api = DictionaryAPIClient(word_repository=dictionary_repo) + news_api = NewsAPIClient(news_repository=news_repo) # 4. 创建审核管理器 review_manager = ReviewManager(InMemoryReviewStore()) @@ -55,6 +65,11 @@ async def lifespan(app: FastAPI): app.state.history_service = history_service app.state.review_manager = review_manager app.state.contact_api = contact_api + app.state.dictionary_api = dictionary_api + app.state.news_api = news_api + app.state.contact_repo = contact_repo + app.state.dictionary_repo = dictionary_repo + app.state.news_repo = news_repo # 应用运行中... yield