feat: 修复数据库持久化,完善服务降级机制
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m37s
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 5m37s
- 恢复使用 AsyncPostgresSaver 持久化短期记忆 - 添加 LLM 作为 Rerank 服务的最后降级方案 - 完善降级链:Local llama.cpp → Zhipu Rerank → LLM Fallback
This commit is contained in:
@@ -4,8 +4,7 @@ FastAPI 后端 - 支持动态模型切换,使用 PostgreSQL 持久化记忆
|
||||
"""
|
||||
|
||||
import os
|
||||
# from app.config import DB_URI, BACKEND_PORT
|
||||
from app.config import BACKEND_PORT
|
||||
from app.config import DB_URI, BACKEND_PORT
|
||||
import uuid
|
||||
import json
|
||||
from contextlib import asynccontextmanager
|
||||
@@ -15,7 +14,7 @@ from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Depe
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import StreamingResponse
|
||||
from pydantic import BaseModel
|
||||
from langgraph.checkpoint.memory import MemorySaver
|
||||
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
|
||||
from .agent.service import AIAgentService
|
||||
from .agent.history import ThreadHistoryService
|
||||
from app.core.human_review import (
|
||||
@@ -27,42 +26,56 @@ from app.core.human_review import (
|
||||
from app.subgraphs.contact.api_client import ContactAPIClient
|
||||
from app.subgraphs.dictionary.api_client import DictionaryAPIClient
|
||||
from app.subgraphs.news_analysis.api_client import NewsAPIClient
|
||||
# from .db.init_db import init_subgraph_tables
|
||||
# from .db.models import ContactRepository, DictionaryRepository, NewsRepository
|
||||
from .db.init_db import init_subgraph_tables
|
||||
from .db.models import ContactRepository, DictionaryRepository, NewsRepository
|
||||
from app.logger import info, error
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""应用生命周期管理:创建并注入全局服务(临时用内存 checkpoint)"""
|
||||
# 1. 创建内存 checkpointer(临时测试)
|
||||
checkpointer = MemorySaver()
|
||||
|
||||
# 2. 构建 AI Agent 服务
|
||||
agent_service = AIAgentService(checkpointer)
|
||||
await agent_service.initialize()
|
||||
|
||||
# 3. 创建历史查询服务(保持原有的 checkpointer 参数)
|
||||
history_service = ThreadHistoryService(checkpointer)
|
||||
|
||||
# 4. 创建审核管理器
|
||||
review_manager = ReviewManager(InMemoryReviewStore())
|
||||
|
||||
# 5. 将服务实例存入 app.state
|
||||
app.state.agent_service = agent_service
|
||||
app.state.history_service = history_service
|
||||
app.state.review_manager = review_manager
|
||||
app.state.contact_api = ContactAPIClient()
|
||||
app.state.dictionary_api = DictionaryAPIClient()
|
||||
app.state.news_api = NewsAPIClient()
|
||||
app.state.contact_repo = None
|
||||
app.state.dictionary_repo = None
|
||||
app.state.news_repo = None
|
||||
|
||||
# 应用运行中...
|
||||
yield
|
||||
|
||||
# 6. 关闭时清理
|
||||
info("🛑 应用关闭")
|
||||
"""应用生命周期管理:创建并注入全局服务"""
|
||||
# 1. 创建数据库连接池并初始化表(仅 checkpointer)
|
||||
async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
|
||||
await checkpointer.setup()
|
||||
|
||||
# 1.5 初始化子图表
|
||||
await init_subgraph_tables(checkpointer.conn)
|
||||
|
||||
# 2. 构建 AI Agent 服务
|
||||
agent_service = AIAgentService(checkpointer)
|
||||
await agent_service.initialize()
|
||||
|
||||
# 3. 创建历史查询服务(保持原有的 checkpointer 参数)
|
||||
history_service = ThreadHistoryService(checkpointer)
|
||||
|
||||
# 3.5 创建子图 Repositories
|
||||
contact_repo = ContactRepository(checkpointer.conn)
|
||||
dictionary_repo = DictionaryRepository(checkpointer.conn)
|
||||
news_repo = NewsRepository(checkpointer.conn)
|
||||
|
||||
# 3.6 创建子图 API 客户端(真实数据库模式)
|
||||
contact_api = ContactAPIClient(checkpointer.conn)
|
||||
dictionary_api = DictionaryAPIClient(word_repository=dictionary_repo)
|
||||
news_api = NewsAPIClient(news_repository=news_repo)
|
||||
|
||||
# 4. 创建审核管理器
|
||||
review_manager = ReviewManager(InMemoryReviewStore())
|
||||
|
||||
# 5. 将服务实例存入 app.state
|
||||
app.state.agent_service = agent_service
|
||||
app.state.history_service = history_service
|
||||
app.state.review_manager = review_manager
|
||||
app.state.contact_api = contact_api
|
||||
app.state.dictionary_api = dictionary_api
|
||||
app.state.news_api = news_api
|
||||
app.state.contact_repo = contact_repo
|
||||
app.state.dictionary_repo = dictionary_repo
|
||||
app.state.news_repo = news_repo
|
||||
|
||||
# 应用运行中...
|
||||
yield
|
||||
|
||||
# 6. 关闭时自动清理数据库连接(async with 负责)
|
||||
info("🛑 应用关闭,数据库连接池已释放")
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
|
||||
@@ -136,6 +136,92 @@ class ZhipuRerankService(BaseRerankService):
|
||||
raise
|
||||
|
||||
|
||||
class LLMFallbackRerankService(BaseRerankService):
|
||||
"""
|
||||
使用 LLM 作为最后的降级方案进行重排
|
||||
通过让 LLM 评估文档相关性并给出分数
|
||||
"""
|
||||
|
||||
def __init__(self, llm=None):
|
||||
from .chat_services import get_chat_service
|
||||
self.llm = llm or get_chat_service()
|
||||
|
||||
def compute_scores(self, query: str, documents: List[str]) -> List[float]:
|
||||
"""
|
||||
使用 LLM 评估文档相关性并打分
|
||||
"""
|
||||
if not documents:
|
||||
return []
|
||||
|
||||
scores = []
|
||||
for doc in documents:
|
||||
score = self._score_single_document(query, doc)
|
||||
scores.append(score)
|
||||
|
||||
return scores
|
||||
|
||||
def _score_single_document(self, query: str, document: str) -> float:
|
||||
"""
|
||||
让 LLM 为单个文档的相关性打分 (0.0-1.0)
|
||||
"""
|
||||
prompt = f"""你是一个文档相关性评分专家。请评估以下文档与查询的相关性,返回一个0到1之间的分数:
|
||||
- 1.0表示完全相关
|
||||
- 0.0表示完全不相关
|
||||
|
||||
查询: {query}
|
||||
|
||||
文档: {document}
|
||||
|
||||
请只返回一个数字,不要解释。"""
|
||||
|
||||
try:
|
||||
result = self.llm.invoke(prompt)
|
||||
content = result.content if hasattr(result, 'content') else str(result)
|
||||
# 尝试提取数字
|
||||
import re
|
||||
match = re.search(r'(\d+\.?\d*)', content)
|
||||
if match:
|
||||
score = float(match.group(1))
|
||||
# 确保在 0-1 之间
|
||||
return max(0.0, min(1.0, score))
|
||||
# 如果没有找到数字,返回0.5作为默认值
|
||||
return 0.5
|
||||
except Exception as e:
|
||||
logger.warning(f"LLM 打分失败,返回默认分数 0.5: {e}")
|
||||
return 0.5
|
||||
|
||||
|
||||
class LLMFallbackRerankProvider(BaseServiceProvider[BaseRerankService]):
|
||||
"""
|
||||
LLM 降级重排服务提供者
|
||||
"""
|
||||
|
||||
def __init__(self, llm=None):
|
||||
super().__init__("llm_fallback_rerank")
|
||||
self._llm = llm
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""
|
||||
LLM 降级方案总是可用(只要 LLM 服务可用)
|
||||
"""
|
||||
try:
|
||||
from .chat_services import get_chat_service
|
||||
get_chat_service()
|
||||
logger.info("LLM 降级重排服务可用")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"LLM 降级重排服务不可用: {e}")
|
||||
return False
|
||||
|
||||
def get_service(self) -> BaseRerankService:
|
||||
"""
|
||||
获取 LLM 降级重排服务
|
||||
"""
|
||||
if self._service_instance is None:
|
||||
self._service_instance = LLMFallbackRerankService(self._llm)
|
||||
return self._service_instance
|
||||
|
||||
|
||||
class LocalLlamaCppRerankProvider(BaseServiceProvider[BaseRerankService]):
|
||||
"""
|
||||
本地 llama.cpp 重排服务提供者
|
||||
@@ -221,13 +307,15 @@ def get_rerank_service() -> BaseRerankService:
|
||||
"""
|
||||
获取重排服务(带自动降级)- 纯服务层
|
||||
|
||||
降级链: Local llama.cpp -> Zhipu Rerank -> LLM Fallback
|
||||
|
||||
Returns:
|
||||
BaseRerankService: 重排服务实例
|
||||
"""
|
||||
def _create_chain():
|
||||
primary = LocalLlamaCppRerankProvider()
|
||||
fallback = ZhipuRerankProvider()
|
||||
return FallbackServiceChain(primary, [fallback])
|
||||
fallbacks = [ZhipuRerankProvider(), LLMFallbackRerankProvider()]
|
||||
return FallbackServiceChain(primary, fallbacks)
|
||||
|
||||
chain = SingletonServiceManager.get_or_create("rerank_service_chain", _create_chain)
|
||||
return chain.get_available_service()
|
||||
|
||||
Reference in New Issue
Block a user