From 5c45806ad3be13a2e277249aea88bb5e52960357 Mon Sep 17 00:00:00 2001 From: root <953994191@qq.com> Date: Sun, 3 May 2026 18:12:20 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=80=20=E5=AE=8C=E5=85=A8=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=20Qdrant=20=E6=B7=B7=E5=90=88=E6=A3=80=E7=B4=A2?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=20-=20=E4=B8=8D=E9=9C=80=E8=A6=81=E5=85=BC?= =?UTF-8?q?=E5=AE=B9=EF=BC=8C=E5=AE=8C=E5=85=A8=E9=87=8D=E5=86=99=E4=B8=BA?= =?UTF-8?q?=E6=B7=B7=E5=90=88=E6=A3=80=E7=B4=A2=20-=20=E6=A3=80=E7=B4=A2?= =?UTF-8?q?=E5=99=A8=EF=BC=9A=E5=BC=BA=E5=88=B6=E4=BD=BF=E7=94=A8=20FastEm?= =?UTF-8?q?bedSparse=20+=20RetrievalMode.HYBRID=20-=20=E7=B4=A2=E5=BC=95?= =?UTF-8?q?=E5=99=A8=EF=BC=9A=E5=BC=BA=E5=88=B6=E5=90=AF=E7=94=A8=E7=A8=80?= =?UTF-8?q?=E7=96=8F=E5=90=91=E9=87=8F=EF=BC=8C=E6=B7=B7=E5=90=88=E6=A3=80?= =?UTF-8?q?=E7=B4=A2=E6=A8=A1=E5=BC=8F=20-=20=E6=B7=BB=E5=8A=A0=20fastembe?= =?UTF-8?q?d=20=E4=BE=9D=E8=B5=96=E5=88=B0=20requirements.txt=20-=20?= =?UTF-8?q?=E8=AF=AD=E6=B3=95=E6=A3=80=E6=9F=A5=E9=80=9A=E8=BF=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/rag/retriever.py | 62 +++++++++-------------------- backend/requirements.txt | 1 + rag_indexer/index_builder.py | 75 +++++++++++++++--------------------- rag_indexer/requirements.txt | 3 +- 4 files changed, 53 insertions(+), 88 deletions(-) diff --git a/backend/app/rag/retriever.py b/backend/app/rag/retriever.py index 8203dcd..65694d1 100644 --- a/backend/app/rag/retriever.py +++ b/backend/app/rag/retriever.py @@ -1,17 +1,16 @@ """ Qdrant 向量检索器模块 -提供基于 Qdrant 的基础向量检索和混合检索(Dense + Sparse)功能。 +提供基于 Qdrant 的混合检索(Dense + Sparse)功能。 核心原理: -- 使用 langchain-qdrant 的 RetrievalMode -- Qdrant 原生混合检索(如果集合已配置 sparse_vectors) -- 如果集合未配置,优雅回退到纯稠密检索 -- 完全兼容现有代码,无接口改动 +- 使用 Qdrant 原生混合检索(langchain-qdrant 的 RetrievalMode.HYBRID) +- 同时存储稠密向量和稀疏向量 +- 语义理解 + 关键词匹配,效果最优 使用示例: >>> from app.rag.retriever import create_hybrid_retriever - >>> retriever = create_hybrid_retriever(collection_name="my_docs") + >>> retriever = create_hybrid_retriever(collection_name="rag_documents") >>> docs = retriever.invoke("什么是 RAG?") """ @@ -21,6 +20,7 @@ from qdrant_client.http.exceptions import UnexpectedResponse from langchain_qdrant import ( QdrantVectorStore, RetrievalMode, + FastEmbedSparse, ) from langchain_core.embeddings import Embeddings from langchain_core.retrievers import BaseRetriever @@ -95,12 +95,7 @@ def create_hybrid_retriever( embeddings: Embeddings | None = None, ) -> BaseRetriever: """ - 创建混合检索器(使用 Qdrant 自身的 RetrievalMode.HYBRID)。 - - ⚡️ Qdrant 原生混合检索: - - 如果 Qdrant 集合已配置 sparse_vectors:启用 Qdrant 原生混合检索 - - 如果未配置:优雅回退到纯稠密检索 - - 完全兼容现有代码,接口不变 + 创建混合检索器(稠密向量 + BM25 稀疏向量,Qdrant 原生实现)。 Args: collection_name: Qdrant 集合名称。 @@ -139,41 +134,22 @@ def create_hybrid_retriever( raise ValueError(f"Qdrant 集合 '{collection_name}' 不存在") raise - # 检查 Qdrant 集合是否有稀疏向量配置 - sparse_available = False - try: - collection_info = client.get_collection(collection_name) - if hasattr(collection_info, 'config'): - params = collection_info.config.params - if hasattr(params, 'sparse_vectors') and params.sparse_vectors: - sparse_available = True - info("✅ 检测到 Qdrant 集合有稀疏向量配置,启用 Qdrant 原生混合检索") - except Exception as e: - warning(f"⚠️ 检查 Qdrant 集合稀疏向量配置失败: {e}") + # 初始化稀疏嵌入 + sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25") + info("✅ FastEmbedSparse 初始化成功") - # 如果有稀疏向量配置,用 Qdrant 原生混合检索 - if sparse_available: - try: - vector_store = QdrantVectorStore( - client=client, - collection_name=collection_name, - embedding=embeddings, - retrieval_mode=RetrievalMode.HYBRID, - ) - info(f"✅ Qdrant 原生混合检索器初始化成功 (k={total_k})") - return vector_store.as_retriever(search_kwargs=search_kwargs) - except Exception as e: - warning(f"⚠️ Qdrant 原生混合检索初始化失败: {e},回退到纯稠密检索") - - # 如果没有稀疏向量配置,回退到纯稠密检索 - info("ℹ️ Qdrant 集合未配置稀疏向量,使用纯稠密检索(完全兼容)") - return create_base_retriever( - collection_name=collection_name, - search_kwargs=search_kwargs, + # 创建混合模式的 QdrantVectorStore + vector_store = QdrantVectorStore( client=client, - embeddings=embeddings, + collection_name=collection_name, + embedding=embeddings, + sparse_embedding=sparse_embeddings, + retrieval_mode=RetrievalMode.HYBRID, ) + info(f"✅ Qdrant 原生混合检索器初始化成功 (k={total_k})") + return vector_store.as_retriever(search_kwargs=search_kwargs) + # 可选:提供异步友好的辅助函数 async def acreate_base_retriever( diff --git a/backend/requirements.txt b/backend/requirements.txt index d36b0ec..2edc6e8 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -18,6 +18,7 @@ zhipuai==2.0.1 # Vector DB qdrant-client==1.17.1 +fastembed>=0.3.0 # 用于 Qdrant BM25 稀疏向量 # Memory mem0ai==1.0.11 diff --git a/rag_indexer/index_builder.py b/rag_indexer/index_builder.py index b60319e..a6b8149 100644 --- a/rag_indexer/index_builder.py +++ b/rag_indexer/index_builder.py @@ -2,6 +2,7 @@ 离线 RAG 索引构建核心流水线。 使用 LangChain 的 ParentDocumentRetriever 实现父子块策略。 +支持 Qdrant 混合检索(Dense + Sparse)。 """ import asyncio @@ -20,6 +21,8 @@ from langchain_core.embeddings import Embeddings from langchain_core.stores import BaseStore from langchain_text_splitters import RecursiveCharacterTextSplitter, TextSplitter from qdrant_client.http.exceptions import ResponseHandlingException +from qdrant_client import QdrantClient +from qdrant_client.http.models import SparseVectorParams from .loaders import DocumentLoader from .splitters import SplitterType, get_splitter @@ -71,13 +74,10 @@ class IndexBuilderConfig: # 其他切分器参数(当 splitter_type 非父子块时使用) extra_splitter_kwargs: Dict[str, Any] = field(default_factory=dict) - - # 混合检索支持(默认 False,完全兼容) - enable_sparse: bool = False # ---------- 索引构建器 ---------- class IndexBuilder: - """RAG 索引构建主流水线,支持单块切分与父子块切分。""" + """RAG 索引构建主流水线,支持单块切分与父子块切分,支持混合检索。""" def __init__(self, config: Optional[IndexBuilderConfig] = None, embeddings: Optional[Embeddings] = None, **kwargs): """ @@ -118,28 +118,19 @@ class IndexBuilder: self.embedder = LlamaCppEmbedder() self.embeddings = self.embedder.as_langchain_embeddings() - # 初始化向量存储 - # 默认 enable_sparse=False,完全兼容现有代码 - # 若需要启用混合检索,请先安装 fastembed,然后设置 enable_sparse=True - qdrant_kwargs = { - "collection_name": config.collection_name, - } - - if self.config.enable_sparse: - try: - from langchain_qdrant import FastEmbedSparse, RetrievalMode - qdrant_kwargs["sparse_embedding"] = FastEmbedSparse(model_name="Qdrant/bm25") - qdrant_kwargs["retrieval_mode"] = RetrievalMode.HYBRID - logger.info("✅ 稀疏向量支持已启用") - except ImportError: - logger.warning("⚠️ fastembed 未安装,无法启用稀疏向量,继续使用纯稠密") - except Exception as e: - logger.warning(f"⚠️ 稀疏向量初始化失败: {e},继续使用纯稠密") - - if self.embedder is None: - qdrant_kwargs["embedding"] = self.embeddings - - self.vector_store = QdrantVectorStore(**qdrant_kwargs) + # 初始化稀疏嵌入 + from langchain_qdrant import FastEmbedSparse, RetrievalMode + self.sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25") + logger.info("✅ FastEmbedSparse 初始化成功") + + # 初始化向量存储(混合检索模式) + self.vector_store = QdrantVectorStore( + collection_name=config.collection_name, + embedding=self.embeddings if self.embedder is None else None, + sparse_embedding=self.sparse_embeddings, + retrieval_mode=RetrievalMode.HYBRID, + ) + logger.info("✅ 混合检索向量存储初始化成功") # 根据切分类型初始化相关组件 self._init_splitters_and_retriever() @@ -222,9 +213,7 @@ class IndexBuilder: logger.info("已加载 %d 个文档", len(documents)) return await self._process_documents(documents) - async def build_from_directory( - self, directory_path: Union[str, Path], recursive: bool = True - ) -> int: + async def build_from_directory(self, directory_path: Union[str, Path], recursive: bool = True) -> int: """从目录递归构建索引。""" logger.info("加载目录: %s (递归=%s)", directory_path, recursive) documents = self.loader.load_directory(directory_path, recursive=recursive) @@ -243,8 +232,8 @@ class IndexBuilder: return await self._index_with_single_splitter(documents) async def _index_with_single_splitter(self, documents: List[Document]) -> int: - """单一模式:切分后直接写入向量库。""" - chunks = self.splitter.split_documents(documents) # type: ignore[union-attr] + """单一切分模式:切分后直接写入向量库。""" + chunks = self.splitter.split_documents(documents) logger.info("已切分为 %d 个块", len(chunks)) self.vector_store.create_collection() @@ -252,7 +241,7 @@ class IndexBuilder: return len(chunks) async def _index_with_parent_child(self, documents: List[Document]) -> int: - """父子模式:使用 ParentDocumentRetriever 批量添加。""" + """父子块模式:使用 ParentDocumentRetriever 批量添加。""" self.vector_store.create_collection() assert self.retriever is not None @@ -261,7 +250,7 @@ class IndexBuilder: processed = 0 for i in range(0, total, batch_size): - batch = documents[i:i + batch_size] + batch = documents[i:i+batch_size] await self._add_batch_with_retry(batch, i // batch_size + 1) processed += len(batch) logger.info("批次 %d: 已处理 %d/%d", i // batch_size + 1, processed, total) @@ -275,7 +264,7 @@ class IndexBuilder: base_delay = 2 for attempt in range(max_retries): try: - await self.retriever.aadd_documents(batch) # type: ignore[union-attr] + await self.retriever.aadd_documents(batch) logger.info("批次 %d 成功添加 %d 个文档", batch_no, len(batch)) return except (RemoteProtocolError, ConnectionError, OSError, ResponseHandlingException) as e: @@ -300,17 +289,17 @@ class IndexBuilder: def get_child_splitter(self) -> TextSplitter: """获取当前使用的子块切分器。""" if self.config.splitter_type == SplitterType.PARENT_CHILD: - return self.child_splitter # type: ignore[return-value] - return self.splitter # type: ignore[return-value] + return self.child_splitter + return self.splitter def get_parent_splitter(self) -> RecursiveCharacterTextSplitter: - """获取父块切分器(仅父子模式可用)。""" + """获取父块切分器(仅父子块模式可用)。""" if self.config.splitter_type != SplitterType.PARENT_CHILD: raise RuntimeError("父块切分器仅在父子块模式下可用") - return self.parent_splitter # type: ignore[return-value] + return self.parent_splitter def get_docstore(self) -> BaseStore: - """获取文档存储实例(仅父子模式可用)。""" + """获取文档存储实例(仅父子块模式可用)。""" if self.config.splitter_type != SplitterType.PARENT_CHILD: raise RuntimeError("文档存储仅在父子块模式下可用") assert self.docstore is not None @@ -325,17 +314,17 @@ class IndexBuilder: except RuntimeError: # 无运行中的事件循环,创建临时循环 loop = asyncio.new_event_loop() - loop.run_until_complete(self.docstore.aclose()) # type: ignore[attr-defined] + loop.run_until_complete(self.docstore.aclose()) loop.close() else: # 已有运行中的循环,创建任务(用户自行等待) - loop.create_task(self.docstore.aclose()) # type: ignore[attr-defined] + loop.create_task(self.docstore.aclose()) logger.info("IndexBuilder 资源已关闭") async def aclose(self) -> None: """异步关闭资源。""" if self.docstore is not None and hasattr(self.docstore, "aclose"): - await self.docstore.aclose() # type: ignore[attr-defined] + await self.docstore.aclose() logger.info("IndexBuilder 资源已异步关闭") def __enter__(self): @@ -350,4 +339,4 @@ class IndexBuilder: async def __aexit__(self, exc_type, exc_val, exc_tb): await self.aclose() - return False \ No newline at end of file + return False diff --git a/rag_indexer/requirements.txt b/rag_indexer/requirements.txt index 1b4460e..1dc4327 100644 --- a/rag_indexer/requirements.txt +++ b/rag_indexer/requirements.txt @@ -14,8 +14,7 @@ tiktoken>=0.12.0 # Vector DB qdrant-client==1.17.1 -# 可选:用于稀疏向量支持 -# fastembed>=0.3.0 +fastembed>=0.3.0 # 用于 Qdrant BM25 稀疏向量 # HTTP httpx==0.28.1