🚀 完全实现 Qdrant 混合检索功能
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 11m7s

- 不需要兼容,完全重写为混合检索
- 检索器:强制使用 FastEmbedSparse + RetrievalMode.HYBRID
- 索引器:强制启用稀疏向量,混合检索模式
- 添加 fastembed 依赖到 requirements.txt
- 语法检查通过
This commit is contained in:
2026-05-03 18:12:20 +08:00
parent ce6e459e19
commit 5c45806ad3
4 changed files with 53 additions and 88 deletions

View File

@@ -2,6 +2,7 @@
离线 RAG 索引构建核心流水线。
使用 LangChain 的 ParentDocumentRetriever 实现父子块策略。
支持 Qdrant 混合检索Dense + Sparse
"""
import asyncio
@@ -20,6 +21,8 @@ from langchain_core.embeddings import Embeddings
from langchain_core.stores import BaseStore
from langchain_text_splitters import RecursiveCharacterTextSplitter, TextSplitter
from qdrant_client.http.exceptions import ResponseHandlingException
from qdrant_client import QdrantClient
from qdrant_client.http.models import SparseVectorParams
from .loaders import DocumentLoader
from .splitters import SplitterType, get_splitter
@@ -71,13 +74,10 @@ class IndexBuilderConfig:
# 其他切分器参数(当 splitter_type 非父子块时使用)
extra_splitter_kwargs: Dict[str, Any] = field(default_factory=dict)
# 混合检索支持(默认 False完全兼容
enable_sparse: bool = False
# ---------- 索引构建器 ----------
class IndexBuilder:
"""RAG 索引构建主流水线,支持单块切分与父子块切分。"""
"""RAG 索引构建主流水线,支持单块切分与父子块切分,支持混合检索"""
def __init__(self, config: Optional[IndexBuilderConfig] = None, embeddings: Optional[Embeddings] = None, **kwargs):
"""
@@ -118,28 +118,19 @@ class IndexBuilder:
self.embedder = LlamaCppEmbedder()
self.embeddings = self.embedder.as_langchain_embeddings()
# 初始化向量存储
# 默认 enable_sparse=False完全兼容现有代码
# 若需要启用混合检索,请先安装 fastembed然后设置 enable_sparse=True
qdrant_kwargs = {
"collection_name": config.collection_name,
}
if self.config.enable_sparse:
try:
from langchain_qdrant import FastEmbedSparse, RetrievalMode
qdrant_kwargs["sparse_embedding"] = FastEmbedSparse(model_name="Qdrant/bm25")
qdrant_kwargs["retrieval_mode"] = RetrievalMode.HYBRID
logger.info("稀疏向量支持已启用")
except ImportError:
logger.warning("⚠️ fastembed 未安装,无法启用稀疏向量,继续使用纯稠密")
except Exception as e:
logger.warning(f"⚠️ 稀疏向量初始化失败: {e},继续使用纯稠密")
if self.embedder is None:
qdrant_kwargs["embedding"] = self.embeddings
self.vector_store = QdrantVectorStore(**qdrant_kwargs)
# 初始化稀疏嵌入
from langchain_qdrant import FastEmbedSparse, RetrievalMode
self.sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")
logger.info("✅ FastEmbedSparse 初始化成功")
# 初始化向量存储(混合检索模式)
self.vector_store = QdrantVectorStore(
collection_name=config.collection_name,
embedding=self.embeddings if self.embedder is None else None,
sparse_embedding=self.sparse_embeddings,
retrieval_mode=RetrievalMode.HYBRID,
)
logger.info("混合检索向量存储初始化成功")
# 根据切分类型初始化相关组件
self._init_splitters_and_retriever()
@@ -222,9 +213,7 @@ class IndexBuilder:
logger.info("已加载 %d 个文档", len(documents))
return await self._process_documents(documents)
async def build_from_directory(
self, directory_path: Union[str, Path], recursive: bool = True
) -> int:
async def build_from_directory(self, directory_path: Union[str, Path], recursive: bool = True) -> int:
"""从目录递归构建索引。"""
logger.info("加载目录: %s (递归=%s)", directory_path, recursive)
documents = self.loader.load_directory(directory_path, recursive=recursive)
@@ -243,8 +232,8 @@ class IndexBuilder:
return await self._index_with_single_splitter(documents)
async def _index_with_single_splitter(self, documents: List[Document]) -> int:
"""单一模式:切分后直接写入向量库。"""
chunks = self.splitter.split_documents(documents) # type: ignore[union-attr]
"""单一切分模式:切分后直接写入向量库。"""
chunks = self.splitter.split_documents(documents)
logger.info("已切分为 %d 个块", len(chunks))
self.vector_store.create_collection()
@@ -252,7 +241,7 @@ class IndexBuilder:
return len(chunks)
async def _index_with_parent_child(self, documents: List[Document]) -> int:
"""父子模式:使用 ParentDocumentRetriever 批量添加。"""
"""父子模式:使用 ParentDocumentRetriever 批量添加。"""
self.vector_store.create_collection()
assert self.retriever is not None
@@ -261,7 +250,7 @@ class IndexBuilder:
processed = 0
for i in range(0, total, batch_size):
batch = documents[i:i + batch_size]
batch = documents[i:i+batch_size]
await self._add_batch_with_retry(batch, i // batch_size + 1)
processed += len(batch)
logger.info("批次 %d: 已处理 %d/%d", i // batch_size + 1, processed, total)
@@ -275,7 +264,7 @@ class IndexBuilder:
base_delay = 2
for attempt in range(max_retries):
try:
await self.retriever.aadd_documents(batch) # type: ignore[union-attr]
await self.retriever.aadd_documents(batch)
logger.info("批次 %d 成功添加 %d 个文档", batch_no, len(batch))
return
except (RemoteProtocolError, ConnectionError, OSError, ResponseHandlingException) as e:
@@ -300,17 +289,17 @@ class IndexBuilder:
def get_child_splitter(self) -> TextSplitter:
"""获取当前使用的子块切分器。"""
if self.config.splitter_type == SplitterType.PARENT_CHILD:
return self.child_splitter # type: ignore[return-value]
return self.splitter # type: ignore[return-value]
return self.child_splitter
return self.splitter
def get_parent_splitter(self) -> RecursiveCharacterTextSplitter:
"""获取父块切分器(仅父子模式可用)。"""
"""获取父块切分器(仅父子模式可用)。"""
if self.config.splitter_type != SplitterType.PARENT_CHILD:
raise RuntimeError("父块切分器仅在父子块模式下可用")
return self.parent_splitter # type: ignore[return-value]
return self.parent_splitter
def get_docstore(self) -> BaseStore:
"""获取文档存储实例(仅父子模式可用)。"""
"""获取文档存储实例(仅父子模式可用)。"""
if self.config.splitter_type != SplitterType.PARENT_CHILD:
raise RuntimeError("文档存储仅在父子块模式下可用")
assert self.docstore is not None
@@ -325,17 +314,17 @@ class IndexBuilder:
except RuntimeError:
# 无运行中的事件循环,创建临时循环
loop = asyncio.new_event_loop()
loop.run_until_complete(self.docstore.aclose()) # type: ignore[attr-defined]
loop.run_until_complete(self.docstore.aclose())
loop.close()
else:
# 已有运行中的循环,创建任务(用户自行等待)
loop.create_task(self.docstore.aclose()) # type: ignore[attr-defined]
loop.create_task(self.docstore.aclose())
logger.info("IndexBuilder 资源已关闭")
async def aclose(self) -> None:
"""异步关闭资源。"""
if self.docstore is not None and hasattr(self.docstore, "aclose"):
await self.docstore.aclose() # type: ignore[attr-defined]
await self.docstore.aclose()
logger.info("IndexBuilder 资源已异步关闭")
def __enter__(self):
@@ -350,4 +339,4 @@ class IndexBuilder:
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclose()
return False
return False

View File

@@ -14,8 +14,7 @@ tiktoken>=0.12.0
# Vector DB
qdrant-client==1.17.1
# 可选:用于稀疏向量支持
# fastembed>=0.3.0
fastembed>=0.3.0 # 用于 Qdrant BM25 稀疏向量
# HTTP
httpx==0.28.1