""" 资讯子图状态定义 News Analysis Subgraph State Definition """ from enum import Enum, auto from typing import Optional, Dict, List, Any from dataclasses import dataclass, field class NewsAction(Enum): """资讯操作类型""" NONE = auto() QUERY_NEWS = auto() # 查询资讯 ANALYZE_URL = auto() # 分析资讯 GENERATE_REPORT = auto() # 生成报告 FETCH_FROM_SOURCES = auto() # 从指定源获取 EXTRACT_KEYWORDS = auto() # 提取关键词 @dataclass class NewsItem: """资讯条目""" title: str = "" url: str = "" source: str = "" content: str = "" author: str = "" published_at: Optional[str] = None summary: str = "" keywords: List[str] = field(default_factory=list) sentiment: float = 0.0 # 情感分析得分 metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class NewsSource: """资讯源""" name: str = "" url: str = "" type: str = "" # rss, website, api enabled: bool = True last_fetched_at: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class NewsAnalysisState: """资讯子图状态""" # ========== 输入 ========== user_query: str = "" # 用户查询 user_id: str = "" # 用户ID # 操作控制 action: NewsAction = NewsAction.NONE action_params: Dict[str, Any] = field(default_factory=dict) # 源配置 use_follow_list: bool = False custom_urls: List[str] = field(default_factory=list) # ========== 执行过程 ========== current_phase: str = "init" # init, fetching, analyzing, done current_source_index: int = 0 primary_fetched: bool = False # 源列表 sources: List[NewsSource] = field(default_factory=list) # 资讯条目 news_items: List[NewsItem] = field(default_factory=list) # 关键词 extracted_keywords: List[str] = field(default_factory=list) # 报告 report_content: str = "" # ========== 结果 ========== success: bool = False error_message: str = "" final_result: str = "" result_data: Dict[str, Any] = field(default_factory=dict) # ========== 元数据 ========== start_time: Optional[str] = None end_time: Optional[str] = None duration: float = 0.0 debug_info: Dict[str, Any] = field(default_factory=dict)