""" 超时和重试工具模块 为 React 模式提供超时控制和重试机制 """ import time import asyncio from functools import wraps from typing import Callable, Any, Optional, Type, Tuple, Union from dataclasses import dataclass, field from enum import Enum, auto class RetryStrategy(Enum): """重试策略""" FIXED = auto() # 固定间隔 EXPONENTIAL = auto() # 指数退避 LINEAR = auto() # 线性增长 @dataclass class RetryConfig: """重试配置""" max_retries: int = 3 # 最大重试次数 base_delay: float = 1.0 # 基础延迟(秒) max_delay: float = 10.0 # 最大延迟(秒) strategy: RetryStrategy = RetryStrategy.EXPONENTIAL timeout: Optional[float] = 30.0 # 单次调用超时(秒) recoverable_exceptions: Tuple[Type[Exception], ...] = field( default_factory=lambda: (Exception,) ) unrecoverable_exceptions: Tuple[Type[Exception], ...] = field( default_factory=tuple ) @dataclass class RetryResult: """重试结果""" success: bool result: Any = None error: Optional[Exception] = None retry_count: int = 0 total_time: float = 0.0 timed_out: bool = False # ========== 同步重试装饰器 ========== def with_retry( config: Optional[RetryConfig] = None, max_retries: int = 3, timeout: Optional[float] = 30.0, base_delay: float = 1.0, on_retry: Optional[Callable[[int, Exception], None]] = None ): """ 同步重试装饰器 Args: config: 重试配置对象 max_retries: 最大重试次数(如果没有 config) timeout: 单次调用超时(秒) base_delay: 基础延迟(秒) on_retry: 重试回调函数(retry_count, exception) """ if config is None: config = RetryConfig( max_retries=max_retries, timeout=timeout, base_delay=base_delay ) def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs) -> RetryResult: start_time = time.time() last_error = None for attempt in range(config.max_retries + 1): try: # 执行函数(带超时) if config.timeout: # 使用信号量或线程实现超时(简化版) result = func(*args, **kwargs) else: result = func(*args, **kwargs) # 成功 total_time = time.time() - start_time return RetryResult( success=True, result=result, retry_count=attempt, total_time=total_time ) except Exception as e: last_error = e # 检查是否是不可恢复的异常 if isinstance(e, config.unrecoverable_exceptions): break # 检查是否达到最大重试次数 if attempt >= config.max_retries: break # 计算延迟 delay = _calculate_delay(attempt, config) # 回调通知 if on_retry: on_retry(attempt + 1, e) # 等待 time.sleep(delay) # 所有重试都失败 total_time = time.time() - start_time return RetryResult( success=False, error=last_error, retry_count=config.max_retries, total_time=total_time ) return wrapper return decorator # ========== 异步重试装饰器 ========== def with_async_retry( config: Optional[RetryConfig] = None, max_retries: int = 3, timeout: Optional[float] = 30.0, base_delay: float = 1.0, on_retry: Optional[Callable[[int, Exception], None]] = None ): """ 异步重试装饰器 """ if config is None: config = RetryConfig( max_retries=max_retries, timeout=timeout, base_delay=base_delay ) def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args, **kwargs) -> RetryResult: start_time = time.time() last_error = None for attempt in range(config.max_retries + 1): try: # 执行函数(带超时) if config.timeout: result = await asyncio.wait_for( func(*args, **kwargs), timeout=config.timeout ) else: result = await func(*args, **kwargs) # 成功 total_time = time.time() - start_time return RetryResult( success=True, result=result, retry_count=attempt, total_time=total_time ) except asyncio.TimeoutError as e: last_error = e timed_out = True except Exception as e: last_error = e timed_out = False # 检查是否是不可恢复的异常 if isinstance(e, config.unrecoverable_exceptions): break # 检查是否达到最大重试次数 if attempt >= config.max_retries: break # 计算延迟 delay = _calculate_delay(attempt, config) # 回调通知 if on_retry: on_retry(attempt + 1, last_error) # 等待 await asyncio.sleep(delay) # 所有重试都失败 total_time = time.time() - start_time return RetryResult( success=False, error=last_error, retry_count=config.max_retries, total_time=total_time, timed_out=isinstance(last_error, asyncio.TimeoutError) ) return wrapper return decorator # ========== 辅助函数 ========== def _calculate_delay(attempt: int, config: RetryConfig) -> float: """计算延迟时间""" if config.strategy == RetryStrategy.FIXED: delay = config.base_delay elif config.strategy == RetryStrategy.LINEAR: delay = config.base_delay * (attempt + 1) elif config.strategy == RetryStrategy.EXPONENTIAL: delay = config.base_delay * (2 ** attempt) else: delay = config.base_delay # 不超过最大延迟 return min(delay, config.max_delay) # ========== 为 React 节点设计的超时重试包装器 ========== def create_retry_wrapper_for_node( node_func: Callable, node_name: str, max_retries: int = 2, timeout: float = 30.0 ): """ 为 React 节点创建带重试和超时的包装器 Args: node_func: 原始节点函数 node_name: 节点名称(用于错误标识) max_retries: 最大重试次数 timeout: 单次执行超时 Returns: 包装后的节点函数 """ config = RetryConfig( max_retries=max_retries, timeout=timeout, strategy=RetryStrategy.EXPONENTIAL ) @wraps(node_func) def wrapped_node(state): # 记录开始时间 start_time = time.time() # 重试循环 last_error = None for attempt in range(config.max_retries + 1): try: # 执行节点 result = node_func(state) # 检查节点是否报告了错误 if hasattr(state, "current_error") and state.current_error: # 节点内部报告了错误,继续重试 last_error = Exception(state.current_error.error_message) if attempt < config.max_retries: delay = _calculate_delay(attempt, config) time.sleep(delay) continue # 成功 return result except Exception as e: last_error = e if attempt >= config.max_retries: break # 等待后重试 delay = _calculate_delay(attempt, config) time.sleep(delay) # 所有重试都失败,更新状态错误信息 from .state import ErrorRecord, ErrorSeverity error_record = ErrorRecord( error_type=f"{node_name}TimeoutError", error_message=str(last_error) if last_error else f"{node_name} 执行超时", severity=ErrorSeverity.ERROR, source=node_name, retry_count=config.max_retries, max_retries=config.max_retries, context={ "timeout": timeout, "total_time": time.time() - start_time } ) if hasattr(state, "errors"): state.errors.append(error_record) if hasattr(state, "current_error"): state.current_error = error_record if hasattr(state, "error_message"): state.error_message = str(last_error) if hasattr(state, "current_phase"): state.current_phase = "error_handling" return state return wrapped_node # ========== 预配置的 RAG 重试配置 ========== RAG_RETRY_CONFIG = RetryConfig( max_retries=2, timeout=60.0, # RAG 可以容忍稍长的超时 base_delay=2.0, strategy=RetryStrategy.EXPONENTIAL ) # ========== 预配置的子图重试配置 ========== SUBGRAPH_RETRY_CONFIG = RetryConfig( max_retries=1, # 子图通常不适合多次重试 timeout=120.0, # 子图执行时间较长 base_delay=3.0 )