离线 RAG 索引构建系统 (Offline RAG Indexer)
该模块负责 RAG 系统的阶段一:离线索引构建。它将外部的非结构化数据(如文档、PDF、网页等)清洗、切分并转化为向量,最终存入向量数据库中。
🎯 核心架构
技术栈
| 组件 | 技术选型 | 版本 | 说明 |
|---|---|---|---|
| 文档解析 | unstructured |
0.22+ | 多格式文档解析(PDF/DOCX/TXT等) |
| 文本切分 | langchain-text-splitters |
内置 | 递归字符切分 + 语义切分 |
| 语义切分 | langchain-experimental |
内置 | SemanticChunker 基于句子相似度 |
| 嵌入模型 | llama.cpp |
本地服务 | embeddinggemma-300M GGUF 模型 |
| 向量数据库 | Qdrant |
1.17+ | HNSW 索引,支持稠密/稀疏向量 |
| 文档存储 | PostgreSQL |
16+ | 异步连接池,持久化父块 |
| 编排框架 | asyncio |
Python 3.10+ | 异步批量处理与重试 |
数据流向总览
┌─────────────────────────────────────────────────────────────┐
│ builder.py │
│ IndexBuilder 入口 │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ loaders.py │
│ DocumentLoader.load_file() │
│ → 返回 List[Document] │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ ParentDocumentRetriever │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ parent_splitter (粗切) │ │
│ │ 父块 ~1000 字符 │ │
│ └──────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌──────────────────────▼──────────────────────────────┐ │
│ │ child_splitter (细切) │ │
│ │ 子块 ~200 字符 │ │
│ └──────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌─────────────┴─────────────┐ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ 子块向量 │ │ 父块原始内容 │ │
│ │ │ │ │ │
│ ▼ │ ▼ │ │
│ ┌────────────┐ │ ┌─────────────────┐ │ │
│ │vector_store│ │ │ store/ │ │ │
│ │ (Qdrant) │ │ │ (PostgreSQL) │ │ │
│ └──────────── │ └─────────────────┘ │ │
└─────────────────────────────────────────────────────────────┘
技术特性
- ✅ 多格式支持:PDF、DOCX、TXT、MD、HTML、PPTX、XLSX、JSON
- ✅ 三种切分策略:递归字符切分、语义切分、父子块策略
- ✅ Parent-Child 架构:子块精准检索,父块完整上下文
- ✅ PostgreSQL DocStore:持久化存储父块,支持异步连接池
- ✅ 批量写入与重试:自动处理网络波动,确保索引完整性
- ✅ 上下文管理器:支持同步/异步资源管理
📂 架构与文件结构
rag_indexer/
├── __init__.py
├── cli.py # 命令行入口
├── index_builder.py # 索引构建主流水线
├── loaders.py # 文档加载器(多格式支持)
├── splitters.py # 文本切分器(递归/语义/父子块)
├── embedders.py # 嵌入模型封装
├── vector_store.py # Qdrant 向量存储
├── store/
│ ├── __init__.py
│ ├── factory.py # DocStore 工厂函数
│ └── postgres.py # PostgreSQL DocStore 实现
└── test/ # 测试脚本
🎯 演进路线与核心算法 (Roadmap)
Level 1: 基础暴力切分 (Basic Recursive Splitting)
- 核心算法: 递归字符切分。它按照预定义的分隔符列表(如
["\n\n", "\n", "。", "!", "?", " ", ""])从大到小尝试切分文本,直到每块的大小满足最大长度限制。 - 优缺点: 实现极简单,速度快。但非常容易将一句话拦腰截断,导致上下文语义丢失。
- 实现指南:
- 从
langchain_text_splitters导入RecursiveCharacterTextSplitter - 实例化时设置
chunk_size(如 500)和chunk_overlap(如 50) - 直接调用
.split_documents(raw_docs)方法
- 从
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
separators=["\n\n", "\n", "。", "!", "?", " ", ""]
)
chunks = splitter.split_documents(documents)
Level 2: 语义动态切分 (Semantic Chunking)
- 核心算法: 句子级相似度阈值算法。
- 将文章按标点符号按句子拆分
- 使用轻量级 Embedding 模型将每一句向量化
- 计算相邻两句之间的余弦相似度 (Cosine Similarity)
- 当相似度低于设定阈值时(说明两句话讲的不是同一件事,语义发生了转折),在此处"切断"形成一个新的块
- 优缺点: 极大程度保留了段落内语义的连贯性,对 LLM 回答非常友好。但由于在切分阶段就需要调用向量模型,耗时略长。
- 实现指南:
- 从
langchain_experimental.text_splitter导入SemanticChunker - 实现
SemanticChunkerAdapter继承TextSplitter,解决类型不兼容问题 - 实例化时需要传入已配置好的 Embedding 模型实例
- 从
from langchain_experimental.text_splitter import SemanticChunker
chunker = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=95,
min_chunk_size=100
)
chunks = chunker.split_documents(documents)
Level 3: 高级父子块策略 (Parent-Child / Auto-merging)
- 核心算法: 层次化双重存储与映射。
- 切分机制: 首先将文档粗切为较大的"父块 (Parent Chunk, 约 1000 字符)",随后将父块细切为较小的"子块 (Child Chunk, 约 200 字符)"
- 存储机制: 仅仅将子块的向量存入 Qdrant 用于精准计算距离;将父块的原始内容存在 PostgreSQL DocStore 中,通过 UUID 相互映射
- 核心思路: 解决 RAG 领域经典的矛盾——检索时块越小越容易精确命中(去除噪声);但生成回答时,块越大越能给大模型提供充足的上下文背景。
- 实现指南:
- 使用
langchain_classic.retrievers中的ParentDocumentRetriever模块 - 在写入时,需要同时准备一个底层的
VectorStore(即 Qdrant) 和一个BaseStore - 推荐方案: 使用
PostgresDocStore作为 docstore,支持持久化存储 - 将两种不同的
TextSplitter分别赋值给检索器的child_splitter和parent_splitter,然后调用.add_documents()即可让系统自动完成映射
- 使用
from langchain.retrievers import ParentDocumentRetriever
retriever = ParentDocumentRetriever(
vectorstore=qdrant_store,
docstore=postgres_docstore,
parent_splitter=parent_splitter,
child_splitter=child_splitter,
)
await retriever.aadd_documents(documents)
Level 3.1: PostgreSQL DocStore 集成
- 核心优势: 利用 PostgreSQL 作为持久化存储,适合生产环境。使用异步连接池,支持高并发。
- 实现步骤:
- 配置连接: 设置
DB_URI环境变量或通过docstore_conn_string参数指定 - 创建 docstore: 使用
rag_indexer.store.create_docstore()工厂函数 - 注入到 IndexBuilder: 通过构造函数参数注入
- 配置连接: 设置
from rag_indexer.store import create_docstore
docstore, conn_info = create_docstore(
connection_string="postgresql://user:pass@host:5432/db",
pool_config={"min_size": 5, "max_size": 20},
)
Level 3.2: 语义切分与父子块策略结合
- 核心优势: 结合语义切分的连贯性和父子块策略的层次化存储优势,实现更精准的检索和更丰富的上下文。
- 实现原理:
- 父块切分: 使用
RecursiveCharacterTextSplitter创建大块(约1000字符),提供完整的上下文背景 - 子块切分: 使用
SemanticChunkerAdapter创建小块,根据语义连贯性动态切分,提高检索精度 - 存储机制: 子块向量存入 Qdrant 用于精准检索,父块内容存入 PostgreSQL 提供完整上下文
- 父块切分: 使用
from rag_indexer.index_builder import IndexBuilder, IndexBuilderConfig
from rag_indexer.splitters import SplitterType
config = IndexBuilderConfig(
collection_name="rag_documents",
splitter_type=SplitterType.PARENT_CHILD,
parent_chunk_size=1000,
child_chunk_size=200,
child_splitter_type=SplitterType.SEMANTIC, # 子块使用语义切分
docstore=DocstoreConfig(
connection_string="postgresql://user:pass@host:5432/db",
),
)
Level 4: GraphRAG(基于图和关系的 RAG)
- 核心算法: LLM 实体关系抽取 (NER & Relation Extraction)。
- 核心思路: 解决传统纯向量检索难以处理"跨文档复杂关系推理"的痛点(如:A公司的CEO是谁?他名下的B公司主要业务是什么?这种需要横跨多页 PDF 的跳跃性问题)。
- 实现原理:
- 实体提取: 利用 LLM 从文档中提取实体(如人物、组织、地点、事件等)
- 关系抽取: 识别实体之间的关系(如"CEO of"、"founded by"、"located in"等)
- 图构建: 将实体作为节点,关系作为边,构建知识图谱
- 混合检索: 结合向量检索和图查询,同时利用语义相似性和结构关系
- 技术栈:
- 图数据库: Neo4j 或 RedisGraph
- LLM 工具:
LLMGraphTransformer或自定义 Prompt - 集成方式: 与向量存储并行,形成混合检索系统
- 实现指南:
- 使用
langchain_community.graphs模块 - 配置本地大模型(如
Gemma-4-E4B)用于实体关系抽取 - 构建包含实体和关系的图结构,存储到图数据库
- 实现混合检索逻辑,结合向量相似度和图路径分析
- 使用
from langchain_community.graphs import Neo4jGraph
from langchain_experimental.graph_transformers import LLMGraphTransformer
# 实体关系抽取
transformer = LLMGraphTransformer(llm=local_llm)
graph_documents = transformer.convert_to_graph_documents(documents)
# 存储到图数据库
graph.add_graph_documents(graph_documents)
Level 5: 多模态 RAG (Multi-modal RAG)
- 核心算法: 跨模态嵌入和多模态融合。
- 核心思路: 突破纯文本限制,支持图像、表格、音频等多种数据类型的理解和检索。
- 实现原理:
- 多模态嵌入: 使用 CLIP 等模型将不同模态数据映射到统一向量空间
- 多模态索引: 为不同类型的内容创建专用索引
- 跨模态检索: 支持以文搜图、以图搜文等跨模态查询
- 技术栈:
- 多模态模型: CLIP、BLIP 等
- 存储: 向量数据库 + 对象存储
- 检索: 混合向量检索
🔧 核心组件详解
1. 文档加载器 (loaders.py)
使用 unstructured 库解析多种文件格式。
支持格式:PDF、DOCX、DOC、TXT、MD、HTML、PPTX、XLSX、JSON
from rag_indexer.loaders import DocumentLoader
loader = DocumentLoader(
strategy="auto", # 解析策略:auto/fast/hi_res/ocr_only
ocr_languages=["chi_sim", "eng"], # OCR 语言
languages=["zh"], # 文档主语言
extract_images=False, # 是否提取图片
pdf_infer_table_structure=True, # 是否识别表格
)
# 加载单个文件
docs = loader.load_file("document.pdf")
# 加载整个目录
docs = loader.load_directory("./docs/", recursive=True)
2. 文本切分器 (splitters.py)
提供三种切分策略:
递归字符切分:
from rag_indexer.splitters import SplitterType, get_splitter
splitter = get_splitter(
SplitterType.RECURSIVE,
chunk_size=500,
chunk_overlap=50,
)
语义切分:
splitter = get_splitter(
SplitterType.SEMANTIC,
embeddings=embeddings,
breakpoint_threshold_type="percentile",
min_chunk_size=100,
)
父子块策略:在 IndexBuilder 中自动配置。
3. 索引构建器 (index_builder.py)
核心编排模块,串联整个索引构建流程。
from rag_indexer.index_builder import IndexBuilder, IndexBuilderConfig, DocstoreConfig
from rag_indexer.splitters import SplitterType
# 配置
config = IndexBuilderConfig(
collection_name="rag_documents",
splitter_type=SplitterType.PARENT_CHILD,
parent_chunk_size=1000,
child_chunk_size=200,
child_splitter_type=SplitterType.SEMANTIC,
docstore=DocstoreConfig(
connection_string="postgresql://user:pass@host:5432/db",
),
)
# 构建索引
async with IndexBuilder(config) as builder:
# 从单个文件构建
count = await builder.build_from_file("document.pdf")
# 或从目录批量构建
count = await builder.build_from_directory("./docs/")
print(f"已索引 {count} 个文档")
4. 向量存储 (vector_store.py)
封装 Qdrant 向量数据库操作。
from rag_core import QdrantVectorStore
vector_store = QdrantVectorStore(
collection_name="rag_documents",
embeddings=embeddings,
)
# 创建集合
vector_store.create_collection()
# 添加文档
vector_store.add_documents(chunks)
5. PostgreSQL DocStore (store/postgres.py)
持久化存储父块内容,支持异步连接池。
from rag_core.store import create_docstore
docstore, conn_info = create_docstore(
connection_string="postgresql://user:pass@host:5432/db",
pool_config={"min_size": 5, "max_size": 20},
)
📊 切分策略对比
| 策略 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 递归字符 | 按分隔符递归切分 | 速度快,实现简单 | 可能截断语义 | 简单文档 |
| 语义切分 | 基于句子相似度阈值 | 语义连贯性好 | 需要 Embedding 模型 | 专业文档 |
| 父子块 | 大块存储+小块检索 | 检索精准+上下文完整 | 存储复杂度高 | 生产环境 |
🚀 快速开始
命令行方式
# 设置环境变量
export QDRANT_URL="http://115.190.121.151:6333"
export DB_URI="postgresql://postgres:password@host:5432/langgraph_db?sslmode=disable"
# 执行索引构建
python -m rag_indexer.cli --path data/user_docs/ --recursive
Python API 方式
import asyncio
from rag_indexer.index_builder import IndexBuilder, IndexBuilderConfig, DocstoreConfig
from rag_indexer.splitters import SplitterType
async def main():
config = IndexBuilderConfig(
collection_name="rag_documents",
splitter_type=SplitterType.PARENT_CHILD,
parent_chunk_size=1000,
child_chunk_size=200,
child_splitter_type=SplitterType.SEMANTIC,
)
async with IndexBuilder(config) as builder:
count = await builder.build_from_directory("./user_docs/")
print(f"索引构建完成,共处理 {count} 个文档")
asyncio.run(main())
⚙️ 环境配置
| 变量名 | 说明 | 默认值 |
|---|---|---|
QDRANT_URL |
Qdrant 向量数据库地址 | http://127.0.0.1:6333 |
QDRANT_API_KEY |
Qdrant API 密钥 | - |
DB_URI |
PostgreSQL 连接字符串 | - |
LLAMACPP_EMBEDDING_URL |
Embedding 服务地址 | http://127.0.0.1:8082/v1 |
LLAMACPP_API_KEY |
llama.cpp API 密钥 | - |
🔄 与 app/rag 集成
- 向量存储:共享 Qdrant 集合,确保嵌入模型一致
- 文档存储:父块存入 PostgreSQL,通过 UUID 与子块关联
- 集合名称:默认使用
rag_documents集合 - 嵌入模型:使用相同的
LlamaCppEmbedder确保向量空间一致
📝 高级配置
自定义切分参数
config = IndexBuilderConfig(
collection_name="my_docs",
splitter_type=SplitterType.PARENT_CHILD,
parent_chunk_size=1500, # 更大的父块
child_chunk_size=300, # 更大的子块
parent_chunk_overlap=150, # 父块重叠
child_chunk_overlap=30, # 子块重叠
search_k=10, # 检索返回数量
)
批量处理与重试
索引构建器内置自动重试机制,处理网络波动:
- 最大重试次数:5 次
- 退避策略:指数退避(2s, 4s, 8s, 16s, 32s)
- 批量大小:10 个文档/批次
资源管理
# 方式一:上下文管理器(推荐)
async with IndexBuilder(config) as builder:
await builder.build_from_directory("./docs/")
# 方式二:手动管理
builder = IndexBuilder(config)
try:
await builder.build_from_directory("./docs/")
finally:
await builder.aclose()