refactor: 重构 rerank 架构,分离服务层和业务逻辑
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Has been cancelled
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Has been cancelled
- rerank_services.py:纯服务层,只负责调用 rerank server - rag/rerank.py:业务逻辑层,负责文档处理、排序、top_n - 更新 pipeline.py 使用新架构 - 架构与 embedding_services.py 保持一致
This commit is contained in:
@@ -9,12 +9,14 @@
|
|||||||
- LocalLlamaCppRerankProvider:本地 llama.cpp 重排服务提供者
|
- LocalLlamaCppRerankProvider:本地 llama.cpp 重排服务提供者
|
||||||
- ZhipuRerankProvider:智谱 API 重排服务提供者
|
- ZhipuRerankProvider:智谱 API 重排服务提供者
|
||||||
- get_rerank_service():获取重排服务的统一接口
|
- get_rerank_service():获取重排服务的统一接口
|
||||||
|
|
||||||
|
注意:本模块只负责调用 rerank server,不包含业务逻辑(文档处理、排序、top_n)
|
||||||
|
业务逻辑放在 backend/app/rag/ 目录下
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import List
|
from typing import List
|
||||||
import requests
|
import httpx
|
||||||
from langchain_core.documents import Document
|
|
||||||
|
|
||||||
from .base import (
|
from .base import (
|
||||||
BaseServiceProvider,
|
BaseServiceProvider,
|
||||||
@@ -32,83 +34,86 @@ from ..config import (
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class BaseReranker:
|
class BaseRerankService:
|
||||||
"""
|
"""
|
||||||
重排器基类,定义统一的接口
|
重排服务基类 - 纯服务层,只负责调用 server
|
||||||
|
不包含业务逻辑(文档处理、排序、top_n 等在 rag/ 目录下)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def compress_documents(self, documents: List[Document], query: str, top_n: int = 5) -> List[Document]:
|
def compute_scores(self, query: str, documents: List[str]) -> List[float]:
|
||||||
"""
|
"""
|
||||||
对文档进行重排序
|
计算每个文档与查询的相关性得分 - 纯 API 调用
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
documents: 待排序的文档列表
|
|
||||||
query: 查询字符串
|
query: 查询字符串
|
||||||
top_n: 返回前 N 个结果
|
documents: 文档字符串列表
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
排序后的文档列表
|
List[float]: 每个文档的相关性得分列表
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
class LocalLlamaCppReranker(BaseReranker):
|
class LocalLlamaCppRerankService(BaseRerankService):
|
||||||
"""
|
"""
|
||||||
使用远程 llama.cpp 服务对检索结果重排序
|
本地 llama.cpp 重排服务 - 纯服务层
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, base_url: str, api_key: str, model: str = "bge-reranker-v2-m3", timeout: int = 60):
|
def __init__(self, base_url: str, api_key: str, model: str = "bge-reranker-v2-m3"):
|
||||||
self.base_url = base_url
|
self.base_url = base_url
|
||||||
self.api_key = api_key
|
self.api_key = api_key
|
||||||
self.model = model
|
self.model = model
|
||||||
self.timeout = timeout
|
|
||||||
self.endpoint = f"{self.base_url}/rerank"
|
|
||||||
|
|
||||||
def compress_documents(self, documents: List[Document], query: str, top_n: int = 5) -> List[Document]:
|
def compute_scores(self, query: str, documents: List[str]) -> List[float]:
|
||||||
"""
|
"""
|
||||||
对文档进行重排序
|
调用 llama.cpp rerank API 计算得分 - 纯 API 调用
|
||||||
"""
|
"""
|
||||||
if not documents:
|
if not documents:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# 准备请求体
|
headers = {"Content-Type": "application/json"}
|
||||||
|
if self.api_key:
|
||||||
|
headers["Authorization"] = f"Bearer {self.api_key}"
|
||||||
|
|
||||||
|
base = self.base_url.rstrip("/")
|
||||||
|
if not base.endswith("/v1"):
|
||||||
|
base = base + "/v1"
|
||||||
|
|
||||||
payload = {
|
payload = {
|
||||||
"model": self.model,
|
"model": self.model,
|
||||||
"query": query,
|
"query": query,
|
||||||
"documents": [doc.page_content for doc in documents],
|
"documents": documents,
|
||||||
"top_n": top_n
|
|
||||||
}
|
|
||||||
headers = {
|
|
||||||
"Content-Type": "application/json",
|
|
||||||
"Authorization": f"Bearer {self.api_key}"
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
with httpx.Client(timeout=120) as client:
|
||||||
response = requests.post(self.endpoint, json=payload, headers=headers, timeout=self.timeout)
|
response = client.post(
|
||||||
|
f"{base}/rerank",
|
||||||
|
headers=headers,
|
||||||
|
json=payload,
|
||||||
|
)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
results = response.json()
|
data = response.json()
|
||||||
|
|
||||||
# 解析返回结果
|
if isinstance(data, dict) and "results" in data:
|
||||||
sorted_indices = [item["index"] for item in results["results"]]
|
results = data["results"]
|
||||||
sorted_docs = [documents[idx] for idx in sorted_indices]
|
results_sorted = sorted(results, key=lambda x: x["index"])
|
||||||
return sorted_docs
|
return [item["relevance_score"] for item in results_sorted]
|
||||||
except Exception as e:
|
else:
|
||||||
logger.warning(f"远程重排序过程出错,返回原始前 {top_n} 个结果: {e}")
|
raise ValueError(f"未知的 rerank API 响应格式: {data}")
|
||||||
return documents[:top_n]
|
|
||||||
|
|
||||||
|
|
||||||
class ZhipuReranker(BaseReranker):
|
class ZhipuRerankService(BaseRerankService):
|
||||||
"""
|
"""
|
||||||
使用智谱 API 对检索结果重排序
|
智谱 API 重排服务 - 纯服务层
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, model: str | None = None):
|
def __init__(self, model: str | None = None):
|
||||||
self.model = model or ZHIPU_RERANK_MODEL
|
self.model = model or ZHIPU_RERANK_MODEL
|
||||||
self.api_key = ZHIPUAI_API_KEY
|
self.api_key = ZHIPUAI_API_KEY
|
||||||
|
|
||||||
def compress_documents(self, documents: List[Document], query: str, top_n: int = 5) -> List[Document]:
|
def compute_scores(self, query: str, documents: List[str]) -> List[float]:
|
||||||
"""
|
"""
|
||||||
对文档进行重排序
|
调用智谱 rerank API 计算得分 - 纯 API 调用
|
||||||
"""
|
"""
|
||||||
if not documents:
|
if not documents:
|
||||||
return []
|
return []
|
||||||
@@ -120,19 +125,18 @@ class ZhipuReranker(BaseReranker):
|
|||||||
response = client.rerank.create(
|
response = client.rerank.create(
|
||||||
model=self.model,
|
model=self.model,
|
||||||
query=query,
|
query=query,
|
||||||
documents=[doc.page_content for doc in documents],
|
documents=documents,
|
||||||
top_n=top_n
|
|
||||||
)
|
)
|
||||||
|
|
||||||
sorted_indices = [item.index for item in response.results]
|
results_sorted = sorted(response.results, key=lambda x: x.index)
|
||||||
sorted_docs = [documents[idx] for idx in sorted_indices]
|
return [item.relevance_score for item in results_sorted]
|
||||||
return sorted_docs
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"智谱重排序过程出错,返回原始前 {top_n} 个结果: {e}")
|
logger.warning(f"智谱 rerank 调用失败: {e}")
|
||||||
return documents[:top_n]
|
raise
|
||||||
|
|
||||||
|
|
||||||
class LocalLlamaCppRerankProvider(BaseServiceProvider[BaseReranker]):
|
class LocalLlamaCppRerankProvider(BaseServiceProvider[BaseRerankService]):
|
||||||
"""
|
"""
|
||||||
本地 llama.cpp 重排服务提供者
|
本地 llama.cpp 重排服务提供者
|
||||||
"""
|
"""
|
||||||
@@ -150,26 +154,24 @@ class LocalLlamaCppRerankProvider(BaseServiceProvider[BaseReranker]):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 测试重排服务
|
service = LocalLlamaCppRerankService(
|
||||||
test_docs = [Document(page_content="test document 1"), Document(page_content="test document 2")]
|
|
||||||
reranker = LocalLlamaCppReranker(
|
|
||||||
base_url=LLAMACPP_RERANKER_URL,
|
base_url=LLAMACPP_RERANKER_URL,
|
||||||
api_key=LLAMACPP_API_KEY,
|
api_key=LLAMACPP_API_KEY,
|
||||||
model=self._model
|
model=self._model
|
||||||
)
|
)
|
||||||
result = reranker.compress_documents(test_docs, "test query", top_n=1)
|
test_scores = service.compute_scores("test query", ["test document"])
|
||||||
logger.info(f"本地 llama.cpp 重排服务可用")
|
logger.info(f"本地 llama.cpp 重排服务可用")
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"本地 llama.cpp 重排服务不可用: {e}")
|
logger.warning(f"本地 llama.cpp 重排服务不可用: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_service(self) -> BaseReranker:
|
def get_service(self) -> BaseRerankService:
|
||||||
"""
|
"""
|
||||||
获取本地 llama.cpp 重排服务
|
获取本地 llama.cpp 重排服务
|
||||||
"""
|
"""
|
||||||
if self._service_instance is None:
|
if self._service_instance is None:
|
||||||
self._service_instance = LocalLlamaCppReranker(
|
self._service_instance = LocalLlamaCppRerankService(
|
||||||
base_url=LLAMACPP_RERANKER_URL,
|
base_url=LLAMACPP_RERANKER_URL,
|
||||||
api_key=LLAMACPP_API_KEY,
|
api_key=LLAMACPP_API_KEY,
|
||||||
model=self._model
|
model=self._model
|
||||||
@@ -177,7 +179,7 @@ class LocalLlamaCppRerankProvider(BaseServiceProvider[BaseReranker]):
|
|||||||
return self._service_instance
|
return self._service_instance
|
||||||
|
|
||||||
|
|
||||||
class ZhipuRerankProvider(BaseServiceProvider[BaseReranker]):
|
class ZhipuRerankProvider(BaseServiceProvider[BaseRerankService]):
|
||||||
"""
|
"""
|
||||||
智谱 API 重排服务提供者
|
智谱 API 重排服务提供者
|
||||||
"""
|
"""
|
||||||
@@ -195,10 +197,8 @@ class ZhipuRerankProvider(BaseServiceProvider[BaseReranker]):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 测试重排服务
|
service = ZhipuRerankService(model=self._model)
|
||||||
test_docs = [Document(page_content="test document 1"), Document(page_content="test document 2")]
|
test_scores = service.compute_scores("test query", ["test document"])
|
||||||
reranker = ZhipuReranker(model=self._model)
|
|
||||||
result = reranker.compress_documents(test_docs, "test query", top_n=1)
|
|
||||||
logger.info(f"智谱重排服务可用")
|
logger.info(f"智谱重排服务可用")
|
||||||
return True
|
return True
|
||||||
except ImportError:
|
except ImportError:
|
||||||
@@ -208,21 +208,21 @@ class ZhipuRerankProvider(BaseServiceProvider[BaseReranker]):
|
|||||||
logger.warning(f"智谱重排服务不可用: {e}")
|
logger.warning(f"智谱重排服务不可用: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_service(self) -> BaseReranker:
|
def get_service(self) -> BaseRerankService:
|
||||||
"""
|
"""
|
||||||
获取智谱 API 重排服务
|
获取智谱 API 重排服务
|
||||||
"""
|
"""
|
||||||
if self._service_instance is None:
|
if self._service_instance is None:
|
||||||
self._service_instance = ZhipuReranker(model=self._model)
|
self._service_instance = ZhipuRerankService(model=self._model)
|
||||||
return self._service_instance
|
return self._service_instance
|
||||||
|
|
||||||
|
|
||||||
def get_rerank_service() -> BaseReranker:
|
def get_rerank_service() -> BaseRerankService:
|
||||||
"""
|
"""
|
||||||
获取重排服务(带自动降级)
|
获取重排服务(带自动降级)- 纯服务层
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
BaseReranker: 重排服务实例
|
BaseRerankService: 重排服务实例
|
||||||
"""
|
"""
|
||||||
def _create_chain():
|
def _create_chain():
|
||||||
primary = LocalLlamaCppRerankProvider()
|
primary = LocalLlamaCppRerankProvider()
|
||||||
@@ -231,3 +231,4 @@ def get_rerank_service() -> BaseReranker:
|
|||||||
|
|
||||||
chain = SingletonServiceManager.get_or_create("rerank_service_chain", _create_chain)
|
chain = SingletonServiceManager.get_or_create("rerank_service_chain", _create_chain)
|
||||||
return chain.get_available_service()
|
return chain.get_available_service()
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ from langchain_core.documents import Document
|
|||||||
from langchain_core.language_models import BaseLanguageModel
|
from langchain_core.language_models import BaseLanguageModel
|
||||||
|
|
||||||
from ..model_services import get_rerank_service
|
from ..model_services import get_rerank_service
|
||||||
|
from .rerank import create_document_reranker
|
||||||
from .query_transform import MultiQueryGenerator
|
from .query_transform import MultiQueryGenerator
|
||||||
from .fusion import reciprocal_rank_fusion
|
from .fusion import reciprocal_rank_fusion
|
||||||
|
|
||||||
@@ -38,7 +39,7 @@ class RAGPipeline:
|
|||||||
|
|
||||||
# 初始化组件 - 使用统一的重排服务获取接口
|
# 初始化组件 - 使用统一的重排服务获取接口
|
||||||
self.query_generator = MultiQueryGenerator(llm=llm, num_queries=num_queries)
|
self.query_generator = MultiQueryGenerator(llm=llm, num_queries=num_queries)
|
||||||
self.reranker = get_rerank_service()
|
self.reranker = create_document_reranker()
|
||||||
|
|
||||||
async def aretrieve(self, query: str) -> List[Document]:
|
async def aretrieve(self, query: str) -> List[Document]:
|
||||||
"""
|
"""
|
||||||
|
|||||||
89
backend/app/rag/rerank.py
Normal file
89
backend/app/rag/rerank.py
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
"""
|
||||||
|
重排业务逻辑模块
|
||||||
|
|
||||||
|
本模块包含 RAG 相关的重排业务逻辑(文档处理、排序、top_n)
|
||||||
|
使用 model_services/rerank_services.py 提供的纯服务层
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import List
|
||||||
|
from langchain_core.documents import Document
|
||||||
|
|
||||||
|
from ..model_services import get_rerank_service
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DocumentReranker:
|
||||||
|
"""
|
||||||
|
文档重排器 - 业务逻辑层
|
||||||
|
|
||||||
|
负责:
|
||||||
|
- 从 Document 提取内容
|
||||||
|
- 调用 rerank service 获取得分
|
||||||
|
- 根据得分排序
|
||||||
|
- 返回 top_n 文档
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, rerank_service=None):
|
||||||
|
"""
|
||||||
|
初始化文档重排器
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rerank_service: 重排服务(可选,默认通过 get_rerank_service() 获取)
|
||||||
|
"""
|
||||||
|
self._rerank_service = rerank_service or get_rerank_service()
|
||||||
|
|
||||||
|
def compress_documents(
|
||||||
|
self,
|
||||||
|
documents: List[Document],
|
||||||
|
query: str,
|
||||||
|
top_n: int = 5
|
||||||
|
) -> List[Document]:
|
||||||
|
"""
|
||||||
|
对文档进行重排 - 业务逻辑
|
||||||
|
|
||||||
|
Args:
|
||||||
|
documents: 待排序的文档列表
|
||||||
|
query: 查询字符串
|
||||||
|
top_n: 返回前 N 个结果
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List[Document]: 排序后的文档列表
|
||||||
|
"""
|
||||||
|
if not documents:
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 1. 从 Document 提取内容(业务逻辑)
|
||||||
|
doc_contents = [doc.page_content for doc in documents]
|
||||||
|
|
||||||
|
# 2. 调用纯服务层计算得分
|
||||||
|
scores = self._rerank_service.compute_scores(query, doc_contents)
|
||||||
|
|
||||||
|
# 3. 根据得分排序(业务逻辑)
|
||||||
|
doc_score_pairs = list(zip(documents, scores))
|
||||||
|
doc_score_pairs_sorted = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)
|
||||||
|
|
||||||
|
# 4. 取 top_n
|
||||||
|
top_docs = [pair[0] for pair in doc_score_pairs_sorted[:top_n]]
|
||||||
|
|
||||||
|
return top_docs
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"重排过程出错,返回原始前 {top_n} 个结果: {e}")
|
||||||
|
return documents[:top_n]
|
||||||
|
|
||||||
|
|
||||||
|
def create_document_reranker(rerank_service=None) -> DocumentReranker:
|
||||||
|
"""
|
||||||
|
创建文档重排器的工厂函数
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rerank_service: 重排服务(可选)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DocumentReranker: 文档重排器实例
|
||||||
|
"""
|
||||||
|
return DocumentReranker(rerank_service)
|
||||||
|
|
||||||
Reference in New Issue
Block a user