""" Core pipeline builder for offline RAG index construction. Now supports LangChain's ParentDocumentRetriever for parent-child chunking. """ import logging from pathlib import Path from typing import List, Union, Optional, Tuple from dataclasses import dataclass from langchain_core.documents import Document from langchain.retrievers import ParentDocumentRetriever from langchain.storage import LocalFileStore, BaseStore from .loaders import DocumentLoader from .splitters import SplitterType, get_splitter, ParentChildSplitter from .embedders import LlamaCppEmbedder from .vector_store import QdrantVectorStore from .docstore_manager import get_docstore, PostgresDocStore, create_docstore logger = logging.getLogger(__name__) @dataclass class ParentChildConfig: """Configuration for parent-child splitting.""" parent_chunk_size: int = 1000 child_chunk_size: int = 200 parent_chunk_overlap: int = 100 child_chunk_overlap: int = 20 search_k: int = 5 docstore_path: str = None docstore_type: str = "local" docstore_conn_string: str = None class IndexBuilder: """Main pipeline for RAG index construction.""" def __init__( self, collection_name: str = "rag_documents", qdrant_url: str = None, splitter_type: SplitterType = SplitterType.RECURSIVE, **splitter_kwargs, ): self.collection_name = collection_name self.qdrant_url = qdrant_url self.splitter_type = splitter_type self.splitter_kwargs = splitter_kwargs # Components self.loader = DocumentLoader() self.embedder = LlamaCppEmbedder() self.embeddings = self.embedder.as_langchain_embeddings() self.vector_store = QdrantVectorStore( collection_name=collection_name, embeddings=self.embeddings, qdrant_url=qdrant_url, ) # Splitter (except parent-child which is handled separately) if splitter_type != SplitterType.PARENT_CHILD: if splitter_type == SplitterType.SEMANTIC: splitter_kwargs["embeddings"] = self.embeddings self.splitter = get_splitter(splitter_type, **splitter_kwargs) else: self.splitter = None # Initialize ParentDocumentRetriever for parent-child splitting self._init_parent_child_retriever() def _init_parent_child_retriever(self, **kwargs): """ Initialize ParentDocumentRetriever for parent-child chunking. This replaces the custom ParentChildSplitter logic. """ # Parse kwargs for parent-child config parent_size = kwargs.get("parent_chunk_size", 1000) child_size = kwargs.get("child_chunk_size", 200) parent_overlap = kwargs.get("parent_chunk_overlap", kwargs.get("chunk_overlap", 100)) child_overlap = kwargs.get("child_chunk_overlap", kwargs.get("chunk_overlap", 20)) # Define splitters self.parent_splitter = RecursiveCharacterTextSplitter( chunk_size=parent_size, chunk_overlap=parent_overlap, ) self.child_splitter = RecursiveCharacterTextSplitter( chunk_size=child_size, chunk_overlap=child_overlap, ) # Vector store (for child chunks) self.vector_store_obj = self.vector_store.get_langchain_vectorstore() # Document store (for parent chunks) docstore_path = kwargs.get("docstore_path") docstore_type = kwargs.get("docstore_type", "local") docstore_conn = kwargs.get("docstore_conn_string") if docstore_type == "postgres" and docstore_conn: self.docstore = PostgresDocStore(docstore_conn) self._docstore_conn = docstore_conn else: self.docstore = get_docstore(docstore_path) self._docstore_conn = None # Create retriever self.retriever = ParentDocumentRetriever( vectorstore=self.vector_store_obj, docstore=self.docstore, child_splitter=self.child_splitter, parent_splitter=self.parent_splitter, search_kwargs={"k": kwargs.get("search_k", 5)}, ) def build_from_file(self, file_path: Union[str, Path]) -> int: logger.info("Loading file: %s", file_path) documents = self.loader.load_file(file_path) logger.info("Loaded %d documents", len(documents)) return self._process_documents(documents) def build_from_directory(self, directory_path: Union[str, Path], recursive: bool = True) -> int: logger.info("Loading directory: %s (recursive=%s)", directory_path, recursive) documents = self.loader.load_directory(directory_path, recursive=recursive) logger.info("Loaded %d documents from directory", len(documents)) return self._process_documents(documents) def _process_documents(self, documents: List[Document]) -> int: if not documents: logger.warning("No documents to process") return 0 if self.splitter_type == SplitterType.PARENT_CHILD: logger.info("Using LangChain ParentDocumentRetriever") # Ensure collection exists for child chunks self.vector_store.create_collection() # Use ParentDocumentRetriever to add documents # This automatically handles parent-child splitting, mapping, and retrieval self.retriever.add_documents(documents) # Log estimated chunk counts estimated_parent_chunks = len(documents) * (self.parent_splitter._chunk_size // self.child_splitter._chunk_size) logger.info( "Indexed with ParentDocumentRetriever: " f"~{len(documents)} parent chunks, ~{estimated_parent_chunks} child chunks" ) return len(documents) else: logger.info("Splitting documents using %s", self.splitter_type) chunks = self.splitter.split_documents(documents) logger.info("Split into %d chunks", len(chunks)) self.vector_store.create_collection() self.vector_store.add_documents(chunks) return len(chunks) def get_collection_info(self): return self.vector_store.get_collection_info() def search(self, query: str, k: int = 5) -> List[Document]: """Standard search - returns child chunks.""" return self.vector_store.similarity_search(query, k=k) def search_with_parent_context(self, query: str, k: int = 5) -> List[Document]: """ Search with parent context - returns full parent chunks. This is the main retrieval method when using parent-child splitting. """ if self.splitter_type != SplitterType.PARENT_CHILD: raise RuntimeError( "search_with_parent_context only available with PARENT_CHILD splitter. " "Use search() for standard retrieval." ) return self.retriever.get_relevant_documents(query, k=k) def retrieve(self, query: str, return_parent: bool = True) -> List[Document]: """ Unified retrieval interface. Args: query: Search query return_parent: If True and using parent-child splitter, return parent chunks If False, always return child chunks Returns: List of relevant documents """ if self.splitter_type == SplitterType.PARENT_CHILD and return_parent: return self.search_with_parent_context(query) else: return self.search(query) def get_retriever(self) -> ParentDocumentRetriever: """ Get the ParentDocumentRetriever instance directly. Useful for advanced use cases where you want to access the retriever outside of IndexBuilder. """ if self.splitter_type != SplitterType.PARENT_CHILD: raise RuntimeError( "get_retriever() only available with PARENT_CHILD splitter. " "Use search() or search_with_parent_context() for standard retrieval." ) return self.retriever def get_child_splitter(self) -> "RecursiveCharacterTextSplitter": """Get the child splitter for reconfiguration.""" if self.splitter_type != SplitterType.PARENT_CHILD: return self.splitter return self.child_splitter def get_parent_splitter(self) -> "RecursiveCharacterTextSplitter": """Get the parent splitter for reconfiguration.""" if self.splitter_type != SplitterType.PARENT_CHILD: raise RuntimeError( "Parent splitter only available with PARENT_CHILD splitter." ) return self.parent_splitter def get_docstore(self) -> BaseStore: """Get the document store for parent chunks.""" if self.splitter_type != SplitterType.PARENT_CHILD: raise RuntimeError( "Docstore only available with PARENT_CHILD splitter." ) return self.docstore def get_docstore_path(self) -> str: """Get the document store path.""" if self.splitter_type != SplitterType.PARENT_CHILD: raise RuntimeError( "Docstore path only available with PARENT_CHILD splitter." ) return self.docstore.persist_path def close(self): """Close resources.""" if hasattr(self, "_docstore_conn") and self._docstore_conn: import psycopg2 conn = psycopg2.connect(self._docstore_conn) conn.close() logger.info("Closed PostgreSQL connection") def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False # RecursiveCharacterTextSplitter needs to be imported from langchain_text_splitters import RecursiveCharacterTextSplitter if __name__ == "__main__": # Example usage builder = IndexBuilder( splitter_type=SplitterType.PARENT_CHILD, parent_chunk_size=1000, child_chunk_size=200, docstore_path="./my_parent_docs", ) print("Parent splitter:", builder.get_parent_splitter().chunk_size) print("Child splitter:", builder.get_child_splitter().chunk_size) print("Docstore path:", builder.get_docstore_path()) print("Retriever:", builder.get_retriever())