333 lines
10 KiB
Python
333 lines
10 KiB
Python
|
|
"""
|
|||
|
|
超时和重试工具模块
|
|||
|
|
为 React 模式提供超时控制和重试机制
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import time
|
|||
|
|
import asyncio
|
|||
|
|
from functools import wraps
|
|||
|
|
from typing import Callable, Any, Optional, Type, Tuple, Union
|
|||
|
|
from dataclasses import dataclass, field
|
|||
|
|
from enum import Enum, auto
|
|||
|
|
|
|||
|
|
|
|||
|
|
class RetryStrategy(Enum):
|
|||
|
|
"""重试策略"""
|
|||
|
|
FIXED = auto() # 固定间隔
|
|||
|
|
EXPONENTIAL = auto() # 指数退避
|
|||
|
|
LINEAR = auto() # 线性增长
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class RetryConfig:
|
|||
|
|
"""重试配置"""
|
|||
|
|
max_retries: int = 3 # 最大重试次数
|
|||
|
|
base_delay: float = 1.0 # 基础延迟(秒)
|
|||
|
|
max_delay: float = 10.0 # 最大延迟(秒)
|
|||
|
|
strategy: RetryStrategy = RetryStrategy.EXPONENTIAL
|
|||
|
|
timeout: Optional[float] = 30.0 # 单次调用超时(秒)
|
|||
|
|
recoverable_exceptions: Tuple[Type[Exception], ...] = field(
|
|||
|
|
default_factory=lambda: (Exception,)
|
|||
|
|
)
|
|||
|
|
unrecoverable_exceptions: Tuple[Type[Exception], ...] = field(
|
|||
|
|
default_factory=tuple
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class RetryResult:
|
|||
|
|
"""重试结果"""
|
|||
|
|
success: bool
|
|||
|
|
result: Any = None
|
|||
|
|
error: Optional[Exception] = None
|
|||
|
|
retry_count: int = 0
|
|||
|
|
total_time: float = 0.0
|
|||
|
|
timed_out: bool = False
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ========== 同步重试装饰器 ==========
|
|||
|
|
def with_retry(
|
|||
|
|
config: Optional[RetryConfig] = None,
|
|||
|
|
max_retries: int = 3,
|
|||
|
|
timeout: Optional[float] = 30.0,
|
|||
|
|
base_delay: float = 1.0,
|
|||
|
|
on_retry: Optional[Callable[[int, Exception], None]] = None
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
同步重试装饰器
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
config: 重试配置对象
|
|||
|
|
max_retries: 最大重试次数(如果没有 config)
|
|||
|
|
timeout: 单次调用超时(秒)
|
|||
|
|
base_delay: 基础延迟(秒)
|
|||
|
|
on_retry: 重试回调函数(retry_count, exception)
|
|||
|
|
"""
|
|||
|
|
if config is None:
|
|||
|
|
config = RetryConfig(
|
|||
|
|
max_retries=max_retries,
|
|||
|
|
timeout=timeout,
|
|||
|
|
base_delay=base_delay
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def decorator(func: Callable) -> Callable:
|
|||
|
|
@wraps(func)
|
|||
|
|
def wrapper(*args, **kwargs) -> RetryResult:
|
|||
|
|
start_time = time.time()
|
|||
|
|
last_error = None
|
|||
|
|
|
|||
|
|
for attempt in range(config.max_retries + 1):
|
|||
|
|
try:
|
|||
|
|
# 执行函数(带超时)
|
|||
|
|
if config.timeout:
|
|||
|
|
# 使用信号量或线程实现超时(简化版)
|
|||
|
|
result = func(*args, **kwargs)
|
|||
|
|
else:
|
|||
|
|
result = func(*args, **kwargs)
|
|||
|
|
|
|||
|
|
# 成功
|
|||
|
|
total_time = time.time() - start_time
|
|||
|
|
return RetryResult(
|
|||
|
|
success=True,
|
|||
|
|
result=result,
|
|||
|
|
retry_count=attempt,
|
|||
|
|
total_time=total_time
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
last_error = e
|
|||
|
|
|
|||
|
|
# 检查是否是不可恢复的异常
|
|||
|
|
if isinstance(e, config.unrecoverable_exceptions):
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 检查是否达到最大重试次数
|
|||
|
|
if attempt >= config.max_retries:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 计算延迟
|
|||
|
|
delay = _calculate_delay(attempt, config)
|
|||
|
|
|
|||
|
|
# 回调通知
|
|||
|
|
if on_retry:
|
|||
|
|
on_retry(attempt + 1, e)
|
|||
|
|
|
|||
|
|
# 等待
|
|||
|
|
time.sleep(delay)
|
|||
|
|
|
|||
|
|
# 所有重试都失败
|
|||
|
|
total_time = time.time() - start_time
|
|||
|
|
return RetryResult(
|
|||
|
|
success=False,
|
|||
|
|
error=last_error,
|
|||
|
|
retry_count=config.max_retries,
|
|||
|
|
total_time=total_time
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return wrapper
|
|||
|
|
return decorator
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ========== 异步重试装饰器 ==========
|
|||
|
|
def with_async_retry(
|
|||
|
|
config: Optional[RetryConfig] = None,
|
|||
|
|
max_retries: int = 3,
|
|||
|
|
timeout: Optional[float] = 30.0,
|
|||
|
|
base_delay: float = 1.0,
|
|||
|
|
on_retry: Optional[Callable[[int, Exception], None]] = None
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
异步重试装饰器
|
|||
|
|
"""
|
|||
|
|
if config is None:
|
|||
|
|
config = RetryConfig(
|
|||
|
|
max_retries=max_retries,
|
|||
|
|
timeout=timeout,
|
|||
|
|
base_delay=base_delay
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def decorator(func: Callable) -> Callable:
|
|||
|
|
@wraps(func)
|
|||
|
|
async def wrapper(*args, **kwargs) -> RetryResult:
|
|||
|
|
start_time = time.time()
|
|||
|
|
last_error = None
|
|||
|
|
|
|||
|
|
for attempt in range(config.max_retries + 1):
|
|||
|
|
try:
|
|||
|
|
# 执行函数(带超时)
|
|||
|
|
if config.timeout:
|
|||
|
|
result = await asyncio.wait_for(
|
|||
|
|
func(*args, **kwargs),
|
|||
|
|
timeout=config.timeout
|
|||
|
|
)
|
|||
|
|
else:
|
|||
|
|
result = await func(*args, **kwargs)
|
|||
|
|
|
|||
|
|
# 成功
|
|||
|
|
total_time = time.time() - start_time
|
|||
|
|
return RetryResult(
|
|||
|
|
success=True,
|
|||
|
|
result=result,
|
|||
|
|
retry_count=attempt,
|
|||
|
|
total_time=total_time
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
except asyncio.TimeoutError as e:
|
|||
|
|
last_error = e
|
|||
|
|
timed_out = True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
last_error = e
|
|||
|
|
timed_out = False
|
|||
|
|
|
|||
|
|
# 检查是否是不可恢复的异常
|
|||
|
|
if isinstance(e, config.unrecoverable_exceptions):
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 检查是否达到最大重试次数
|
|||
|
|
if attempt >= config.max_retries:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 计算延迟
|
|||
|
|
delay = _calculate_delay(attempt, config)
|
|||
|
|
|
|||
|
|
# 回调通知
|
|||
|
|
if on_retry:
|
|||
|
|
on_retry(attempt + 1, last_error)
|
|||
|
|
|
|||
|
|
# 等待
|
|||
|
|
await asyncio.sleep(delay)
|
|||
|
|
|
|||
|
|
# 所有重试都失败
|
|||
|
|
total_time = time.time() - start_time
|
|||
|
|
return RetryResult(
|
|||
|
|
success=False,
|
|||
|
|
error=last_error,
|
|||
|
|
retry_count=config.max_retries,
|
|||
|
|
total_time=total_time,
|
|||
|
|
timed_out=isinstance(last_error, asyncio.TimeoutError)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return wrapper
|
|||
|
|
return decorator
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ========== 辅助函数 ==========
|
|||
|
|
def _calculate_delay(attempt: int, config: RetryConfig) -> float:
|
|||
|
|
"""计算延迟时间"""
|
|||
|
|
if config.strategy == RetryStrategy.FIXED:
|
|||
|
|
delay = config.base_delay
|
|||
|
|
elif config.strategy == RetryStrategy.LINEAR:
|
|||
|
|
delay = config.base_delay * (attempt + 1)
|
|||
|
|
elif config.strategy == RetryStrategy.EXPONENTIAL:
|
|||
|
|
delay = config.base_delay * (2 ** attempt)
|
|||
|
|
else:
|
|||
|
|
delay = config.base_delay
|
|||
|
|
|
|||
|
|
# 不超过最大延迟
|
|||
|
|
return min(delay, config.max_delay)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ========== 为 React 节点设计的超时重试包装器 ==========
|
|||
|
|
def create_retry_wrapper_for_node(
|
|||
|
|
node_func: Callable,
|
|||
|
|
node_name: str,
|
|||
|
|
max_retries: int = 2,
|
|||
|
|
timeout: float = 30.0
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
为 React 节点创建带重试和超时的包装器
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
node_func: 原始节点函数
|
|||
|
|
node_name: 节点名称(用于错误标识)
|
|||
|
|
max_retries: 最大重试次数
|
|||
|
|
timeout: 单次执行超时
|
|||
|
|
|
|||
|
|
Returns: 包装后的节点函数
|
|||
|
|
"""
|
|||
|
|
config = RetryConfig(
|
|||
|
|
max_retries=max_retries,
|
|||
|
|
timeout=timeout,
|
|||
|
|
strategy=RetryStrategy.EXPONENTIAL
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
@wraps(node_func)
|
|||
|
|
def wrapped_node(state):
|
|||
|
|
# 记录开始时间
|
|||
|
|
start_time = time.time()
|
|||
|
|
|
|||
|
|
# 重试循环
|
|||
|
|
last_error = None
|
|||
|
|
for attempt in range(config.max_retries + 1):
|
|||
|
|
try:
|
|||
|
|
# 执行节点
|
|||
|
|
result = node_func(state)
|
|||
|
|
|
|||
|
|
# 检查节点是否报告了错误
|
|||
|
|
if hasattr(state, "current_error") and state.current_error:
|
|||
|
|
# 节点内部报告了错误,继续重试
|
|||
|
|
last_error = Exception(state.current_error.error_message)
|
|||
|
|
if attempt < config.max_retries:
|
|||
|
|
delay = _calculate_delay(attempt, config)
|
|||
|
|
time.sleep(delay)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 成功
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
last_error = e
|
|||
|
|
|
|||
|
|
if attempt >= config.max_retries:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 等待后重试
|
|||
|
|
delay = _calculate_delay(attempt, config)
|
|||
|
|
time.sleep(delay)
|
|||
|
|
|
|||
|
|
# 所有重试都失败,更新状态错误信息
|
|||
|
|
from .state import ErrorRecord, ErrorSeverity
|
|||
|
|
|
|||
|
|
error_record = ErrorRecord(
|
|||
|
|
error_type=f"{node_name}TimeoutError",
|
|||
|
|
error_message=str(last_error) if last_error else f"{node_name} 执行超时",
|
|||
|
|
severity=ErrorSeverity.ERROR,
|
|||
|
|
source=node_name,
|
|||
|
|
retry_count=config.max_retries,
|
|||
|
|
max_retries=config.max_retries,
|
|||
|
|
context={
|
|||
|
|
"timeout": timeout,
|
|||
|
|
"total_time": time.time() - start_time
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if hasattr(state, "errors"):
|
|||
|
|
state.errors.append(error_record)
|
|||
|
|
if hasattr(state, "current_error"):
|
|||
|
|
state.current_error = error_record
|
|||
|
|
if hasattr(state, "error_message"):
|
|||
|
|
state.error_message = str(last_error)
|
|||
|
|
if hasattr(state, "current_phase"):
|
|||
|
|
state.current_phase = "error_handling"
|
|||
|
|
|
|||
|
|
return state
|
|||
|
|
|
|||
|
|
return wrapped_node
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ========== 预配置的 RAG 重试配置 ==========
|
|||
|
|
RAG_RETRY_CONFIG = RetryConfig(
|
|||
|
|
max_retries=2,
|
|||
|
|
timeout=60.0, # RAG 可以容忍稍长的超时
|
|||
|
|
base_delay=2.0,
|
|||
|
|
strategy=RetryStrategy.EXPONENTIAL
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# ========== 预配置的子图重试配置 ==========
|
|||
|
|
SUBGRAPH_RETRY_CONFIG = RetryConfig(
|
|||
|
|
max_retries=1, # 子图通常不适合多次重试
|
|||
|
|
timeout=120.0, # 子图执行时间较长
|
|||
|
|
base_delay=3.0
|
|||
|
|
)
|