完成:词典和资讯 API 支持 async 和数据库缓存
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m44s

This commit is contained in:
2026-04-27 17:37:07 +08:00
parent 37ceb6c260
commit f274df6e3d
3 changed files with 161 additions and 16 deletions

View File

@@ -1,6 +1,7 @@
"""
资讯子图API调用工具
News Analysis API Client
支持 async 和真实数据库缓存
"""
from typing import Dict, Any, Optional, List
@@ -12,12 +13,55 @@ from dataclasses import dataclass
@dataclass
class NewsAPIClient:
"""
资讯API客户端 - 可扩展支持多种API
资讯API客户端 - 可扩展支持多种API和数据库缓存
"""
# 可以配置多个API如 NewsAPI, 今日头条, 百度新闻等)
newsapi_key: Optional[str] = None
# 数据库 Repository可选用于缓存新闻
news_repository: Optional[Any] = None
async def query_news_db(self, user_id: str, keyword: str) -> Optional[List[Dict[str, Any]]]:
"""从数据库缓存查询新闻"""
if not self.news_repository:
return None
try:
entities = await self.news_repository.search_by_keywords(user_id, keyword)
if entities:
return [
{
"title": e.title,
"source": e.source,
"summary": e.content,
"keywords": e.keywords.split(",") if e.keywords else [],
"author": "",
"published_at": e.created_at
}
for e in entities
]
except Exception as e:
print(f"从数据库查询新闻失败:{e}")
return None
async def cache_news_db(self, user_id: str, news: Dict[str, Any]):
"""把新闻缓存到数据库"""
if not self.news_repository:
return
try:
from ...db.models import NewsEntity
entity = NewsEntity(
user_id=user_id,
title=news.get("title", ""),
content=news.get("summary", ""),
url=news.get("url", ""),
source=news.get("source", ""),
keywords=",".join(news.get("keywords", []))
)
await self.news_repository.insert(entity)
except Exception as e:
print(f"缓存新闻到数据库失败:{e}")
def query_news_mock(self, query: str) -> List[Dict[str, Any]]:
"""
模拟查询资讯 - 目前用于演示
@@ -123,7 +167,30 @@ class NewsAPIClient:
💡 建议:继续关注行业动态,把握发展机遇!
"""
return report
# ========== 统一入口(优先查缓存) ==========
async def query_news(self, user_id: str = "default", query: str = "", use_cache: bool = True) -> List[Dict[str, Any]]:
"""查询新闻(统一入口,优先查数据库缓存)"""
# 1. 先查数据库缓存
if use_cache:
cached = await self.query_news_db(user_id, query)
if cached:
return cached
# 2. 查第三方 API暂未实现
# api_result = await self.query_news_api(query)
# if api_result:
# for news in api_result:
# await self.cache_news_db(user_id, news)
# return api_result
# 3. 用模拟数据(兜底)
mock_result = self.query_news_mock(query)
if use_cache:
for news in mock_result:
await self.cache_news_db(user_id, news)
return mock_result
# 单例实例
# 单例实例(模拟模式,保持向后兼容)
news_api = NewsAPIClient()