""" 新闻资讯适配器 整合MCP、数据库缓存和模拟数据 """ from typing import Dict, Any, Optional, List from datetime import datetime from .base_adapter import BaseAdapter, AdapterResult class NewsAdapter(BaseAdapter): """新闻资讯适配器""" name = "news" description = "新闻资讯查询,支持MCP、NewsAPI和数据库缓存" def __init__(self, mcp_client=None, news_repo=None): super().__init__(mcp_client, news_repo) self._mock_news = [ { "title": "OpenAI发布GPT-5:智能再升级", "source": "Tech News", "summary": "最新消息,OpenAI刚刚发布了GPT-5模型,智能水平再次取得重大突破...", "keywords": ["AI", "GPT-5", "OpenAI"], "author": "AI Team", "published_at": datetime.now().isoformat() }, { "title": "大模型在医疗领域的应用", "source": "Health Tech", "summary": "大模型AI技术正在医疗领域展现巨大潜力,从辅助诊断到药物研发...", "keywords": ["医疗", "大模型", "应用"], "author": "Medical Team", "published_at": datetime.now().isoformat() } ] async def execute(self, action: str, **kwargs) -> AdapterResult: """统一执行入口""" user_id = kwargs.get("user_id", "default") query = kwargs.get("query", "") use_cache = kwargs.get("use_cache", True) # 1. 先查缓存 if use_cache and self.repository and query: cached = await self._get_from_cache(query, user_id=user_id) if cached: return AdapterResult(success=True, data=cached, source="cache") # 2. 尝试MCP if self.mcp_client and self.mcp_client.is_available(): try: mcp_result = await self._execute_mcp(action, **kwargs) if mcp_result.success: if use_cache: for news in mcp_result.data: await self._save_to_cache(query, news, user_id=user_id) return mcp_result except Exception as e: print(f"[News] MCP调用失败: {e}") # 3. 尝试第三方API(预留) # result = await self._execute_api(action, **kwargs) # 4. 降级到模拟数据 result = self._fallback(action, **kwargs) if use_cache and result.success: for news in result.data: await self._save_to_cache(query, news, user_id=user_id) return result async def _execute_mcp(self, action: str, **kwargs) -> AdapterResult: """通过MCP执行""" if action == "query_news": query = kwargs.get("query", "") result = await self.mcp_client.call_tool( "news_search_news", {"query": query} ) if result.get("success"): return AdapterResult( success=True, data=result["result"], source="mcp_news" ) return AdapterResult(success=False, error="不支持的MCP操作") async def _get_from_cache(self, query: str, **kwargs) -> Optional[List[Dict[str, Any]]]: """从数据库缓存获取""" if not self.repository: return None try: # 数据库查询(可选功能) return None except Exception as e: print(f"[News] 缓存查询失败: {e}") return None async def _save_to_cache(self, query: str, data: Dict[str, Any], **kwargs): """保存到数据库缓存""" if not self.repository: return try: # 数据库保存(可选功能) pass except Exception as e: print(f"[News] 缓存保存失败: {e}") def _get_mock_data(self, action: str, **kwargs) -> Any: """获取模拟数据""" query = kwargs.get("query", "").lower() if action == "query_news": results = [] for news in self._mock_news: if (query in news["title"].lower() or query in news["summary"].lower() or any(keyword.lower() in query for keyword in news["keywords"])): results.append(news) if not results: results = self._mock_news[:2] return results elif action == "analyze_url": url = kwargs.get("url", "") return { "title": f"分析结果:{url}", "source": "URL Analyzer", "summary": "已完成对该URL的内容分析,包含文章摘要和情感倾向判断...", "keywords": ["News", "Analysis"] } elif action == "extract_keywords": text = kwargs.get("text", "") keywords = ["AI", "大模型", "应用场景", "行业趋势"] result = [k for k in keywords if k.lower() in text.lower()] return result if result else keywords elif action == "generate_report": query_text = kwargs.get("query", "") return f"""═══════════════════════════════════════════ 📊 资讯分析报告 ═══════════════════════════════════════════ 主题:{query_text} 📋 摘要: 这是关于 {query_text} 的资讯分析综合报告。 🔍 主要发现: 1. AI技术持续快速发展 2. 大模型应用场景不断拓展 3. 行业数字化转型加速 🏷️ 关键词: - AI - 大模型 - 数字化转型 ═══════════════════════════════════════════ """ return None