Files
ailine/backend/app/mcp/adapters/news_adapter.py

166 lines
6.1 KiB
Python
Raw Normal View History

"""
新闻资讯适配器
整合MCP数据库缓存和模拟数据
"""
from typing import Dict, Any, Optional, List
from datetime import datetime
from .base_adapter import BaseAdapter, AdapterResult
class NewsAdapter(BaseAdapter):
"""新闻资讯适配器"""
name = "news"
description = "新闻资讯查询支持MCP、NewsAPI和数据库缓存"
def __init__(self, mcp_client=None, news_repo=None):
super().__init__(mcp_client, news_repo)
self._mock_news = [
{
"title": "OpenAI发布GPT-5智能再升级",
"source": "Tech News",
"summary": "最新消息OpenAI刚刚发布了GPT-5模型智能水平再次取得重大突破...",
"keywords": ["AI", "GPT-5", "OpenAI"],
"author": "AI Team",
"published_at": datetime.now().isoformat()
},
{
"title": "大模型在医疗领域的应用",
"source": "Health Tech",
"summary": "大模型AI技术正在医疗领域展现巨大潜力从辅助诊断到药物研发...",
"keywords": ["医疗", "大模型", "应用"],
"author": "Medical Team",
"published_at": datetime.now().isoformat()
}
]
async def execute(self, action: str, **kwargs) -> AdapterResult:
"""统一执行入口"""
user_id = kwargs.get("user_id", "default")
query = kwargs.get("query", "")
use_cache = kwargs.get("use_cache", True)
# 1. 先查缓存
if use_cache and self.repository and query:
cached = await self._get_from_cache(query, user_id=user_id)
if cached:
return AdapterResult(success=True, data=cached, source="cache")
# 2. 尝试MCP
if self.mcp_client and self.mcp_client.is_available():
try:
mcp_result = await self._execute_mcp(action, **kwargs)
if mcp_result.success:
if use_cache:
for news in mcp_result.data:
await self._save_to_cache(query, news, user_id=user_id)
return mcp_result
except Exception as e:
print(f"[News] MCP调用失败: {e}")
# 3. 尝试第三方API预留
# result = await self._execute_api(action, **kwargs)
# 4. 降级到模拟数据
result = self._fallback(action, **kwargs)
if use_cache and result.success:
for news in result.data:
await self._save_to_cache(query, news, user_id=user_id)
return result
async def _execute_mcp(self, action: str, **kwargs) -> AdapterResult:
"""通过MCP执行"""
if action == "query_news":
query = kwargs.get("query", "")
result = await self.mcp_client.call_tool(
"news_search_news",
{"query": query}
)
if result.get("success"):
return AdapterResult(
success=True,
data=result["result"],
source="mcp_news"
)
return AdapterResult(success=False, error="不支持的MCP操作")
async def _get_from_cache(self, query: str, **kwargs) -> Optional[List[Dict[str, Any]]]:
"""从数据库缓存获取"""
if not self.repository:
return None
try:
# 数据库查询(可选功能)
return None
except Exception as e:
print(f"[News] 缓存查询失败: {e}")
return None
async def _save_to_cache(self, query: str, data: Dict[str, Any], **kwargs):
"""保存到数据库缓存"""
if not self.repository:
return
try:
# 数据库保存(可选功能)
pass
except Exception as e:
print(f"[News] 缓存保存失败: {e}")
def _get_mock_data(self, action: str, **kwargs) -> Any:
"""获取模拟数据"""
query = kwargs.get("query", "").lower()
if action == "query_news":
results = []
for news in self._mock_news:
if (query in news["title"].lower() or
query in news["summary"].lower() or
any(keyword.lower() in query for keyword in news["keywords"])):
results.append(news)
if not results:
results = self._mock_news[:2]
return results
elif action == "analyze_url":
url = kwargs.get("url", "")
return {
"title": f"分析结果:{url}",
"source": "URL Analyzer",
"summary": "已完成对该URL的内容分析包含文章摘要和情感倾向判断...",
"keywords": ["News", "Analysis"]
}
elif action == "extract_keywords":
text = kwargs.get("text", "")
keywords = ["AI", "大模型", "应用场景", "行业趋势"]
result = [k for k in keywords if k.lower() in text.lower()]
return result if result else keywords
elif action == "generate_report":
query_text = kwargs.get("query", "")
return f"""═══════════════════════════════════════════
📊 资讯分析报告
主题{query_text}
📋 摘要
这是关于 {query_text} 的资讯分析综合报告
🔍 主要发现
1. AI技术持续快速发展
2. 大模型应用场景不断拓展
3. 行业数字化转型加速
🏷 关键词
- AI
- 大模型
- 数字化转型
"""
return None