""" 主图节点通用工具模块 包含事件发送、状态更新等通用功能 """ from typing import Dict, Any, Optional from langchain_core.runnables.config import RunnableConfig async def dispatch_custom_event( event_name: str, data: Dict[str, Any], config: Optional[RunnableConfig] = None, ) -> None: """ 安全地发送自定义事件,忽略发送失败 Args: event_name: 事件名称 data: 事件数据 config: LangChain 配置 """ if not config: return try: from langchain_core.callbacks.manager import adispatch_custom_event await adispatch_custom_event(event_name, data, config=config) except Exception: # 事件发送失败不应中断主流程 pass def make_react_event( step: int, action: str, confidence: float = 1.0, reasoning: str = "" ) -> Dict[str, Any]: """ 构造标准推理事件数据 Args: step: 当前步数 action: 动作名称 confidence: 置信度 reasoning: 推理过程 Returns: 事件数据字典 """ return { "step": step, "action": action, "confidence": confidence, "reasoning": reasoning }