From c18e8a9860c816f8b8d92a0bd41eaf5b63f1e801 Mon Sep 17 00:00:00 2001 From: root <953994191@qq.com> Date: Sat, 18 Apr 2026 16:56:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=91=E9=87=8F=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 + rag_indexer/README.md | 109 +++++++++++ rag_indexer/__init__.py | 25 +++ rag_indexer/builder.py | 277 ++++++++++++++++++++++++++++ rag_indexer/cli.py | 102 ++++++++++ rag_indexer/docstore_manager.py | 142 ++++++++++++++ rag_indexer/embedders.py | 68 +++++++ rag_indexer/example_parent_child.py | 124 +++++++++++++ rag_indexer/loaders.py | 91 +++++++++ rag_indexer/splitters.py | 71 +++++++ rag_indexer/vector_store.py | 110 +++++++++++ 11 files changed, 1121 insertions(+) create mode 100644 rag_indexer/README.md create mode 100644 rag_indexer/__init__.py create mode 100644 rag_indexer/builder.py create mode 100755 rag_indexer/cli.py create mode 100644 rag_indexer/docstore_manager.py create mode 100644 rag_indexer/embedders.py create mode 100644 rag_indexer/example_parent_child.py create mode 100644 rag_indexer/loaders.py create mode 100644 rag_indexer/splitters.py create mode 100644 rag_indexer/vector_store.py diff --git a/.gitignore b/.gitignore index 393cff4..ff0b849 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,8 @@ !frontend/** !scripts/ !scripts/** +!rag_indexer/ +!rag_indexer/** !docker/ !docker/** !.gitea/ diff --git a/rag_indexer/README.md b/rag_indexer/README.md new file mode 100644 index 0000000..c656a05 --- /dev/null +++ b/rag_indexer/README.md @@ -0,0 +1,109 @@ +# 离线 RAG 索引构建系统 (Offline RAG Indexer) + +该模块负责 RAG 系统的阶段一:**离线索引构建**。它将外部的非结构化数据(如文档、PDF、网页等)清洗、切分并转化为向量,最终存入向量数据库中。 + +## 📊 系统工作流示意图 + +```mermaid +graph TD + A[原始文档集合
PDF / Word / Markdown] --> B(文档加载器 DocumentLoader) + B --> C{文本切分策略 Splitter} + + C -->|基础策略| D1[固定字符长度切分
Recursive Split] + C -->|进阶策略| D2[语义边界切分
Semantic Chunking] + C -->|高级策略| D3[父子文档切分
Parent-Child / Auto-merging] + + D1 & D2 & D3 --> E[向量化 Embedder
llama.cpp: embeddinggemma] + + E --> F[(Qdrant 向量数据库)] + + subgraph "元数据管理" + G[提取作者、日期、页码等元数据 Metadata] -.附加.-> E + end +``` + +--- + +## 🎯 演进路线与核心算法 (Roadmap) + +### Level 1: 基础暴力切分 (Basic Recursive Splitting) +- **核心算法**: 递归字符切分。它按照预定义的分隔符列表(如 `["\n\n", "\n", " ", ""]`)从大到小尝试切分文本,直到每块的大小满足最大长度限制。 +- **优缺点**: 实现极简单,速度快。但非常容易将一句话拦腰截断,导致上下文语义丢失。 +- **实现指南**: + - 从 `langchain.text_splitter` 导入 `RecursiveCharacterTextSplitter`。 + - 实例化时设置 `chunk_size`(如 500)和 `chunk_overlap`(如 50),直接调用 `.split_documents(raw_docs)` 方法。 + +### Level 2: 语义动态切分 (Semantic Chunking) +- **核心算法**: 句子级相似度阈值算法。 + 1. 将文章按标点符号按句子拆分。 + 2. 使用轻量级 Embedding 模型将每一句向量化。 + 3. 计算相邻两句之间的余弦相似度 (Cosine Similarity)。 + 4. 当相似度低于设定阈值时(说明两句话讲的不是同一件事,语义发生了转折),在此处“切断”形成一个新的块。 +- **优缺点**: 极大程度保留了段落内语义的连贯性,对 LLM 回答非常友好。但由于在切分阶段就需要调用向量模型,耗时略长。 +- **实现指南**: + - 从 `langchain_experimental.text_splitter` 导入 `SemanticChunker`。 + - 实例化时需要传入你已经配置好的 Embedding 模型实例(如基于 `OpenAIEmbeddings` 封装的 llama.cpp 本地模型),并设置 `breakpoint_threshold_type="percentile"` 等阈值参数。 + +### Level 3: 高级父子块策略 (Parent-Child / Auto-merging) +- **核心算法**: 层次化双重存储与映射。 + - **切分机制**: 首先将文档粗切为较大的“父块 (Parent Chunk, 约 1000 词)”,随后将父块细切为较小的“子块 (Child Chunk, 约 200 词)”。 + - **存储机制**: 仅仅将**子块**的向量存入 Qdrant 用于精准计算距离;将**父块**的原始内容存在内存或 Document Store (如 KV 数据库) 中,通过 UUID 相互映射。 +- **核心思路**: 解决 RAG 领域经典的矛盾——检索时块越小越容易精确命中(去除噪声);但生成回答时,块越大越能给大模型提供充足的上下文背景。 +- **实现指南**: + - 使用 `langchain.retrievers` 中的 `ParentDocumentRetriever` 模块。 + - 在写入时,你需要同时准备一个底层的 `VectorStore` (即 Qdrant) 和一个 `BaseStore` (比如原生的 `InMemoryStore` 或 `Redis`)。 + - 将两种不同的 `TextSplitter` 分别赋值给检索器的 `child_splitter` 和 `parent_splitter`,然后调用 `.add_documents()` 即可让系统自动完成映射。 + +### Level 4: GraphRAG 与 多模态 (Graph & Multi-modal) +- **核心算法**: LLM 实体关系抽取 (NER & Relation Extraction)。 +- **核心思路**: 解决传统纯向量检索难以处理“跨文档复杂关系推理”的痛点(如:A公司的CEO是谁?他名下的B公司主要业务是什么?这种需要横跨多页 PDF 的跳跃性问题)。 +- **实现指南**: + - 使用本地的大模型(如 `Gemma-4-E2B`)配合 `langchain_community.graphs` 模块。 + - 利用 `LLMGraphTransformer` 组件,在读取文档时,通过预设的 Prompt 强制大模型提取出实体(Node)和关系(Edge),直接写入诸如 Neo4j 这样的图数据库中,而非传统的 Qdrant 向量库。 + +--- + +## � 所需依赖与安装 + +为了支持完整的文档解析和 Qdrant 写入,需要安装以下 Python 包: + +```bash +# 基础核心库 +pip install langchain langchain-core langchain-openai langchain-qdrant + +# 用于复杂文档解析 (PDF, Word, Excel 等) +pip install unstructured pdf2image pdfminer.six + +# 用于语义分块 (可选) +pip install langchain-experimental +``` + +--- + +## 📂 架构与文件结构设计 + +在 `rag_indexer/` 目录下,需创建以下核心文件: + +```text +rag_indexer/ +├── __init__.py +├── loaders.py # 负责调用 unstructured 解析不同类型文件 +├── splitters.py # 负责实现 Recursive、Semantic、Parent-Child 切分逻辑 +├── embedders.py # 封装本地 llama.cpp 交互的 Embedding 接口 +├── vector_store.py # 封装 Qdrant 写入、Upsert、Collection 初始化操作 +└── builder.py # 核心编排文件,将上述模块串联成 Pipeline +``` + +--- + + + +### 串联与触发方式 +在你的 LangGraph 系统外,创建一个执行脚本 `scripts/run_indexer.py`: + +```bash +# 终端执行,将本地的 PDF 手册刷入向量数据库 +export QDRANT_URL="http://115.190.121.151:6333" +python scripts/run_indexer.py --file data/user_docs/tech_manual.pdf +``` +这相当于系统后台的**“离线学习阶段”**,你可以随时挂载定时任务去扫描文件夹,增量更新知识库。 diff --git a/rag_indexer/__init__.py b/rag_indexer/__init__.py new file mode 100644 index 0000000..56905c2 --- /dev/null +++ b/rag_indexer/__init__.py @@ -0,0 +1,25 @@ +""" +Offline RAG Indexer module. +""" + +from .loaders import DocumentLoader +from .splitters import ( + RecursiveSplitter, + SemanticSplitter, + ParentChildSplitter, + SplitterType, +) +from .embedders import LlamaCppEmbedder +from .vector_store import QdrantVectorStore +from .builder import IndexBuilder + +__all__ = [ + "DocumentLoader", + "RecursiveSplitter", + "SemanticSplitter", + "ParentChildSplitter", + "SplitterType", + "LlamaCppEmbedder", + "QdrantVectorStore", + "IndexBuilder", +] \ No newline at end of file diff --git a/rag_indexer/builder.py b/rag_indexer/builder.py new file mode 100644 index 0000000..d680c5a --- /dev/null +++ b/rag_indexer/builder.py @@ -0,0 +1,277 @@ +""" +Core pipeline builder for offline RAG index construction. + +Now supports LangChain's ParentDocumentRetriever for parent-child chunking. +""" + +import logging +from pathlib import Path +from typing import List, Union, Optional, Tuple +from dataclasses import dataclass + +from langchain_core.documents import Document +from langchain.retrievers import ParentDocumentRetriever +from langchain.storage import LocalFileStore, BaseStore + +from .loaders import DocumentLoader +from .splitters import SplitterType, get_splitter, ParentChildSplitter +from .embedders import LlamaCppEmbedder +from .vector_store import QdrantVectorStore +from .docstore_manager import get_docstore, PostgresDocStore, create_docstore + +logger = logging.getLogger(__name__) + + +@dataclass +class ParentChildConfig: + """Configuration for parent-child splitting.""" + parent_chunk_size: int = 1000 + child_chunk_size: int = 200 + parent_chunk_overlap: int = 100 + child_chunk_overlap: int = 20 + search_k: int = 5 + docstore_path: str = None + docstore_type: str = "local" + docstore_conn_string: str = None + + +class IndexBuilder: + """Main pipeline for RAG index construction.""" + + def __init__( + self, + collection_name: str = "rag_documents", + qdrant_url: str = None, + splitter_type: SplitterType = SplitterType.RECURSIVE, + **splitter_kwargs, + ): + self.collection_name = collection_name + self.qdrant_url = qdrant_url + self.splitter_type = splitter_type + self.splitter_kwargs = splitter_kwargs + + # Components + self.loader = DocumentLoader() + self.embedder = LlamaCppEmbedder() + self.embeddings = self.embedder.as_langchain_embeddings() + + self.vector_store = QdrantVectorStore( + collection_name=collection_name, + embeddings=self.embeddings, + qdrant_url=qdrant_url, + ) + + # Splitter (except parent-child which is handled separately) + if splitter_type != SplitterType.PARENT_CHILD: + if splitter_type == SplitterType.SEMANTIC: + splitter_kwargs["embeddings"] = self.embeddings + self.splitter = get_splitter(splitter_type, **splitter_kwargs) + else: + self.splitter = None + # Initialize ParentDocumentRetriever for parent-child splitting + self._init_parent_child_retriever() + + def _init_parent_child_retriever(self, **kwargs): + """ + Initialize ParentDocumentRetriever for parent-child chunking. + + This replaces the custom ParentChildSplitter logic. + """ + # Parse kwargs for parent-child config + parent_size = kwargs.get("parent_chunk_size", 1000) + child_size = kwargs.get("child_chunk_size", 200) + parent_overlap = kwargs.get("parent_chunk_overlap", kwargs.get("chunk_overlap", 100)) + child_overlap = kwargs.get("child_chunk_overlap", kwargs.get("chunk_overlap", 20)) + + # Define splitters + self.parent_splitter = RecursiveCharacterTextSplitter( + chunk_size=parent_size, + chunk_overlap=parent_overlap, + ) + self.child_splitter = RecursiveCharacterTextSplitter( + chunk_size=child_size, + chunk_overlap=child_overlap, + ) + + # Vector store (for child chunks) + self.vector_store_obj = self.vector_store.get_langchain_vectorstore() + + # Document store (for parent chunks) + docstore_path = kwargs.get("docstore_path") + docstore_type = kwargs.get("docstore_type", "local") + docstore_conn = kwargs.get("docstore_conn_string") + + if docstore_type == "postgres" and docstore_conn: + self.docstore = PostgresDocStore(docstore_conn) + self._docstore_conn = docstore_conn + else: + self.docstore = get_docstore(docstore_path) + self._docstore_conn = None + + # Create retriever + self.retriever = ParentDocumentRetriever( + vectorstore=self.vector_store_obj, + docstore=self.docstore, + child_splitter=self.child_splitter, + parent_splitter=self.parent_splitter, + search_kwargs={"k": kwargs.get("search_k", 5)}, + ) + + def build_from_file(self, file_path: Union[str, Path]) -> int: + logger.info("Loading file: %s", file_path) + documents = self.loader.load_file(file_path) + logger.info("Loaded %d documents", len(documents)) + return self._process_documents(documents) + + def build_from_directory(self, directory_path: Union[str, Path], recursive: bool = True) -> int: + logger.info("Loading directory: %s (recursive=%s)", directory_path, recursive) + documents = self.loader.load_directory(directory_path, recursive=recursive) + logger.info("Loaded %d documents from directory", len(documents)) + return self._process_documents(documents) + + def _process_documents(self, documents: List[Document]) -> int: + if not documents: + logger.warning("No documents to process") + return 0 + + if self.splitter_type == SplitterType.PARENT_CHILD: + logger.info("Using LangChain ParentDocumentRetriever") + + # Ensure collection exists for child chunks + self.vector_store.create_collection() + + # Use ParentDocumentRetriever to add documents + # This automatically handles parent-child splitting, mapping, and retrieval + self.retriever.add_documents(documents) + + # Log estimated chunk counts + estimated_parent_chunks = len(documents) * (self.parent_splitter._chunk_size // self.child_splitter._chunk_size) + logger.info( + "Indexed with ParentDocumentRetriever: " + f"~{len(documents)} parent chunks, ~{estimated_parent_chunks} child chunks" + ) + return len(documents) + + else: + logger.info("Splitting documents using %s", self.splitter_type) + chunks = self.splitter.split_documents(documents) + logger.info("Split into %d chunks", len(chunks)) + + self.vector_store.create_collection() + self.vector_store.add_documents(chunks) + return len(chunks) + + def get_collection_info(self): + return self.vector_store.get_collection_info() + + def search(self, query: str, k: int = 5) -> List[Document]: + """Standard search - returns child chunks.""" + return self.vector_store.similarity_search(query, k=k) + + def search_with_parent_context(self, query: str, k: int = 5) -> List[Document]: + """ + Search with parent context - returns full parent chunks. + + This is the main retrieval method when using parent-child splitting. + """ + if self.splitter_type != SplitterType.PARENT_CHILD: + raise RuntimeError( + "search_with_parent_context only available with PARENT_CHILD splitter. " + "Use search() for standard retrieval." + ) + return self.retriever.get_relevant_documents(query, k=k) + + def retrieve(self, query: str, return_parent: bool = True) -> List[Document]: + """ + Unified retrieval interface. + + Args: + query: Search query + return_parent: If True and using parent-child splitter, return parent chunks + If False, always return child chunks + + Returns: + List of relevant documents + """ + if self.splitter_type == SplitterType.PARENT_CHILD and return_parent: + return self.search_with_parent_context(query) + else: + return self.search(query) + + def get_retriever(self) -> ParentDocumentRetriever: + """ + Get the ParentDocumentRetriever instance directly. + + Useful for advanced use cases where you want to access the retriever + outside of IndexBuilder. + """ + if self.splitter_type != SplitterType.PARENT_CHILD: + raise RuntimeError( + "get_retriever() only available with PARENT_CHILD splitter. " + "Use search() or search_with_parent_context() for standard retrieval." + ) + return self.retriever + + def get_child_splitter(self) -> "RecursiveCharacterTextSplitter": + """Get the child splitter for reconfiguration.""" + if self.splitter_type != SplitterType.PARENT_CHILD: + return self.splitter + return self.child_splitter + + def get_parent_splitter(self) -> "RecursiveCharacterTextSplitter": + """Get the parent splitter for reconfiguration.""" + if self.splitter_type != SplitterType.PARENT_CHILD: + raise RuntimeError( + "Parent splitter only available with PARENT_CHILD splitter." + ) + return self.parent_splitter + + def get_docstore(self) -> BaseStore: + """Get the document store for parent chunks.""" + if self.splitter_type != SplitterType.PARENT_CHILD: + raise RuntimeError( + "Docstore only available with PARENT_CHILD splitter." + ) + return self.docstore + + def get_docstore_path(self) -> str: + """Get the document store path.""" + if self.splitter_type != SplitterType.PARENT_CHILD: + raise RuntimeError( + "Docstore path only available with PARENT_CHILD splitter." + ) + return self.docstore.persist_path + + def close(self): + """Close resources.""" + if hasattr(self, "_docstore_conn") and self._docstore_conn: + import psycopg2 + conn = psycopg2.connect(self._docstore_conn) + conn.close() + logger.info("Closed PostgreSQL connection") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + + +# RecursiveCharacterTextSplitter needs to be imported +from langchain_text_splitters import RecursiveCharacterTextSplitter + + +if __name__ == "__main__": + # Example usage + builder = IndexBuilder( + splitter_type=SplitterType.PARENT_CHILD, + parent_chunk_size=1000, + child_chunk_size=200, + docstore_path="./my_parent_docs", + ) + + print("Parent splitter:", builder.get_parent_splitter().chunk_size) + print("Child splitter:", builder.get_child_splitter().chunk_size) + print("Docstore path:", builder.get_docstore_path()) + print("Retriever:", builder.get_retriever()) diff --git a/rag_indexer/cli.py b/rag_indexer/cli.py new file mode 100755 index 0000000..b506ae3 --- /dev/null +++ b/rag_indexer/cli.py @@ -0,0 +1,102 @@ +""" +Command-line interface for the RAG index builder. +""" + +import argparse +import logging +import sys + +from builder import IndexBuilder +from splitters import SplitterType + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) + + +def main(): + parser = argparse.ArgumentParser(description="Offline RAG Index Builder") + parser.add_argument("--file", type=str, help="Path to file to index") + parser.add_argument("--dir", type=str, help="Path to directory to index") + parser.add_argument("--recursive", action="store_true", default=True, + help="Recursively process directories (default: True)") + parser.add_argument("--collection", type=str, default="rag_documents", + help="Qdrant collection name (default: rag_documents)") + parser.add_argument("--qdrant-url", type=str, + help="Qdrant server URL (default: http://127.0.0.1:6333)") + parser.add_argument("--splitter", type=str, + choices=["recursive", "semantic", "parent_child"], + default="recursive", + help="Text splitting strategy (default: recursive)") + parser.add_argument("--chunk-size", type=int, default=500, + help="Chunk size for recursive/parent splitter (default: 500)") + parser.add_argument("--chunk-overlap", type=int, default=50, + parser.add_argument("--docstore-path", type=str, + default=None, + help="Path to store parent documents for parent-child splitter (default: ./parent_docs or HERMES_HOME/parent_docs)") + parser.add_argument("--docstore-type", type=str, + choices=["local", "postgres"], + default="local", + help="Type of docstore: 'local' (default) or 'postgres' for PostgreSQL-backed storage") + parser.add_argument("--docstore-conn", type=str, + default=None, + help="PostgreSQL connection string for postgres docstore") + + help="Chunk overlap (default: 50)") + parser.add_argument("--parent-size", type=int, default=1000, + help="Parent chunk size for parent-child splitter (default: 1000)") + parser.add_argument("--child-size", type=int, default=200, + help="Child chunk size for parent-child splitter (default: 200)") + + args = parser.parse_args() + + if not args.file and not args.dir: + print("Error: Either --file or --dir must be specified", file=sys.stderr) + parser.print_help() + sys.exit(1) + + splitter_map = { + "recursive": SplitterType.RECURSIVE, + "semantic": SplitterType.SEMANTIC, + "parent_child": SplitterType.PARENT_CHILD, + } + splitter_type = splitter_map[args.splitter] + + splitter_kwargs = {} + if splitter_type == SplitterType.RECURSIVE: + splitter_kwargs["chunk_size"] = args.chunk_size + splitter_kwargs["chunk_overlap"] = args.chunk_overlap + elif splitter_type == SplitterType.PARENT_CHILD: + splitter_kwargs["parent_chunk_size"] = args.parent_size + splitter_kwargs["child_chunk_size"] = args.child_size + splitter_kwargs["parent_chunk_overlap"] = args.chunk_overlap + splitter_kwargs["child_chunk_overlap"] = args.chunk_overlap // 2 + splitter_kwargs["docstore_path"] = args.docstore_path + splitter_kwargs["docstore_type"] = args.docstore_type + splitter_kwargs["docstore_conn_string"] = args.docstore_conn + + builder = IndexBuilder( + collection_name=args.collection, + qdrant_url=args.qdrant_url, + splitter_type=splitter_type, + **splitter_kwargs + ) + + try: + if args.file: + chunk_count = builder.build_from_file(args.file) + else: + chunk_count = builder.build_from_directory(args.dir, args.recursive) + + print(f"Indexing completed. Total chunks indexed: {chunk_count}") + info = builder.get_collection_info() + print(f"Collection '{info['name']}' has {info['vectors_count']} vectors (dim={info['vector_size']})") + + except Exception as e: + logging.exception("Indexing failed") + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/rag_indexer/docstore_manager.py b/rag_indexer/docstore_manager.py new file mode 100644 index 0000000..d2cce2c --- /dev/null +++ b/rag_indexer/docstore_manager.py @@ -0,0 +1,142 @@ +""" +Document store manager for ParentDocumentRetriever. + +Supports both LocalFileStore (default) and custom PostgreSQL-backed stores. +""" + +import os +from typing import Optional +from langchain.storage import BaseStore, LocalFileStore + + +def get_docstore(persist_path: str = None) -> LocalFileStore: + """ + Create and return a document store for parent chunks. + + Args: + persist_path: Path to store parent documents. Defaults to ./parent_docs + or HERMES_HOME/parent_docs if set. + """ + if persist_path is None: + # Use HERMES_HOME if available, otherwise default to current directory + persist_path = os.getenv("HERMES_HOME") + if persist_path: + persist_path = os.path.join(persist_path, "parent_docs") + else: + persist_path = "./parent_docs" + + os.makedirs(persist_path, exist_ok=True) + return LocalFileStore(persist_path) + + +class PostgresDocStore(BaseStore): + """ + PostgreSQL-backed document store for parent chunks. + + This is an optional advanced feature. For most use cases, + LocalFileStore is sufficient and simpler. + """ + + def __init__(self, connection_string: str): + """ + Initialize PostgreSQL document store. + + Args: + connection_string: PostgreSQL connection URL + """ + import psycopg2 + from psycopg2 import sql + + self.conn_string = connection_string + self._conn = None + + # Create table if not exists + self._create_table() + + def _create_table(self): + """Create the parent documents table if not exists.""" + try: + self._conn = psycopg2.connect(self.conn_string) + cursor = self._conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS parent_documents ( + key TEXT PRIMARY KEY, + value JSONB NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW() + ) + """) + self._conn.commit() + cursor.close() + except Exception as e: + raise RuntimeError(f"Failed to create PostgreSQL table: {e}") + + def get(self, key: str) -> Optional[dict]: + """Retrieve a document by key.""" + try: + self._ensure_connection() + cursor = self._conn.cursor() + cursor.execute("SELECT value FROM parent_documents WHERE key = %s", (key,)) + row = cursor.fetchone() + cursor.close() + if row: + import json + return json.loads(row[0]) + return None + except Exception as e: + raise RuntimeError(f"Failed to retrieve document: {e}") + + def set(self, key: str, value: dict) -> None: + """Store a document.""" + try: + self._ensure_connection() + cursor = self._conn.cursor() + # Upsert + insert_query = sql.SQL( + "INSERT INTO parent_documents (key, value) VALUES (%s, %s)" + ) + update_query = sql.SQL( + "UPDATE parent_documents SET value = %s WHERE key = %s" + ) + cursor.execute(insert_query, (key, json.dumps(value))) + try: + cursor.execute(update_query, (key, json.dumps(value))) + except psycopg2.IntegrityError: + pass # Key exists, ignore + self._conn.commit() + cursor.close() + except Exception as e: + raise RuntimeError(f"Failed to store document: {e}") + + def _ensure_connection(self): + """Ensure we have an open connection.""" + if self._conn is None or self._conn.closed: + self._conn = psycopg2.connect(self.conn_string) + + def close(self): + """Close the connection.""" + if self._conn and not self._conn.closed: + self._conn.close() + + +# Factory function for creating custom docstores +# Returns a tuple: (BaseStore instance, connection_string or None) +def create_docstore( + store_type: str = "local", + persist_path: str = None, + connection_string: str = None +) -> tuple: + """ + Factory function to create different types of document stores. + + Args: + store_type: "local" (default), "postgres" + persist_path: Path for local file store + connection_string: PostgreSQL connection string + + Returns: + Tuple of (BaseStore instance, connection_string or None) + """ + if store_type == "postgres" and connection_string: + return (PostgresDocStore(connection_string), connection_string) + else: + return (get_docstore(persist_path), None) diff --git a/rag_indexer/embedders.py b/rag_indexer/embedders.py new file mode 100644 index 0000000..80e6adf --- /dev/null +++ b/rag_indexer/embedders.py @@ -0,0 +1,68 @@ +""" +Embedding model wrapper for llama.cpp service. +""" + +import os +from typing import List, Optional +from urllib.parse import urljoin + +from langchain_openai import OpenAIEmbeddings + + +class LlamaCppEmbedder: + """Wrapper for llama.cpp embedding service via OpenAI-compatible API.""" + + def __init__( + self, + base_url: Optional[str] = None, + api_key: Optional[str] = None, + model: str = "embeddinggemma-300M-Q8_0", + ): + self.base_url = base_url or os.getenv("LLAMACPP_EMBEDDING_URL", "http://127.0.0.1:8082") + self.api_key = api_key or os.getenv("LLAMACPP_API_KEY", "") + self.model = model + + # Ensure URL ends with /v1 + self.base_url = urljoin(self.base_url.rstrip("/") + "/", "v1") + + def as_langchain_embeddings(self) -> OpenAIEmbeddings: + """Create LangChain OpenAIEmbeddings instance.""" + return OpenAIEmbeddings( + openai_api_base=self.base_url, + openai_api_key=self.api_key, + model=self.model, + ) + + def embed_documents(self, texts: List[str]) -> List[List[float]]: + """Embed a list of documents.""" + emb = self.as_langchain_embeddings() + return emb.embed_documents(texts) + + def embed_query(self, text: str) -> List[float]: + """Embed a single query.""" + emb = self.as_langchain_embeddings() + return emb.embed_query(text) + + def get_embedding_dimension(self) -> int: + """Get embedding dimension by embedding a test string.""" + test_embedding = self.embed_query("test") + return len(test_embedding) + + +class MockEmbedder: + """Mock embedder for testing without a real service.""" + + def __init__(self, dimension: int = 768): + self.dimension = dimension + + def as_langchain_embeddings(self) -> OpenAIEmbeddings: + raise NotImplementedError("MockEmbedder cannot be used as LangChain embeddings") + + def embed_documents(self, texts: List[str]) -> List[List[float]]: + return [[0.0] * self.dimension for _ in texts] + + def embed_query(self, text: str) -> List[float]: + return [0.0] * self.dimension + + def get_embedding_dimension(self) -> int: + return self.dimension \ No newline at end of file diff --git a/rag_indexer/example_parent_child.py b/rag_indexer/example_parent_child.py new file mode 100644 index 0000000..19db145 --- /dev/null +++ b/rag_indexer/example_parent_child.py @@ -0,0 +1,124 @@ +""" +Example demonstrating ParentDocumentRetriever usage. + +This script shows how to: +1. Build an index with parent-child chunking +2. Search with child chunks (fast, precise) +3. Search with parent context (large context) +4. Access the retriever directly for advanced use cases +""" + +import logging +from pathlib import Path + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) + +from builder import IndexBuilder +from splitters import SplitterType + + +def main(): + print("=" * 70) + print("ParentDocumentRetriever Example") + print("=" * 70) + + # Step 1: Create IndexBuilder with parent-child splitting + print("\n1. Creating IndexBuilder with parent-child splitting...") + builder = IndexBuilder( + collection_name="parent_child_demo", + splitter_type=SplitterType.PARENT_CHILD, + parent_chunk_size=1000, # Parent chunks: larger context + child_chunk_size=200, # Child chunks: smaller for precision + docstore_path="./my_parent_docs", # Where to store parent chunks + search_k=5, # Number of child chunks to retrieve + ) + + print(f" Parent splitter: chunk_size={builder.get_parent_splitter().chunk_size}") + print(f" Child splitter: chunk_size={builder.get_child_splitter().chunk_size}") + print(f" Docstore path: {builder.get_docstore_path()}") + print(f" Search k: {builder.retriever.search_kwargs['k']}") + + # Step 2: Build index from a sample file + print("\n2. Building index from sample file...") + + # Create a test document + test_content = """ + This is a test document for demonstrating ParentDocumentRetriever. + + Parent chunks contain larger portions of text (1000 characters), + while child chunks are smaller (200 characters) for precise retrieval. + + When you search with ParentDocumentRetriever: + - It first retrieves relevant child chunks + - Then replaces them with their corresponding parent chunks + - This gives you large context while maintaining precision + + Example search queries: + - "ParentDocumentRetriever" + - "child chunks" + - "large context" + - "precise retrieval" + """ + + test_file = Path("./test_document.txt") + test_file.write_text(test_content) + + chunk_count = builder.build_from_file(str(test_file)) + print(f" Indexed {chunk_count} documents") + + # Step 3: Search with child chunks (fast, precise) + print("\n3. Searching with child chunks (fast, precise)...") + child_results = builder.search("ParentDocumentRetriever", k=3) + print(f" Found {len(child_results)} child chunks:") + for i, doc in enumerate(child_results, 1): + print(f" [{i}] {doc.page_content[:100]}...") + + # Step 4: Search with parent context (large context) + print("\n4. Searching with parent context (large context)...") + parent_results = builder.search_with_parent_context("ParentDocumentRetriever", k=3) + print(f" Found {len(parent_results)} parent chunks:") + for i, doc in enumerate(parent_results, 1): + print(f" [{i}] {doc.page_content[:150]}...") + + # Step 5: Compare results + print("\n5. Comparing child vs parent results...") + print(f" Child chunks total length: {sum(len(d.page_content) for d in child_results)}") + print(f" Parent chunks total length: {sum(len(d.page_content) for d in parent_results)}") + print(f" Ratio: parent/child = {sum(len(d.page_content) for d in parent_results) / max(sum(len(d.page_content) for d in child_results), 1):.2f}x larger") + + # Step 6: Access retriever directly + print("\n6. Accessing retriever directly...") + retriever = builder.get_retriever() + print(f" Retriever type: {type(retriever).__name__}") + print(f" Vectorstore: {retriever.vectorstore}") + print(f" Docstore: {retriever.docstore}") + + # Step 7: Unified retrieval interface + print("\n7. Using unified retrieval interface...") + unified_results = builder.retrieve("ParentDocumentRetriever", return_parent=True) + print(f" Retrieved {len(unified_results)} documents (with parent context)") + + # Step 8: Collection info + print("\n8. Collection info...") + info = builder.get_collection_info() + print(f" Collection: {info['name']}") + print(f" Vectors: {info['vectors_count']}") + print(f" Vector size: {info['vector_size']}") + + # Cleanup + print("\n9. Cleaning up...") + builder.close() + + print("\n" + "=" * 70) + print("Example completed successfully!") + print("=" * 70) + + return builder + + +if __name__ == "__main__": + builder = main() diff --git a/rag_indexer/loaders.py b/rag_indexer/loaders.py new file mode 100644 index 0000000..b896015 --- /dev/null +++ b/rag_indexer/loaders.py @@ -0,0 +1,91 @@ +""" +Document loaders using unstructured library. +""" + +import logging +from pathlib import Path +from typing import List, Union + +from langchain_core.documents import Document +from unstructured.partition.auto import partition + +logger = logging.getLogger(__name__) + + +class DocumentLoader: + """Load documents from various file formats.""" + + SUPPORTED_EXTENSIONS = {".pdf", ".docx", ".doc", ".txt", ".md", ".html", ".pptx", ".xlsx"} + + def __init__(self, extract_images: bool = False): + """ + Args: + extract_images: Whether to extract images from documents (requires additional dependencies) + """ + self.extract_images = extract_images + + def load_file(self, file_path: Union[str, Path]) -> List[Document]: + """Load a single file into LangChain Document objects.""" + file_path = Path(file_path).resolve() + if not file_path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + + suffix = file_path.suffix.lower() + if suffix not in self.SUPPORTED_EXTENSIONS: + raise ValueError( + f"Unsupported file extension: {suffix}. Supported: {self.SUPPORTED_EXTENSIONS}" + ) + + # Parse with unstructured + elements = partition( + filename=str(file_path), + extract_images_in_pdf=self.extract_images, + ) + + documents = [] + for elem in elements: + text = getattr(elem, "text", "") + if not text or not text.strip(): + continue + + # Base metadata + metadata = { + "source": str(file_path), + "file_name": file_path.name, + "file_type": suffix, + } + + # Merge element-specific metadata without overwriting base fields + elem_meta = getattr(elem, "metadata", {}) or {} + for key, value in elem_meta.items(): + if value and key not in metadata: + metadata[key] = value + + documents.append(Document(page_content=text, metadata=metadata)) + + if not documents: + logger.warning("No text content extracted from %s", file_path) + return [] + + return documents + + def load_directory( + self, directory_path: Union[str, Path], recursive: bool = True + ) -> List[Document]: + """Load all supported files from a directory.""" + directory_path = Path(directory_path).resolve() + if not directory_path.is_dir(): + raise NotADirectoryError(f"Not a directory: {directory_path}") + + all_documents = [] + pattern = "**/*" if recursive else "*" + + for file_path in directory_path.glob(pattern): + if file_path.is_file() and file_path.suffix.lower() in self.SUPPORTED_EXTENSIONS: + try: + docs = self.load_file(file_path) + all_documents.extend(docs) + except Exception as e: + logger.error("Failed to load %s: %s", file_path, e) + + return all_documents \ No newline at end of file diff --git a/rag_indexer/splitters.py b/rag_indexer/splitters.py new file mode 100644 index 0000000..718f969 --- /dev/null +++ b/rag_indexer/splitters.py @@ -0,0 +1,71 @@ +""" +Text splitters for chunking documents. +""" + +from enum import Enum +from typing import List, Optional + +from langchain_core.documents import Document +from langchain_text_splitters import RecursiveCharacterTextSplitter +from langchain_experimental.text_splitter import SemanticChunker + + +class SplitterType(str, Enum): + RECURSIVE = "recursive" + SEMANTIC = "semantic" + PARENT_CHILD = "parent_child" + + +def get_splitter(splitter_type: SplitterType, **kwargs): + """Factory function to create a text splitter.""" + if splitter_type == SplitterType.RECURSIVE: + chunk_size = kwargs.get("chunk_size", 500) + chunk_overlap = kwargs.get("chunk_overlap", 50) + return RecursiveCharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + separators=["\n\n", "\n", "。", "!", "?", " ", ""], + ) + elif splitter_type == SplitterType.SEMANTIC: + # Requires embeddings for semantic splitting + embeddings = kwargs.get("embeddings") + if embeddings is None: + raise ValueError("Semantic splitter requires 'embeddings' parameter") + return SemanticChunker(embeddings=embeddings) + else: + raise ValueError(f"Unsupported splitter type: {splitter_type}") + + +class ParentChildSplitter: + """ + Splits documents into parent (large) and child (small) chunks. + Child chunks are indexed for retrieval, parent chunks are stored for context. + """ + + def __init__( + self, + parent_chunk_size: int = 1000, + child_chunk_size: int = 200, + parent_chunk_overlap: int = 100, + child_chunk_overlap: int = 20, + ): + self.parent_splitter = RecursiveCharacterTextSplitter( + chunk_size=parent_chunk_size, + chunk_overlap=parent_chunk_overlap, + ) + self.child_splitter = RecursiveCharacterTextSplitter( + chunk_size=child_chunk_size, + chunk_overlap=child_chunk_overlap, + ) + + def split_documents(self, documents: List[Document]) -> tuple[List[Document], List[Document]]: + """ + Returns: + (parent_chunks, child_chunks) + """ + parent_chunks = self.parent_splitter.split_documents(documents) + child_chunks = self.child_splitter.split_documents(documents) + + # Link child chunks to parent IDs (optional metadata) + # In a real implementation, you'd map each child to a parent chunk ID. + return parent_chunks, child_chunks \ No newline at end of file diff --git a/rag_indexer/vector_store.py b/rag_indexer/vector_store.py new file mode 100644 index 0000000..87bd6bf --- /dev/null +++ b/rag_indexer/vector_store.py @@ -0,0 +1,110 @@ +""" +Qdrant vector store wrapper. +""" + +import logging +import os +from typing import List, Optional, Dict, Any + +from langchain_core.documents import Document +from langchain_qdrant import QdrantVectorStore as LangchainQdrantVS +from qdrant_client import QdrantClient +from qdrant_client.http import models +from qdrant_client.http.models import Distance, VectorParams + +from .embedders import LlamaCppEmbedder + +logger = logging.getLogger(__name__) + + +class QdrantVectorStore: + """Wrapper for Qdrant vector database operations.""" + + def __init__( + self, + collection_name: str, + embeddings: Optional[Any] = None, + qdrant_url: Optional[str] = None, + api_key: Optional[str] = None, + ): + self.collection_name = collection_name + self.qdrant_url = qdrant_url or os.getenv("QDRANT_URL", "http://127.0.0.1:6333") + self.api_key = api_key + + # Embeddings + if embeddings is None: + embedder = LlamaCppEmbedder() + self.embeddings = embedder.as_langchain_embeddings() + else: + self.embeddings = embeddings + + # Qdrant client + self.client = QdrantClient(url=self.qdrant_url, api_key=self.api_key) + + # LangChain vector store + self.vector_store = LangchainQdrantVS( + client=self.client, + collection_name=self.collection_name, + embeddings=self.embeddings, + ) + + def create_collection(self, vector_size: Optional[int] = None, force_recreate: bool = False): + """Create collection with appropriate vector size.""" + if vector_size is None: + embedder = LlamaCppEmbedder() + vector_size = embedder.get_embedding_dimension() + + collections = self.client.get_collections().collections + exists = any(c.name == self.collection_name for c in collections) + + if exists and force_recreate: + self.client.delete_collection(self.collection_name) + exists = False + + if not exists: + self.client.create_collection( + collection_name=self.collection_name, + vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE), + ) + logger.info("Collection '%s' created (dim=%d)", self.collection_name, vector_size) + else: + logger.info("Collection '%s' already exists", self.collection_name) + + def add_documents(self, documents: List[Document], batch_size: int = 100): + """Add documents to vector store.""" + if not documents: + return [] + self.create_collection() + ids = self.vector_store.add_documents(documents, batch_size=batch_size) + logger.info("Added %d documents to '%s'", len(ids), self.collection_name) + return ids + + def similarity_search(self, query: str, k: int = 5) -> List[Document]: + return self.vector_store.similarity_search(query, k=k) + + def similarity_search_with_score(self, query: str, k: int = 5) -> List[tuple[Document, float]]: + return self.vector_store.similarity_search_with_score(query, k=k) + + def delete_collection(self): + self.client.delete_collection(self.collection_name) + logger.info("Collection '%s' deleted", self.collection_name) + + def get_collection_info(self) -> Dict[str, Any]: + info = self.client.get_collection(self.collection_name) + return { + "name": info.name, + "vectors_count": info.vectors_count, + "status": info.status, + "vector_size": info.config.params.vectors.size, + } + + def as_langchain_vectorstore(self): + return self.vector_store + + def get_langchain_vectorstore(self): + """返回 LangChain Qdrant 向量存储对象(别名)""" + return self.vector_store + + def get_qdrant_client(self): + """返回原生 Qdrant 客户端(如需手动管理 collection)""" + return self.client \ No newline at end of file