Files
ailine/rag_indexer/README.md
root 8d4fc76a95
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m16s
修改容器生成
2026-04-21 00:00:56 +08:00

460 lines
19 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 离线 RAG 索引构建系统 (Offline RAG Indexer)
该模块负责 RAG 系统的阶段一:**离线索引构建**。它将外部的非结构化数据如文档、PDF、网页等清洗、切分并转化为向量最终存入向量数据库中。
## 🎯 核心架构
### 技术栈
| 组件 | 技术选型 | 版本 | 说明 |
|:-----|:---------|:-----|:-----|
| **文档解析** | `unstructured` | 0.22+ | 多格式文档解析PDF/DOCX/TXT等 |
| **文本切分** | `langchain-text-splitters` | 内置 | 递归字符切分 + 语义切分 |
| **语义切分** | `langchain-experimental` | 内置 | `SemanticChunker` 基于句子相似度 |
| **嵌入模型** | `llama.cpp` | 本地服务 | `embeddinggemma-300M` GGUF 模型 |
| **向量数据库** | `Qdrant` | 1.17+ | HNSW 索引,支持稠密/稀疏向量 |
| **文档存储** | `PostgreSQL` | 16+ | 异步连接池,持久化父块 |
| **编排框架** | `asyncio` | Python 3.10+ | 异步批量处理与重试 |
### 数据流向总览
```
┌─────────────────────────────────────────────────────────────┐
│ builder.py │
│ IndexBuilder 入口 │
└──────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ loaders.py │
│ DocumentLoader.load_file() │
│ → 返回 List[Document] │
└──────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ParentDocumentRetriever │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ parent_splitter (粗切) │ │
│ │ 父块 ~1000 字符 │ │
│ └──────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌──────────────────────▼──────────────────────────────┐ │
│ │ child_splitter (细切) │ │
│ │ 子块 ~200 字符 │ │
│ └──────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌─────────────┴─────────────┐ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ 子块向量 │ │ 父块原始内容 │ │
│ │ │ │ │ │
│ ▼ │ ▼ │ │
│ ┌────────────┐ │ ┌─────────────────┐ │ │
│ │vector_store│ │ │ store/ │ │ │
│ │ (Qdrant) │ │ │ (PostgreSQL) │ │ │
│ └──────────── │ └─────────────────┘ │ │
└─────────────────────────────────────────────────────────────┘
```
### 技术特性
-**多格式支持**PDF、DOCX、TXT、MD、HTML、PPTX、XLSX、JSON
-**三种切分策略**:递归字符切分、语义切分、父子块策略
-**Parent-Child 架构**:子块精准检索,父块完整上下文
-**PostgreSQL DocStore**:持久化存储父块,支持异步连接池
-**批量写入与重试**:自动处理网络波动,确保索引完整性
-**上下文管理器**:支持同步/异步资源管理
## 📂 架构与文件结构
```
rag_indexer/
├── __init__.py
├── cli.py # 命令行入口
├── index_builder.py # 索引构建主流水线
├── loaders.py # 文档加载器(多格式支持)
├── splitters.py # 文本切分器(递归/语义/父子块)
├── embedders.py # 嵌入模型封装
├── vector_store.py # Qdrant 向量存储
├── store/
│ ├── __init__.py
│ ├── factory.py # DocStore 工厂函数
│ └── postgres.py # PostgreSQL DocStore 实现
└── test/ # 测试脚本
```
## 🎯 演进路线与核心算法 (Roadmap)
### Level 1: 基础暴力切分 (Basic Recursive Splitting)
- **核心算法**: 递归字符切分。它按照预定义的分隔符列表(如 `["\n\n", "\n", "。", "", "", " ", ""]`)从大到小尝试切分文本,直到每块的大小满足最大长度限制。
- **优缺点**: 实现极简单,速度快。但非常容易将一句话拦腰截断,导致上下文语义丢失。
- **实现指南**:
-`langchain_text_splitters` 导入 `RecursiveCharacterTextSplitter`
- 实例化时设置 `chunk_size`(如 500`chunk_overlap`(如 50
- 直接调用 `.split_documents(raw_docs)` 方法
```python
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
separators=["\n\n", "\n", "。", "", "", " ", ""]
)
chunks = splitter.split_documents(documents)
```
### Level 2: 语义动态切分 (Semantic Chunking)
- **核心算法**: 句子级相似度阈值算法。
1. 将文章按标点符号按句子拆分
2. 使用轻量级 Embedding 模型将每一句向量化
3. 计算相邻两句之间的余弦相似度 (Cosine Similarity)
4. 当相似度低于设定阈值时(说明两句话讲的不是同一件事,语义发生了转折),在此处"切断"形成一个新的块
- **优缺点**: 极大程度保留了段落内语义的连贯性,对 LLM 回答非常友好。但由于在切分阶段就需要调用向量模型,耗时略长。
- **实现指南**:
-`langchain_experimental.text_splitter` 导入 `SemanticChunker`
- 实现 `SemanticChunkerAdapter` 继承 `TextSplitter`,解决类型不兼容问题
- 实例化时需要传入已配置好的 Embedding 模型实例
```python
from langchain_experimental.text_splitter import SemanticChunker
chunker = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=95,
min_chunk_size=100
)
chunks = chunker.split_documents(documents)
```
### Level 3: 高级父子块策略 (Parent-Child / Auto-merging)
- **核心算法**: 层次化双重存储与映射。
- **切分机制**: 首先将文档粗切为较大的"父块 (Parent Chunk, 约 1000 字符)",随后将父块细切为较小的"子块 (Child Chunk, 约 200 字符)"
- **存储机制**: 仅仅将**子块**的向量存入 Qdrant 用于精准计算距离;将**父块**的原始内容存在 PostgreSQL DocStore 中,通过 UUID 相互映射
- **核心思路**: 解决 RAG 领域经典的矛盾——检索时块越小越容易精确命中(去除噪声);但生成回答时,块越大越能给大模型提供充足的上下文背景。
- **实现指南**:
- 使用 `langchain_classic.retrievers` 中的 `ParentDocumentRetriever` 模块
- 在写入时,需要同时准备一个底层的 `VectorStore` (即 Qdrant) 和一个 `BaseStore`
- **推荐方案**: 使用 `PostgresDocStore` 作为 docstore支持持久化存储
- 将两种不同的 `TextSplitter` 分别赋值给检索器的 `child_splitter``parent_splitter`,然后调用 `.add_documents()` 即可让系统自动完成映射
```python
from langchain.retrievers import ParentDocumentRetriever
retriever = ParentDocumentRetriever(
vectorstore=qdrant_store,
docstore=postgres_docstore,
parent_splitter=parent_splitter,
child_splitter=child_splitter,
)
await retriever.aadd_documents(documents)
```
### Level 3.1: PostgreSQL DocStore 集成
- **核心优势**: 利用 PostgreSQL 作为持久化存储,适合生产环境。使用异步连接池,支持高并发。
- **实现步骤**:
1. **配置连接**: 设置 `DB_URI` 环境变量或通过 `docstore_conn_string` 参数指定
2. **创建 docstore**: 使用 `rag_indexer.store.create_docstore()` 工厂函数
3. **注入到 IndexBuilder**: 通过构造函数参数注入
```python
from rag_indexer.store import create_docstore
docstore, conn_info = create_docstore(
connection_string="postgresql://user:pass@host:5432/db",
pool_config={"min_size": 5, "max_size": 20},
)
```
### Level 3.2: 语义切分与父子块策略结合
- **核心优势**: 结合语义切分的连贯性和父子块策略的层次化存储优势,实现更精准的检索和更丰富的上下文。
- **实现原理**:
- **父块切分**: 使用 `RecursiveCharacterTextSplitter` 创建大块约1000字符提供完整的上下文背景
- **子块切分**: 使用 `SemanticChunkerAdapter` 创建小块,根据语义连贯性动态切分,提高检索精度
- **存储机制**: 子块向量存入 Qdrant 用于精准检索,父块内容存入 PostgreSQL 提供完整上下文
```python
from rag_indexer.index_builder import IndexBuilder, IndexBuilderConfig
from rag_indexer.splitters import SplitterType
config = IndexBuilderConfig(
collection_name="rag_documents",
splitter_type=SplitterType.PARENT_CHILD,
parent_chunk_size=1000,
child_chunk_size=200,
child_splitter_type=SplitterType.SEMANTIC, # 子块使用语义切分
docstore=DocstoreConfig(
connection_string="postgresql://user:pass@host:5432/db",
),
)
```
### Level 4: GraphRAG基于图和关系的 RAG
- **核心算法**: LLM 实体关系抽取 (NER & Relation Extraction)。
- **核心思路**: 解决传统纯向量检索难以处理"跨文档复杂关系推理"的痛点A公司的CEO是谁他名下的B公司主要业务是什么这种需要横跨多页 PDF 的跳跃性问题)。
- **实现原理**:
1. **实体提取**: 利用 LLM 从文档中提取实体(如人物、组织、地点、事件等)
2. **关系抽取**: 识别实体之间的关系(如"CEO of"、"founded by"、"located in"等)
3. **图构建**: 将实体作为节点,关系作为边,构建知识图谱
4. **混合检索**: 结合向量检索和图查询,同时利用语义相似性和结构关系
- **技术栈**:
- **图数据库**: Neo4j 或 RedisGraph
- **LLM 工具**: `LLMGraphTransformer` 或自定义 Prompt
- **集成方式**: 与向量存储并行,形成混合检索系统
- **实现指南**:
- 使用 `langchain_community.graphs` 模块
- 配置本地大模型(如 `Gemma-4-E4B`)用于实体关系抽取
- 构建包含实体和关系的图结构,存储到图数据库
- 实现混合检索逻辑,结合向量相似度和图路径分析
```python
from langchain_community.graphs import Neo4jGraph
from langchain_experimental.graph_transformers import LLMGraphTransformer
# 实体关系抽取
transformer = LLMGraphTransformer(llm=local_llm)
graph_documents = transformer.convert_to_graph_documents(documents)
# 存储到图数据库
graph.add_graph_documents(graph_documents)
```
### Level 5: 多模态 RAG (Multi-modal RAG)
- **核心算法**: 跨模态嵌入和多模态融合。
- **核心思路**: 突破纯文本限制,支持图像、表格、音频等多种数据类型的理解和检索。
- **实现原理**:
1. **多模态嵌入**: 使用 CLIP 等模型将不同模态数据映射到统一向量空间
2. **多模态索引**: 为不同类型的内容创建专用索引
3. **跨模态检索**: 支持以文搜图、以图搜文等跨模态查询
- **技术栈**:
- **多模态模型**: CLIP、BLIP 等
- **存储**: 向量数据库 + 对象存储
- **检索**: 混合向量检索
## 🔧 核心组件详解
### 1. 文档加载器 (loaders.py)
使用 `unstructured` 库解析多种文件格式。
**支持格式**PDF、DOCX、DOC、TXT、MD、HTML、PPTX、XLSX、JSON
```python
from rag_indexer.loaders import DocumentLoader
loader = DocumentLoader(
strategy="auto", # 解析策略auto/fast/hi_res/ocr_only
ocr_languages=["chi_sim", "eng"], # OCR 语言
languages=["zh"], # 文档主语言
extract_images=False, # 是否提取图片
pdf_infer_table_structure=True, # 是否识别表格
)
# 加载单个文件
docs = loader.load_file("document.pdf")
# 加载整个目录
docs = loader.load_directory("./docs/", recursive=True)
```
### 2. 文本切分器 (splitters.py)
提供三种切分策略:
**递归字符切分**
```python
from rag_indexer.splitters import SplitterType, get_splitter
splitter = get_splitter(
SplitterType.RECURSIVE,
chunk_size=500,
chunk_overlap=50,
)
```
**语义切分**
```python
splitter = get_splitter(
SplitterType.SEMANTIC,
embeddings=embeddings,
breakpoint_threshold_type="percentile",
min_chunk_size=100,
)
```
**父子块策略**:在 `IndexBuilder` 中自动配置。
### 3. 索引构建器 (index_builder.py)
核心编排模块,串联整个索引构建流程。
```python
from rag_indexer.index_builder import IndexBuilder, IndexBuilderConfig, DocstoreConfig
from rag_indexer.splitters import SplitterType
# 配置
config = IndexBuilderConfig(
collection_name="rag_documents",
splitter_type=SplitterType.PARENT_CHILD,
parent_chunk_size=1000,
child_chunk_size=200,
child_splitter_type=SplitterType.SEMANTIC,
docstore=DocstoreConfig(
connection_string="postgresql://user:pass@host:5432/db",
),
)
# 构建索引
async with IndexBuilder(config) as builder:
# 从单个文件构建
count = await builder.build_from_file("document.pdf")
# 或从目录批量构建
count = await builder.build_from_directory("./docs/")
print(f"已索引 {count} 个文档")
```
### 4. 向量存储 (vector_store.py)
封装 Qdrant 向量数据库操作。
```python
from rag_core import QdrantVectorStore
vector_store = QdrantVectorStore(
collection_name="rag_documents",
embeddings=embeddings,
)
# 创建集合
vector_store.create_collection()
# 添加文档
vector_store.add_documents(chunks)
```
### 5. PostgreSQL DocStore (store/postgres.py)
持久化存储父块内容,支持异步连接池。
```python
from rag_core.store import create_docstore
docstore, conn_info = create_docstore(
connection_string="postgresql://user:pass@host:5432/db",
pool_config={"min_size": 5, "max_size": 20},
)
```
## 📊 切分策略对比
| 策略 | 原理 | 优点 | 缺点 | 适用场景 |
|:-----|:-----|:-----|:-----|:---------|
| **递归字符** | 按分隔符递归切分 | 速度快,实现简单 | 可能截断语义 | 简单文档 |
| **语义切分** | 基于句子相似度阈值 | 语义连贯性好 | 需要 Embedding 模型 | 专业文档 |
| **父子块** | 大块存储+小块检索 | 检索精准+上下文完整 | 存储复杂度高 | 生产环境 |
## 🚀 快速开始
### 命令行方式
```bash
# 设置环境变量
export QDRANT_URL="http://115.190.121.151:6333"
export DB_URI="postgresql://postgres:password@host:5432/langgraph_db?sslmode=disable"
# 执行索引构建
python -m rag_indexer.cli --path data/user_docs/ --recursive
```
### Python API 方式
```python
import asyncio
from rag_indexer.index_builder import IndexBuilder, IndexBuilderConfig, DocstoreConfig
from rag_indexer.splitters import SplitterType
async def main():
config = IndexBuilderConfig(
collection_name="rag_documents",
splitter_type=SplitterType.PARENT_CHILD,
parent_chunk_size=1000,
child_chunk_size=200,
child_splitter_type=SplitterType.SEMANTIC,
)
async with IndexBuilder(config) as builder:
count = await builder.build_from_directory("./user_docs/")
print(f"索引构建完成,共处理 {count} 个文档")
asyncio.run(main())
```
## ⚙️ 环境配置
| 变量名 | 说明 | 默认值 |
|:-------|:-----|:-------|
| `QDRANT_URL` | Qdrant 向量数据库地址 | `http://127.0.0.1:6333` |
| `QDRANT_API_KEY` | Qdrant API 密钥 | - |
| `DB_URI` | PostgreSQL 连接字符串 | - |
| `LLAMACPP_EMBEDDING_URL` | Embedding 服务地址 | `http://127.0.0.1:8082/v1` |
| `LLAMACPP_API_KEY` | llama.cpp API 密钥 | - |
## 🔄 与 app/rag 集成
- **向量存储**:共享 Qdrant 集合,确保嵌入模型一致
- **文档存储**:父块存入 PostgreSQL通过 UUID 与子块关联
- **集合名称**:默认使用 `rag_documents` 集合
- **嵌入模型**:使用相同的 `LlamaCppEmbedder` 确保向量空间一致
详见 [app/rag/README.md](../app/rag/README.md)
## 📝 高级配置
### 自定义切分参数
```python
config = IndexBuilderConfig(
collection_name="my_docs",
splitter_type=SplitterType.PARENT_CHILD,
parent_chunk_size=1500, # 更大的父块
child_chunk_size=300, # 更大的子块
parent_chunk_overlap=150, # 父块重叠
child_chunk_overlap=30, # 子块重叠
search_k=10, # 检索返回数量
)
```
### 批量处理与重试
索引构建器内置自动重试机制,处理网络波动:
- 最大重试次数5 次
- 退避策略指数退避2s, 4s, 8s, 16s, 32s
- 批量大小10 个文档/批次
### 资源管理
```python
# 方式一:上下文管理器(推荐)
async with IndexBuilder(config) as builder:
await builder.build_from_directory("./docs/")
# 方式二:手动管理
builder = IndexBuilder(config)
try:
await builder.build_from_directory("./docs/")
finally:
await builder.aclose()
```