diff --git a/backend/app/model_services/rerank_services.py b/backend/app/model_services/rerank_services.py index 5de0a1c..56a5736 100644 --- a/backend/app/model_services/rerank_services.py +++ b/backend/app/model_services/rerank_services.py @@ -9,12 +9,14 @@ - LocalLlamaCppRerankProvider:本地 llama.cpp 重排服务提供者 - ZhipuRerankProvider:智谱 API 重排服务提供者 - get_rerank_service():获取重排服务的统一接口 + +注意:本模块只负责调用 rerank server,不包含业务逻辑(文档处理、排序、top_n) +业务逻辑放在 backend/app/rag/ 目录下 """ import logging from typing import List -import requests -from langchain_core.documents import Document +import httpx from .base import ( BaseServiceProvider, @@ -32,115 +34,117 @@ from ..config import ( logger = logging.getLogger(__name__) -class BaseReranker: +class BaseRerankService: """ - 重排器基类,定义统一的接口 + 重排服务基类 - 纯服务层,只负责调用 server + 不包含业务逻辑(文档处理、排序、top_n 等在 rag/ 目录下) """ - - def compress_documents(self, documents: List[Document], query: str, top_n: int = 5) -> List[Document]: + + def compute_scores(self, query: str, documents: List[str]) -> List[float]: """ - 对文档进行重排序 - + 计算每个文档与查询的相关性得分 - 纯 API 调用 + Args: - documents: 待排序的文档列表 query: 查询字符串 - top_n: 返回前 N 个结果 - + documents: 文档字符串列表 + Returns: - 排序后的文档列表 + List[float]: 每个文档的相关性得分列表 """ raise NotImplementedError -class LocalLlamaCppReranker(BaseReranker): +class LocalLlamaCppRerankService(BaseRerankService): """ - 使用远程 llama.cpp 服务对检索结果重排序 + 本地 llama.cpp 重排服务 - 纯服务层 """ - - def __init__(self, base_url: str, api_key: str, model: str = "bge-reranker-v2-m3", timeout: int = 60): + + def __init__(self, base_url: str, api_key: str, model: str = "bge-reranker-v2-m3"): self.base_url = base_url self.api_key = api_key self.model = model - self.timeout = timeout - self.endpoint = f"{self.base_url}/rerank" - - def compress_documents(self, documents: List[Document], query: str, top_n: int = 5) -> List[Document]: + + def compute_scores(self, query: str, documents: List[str]) -> List[float]: """ - 对文档进行重排序 + 调用 llama.cpp rerank API 计算得分 - 纯 API 调用 """ if not documents: return [] - - # 准备请求体 + + headers = {"Content-Type": "application/json"} + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + + base = self.base_url.rstrip("/") + if not base.endswith("/v1"): + base = base + "/v1" + payload = { "model": self.model, "query": query, - "documents": [doc.page_content for doc in documents], - "top_n": top_n + "documents": documents, } - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {self.api_key}" - } - - try: - response = requests.post(self.endpoint, json=payload, headers=headers, timeout=self.timeout) + + with httpx.Client(timeout=120) as client: + response = client.post( + f"{base}/rerank", + headers=headers, + json=payload, + ) response.raise_for_status() - results = response.json() - - # 解析返回结果 - sorted_indices = [item["index"] for item in results["results"]] - sorted_docs = [documents[idx] for idx in sorted_indices] - return sorted_docs - except Exception as e: - logger.warning(f"远程重排序过程出错,返回原始前 {top_n} 个结果: {e}") - return documents[:top_n] + data = response.json() + + if isinstance(data, dict) and "results" in data: + results = data["results"] + results_sorted = sorted(results, key=lambda x: x["index"]) + return [item["relevance_score"] for item in results_sorted] + else: + raise ValueError(f"未知的 rerank API 响应格式: {data}") -class ZhipuReranker(BaseReranker): +class ZhipuRerankService(BaseRerankService): """ - 使用智谱 API 对检索结果重排序 + 智谱 API 重排服务 - 纯服务层 """ - + def __init__(self, model: str | None = None): self.model = model or ZHIPU_RERANK_MODEL self.api_key = ZHIPUAI_API_KEY - - def compress_documents(self, documents: List[Document], query: str, top_n: int = 5) -> List[Document]: + + def compute_scores(self, query: str, documents: List[str]) -> List[float]: """ - 对文档进行重排序 + 调用智谱 rerank API 计算得分 - 纯 API 调用 """ if not documents: return [] - + try: from zhipuai import ZhipuAI client = ZhipuAI(api_key=self.api_key) - + response = client.rerank.create( model=self.model, query=query, - documents=[doc.page_content for doc in documents], - top_n=top_n + documents=documents, ) - - sorted_indices = [item.index for item in response.results] - sorted_docs = [documents[idx] for idx in sorted_indices] - return sorted_docs + + results_sorted = sorted(response.results, key=lambda x: x.index) + return [item.relevance_score for item in results_sorted] + except Exception as e: - logger.warning(f"智谱重排序过程出错,返回原始前 {top_n} 个结果: {e}") - return documents[:top_n] + logger.warning(f"智谱 rerank 调用失败: {e}") + raise -class LocalLlamaCppRerankProvider(BaseServiceProvider[BaseReranker]): +class LocalLlamaCppRerankProvider(BaseServiceProvider[BaseRerankService]): """ 本地 llama.cpp 重排服务提供者 """ - + def __init__(self, model: str = "bge-reranker-v2-m3"): super().__init__("local_llamacpp_rerank") self._model = model - + def is_available(self) -> bool: """ 检查本地 llama.cpp 重排服务是否可用 @@ -148,28 +152,26 @@ class LocalLlamaCppRerankProvider(BaseServiceProvider[BaseReranker]): if not LLAMACPP_RERANKER_URL: logger.warning("LLAMACPP_RERANKER_URL 未配置") return False - + try: - # 测试重排服务 - test_docs = [Document(page_content="test document 1"), Document(page_content="test document 2")] - reranker = LocalLlamaCppReranker( + service = LocalLlamaCppRerankService( base_url=LLAMACPP_RERANKER_URL, api_key=LLAMACPP_API_KEY, model=self._model ) - result = reranker.compress_documents(test_docs, "test query", top_n=1) + test_scores = service.compute_scores("test query", ["test document"]) logger.info(f"本地 llama.cpp 重排服务可用") return True except Exception as e: logger.warning(f"本地 llama.cpp 重排服务不可用: {e}") return False - - def get_service(self) -> BaseReranker: + + def get_service(self) -> BaseRerankService: """ 获取本地 llama.cpp 重排服务 """ if self._service_instance is None: - self._service_instance = LocalLlamaCppReranker( + self._service_instance = LocalLlamaCppRerankService( base_url=LLAMACPP_RERANKER_URL, api_key=LLAMACPP_API_KEY, model=self._model @@ -177,15 +179,15 @@ class LocalLlamaCppRerankProvider(BaseServiceProvider[BaseReranker]): return self._service_instance -class ZhipuRerankProvider(BaseServiceProvider[BaseReranker]): +class ZhipuRerankProvider(BaseServiceProvider[BaseRerankService]): """ 智谱 API 重排服务提供者 """ - + def __init__(self, model: str | None = None): super().__init__("zhipu_rerank") self._model = model or ZHIPU_RERANK_MODEL - + def is_available(self) -> bool: """ 检查智谱 API 重排服务是否可用 @@ -193,12 +195,10 @@ class ZhipuRerankProvider(BaseServiceProvider[BaseReranker]): if not ZHIPUAI_API_KEY: logger.warning("ZHIPUAI_API_KEY 未配置") return False - + try: - # 测试重排服务 - test_docs = [Document(page_content="test document 1"), Document(page_content="test document 2")] - reranker = ZhipuReranker(model=self._model) - result = reranker.compress_documents(test_docs, "test query", top_n=1) + service = ZhipuRerankService(model=self._model) + test_scores = service.compute_scores("test query", ["test document"]) logger.info(f"智谱重排服务可用") return True except ImportError: @@ -207,27 +207,28 @@ class ZhipuRerankProvider(BaseServiceProvider[BaseReranker]): except Exception as e: logger.warning(f"智谱重排服务不可用: {e}") return False - - def get_service(self) -> BaseReranker: + + def get_service(self) -> BaseRerankService: """ 获取智谱 API 重排服务 """ if self._service_instance is None: - self._service_instance = ZhipuReranker(model=self._model) + self._service_instance = ZhipuRerankService(model=self._model) return self._service_instance -def get_rerank_service() -> BaseReranker: +def get_rerank_service() -> BaseRerankService: """ - 获取重排服务(带自动降级) - + 获取重排服务(带自动降级)- 纯服务层 + Returns: - BaseReranker: 重排服务实例 + BaseRerankService: 重排服务实例 """ def _create_chain(): primary = LocalLlamaCppRerankProvider() fallback = ZhipuRerankProvider() return FallbackServiceChain(primary, [fallback]) - + chain = SingletonServiceManager.get_or_create("rerank_service_chain", _create_chain) return chain.get_available_service() + diff --git a/backend/app/rag/pipeline.py b/backend/app/rag/pipeline.py index 4f41bcc..e714fb0 100644 --- a/backend/app/rag/pipeline.py +++ b/backend/app/rag/pipeline.py @@ -7,6 +7,7 @@ from langchain_core.documents import Document from langchain_core.language_models import BaseLanguageModel from ..model_services import get_rerank_service +from .rerank import create_document_reranker from .query_transform import MultiQueryGenerator from .fusion import reciprocal_rank_fusion @@ -38,7 +39,7 @@ class RAGPipeline: # 初始化组件 - 使用统一的重排服务获取接口 self.query_generator = MultiQueryGenerator(llm=llm, num_queries=num_queries) - self.reranker = get_rerank_service() + self.reranker = create_document_reranker() async def aretrieve(self, query: str) -> List[Document]: """ diff --git a/backend/app/rag/rerank.py b/backend/app/rag/rerank.py new file mode 100644 index 0000000..34a7133 --- /dev/null +++ b/backend/app/rag/rerank.py @@ -0,0 +1,89 @@ +""" +重排业务逻辑模块 + +本模块包含 RAG 相关的重排业务逻辑(文档处理、排序、top_n) +使用 model_services/rerank_services.py 提供的纯服务层 +""" + +import logging +from typing import List +from langchain_core.documents import Document + +from ..model_services import get_rerank_service + +logger = logging.getLogger(__name__) + + +class DocumentReranker: + """ + 文档重排器 - 业务逻辑层 + + 负责: + - 从 Document 提取内容 + - 调用 rerank service 获取得分 + - 根据得分排序 + - 返回 top_n 文档 + """ + + def __init__(self, rerank_service=None): + """ + 初始化文档重排器 + + Args: + rerank_service: 重排服务(可选,默认通过 get_rerank_service() 获取) + """ + self._rerank_service = rerank_service or get_rerank_service() + + def compress_documents( + self, + documents: List[Document], + query: str, + top_n: int = 5 + ) -> List[Document]: + """ + 对文档进行重排 - 业务逻辑 + + Args: + documents: 待排序的文档列表 + query: 查询字符串 + top_n: 返回前 N 个结果 + + Returns: + List[Document]: 排序后的文档列表 + """ + if not documents: + return [] + + try: + # 1. 从 Document 提取内容(业务逻辑) + doc_contents = [doc.page_content for doc in documents] + + # 2. 调用纯服务层计算得分 + scores = self._rerank_service.compute_scores(query, doc_contents) + + # 3. 根据得分排序(业务逻辑) + doc_score_pairs = list(zip(documents, scores)) + doc_score_pairs_sorted = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True) + + # 4. 取 top_n + top_docs = [pair[0] for pair in doc_score_pairs_sorted[:top_n]] + + return top_docs + + except Exception as e: + logger.warning(f"重排过程出错,返回原始前 {top_n} 个结果: {e}") + return documents[:top_n] + + +def create_document_reranker(rerank_service=None) -> DocumentReranker: + """ + 创建文档重排器的工厂函数 + + Args: + rerank_service: 重排服务(可选) + + Returns: + DocumentReranker: 文档重排器实例 + """ + return DocumentReranker(rerank_service) +