diff --git a/QUICKSTART.md b/QUICKSTART.md index 2f5f92b..7d1cda8 100644 --- a/QUICKSTART.md +++ b/QUICKSTART.md @@ -22,15 +22,17 @@ vim .env # 或使用你喜欢的编辑器 **必需配置项**: - `ZHIPUAI_API_KEY` - 智谱 AI API 密钥(从 [智谱开放平台](https://open.bigmodel.cn/) 获取) +- `DEEPSEEK_API_KEY` - DeepSeek API 密钥(从 [DeepSeek 开放平台](https://platform.deepseek.com/) 获取) - `LLAMACPP_API_KEY` - llama.cpp 服务认证 Token(与容器启动参数一致,默认 `token-abc123`) **可选配置项**: -- `VLLM_BASE_URL` - LLM 服务地址(本地默认:`http://127.0.0.1:8081/v1`,Docker容器访问宿主机:`http://host.docker.internal:18000/v1`) -- `LLAMACPP_EMBEDDING_URL` - Embedding 服务地址(本地默认:`http://127.0.0.1:8082/v1`,Docker容器访问宿主机:`http://host.docker.internal:18001/v1`) -- `DB_URI` - PostgreSQL 连接字符串(默认已配置,使用远程服务器地址) -- `QDRANT_URL` - Qdrant 向量数据库地址(默认已配置,使用远程服务器地址) - -**注意**:Docker Compose 部署时,`API_URL` 由 `docker-compose.yml` 自动注入,无需在 `.env` 中配置。 +- `VLLM_BASE_URL` - LLM 服务地址(本地默认:`http://127.0.0.1:8081/v1`) +- `LLAMACPP_EMBEDDING_URL` - Embedding 服务地址(本地默认:`http://127.0.0.1:8082/v1`) +- `DB_URI` - PostgreSQL 连接字符串(默认已配置远程服务器地址) +- `QDRANT_URL` - Qdrant 向量数据库地址(默认已配置远程服务器地址) +- `LOG_LEVEL` - 日志级别(`DEBUG`/`INFO`/`WARNING`/`ERROR`) +- `ENABLE_GRAPH_TRACE` - 是否启用图流转追踪(`true`/`false`) +- `MEMORY_SUMMARIZE_INTERVAL` - 对话摘要生成间隔(默认 `10`) #### 2. 启动服务 @@ -40,12 +42,9 @@ docker compose -f docker/docker-compose.yml up -d --build #### 3. 访问应用 -**如果配置了 Nginx 反向代理**: -- 访问地址:`http://your-domain.com` 或 `http://your-server-ip` - -**如果未配置 Nginx(直接访问容器)**: - **前端**: http://127.0.0.1:8501 -- **后端 API**: http://127.0.0.1:8001 +- **后端 API**: http://127.0.0.1:8083 +- **健康检查**: http://127.0.0.1:8083/health #### 常用命令 @@ -56,11 +55,17 @@ docker compose ps # 查看日志 docker compose logs -f +# 查看特定服务日志 +docker compose logs -f backend + # 重启特定服务 docker compose restart backend # 停止所有服务 docker compose down + +# 重新构建并启动 +docker compose up -d --build ``` --- @@ -83,31 +88,31 @@ pip install -r requirement.txt 复制并编辑 `.env` 文件: -``` -# 基于 Docker 模板创建 +```bash cp .env.docker .env vim .env ``` -**本地开发需要修改以下配置**: +**本地开发配置示例**: -``env +```env +# API Keys ZHIPUAI_API_KEY=your_api_key_here +DEEPSEEK_API_KEY=your_deepseek_api_key_here LLAMACPP_API_KEY=token-abc123 -# 本地开发时,llama.cpp 服务在 127.0.0.1 -VLLM_BASE_URL=http://127.0.0.1:8081/v1 # 本地开发 -LLAMACPP_EMBEDDING_URL=http://127.0.0.1:8082/v1 # 本地开发 -# 或 -VLLM_BASE_URL=http://host.docker.internal:18000/v1 # Docker容器访问宿主机 -LLAMACPP_EMBEDDING_URL=http://host.docker.internal:18001/v1 # Docker容器访问宿主机 +# LLM 服务地址(本地开发) +VLLM_BASE_URL=http://127.0.0.1:8081/v1 +LLAMACPP_EMBEDDING_URL=http://127.0.0.1:8082/v1 -# 数据库和向量存储使用远程服务器 +# 远程数据库和向量存储 DB_URI=postgresql://postgres:huang1998@115.190.121.151:5432/langgraph_db?sslmode=disable QDRANT_URL=http://115.190.121.151:6333 -# 本地开发时,后端也在 127.0.0.1 -API_URL=http://127.0.0.1:8083/chat +# 日志和调试 +LOG_LEVEL=INFO +ENABLE_GRAPH_TRACE=true +MEMORY_SUMMARIZE_INTERVAL=10 ``` #### 3. 启动服务 @@ -119,10 +124,10 @@ python app/backend.py **终端 2 - 前端:** ```bash -cd frontend && streamlit run app.py +streamlit run frontend/frontend_main.py ``` -浏览器自动打开前端页面(如果配置了 Nginx,访问 `http://your-domain.com`;否则访问 http://127.0.0.1:8501) +浏览器自动打开前端页面,访问 http://127.0.0.1:8501 --- @@ -132,7 +137,7 @@ cd frontend && streamlit run app.py | 文件 | 用途 | |------|------| -| `docker-compose.yml` | 服务编排配置(仅包含 backend 和 frontend) | +| `docker-compose.yml` | 服务编排配置(包含 backend 和 frontend) | | `Dockerfile.backend` | 后端镜像构建 | | `Dockerfile.frontend` | 前端镜像构建 | | `.gitea/workflows/deploy.yml` | CI/CD 自动化部署 | @@ -141,14 +146,16 @@ cd frontend && streamlit run app.py ```yaml services: - backend: # FastAPI 后端服务(连接远程 PostgreSQL 和 Qdrant) - frontend: # Streamlit 前端界面 + backend: # FastAPI 后端服务(端口 8083) + frontend: # Streamlit 前端界面(端口 8501) ``` **特性:** - ✅ 通过环境变量连接远程 PostgreSQL 和 Qdrant - ✅ 自动重启策略(`restart: unless-stopped`) -- ✅ 内部网络隔离 +- ✅ 内部网络隔离(ai-network) +- ✅ 文档目录挂载(`./data/user_docs`) +- ✅ 日志目录挂载(`./logs`) ### 只更新特定服务 @@ -166,7 +173,7 @@ docker compose up -d frontend ### 添加新工具 -在 `app/tools.py` 中添加: +在 [app/graph/graph_tools.py](file:///home/huang/Study/AIProject/Agent1/app/graph/graph_tools.py) 中添加: ```python @tool @@ -184,35 +191,34 @@ def my_new_tool(param: str) -> str: return result ``` -工具会自动注册,无需修改其他文件。 +工具会自动注册到 `AVAILABLE_TOOLS` 列表,无需修改其他文件。 ### 添加新模型 -在 `app/agent.py` 中: +在 [app/agent/llm_factory.py](file:///home/huang/Study/AIProject/Agent1/app/agent/llm_factory.py) 中: ```python -def _create_new_model_llm(self): +@staticmethod +def create_new_model(): """创建新模型的 LLM""" - return YourChatModel( + api_key = os.getenv("NEW_MODEL_API_KEY") + return ChatOpenAI( + base_url="https://api.new-model.com/v1", + api_key=SecretStr(api_key), model="model-name", - api_key=os.getenv("YOUR_API_KEY"), + temperature=0.1, + streaming=True, ) - -# 在 initialize() 方法中注册 -model_configs = { - "zhipu": self._create_zhipu_llm, - "local": self._create_local_llm, - "new_model": self._create_new_model_llm, # 新增 -} ``` -在前端 `frontend/app.py` 中添加选项: +然后在 `CREATORS` 字典中注册: ```python -MODEL_OPTIONS = { - "智谱 GLM-4": "zhipu", - "本地 Gemma-4": "local", - "新模型": "new_model", # 新增 +CREATORS = { + "local": create_local, + "deepseek": create_deepseek, + "zhipu": create_zhipu, + "new_model": create_new_model, # 新增 } ``` @@ -226,7 +232,12 @@ docker compose exec backend bash docker compose logs -f backend # 测试后端 API -curl http://127.0.0.1:8001/health +curl http://127.0.0.1:8083/health + +# 测试对话接口 +curl -X POST http://127.0.0.1:8083/chat \ + -H "Content-Type: application/json" \ + -d '{"message": "你好", "model": "zhipu"}' ``` --- @@ -252,6 +263,7 @@ curl http://127.0.0.1:8001/health 在 Gitea 仓库设置中添加: - `ZHIPUAI_API_KEY` +- `DEEPSEEK_API_KEY` - `LLAMACPP_API_KEY` --- @@ -282,19 +294,19 @@ curl http://115.190.121.151:6333/collections docker compose logs backend # 检查端口占用 -lsof -i :8001 +lsof -i :8083 ``` **常见原因:** - API Key 未配置或错误 -- 端口 8001 被占用 +- 端口 8083 被占用 - 依赖包缺失 #### 3. 前端无法连接后端 **错误信息:** ``` -HTTPConnectionPool(host='backend', port=8001): Max retries exceeded +HTTPConnectionPool(host='backend', port=8083): Max retries exceeded ``` **解决方案:** @@ -324,7 +336,7 @@ docker compose logs backend | grep -i "model\|error" ``` **可能原因:** -- 智谱 API Key 无效 +- 智谱/DeepSeek API Key 无效 - vLLM 容器未启动(如使用本地模型) - 网络连接问题 diff --git a/README.md b/README.md index e654505..6ab1958 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,21 @@ --- +## 📑 目录导航 + +- [核心功能](#-核心功能) - 面向用户的功能和技术特性 +- [技术架构](#️-技术架构) - 技术栈、系统架构图、工作流流程图 +- [核心算法与实现原理](#-核心算法与实现原理) - RAG 检索、LangGraph 工作流、多模型路由、SSE 流式响应 +- [RAG 系统完整架构](#-rag-系统完整架构) - 离线索引构建、在线检索生成、演进路线 +- [快速开始](#-快速开始) - Docker 和本地部署指南 +- [使用指南](#-使用指南) - 基础对话、工具调用、多模型切换 +- [开发指南](#-开发指南) - 添加工具、添加模型、Docker 部署 +- [实现指南与最佳实践](#️-实现指南与最佳实践) - RAG 构建、性能优化、扩展开发、部署实践 +- [环境配置](#️-环境配置) - 配置文件、环境变量 +- [故障排查](#-故障排查) - 常见问题 + +--- + ## 🎯 核心功能 ### 面向用户的功能 @@ -22,59 +37,132 @@ - ✅ **前后端分离**:FastAPI 后端 + Streamlit 前端 - ✅ **Docker 部署**:一键启动所有服务 - ✅ **远程服务架构**:PostgreSQL 和 Qdrant 部署在远程服务器 +- ✅ **流式响应**:SSE 流式输出,提升用户体验 +- ✅ **模块化设计**:清晰的代码分层,易于扩展和维护 --- ## 🏗️ 技术架构 -### 技术栈 +### 技术栈总览 -| 层级 | 技术选型 | 说明 | -|------|---------|------| -| **LLM 服务** | 智谱 AI API / llama.cpp (Gemma-4 GGUF) | 云端 API 或本地推理 | -| **Embedding** | llama.cpp (embeddinggemma-300M GGUF) | 本地向量嵌入服务 | -| **Agent 框架** | LangGraph + LangChain | 工作流编排 | -| **向量数据库** | Qdrant | RAG 知识检索(远程服务器) | -| **后端框架** | FastAPI + Uvicorn | RESTful API + WebSocket | -| **前端框架** | Streamlit | 交互式 Web 界面 | -| **数据库** | PostgreSQL 16 | 对话记忆持久化(远程服务器) | -| **容器化** | Docker + Docker Compose | 服务编排 | +| 层级 | 组件 | 技术选型 | 版本 | 说明 | +|------|------|---------|------|------| +| **LLM 服务** | 云端模型 | 智谱 AI (glm-4.7-flash) | v4.7 | 快速响应,适合日常对话 | +| | | DeepSeek (deepseek-reasoner) | v3 | 深度推理,适合复杂问题 | +| | 本地模型 | Gemma-4-E4B-it | v4 | 本地部署,保护隐私 | +| **Embedding** | 向量嵌入 | llama.cpp server | latest | 本地 embedding 服务,支持多种模型 | +| **Agent 框架** | 工作流编排 | LangGraph + LangChain | latest | 状态机驱动的智能体工作流 | +| **向量数据库** | 向量检索 | Qdrant | v1.12+ | 高性能向量相似度检索(远程服务器) | +| **后端框架** | API 服务 | FastAPI + Uvicorn | v0.115+ | RESTful API + WebSocket + SSE 流式输出 | +| **前端框架** | Web 界面 | Streamlit | v1.40+ | 交互式对话界面,组件化设计 | +| **关系数据库** | 持久化存储 | PostgreSQL | v16 | 对话记忆持久化(远程服务器) | +| **容器化** | 服务编排 | Docker + Docker Compose | v24+ | 一键部署所有服务 | +| **CI/CD** | 自动化部署 | Gitea Workflows | latest | 代码推送自动构建部署 | -### 架构图 +### 系统架构流程图 + +```mermaid +graph TB + User[用户浏览器] -->|HTTP/SSE| Frontend[Streamlit 前端 :8501] + Frontend -->|REST API| Backend[FastAPI 后端 :8083] + + Backend --> AgentService[AIAgentService] + + AgentService -->|模型路由| LLMFactory[LLM Factory] + LLMFactory -->|创建| Zhipu[智谱 GLM-4.7] + LLMFactory -->|创建| DeepSeek[DeepSeek Reasoner] + LLMFactory -->|创建| LocalGemma[本地 Gemma-4] + + AgentService -->|初始化| LangGraph[LangGraph 工作流引擎] + + LangGraph -->|节点1| RetrieveMemory[记忆检索] + LangGraph -->|节点2| MemoryTrigger[记忆触发] + LangGraph -->|节点3| LLMCall[LLM 调用] + LangGraph -->|节点4| ToolNode[工具执行] + LangGraph -->|节点5| Summarize[记忆摘要] + LangGraph -->|节点6| Finalize[最终处理] + + ToolNode -->|调用| Tools[工具集合] + Tools -->|天气| WeatherTool[天气查询] + Tools -->|文件| FileTool[文件读取] + Tools -->|网页| WebTool[网页抓取] + Tools -->|RAG| RAGTool[知识库检索] + + RAGTool -->|检索| Qdrant[(Qdrant 向量库)] + RAGTool -->|重排序| CrossEncoder[Cross-Encoder] + + RetrieveMemory -->|存储| PostgreSQL[(PostgreSQL)] + Summarize -->|存储| PostgreSQL + + style User fill:#e1f5ff + style Frontend fill:#fff4e1 + style Backend fill:#e8f5e9 + style LangGraph fill:#f3e5f5 + style PostgreSQL fill:#ffebee + style Qdrant fill:#ffebee +``` + +### LangGraph 工作流详细流程 + +```mermaid +stateDiagram-v2 + [*] --> RetrieveMemory: 用户输入消息 + + RetrieveMemory --> MemoryTrigger: 检索历史记忆 + MemoryTrigger --> CheckMemory: 检查是否需要触发记忆 + + CheckMemory --> LLMCall: 记忆充足 + CheckMemory --> Summarize: 需要生成摘要 + + Summarize --> PostgreSQL: 保存摘要 + PostgreSQL --> LLMCall: 继续对话 + + LLMCall --> CheckTools: LLM 输出 + + CheckTools --> ToolNode: 需要调用工具 + CheckTools --> Finalize: 直接回复 + + ToolNode --> ExecuteTool: 执行工具 + ExecuteTool --> LLMCall: 工具结果返回 + + Finalize --> FormatResponse: 格式化响应 + FormatResponse --> [*]: SSE 流式输出 +``` + +### 数据流向图 ``` -┌──────────────┐ -│ 用户浏览器 │ Streamlit 前端 (8501) -└──────┬───────┘ - │ HTTP/WebSocket - ↓ -┌──────────────────┐ -│ FastAPI 后端 │ 端口 8001 -│ ┌────────────┐ │ -│ │ AIAgent │ │ 多模型管理 -│ └─────┬──────┘ │ -│ │ │ -│ ┌─────▼──────┐ │ -│ │LangGraph │ │ 工作流引擎 -│ │ StateGraph │ │ -│ └─────┬──────┘ │ -│ │ │ -│ ┌─────▼──────┐ │ -│ │ Tools │ │ 工具集合 -│ │ - Weather │ │ -│ │ - File IO │ │ -│ │ - Web Scrap│ │ -│ │ - Memory │ │ -│ └────────────┘ │ -└────────┬─────────┘ - │ - ┌────┴────────────────────┐ - ↓ ↓ -┌──────────────┐ ┌──────────────┐ -│ PostgreSQL │ │ Qdrant │ -│ (远程服务器) │ │ (远程服务器) │ -│ 115.190... │ │ 115.190... │ -└──────────────┘ └──────────────┘ +用户请求 + │ + ├─→ 前端 Streamlit + │ │ + │ ├─→ 发送 HTTP POST /api/chat + │ │ + │ └─→ 接收 SSE 流式响应 + │ + └─→ 后端 FastAPI + │ + ├─→ AIAgentService.achat() + │ │ + │ ├─→ 初始化 LangGraph 状态 + │ │ ├─→ messages: 对话历史 + │ │ ├─→ user_id: 用户标识 + │ │ └─→ metadata: 元数据 + │ │ + │ ├─→ 执行工作流 + │ │ ├─→ retrieve_memory: 从 PostgreSQL 检索历史 + │ │ ├─→ memory_trigger: 判断是否触发记忆 + │ │ ├─→ llm_call: 调用 LLM 生成响应 + │ │ ├─→ tool_node: 执行工具(如有需要) + │ │ ├─→ summarize: 生成对话摘要 + │ │ └─→ finalize: 格式化最终响应 + │ │ + │ └─→ 返回 SSE 流 + │ + └─→ 持久化存储 + ├─→ PostgreSQL: 对话历史和摘要 + └─→ Qdrant: RAG 向量索引 ``` ### 项目结构 @@ -84,40 +172,608 @@ Agent1/ ├── app/ │ ├── __init__.py │ ├── config.py # 配置管理 -│ ├── state.py # 状态定义 -│ ├── prompts.py # 提示模板 │ ├── logger.py # 日志工具 -│ ├── tools.py # 工具函数定义 -│ ├── memory/ +│ ├── backend.py # FastAPI 后端应用 +│ ├── agent/ │ │ ├── __init__.py -│ │ └── mem0_client.py # Mem0 客户端封装 +│ │ ├── service.py # Agent 服务核心 +│ │ ├── llm_factory.py # LLM 工厂 +│ │ ├── history.py # 历史查询服务 +│ │ ├── prompts.py # 提示模板 +│ │ └── rag_initializer.py # RAG 工具初始化 +│ ├── graph/ +│ │ ├── __init__.py +│ │ ├── graph_builder.py # LangGraph 图构建器 +│ │ ├── graph_tools.py # 工具定义 +│ │ ├── state.py # 状态定义 +│ │ └── retrieve_memory.py # 记忆检索 │ ├── nodes/ │ │ ├── __init__.py │ │ ├── router.py # 路由决策 │ │ ├── llm_call.py # LLM 调用节点 │ │ ├── tool_call.py # 工具执行节点 │ │ ├── retrieve_memory.py # 记忆检索节点 -│ │ └── summarize.py # 记忆存储节点 -│ ├── graph_builder.py # LangGraph 图构建器 -│ ├── agent.py # Agent 服务核心 -│ └── backend.py # FastAPI 后端应用 +│ │ ├── summarize.py # 记忆存储节点 +│ │ ├── finalize.py # 最终处理节点 +│ │ └── memory_trigger.py # 记忆触发节点 +│ ├── memory/ +│ │ ├── __init__.py +│ │ └── mem0_client.py # Mem0 客户端封装 +│ ├── rag/ +│ │ ├── __init__.py +│ │ ├── retriever.py # 检索器 +│ │ ├── reranker.py # 重排序 +│ │ ├── query_transform.py # 查询转换 +│ │ ├── pipeline.py # RAG 流水线 +│ │ ├── fusion.py # RAG-Fusion +│ │ └── tools.py # RAG 工具 +│ └── utils/ +│ └── __init__.py ├── frontend/ -│ └── app.py # Streamlit 前端界面 +│ ├── __init__.py +│ ├── frontend_main.py # Streamlit 主入口 +│ ├── api_client.py # API 客户端 +│ ├── config.py # 前端配置 +│ ├── state.py # 状态管理 +│ ├── logger.py # 日志 +│ ├── utils.py # 工具函数 +│ └── components/ +│ ├── __init__.py +│ ├── sidebar.py # 侧边栏组件 +│ ├── chat_area.py # 聊天区域组件 +│ └── info_panel.py # 信息面板组件 +├── rag_indexer/ +│ ├── __init__.py +│ ├── cli.py # 命令行入口 +│ ├── index_builder.py # 索引构建器 +│ ├── loaders.py # 文档加载器 +│ ├── splitters.py # 文本切分器 +│ └── test/ # 测试脚本 +├── rag_core/ +│ ├── __init__.py +│ ├── client.py # RAG 核心客户端 +│ ├── embedders.py # 嵌入模型 +│ ├── vector_store.py # 向量存储 +│ ├── retriever_factory.py # 检索器工厂 +│ └── store/ +│ ├── __init__.py +│ ├── factory.py # 存储工厂 +│ └── postgres.py # PostgreSQL 存储 ├── docker/ │ ├── docker-compose.yml # Docker 服务编排 │ ├── Dockerfile.backend # 后端镜像构建 │ └── Dockerfile.frontend # 前端镜像构建 +├── scripts/ +│ └── start.sh # 启动脚本 +├── .gitea/ +│ └── workflows/ +│ └── deploy.yml # CI/CD 自动化部署 ├── requirement.txt # Python 依赖 -├── .env # 环境变量配置 +├── .env.docker # Docker 环境变量模板 +├── .gitignore +├── LICENSE +├── QUICKSTART.md # 快速开始指南 └── user_docs/ # 用户文档目录 - ├── a.txt - ├── b.pdf - └── c.xlsx ``` --- -## 🚀 快速开始 +## 🧠 核心算法与实现原理 + +### 1. RAG 检索算法 + +#### 1.1 文本切分策略 + +项目支持三种文本切分策略,适应不同场景需求: + +**递归字符切分(Recursive Character Splitting)** +``` +算法思路: + 按优先级分隔符逐级切分:["\n\n", "\n", "。", "!", "?", " ", ""] + ↓ + 确保每个块不超过 chunk_size(默认 500 字符) + ↓ + 保留 chunk_overlap(默认 50 字符)避免上下文丢失 + +优势:简单高效,适合结构化文档 +实现:langchain_text_splitters.RecursiveCharacterTextSplitter +``` + +**语义切分(Semantic Chunking)** +``` +算法思路: + 1. 将文档按句子边界切分 + 2. 使用 Embedding 模型计算相邻句子的向量相似度 + 3. 计算相似度变化率(breakpoint threshold) + 4. 在相似度骤降处切分(语义主题变化点) + +阈值策略: + - percentile(百分位数):默认,取相似度分布的 95 百分位 + - standard_deviation(标准差):低于均值 1.5 标准差处切分 + - interquartile(四分位距):使用 IQR 方法检测异常点 + +优势:保持语义完整性,切分更自然 +实现:langchain_experimental.text_splitter.SemanticChunker +``` + +**父子块切分(Parent-Child Chunking)** +``` +算法思路: + 父块(大块):1000 字符,保留完整上下文 + ↓ + 子块(小块):语义切分,用于向量检索 + ↓ + 建立映射关系:child_id → parent_id + ↓ + 检索时:用子块检索,返回时扩展为父块上下文 + +检索流程: + 用户查询 → 向量检索匹配子块 → 通过映射获取父块 → 返回完整上下文 + +优势:检索精度高,上下文完整 +实现:自定义 ParentChildSplitter 类 +``` + +#### 1.2 混合检索(Dense + Sparse) + +``` +检索架构: +┌─────────────────────────────────────────────┐ +│ 用户查询 │ +└──────────────────┬──────────────────────────┘ + │ + ┌──────────┴──────────┐ + ↓ ↓ +┌───────────────┐ ┌───────────────┐ +│ 稠密向量检索 │ │ 稀疏 BM25 检索 │ +│ (语义相似) │ │ (关键词匹配) │ +│ │ │ │ +│ Embedding │ │ Token 化 │ +│ → 向量相似度 │ │ → 词频统计 │ +└───────┬───────┘ └───────┬───────┘ + │ │ + └──────────┬─────────┘ + ↓ + ┌────────────────┐ + │ 结果融合 │ + │ (RRF 算法) │ + └────────┬───────┘ + ↓ + ┌────────────────┐ + │ Cross-Encoder │ + │ 重排序 │ + └────────┬───────┘ + ↓ + ┌────────────────┐ + │ Top-K 结果返回 │ + └────────────────┘ +``` + +**稠密向量检索(Dense Retrieval)** +- 使用 Embedding 模型将查询和文档映射到同一向量空间 +- 计算余弦相似度,返回语义最相似的文档 +- 适合:语义理解、同义词匹配、概念检索 + +**稀疏向量检索(Sparse Retrieval - BM25)** +- 基于词频(TF)和逆文档频率(IDF)计算相关性 +- 适合:专有名词、精确匹配、术语检索 + +#### 1.3 RRF 融合算法(Reciprocal Rank Fusion) + +```python +# 核心算法实现 +def reciprocal_rank_fusion(doc_lists: List[List[Document]], k: int = 60) -> List[Document]: + """ + RRF 公式:RRF(d) = Σ (1 / (k + rank(d))) + + 参数说明: + - k: 平滑常数,通常设为 60 + - k 值越大,排名影响越小,结果越平滑 + - k 值越小,高排名文档优势越大 + + 算法步骤: + 1. 遍历每个检索结果列表 + 2. 对每个文档,根据其排名计算 RRF 得分 + 3. 累加同一文档在不同列表中的得分 + 4. 按总得分降序排序 + + 示例: + 文档 A 在列表 1 中排名第 1,在列表 2 中排名第 3 + RRF(A) = 1/(60+1) + 1/(60+3) = 0.0164 + 0.0159 = 0.0323 + + 文档 B 在列表 1 中排名第 2,在列表 2 中排名第 2 + RRF(B) = 1/(60+2) + 1/(60+2) = 0.0161 + 0.0161 = 0.0322 + + 结果:A 排名高于 B + """ + doc_to_score: Dict[str, float] = {} + doc_map: Dict[str, Document] = {} + + for docs in doc_lists: + for rank, doc in enumerate(docs, start=1): + doc_id = f"{doc.page_content[:200]}_{doc.metadata.get('source', '')}" + if doc_id not in doc_map: + doc_map[doc_id] = doc + score = doc_to_score.get(doc_id, 0.0) + 1.0 / (k + rank) + doc_to_score[doc_id] = score + + sorted_ids = sorted(doc_to_score.keys(), key=lambda x: doc_to_score[x], reverse=True) + return [doc_map[doc_id] for doc_id in sorted_ids] +``` + +**RRF 算法优势:** +- 无需归一化:不同检索器的分数范围可能不同,RRF 直接使用排名 +- 参数简单:仅需调整 k 值,默认 60 在大多数场景表现良好 +- 鲁棒性强:对异常值和噪声不敏感 + +#### 1.4 Cross-Encoder 重排序 + +``` +重排序流程: +┌──────────────────────────────────────────────┐ +│ 输入:RRF 融合后的 Top-20 文档 │ +└──────────────────┬───────────────────────────┘ + │ + ↓ +┌──────────────────────────────────────────────┐ +│ Cross-Encoder 模型(bge-reranker-v2-m3) │ +│ │ +│ 工作原理: │ +│ - 将 Query 和 Document 拼接输入模型 │ +│ - "[CLS] Query [SEP] Document [SEP]" │ +│ - 通过 Transformer 计算交互注意力 │ +│ - 输出相关性得分(0-1) │ +│ │ +│ 与 Bi-Encoder 的区别: │ +│ - Bi-Encoder:分别编码,计算余弦相似度(快) │ +│ - Cross-Encoder:联合编码,计算交互得分(准) │ +└──────────────────┬───────────────────────────┘ + │ + ↓ +┌──────────────────────────────────────────────┐ +│ 按相关性得分排序,返回 Top-5 │ +└──────────────────────────────────────────────┘ +``` + +**实现细节:** +- 使用远程 llama.cpp 服务部署重排序模型 +- 兼容 OpenAI Rerank API 格式 +- 超时保护:60 秒超时,失败时降级为原始排序 + +### 2. LangGraph 工作流算法 + +#### 2.1 状态机设计 + +```python +# 核心状态定义 +class AgentState(TypedDict): + messages: Annotated[list, add_messages] # 对话历史(自动合并) + user_id: str # 用户标识 + memory_context: str # 检索到的记忆上下文 + should_summarize: bool # 是否需要生成摘要 + tool_calls: list # 工具调用列表 + final_response: str # 最终响应 +``` + +**状态流转规则:** +``` +初始状态 → retrieve_memory → memory_trigger → [条件分支] + ↓ + ┌───────────────────┼───────────────────┐ + ↓ ↓ ↓ + should_summarize 直接回复 需要工具 + ↓ ↓ ↓ + summarize → save finalize tool_node + ↓ ↓ ↓ + llm_call ←───────────────┘ llm_call + ↓ + finalize +``` + +#### 2.2 记忆管理算法 + +**记忆检索策略:** +``` +1. 向量检索:将用户查询 Embedding,在 PostgreSQL 中检索相似对话 +2. 时间衰减:近期对话权重更高(可选) +3. 相关性过滤:仅返回相似度 > 阈值的记忆 +4. 上下文窗口:限制记忆长度,避免超出 LLM 上下文限制 +``` + +**摘要生成策略:** +``` +触发条件: + - 对话轮数超过阈值(默认 10 轮) + - 记忆上下文长度超过限制 + +摘要生成: + 1. 提取最近 N 轮对话 + 2. 调用 LLM 生成摘要 + 3. 保存摘要到 PostgreSQL + 4. 清理旧对话记录 + +摘要格式: + "用户在 {时间} 讨论了 {主题},关键信息:{要点}" +``` + +### 3. 多模型路由算法 + +``` +模型选择逻辑: +┌─────────────────────────────────────────────┐ +│ 用户选择模型 │ +└──────────────────┬──────────────────────────┘ + │ + ┌──────────┼──────────┐ + ↓ ↓ ↓ + ┌──────┐ ┌──────┐ ┌──────┐ + │ zhipu│ │deep │ │local │ + └──┬───┘ └──┬───┘ └──┬───┘ + │ │ │ + ↓ ↓ ↓ + ChatZhipu ChatOpenAI ChatOpenAI + (官方SDK) (DeepSeek) (vLLM/Gemma) + │ │ │ + └─────────┼─────────┘ + ↓ + ┌────────────────┐ + │ 统一接口输出 │ + │ (BaseChatModel)│ + └────────────────┘ +``` + +**高可用降级策略:** +``` +1. 优先使用用户选择的模型 +2. 如果模型不可用(API 错误、超时等): + - 尝试切换到备用模型 + - 记录错误日志 + - 返回降级提示给用户 +3. 健康检查:定期检查各模型服务状态 +``` + +### 4. SSE 流式响应算法 + +``` +流式输出流程: +┌─────────────────────────────────────────────┐ +│ LLM 生成 Token │ +└──────────────────┬──────────────────────────┘ + │ + ↓ +┌─────────────────────────────────────────────┐ +│ 事件格式化 │ +│ data: {"content": "你", "type": "content"} │ +│ data: {"content": "好", "type": "content"} │ +│ data: {"content": "!", "type": "content"} │ +│ data: {"done": true, "type": "end"} │ +└──────────────────┬──────────────────────────┘ + │ + ↓ +┌─────────────────────────────────────────────┐ +│ HTTP Response Stream │ +│ Content-Type: text/event-stream │ +│ Cache-Control: no-cache │ +│ Connection: keep-alive │ +└─────────────────────────────────────────────┘ +``` + +**实现要点:** +- 使用 `asyncio.Queue` 缓冲 Token +- 生产者:LLM 异步生成 Token 放入队列 +- 消费者:FastAPI `EventSourceResponse` 从队列读取并推送 +- 超时保护:30 秒无输出自动断开 + +--- + +## 📚 RAG 系统完整架构 + +### 离线索引构建 vs 在线检索生成 + +RAG 系统分为两个独立但协同的阶段: + +``` +┌──────────────────────────────────────────────────────────────────┐ +│ RAG 系统双阶段架构 │ +├──────────────────────────────────────────────────────────────────┤ +│ │ +│ 阶段一:离线索引构建 (rag_indexer) │ +│ ┌────────────────────────────────────────────────────────┐ │ +│ │ 文档加载 → 文本切分 → Embedding → 向量存储 │ │ +│ │ PDF/DOCX/TXT 递归/语义/父子块 llama.cpp Qdrant │ │ +│ └────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ 阶段二:在线检索生成 (app/rag) │ +│ ┌────────────────────────────────────────────────────────┐ │ +│ │ 用户查询 → 查询改写 → 多路检索 → RRF 融合 → 重排序 │ │ +│ │ MultiQuery Dense+Sparse Cross-Encoder │ │ +│ │ │ │ │ +│ │ ▼ │ │ +│ │ LLM 生成回答 │ │ +│ └────────────────────────────────────────────────────────┘ │ +│ │ +└──────────────────────────────────────────────────────────────────┘ +``` + +### RAG 演进路线 (Roadmap) + +#### Level 1: 基础向量搜索 (Basic Similarity Search) + +**核心算法**: 近似最近邻搜索 (ANN, 常用 HNSW 算法) + +``` +算法原理: + 用户问题 → Embedding 模型 → 向量表示 + ↓ + 计算与库中向量的余弦相似度 + ↓ + 取距离最近的 K 个块返回 + +优缺点: + ✅ 速度极快(毫秒级) + ❌ 只能捕捉"语义相似",专有名词匹配差 + +实现代码: + from app.rag.retriever import create_base_retriever + + retriever = create_base_retriever( + collection_name="rag_documents", + embeddings=embeddings, + search_kwargs={"k": 20} + ) + docs = retriever.invoke("什么是 RAG?") +``` + +#### Level 2: 混合检索与重排序 (Hybrid Search + Reranker) + +**1. 基础召回(混合检索)** + +``` +算法原理: + 稠密向量检索(语义相似)+ BM25 稀疏检索(关键词匹配) + ↓ + 两路结果并行获取,等待融合 + +实现代码: + from app.rag.retriever import create_hybrid_retriever + + retriever = create_hybrid_retriever( + collection_name="rag_documents", + embeddings=embeddings, + dense_k=10, + sparse_k=10, + score_threshold=0.3 + ) +``` + +**2. 二次精排(Cross-Encoder)** + +``` +算法原理: + 不同于双塔模型(分别算向量再求距离) + 交叉编码器将"问题 + 文档"拼接后整体输入 Transformer + 由模型直接输出 0~1 的相关性得分,精度极高 + +实现代码: + from app.rag.reranker import LLaMaCPPReranker + + reranker = LLaMaCPPReranker( + base_url="http://127.0.0.1:8083", + api_key="your-api-key", + top_n=5 + ) + sorted_docs = reranker.compress_documents(documents, query) +``` + +#### Level 3: RAG-Fusion (多路改写与倒数排名融合) + +**1. 多路查询改写** + +``` +算法原理: + 克服用户初始提问词不达意或视角受限的问题 + 通过 LLM 将单一问题改写为多个不同角度的查询 + +实现代码: + from app.rag.query_transform import MultiQueryGenerator + + generator = MultiQueryGenerator(llm=llm, num_queries=3) + queries = await generator.agenerate("如何申请项目资金?") + # 返回:["如何申请项目资金?", "项目资金申请流程是什么?", "申请项目经费需要哪些步骤?"] +``` + +**2. 倒数排名融合(RRF)** + +``` +算法原理: + RRF (Reciprocal Rank Fusion) 是一种无需评分归一化的融合算法 + 公式:RRF_score(d) = Σ 1/(k + rank_q(d)) + 有效避免某一极端检索结果主导全局 + +实现代码: + from app.rag.fusion import reciprocal_rank_fusion + + # 多个查询的检索结果 + doc_lists = [result1, result2, result3] + fused_docs = reciprocal_rank_fusion(doc_lists, k=60) +``` + +#### Level 4: Agentic RAG / Self-RAG (智能体与自我反思) + +``` +核心原理: + 基于 LangGraph 的 ReAct (Reasoning and Acting) 状态机路由 + 大模型并非每次都去死板地执行检索,而是先判断问题: + "这是闲聊?还是需要查知识库?" + +工作流程: + ┌──────────┐ ┌──────────────┐ ┌──────────┐ ┌────────┐ + │ User │────>│ LangGraph │────>│ RAG_Tool │────>│ Qdrant │ + │ │ │ Agent │ │ │ │ │ + │ "公司报 │ │ 思考: 这是 │ │ ToolCall │ │ RAG- │ + │ 销流程?"│ │ 内部规章问题 │ │ search_ │ │ Fusion │ + │ │ │ 需要查资料 │ │ knowledge│ │ & 混合 │ + │ │<────│ 资料充分, │<────│ 返回最相 │<────│ 检索 │ + │ "根据知 │ │ 开始撰写回答 │ │ 关5条规定 │ │ Cross- │ + │ 识库规定 │ │ │ │ │ │ Encoder│ + │ ..." │ │ │ │ │ │ 重排 │ + └────────── └────────────── └──────────┘ └────────┘ + +实现代码: + from app.rag.tools import search_knowledge_base + from app.graph.graph_builder import GraphBuilder + + # 将 RAG 工具添加到工具列表 + tools = AVAILABLE_TOOLS + [search_knowledge_base] + + # 构建图 + builder = GraphBuilder(llm, tools, tools_by_name) + graph = builder.build().compile(checkpointer=checkpointer) +``` + +#### Level 5: GraphRAG 集成 (基于图和关系的 RAG) + +``` +核心原理: + 结合知识图谱的结构化关系和向量检索的语义相似度 + 解决跨文档复杂关系推理问题 + +实现代码: + from langchain_community.graphs import Neo4jGraph + from langchain_experimental.graph_transformers import LLMGraphTransformer + + # 实体关系抽取 + transformer = LLMGraphTransformer(llm=local_llm) + graph_documents = transformer.convert_to_graph_documents(documents) + + # 存储到图数据库 + graph = Neo4jGraph(url="bolt://localhost:7687") + graph.add_graph_documents(graph_documents) +``` + +### 检索策略对比 + +| 策略 | 优点 | 缺点 | 适用场景 | +|:-----|:-----|:-----|:---------| +| **基础向量检索** | 速度快,语义理解好 | 专有名词匹配差 | 通用问答 | +| **混合检索** | 语义 + 关键词匹配 | 需要配置稀疏向量 | 专业术语查询 | +| **多路改写 + RRF** | 搜索面广,结果稳定 | 延迟略高 | 复杂问题 | +| **重排序** | 精度高 | 依赖额外模型 | 最终精排 | +| **Agentic RAG** | 智能决策,灵活 | 实现复杂 | 生产环境 | +| **GraphRAG** | 关系推理能力强 | 需要图数据库 | 知识密集型场景 | + +### 切分策略对比 + +| 策略 | 原理 | 优点 | 缺点 | 适用场景 | +|:-----|:-----|:-----|:-----|:---------| +| **递归字符** | 按分隔符递归切分 | 速度快,实现简单 | 可能截断语义 | 简单文档 | +| **语义切分** | 基于句子相似度阈值 | 语义连贯性好 | 需要 Embedding 模型 | 专业文档 | +| **父子块** | 大块存储+小块检索 | 检索精准+上下文完整 | 存储复杂度高 | 生产环境 | + +--- + +## �� 快速开始 详细启动指南请查看 [QUICKSTART.md](QUICKSTART.md) @@ -132,10 +788,8 @@ cp .env.docker .env docker compose -f docker/docker-compose.yml up -d --build # 3. 访问应用 -# 如果配置了 Nginx 反向代理:http://your-domain.com 或 http://your-server-ip -# 如果未配置 Nginx(直接访问): -# - 前端: http://127.0.0.1:8501 -# - 后端 API: http://127.0.0.1:8001 +# 前端: http://127.0.0.1:8501 +# 后端 API: http://127.0.0.1:8083 ``` ### 方式二:本地开发模式 @@ -152,7 +806,7 @@ cp .env.docker .env python app/backend.py # 4. 启动前端(新终端) -cd frontend && streamlit run app.py +streamlit run frontend/frontend_main.py ``` --- @@ -183,7 +837,8 @@ cd frontend && streamlit run app.py ### 多模型切换 1. 在左侧边栏选择模型: - - **智谱 GLM-4**:在线服务,速度快 + - **智谱 GLM-4.7**:在线服务,速度快 + - **DeepSeek Reasoner**:深度推理模型 - **本地 Gemma-4**:本地部署,隐私性好 2. 可随时切换,甚至在同一会话中 @@ -196,7 +851,7 @@ cd frontend && streamlit run app.py ### 添加新工具 -在 `app/tools.py` 中添加新的 `@tool` 装饰函数: +在 [app/graph/graph_tools.py](file:///home/huang/Study/AIProject/Agent1/app/graph/graph_tools.py) 中添加新的 `@tool` 装饰函数: ```python @tool @@ -218,13 +873,29 @@ def my_new_tool(param: str) -> str: ### 添加新模型 -在 `app/agent.py` 的 `initialize()` 方法中添加模型配置: +在 [app/agent/llm_factory.py](file:///home/huang/Study/AIProject/Agent1/app/agent/llm_factory.py) 中添加模型创建方法: ```python -model_configs = { - "zhipu": self._create_zhipu_llm, - "local": self._create_local_llm, - "new_model": self._create_new_model_llm, # 添加新模型 +@staticmethod +def create_new_model(): + api_key = os.getenv("NEW_MODEL_API_KEY") + return ChatOpenAI( + base_url="https://api.new-model.com/v1", + api_key=SecretStr(api_key), + model="model-name", + temperature=0.1, + streaming=True, + ) +``` + +然后在 `CREATORS` 字典中注册: + +```python +CREATORS = { + "local": create_local, + "deepseek": create_deepseek, + "zhipu": create_zhipu, + "new_model": create_new_model, # 新增 } ``` @@ -245,66 +916,32 @@ model_configs = { ### 配置文件说明 -项目采用三层环境配置文件体系: +项目采用两层环境配置文件体系: | 文件 | 用途 | 是否提交 Git | |------|------|------------| -| `.env.example` | 配置模板 | ✅ 是 | -| `.env` | 实际使用的配置 | ❌ 否(已忽略) | | `.env.docker` | Docker 部署模板 | ✅ 是 | +| `.env` | 实际使用的配置 | ❌ 否(已忽略) | **使用方法:** -- **本地开发**:`cp .env.example .env`,修改为 127.0.0.1 相关地址 +- **本地开发**:`cp .env.docker .env`,修改为本地服务地址 - **Docker 部署**:`cp .env.docker .env`,使用远程服务器地址 ### 必需的环境变量 | 变量名 | 说明 | 本地开发示例 | Docker 部署示例 | |--------|------|------------|----------------| -| `QDRANT_URL` | Qdrant 向量数据库地址 | `http://127.0.0.1:6333` | `http://your-server:6333` | -| `QDRANT_COLLECTION_NAME` | Qdrant 集合名称 | `mem0_user_memories` | `your_collection_name` | -| `VLLM_BASE_URL` | LLM 服务地址(本地) | `http://127.0.0.1:8081/v1` | `http://your-server:8081/v1` | -| `LLAMACPP_EMBEDDING_URL` | Embedding 服务地址(本地) | `http://127.0.0.1:8082/v1` | `http://your-server:8082/v1` | -| `VLLM_BASE_URL` | LLM 服务地址(Docker) | `http://host.docker.internal:18000/v1` | `http://your-server:18000/v1` | -| `LLAMACPP_EMBEDDING_URL` | Embedding 服务地址(Docker) | `http://host.docker.internal:18001/v1` | `http://your-server:18001/v1` | -| `LLAMACPP_API_KEY` | llama.cpp API 密钥 | `your-llamacpp-api-key` | `your-real-api-key` | -| `ZHIPUAI_API_KEY` | 智谱AI API密钥 | `your-zhipuai-api-key` | `your-real-api-key` | -| `DEEPSEEK_API_KEY` | DeepSeek API密钥 | `your-deepseek-api-key` | `your-real-api-key` | -| `VLLM_BASE_URL` | vLLM 服务地址 | `http://127.0.0.1:8081/v1` | `http://your-server:8081/v1` | -| `LOG_LEVEL` | 日志级别 | `INFO` | `DEBUG`/`INFO`/`WARNING`/`ERROR` | -| `ENABLE_GRAPH_TRACE` | 是否启用图流转追踪 | `true` | `true`/`false` | -| `MEMORY_SUMMARIZE_INTERVAL` | 对话摘要生成间隔 | `10` | `5`/`10`/`15` | - -### 配置示例 - -#### 本地开发 (.env) -``` -ZHIPUAI_API_KEY=your_api_key_here -DEEPSEEK_API_KEY=your_deepseek_api_key_here -LLAMACPP_API_KEY=token-abc123 -VLLM_BASE_URL=http://127.0.0.1:8081/v1 -LLAMACPP_EMBEDDING_URL=http://127.0.0.1:8082/v1 # 本地开发 -# 或 -LLAMACPP_EMBEDDING_URL=http://host.docker.internal:18001/v1 # Docker容器访问宿主机 -QDRANT_URL=http://115.190.121.151:6333 -DB_URI=postgresql://postgres:huang1998@115.190.121.151:5432/langgraph_db?sslmode=disable -API_URL=http://127.0.0.1:8083/chat -``` - -#### Docker 部署 (.env.docker) -``` -ZHIPUAI_API_KEY=your_api_key_here -DEEPSEEK_API_KEY=your_deepseek_api_key_here -LLAMACPP_API_KEY=token-abc123 -VLLM_BASE_URL=http://127.0.0.1:8081/v1 -LLAMACPP_EMBEDDING_URL=http://127.0.0.1:8082/v1 # 本地开发 -# 或 -LLAMACPP_EMBEDDING_URL=http://host.docker.internal:18001/v1 # Docker容器访问宿主机 -QDRANT_URL=http://115.190.121.151:6333 -DB_URI=postgresql://postgres:huang1998@115.190.121.151:5432/langgraph_db?sslmode=disable -# API_URL 在 docker-compose.yml 中配置为 http://backend:8083/chat -``` +| `ZHIPUAI_API_KEY` | 智谱AI API密钥 | `your-api-key` | `your-api-key` | +| `DEEPSEEK_API_KEY` | DeepSeek API密钥 | `your-api-key` | `your-api-key` | +| `LLAMACPP_API_KEY` | llama.cpp API 密钥 | `token-abc123` | `token-abc123` | +| `VLLM_BASE_URL` | LLM 服务地址 | `http://127.0.0.1:8081/v1` | `http://your-server:8081/v1` | +| `LLAMACPP_EMBEDDING_URL` | Embedding 服务地址 | `http://127.0.0.1:8082/v1` | `http://your-server:8082/v1` | +| `DB_URI` | PostgreSQL 连接字符串 | `postgresql://...@115.190.121.151:5432/langgraph_db` | 同左 | +| `QDRANT_URL` | Qdrant 向量数据库地址 | `http://115.190.121.151:6333` | 同左 | +| `LOG_LEVEL` | 日志级别 | `INFO` | `WARNING` | +| `ENABLE_GRAPH_TRACE` | 是否启用图流转追踪 | `true` | `false` | +| `MEMORY_SUMMARIZE_INTERVAL` | 对话摘要生成间隔 | `10` | `10` | ### 注意事项 @@ -314,7 +951,460 @@ DB_URI=postgresql://postgres:huang1998@115.190.121.151:5432/langgraph_db?sslmode --- -## 🐛 故障排查 +## �️ 实现指南与最佳实践 + +### 1. RAG 知识库构建指南 + +#### 1.1 离线索引构建流程 + +```bash +# 步骤 1:准备文档 +# 将文档放入指定目录(如 ./documents/) +# 支持格式:TXT, PDF, Markdown, DOCX + +# 步骤 2:选择切分策略 +# 根据文档类型选择: +# - 结构化文档(如手册、API 文档)→ Recursive Splitting +# - 非结构化文档(如文章、报告)→ Semantic Chunking +# - 长文档(如书籍、论文)→ Parent-Child Chunking + +# 步骤 3:构建索引 +cd rag_indexer +python cli.py build \ + --input-dir ../documents \ + --collection-name my_knowledge \ + --splitter-type semantic \ + --chunk-size 500 + +# 步骤 4:验证索引 +python cli.py query \ + --collection-name my_knowledge \ + --query "如何配置系统?" \ + --top-k 5 +``` + +#### 1.2 切分策略选择建议 + +| 文档类型 | 推荐策略 | chunk_size | 说明 | +|---------|---------|------------|------| +| API 文档 | Recursive | 300-500 | 结构清晰,按章节切分 | +| 技术文章 | Semantic | 自适应 | 保持语义完整性 | +| 法律合同 | Parent-Child | 父:1000, 子:200 | 需要完整上下文 | +| 问答对 | Recursive | 200-300 | 短文本,精确匹配 | +| 产品手册 | Parent-Child | 父:1500, 子:300 | 长文档,跨章节检索 | + +#### 1.3 Embedding 模型选择 + +| 模型 | 维度 | 语言 | 速度 | 精度 | 适用场景 | +|------|------|------|------|------|---------| +| bge-large-zh-v1.5 | 1024 | 中文 | 中 | 高 | 中文文档检索 | +| bge-m3 | 1024 | 多语言 | 中 | 高 | 多语言混合文档 | +| text-embedding-3-small | 1536 | 英文 | 快 | 中 | 英文文档检索 | +| gte-large | 1024 | 中文 | 慢 | 很高 | 高精度要求场景 | + +### 2. 性能优化指南 + +#### 2.1 检索性能优化 + +```python +# 优化 1:调整检索参数 +search_kwargs = { + "k": 20, # 召回数量(增加召回,提高精度) + "score_threshold": 0.3, # 相似度阈值(过滤低质量结果) + "fetch_k": 50, # 初始召回数量(用于 MMR 去重) + "lambda_mult": 0.7, # MMR 多样性参数(0=去重,1=不去重) +} + +# 优化 2:使用缓存 +from functools import lru_cache + +@lru_cache(maxsize=1000) +def cached_retrieve(query: str) -> List[Document]: + """缓存常见查询结果""" + return retriever.invoke(query) + +# 优化 3:批量 Embedding +# 构建索引时,使用批量处理提高效率 +batch_size = 32 +for i in range(0, len(documents), batch_size): + batch = documents[i:i+batch_size] + embeddings = embedder.embed_documents([doc.page_content for doc in batch]) +``` + +#### 2.2 LLM 调用优化 + +```python +# 优化 1:使用流式响应 +# 减少首字延迟,提升用户体验 +response = await llm.astream(messages) +async for chunk in response: + yield chunk.content + +# 优化 2:控制上下文长度 +MAX_CONTEXT_LENGTH = 4000 + +def truncate_context(context: str, max_length: int = MAX_CONTEXT_LENGTH) -> str: + """截断上下文,保留开头和结尾""" + if len(context) <= max_length: + return context + half = max_length // 2 + return context[:half] + "\n...\n" + context[-half:] + +# 优化 3:Prompt 优化 +# 使用结构化 Prompt,提高 LLM 理解能力 +prompt_template = """ +你是一个智能助手。请根据以下上下文回答用户问题。 + +上下文: +{context} + +问题:{question} + +回答要求: +1. 仅基于上下文回答,不要编造信息 +2. 如果上下文中没有答案,明确告知用户 +3. 引用上下文中的具体信息时,注明来源 +""" +``` + +#### 2.3 数据库性能优化 + +```sql +-- PostgreSQL 优化 +-- 1. 为对话历史表添加索引 +CREATE INDEX idx_conversations_user_id ON conversations(user_id); +CREATE INDEX idx_conversations_created_at ON conversations(created_at); + +-- 2. 为记忆摘要表添加索引 +CREATE INDEX idx_summaries_user_id ON summaries(user_id); + +-- 3. 定期清理旧数据(保留最近 90 天) +DELETE FROM conversations +WHERE created_at < NOW() - INTERVAL '90 days'; + +-- Qdrant 优化 +-- 1. 使用 HNSW 索引(默认已启用) +-- 2. 调整 ef_construct 和 m 参数平衡速度和精度 +-- 3. 定期优化集合 +curl -X POST http://localhost:6333/collections/my_docs/actions \ + -H "Content-Type: application/json" \ + -d '{"actions": [{"optimize": {}}]}' +``` + +### 3. 扩展开发指南 + +#### 3.1 添加自定义工具 + +```python +# 在 app/graph/graph_tools.py 中添加 + +from langchain_core.tools import tool +from typing import Optional + +@tool +def calculate_expression( + expression: str, + precision: int = 2 +) -> str: + """ + 计算数学表达式的值。 + + 支持基本运算:加减乘除、幂运算、括号。 + + Args: + expression: 数学表达式,如 "2 + 3 * 4" + precision: 结果精度(小数位数),默认 2 + + Returns: + 计算结果 + """ + try: + # 安全计算(避免 eval 的安全风险) + import ast + import operator + + # 定义允许的操作符 + ops = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.Pow: operator.pow, + } + + def eval_node(node): + if isinstance(node, ast.Num): + return node.n + elif isinstance(node, ast.BinOp): + left = eval_node(node.left) + right = eval_node(node.right) + return ops[type(node.op)](left, right) + else: + raise ValueError(f"不支持的操作: {type(node)}") + + tree = ast.parse(expression, mode='eval') + result = eval_node(tree.body) + return f"{result:.{precision}f}" + except Exception as e: + return f"计算错误: {str(e)}" + +# 工具会自动注册,无需手动添加到 AVAILABLE_TOOLS +``` + +#### 3.2 添加自定义 LLM 提供商 + +```python +# 在 app/agent/llm_factory.py 中添加 + +from langchain_openai import ChatOpenAI +from langchain_core.language_models import BaseChatModel + +class LLMFactory: + @staticmethod + def create_anthropic_model() -> BaseChatModel: + """创建 Anthropic Claude 模型""" + api_key = os.getenv("ANTHROPIC_API_KEY") + if not api_key: + raise ValueError("ANTHROPIC_API_KEY 环境变量未设置") + + return ChatAnthropic( + model="claude-3-sonnet-20240229", + temperature=0.1, + max_tokens=4096, + api_key=api_key, + ) + + # 注册到 CREATORS + CREATORS = { + "local": create_local, + "deepseek": create_deepseek, + "zhipu": create_zhipu, + "anthropic": create_anthropic_model, # 新增 + } +``` + +#### 3.3 添加自定义 RAG 检索器 + +```python +# 在 app/rag/retriever.py 中添加 + +from langchain_core.retrievers import BaseRetriever +from langchain_core.documents import Document + +class HybridRetriever(BaseRetriever): + """混合检索器:结合多种检索策略""" + + dense_retriever: BaseRetriever + sparse_retriever: BaseRetriever + reranker: Optional[BaseReranker] = None + top_k: int = 5 + + def _get_relevant_documents(self, query: str) -> List[Document]: + # 并行检索 + dense_docs = self.dense_retriever.invoke(query) + sparse_docs = self.sparse_retriever.invoke(query) + + # RRF 融合 + fused_docs = reciprocal_rank_fusion([dense_docs, sparse_docs]) + + # 重排序 + if self.reranker: + fused_docs = self.reranker.compress_documents(fused_docs[:20], query) + + return fused_docs[:self.top_k] +``` + +### 4. 部署最佳实践 + +#### 4.1 生产环境配置 + +```yaml +# docker-compose.prod.yml +version: '3.8' + +services: + backend: + build: + context: . + dockerfile: docker/Dockerfile.backend + environment: + - LOG_LEVEL=WARNING # 生产环境降低日志级别 + - ENABLE_GRAPH_TRACE=false # 关闭图追踪(提升性能) + - MEMORY_SUMMARIZE_INTERVAL=5 # 更频繁的摘要生成 + deploy: + resources: + limits: + cpus: '2' + memory: 4G + reservations: + cpus: '1' + memory: 2G + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8083/health"] + interval: 30s + timeout: 10s + retries: 3 + + frontend: + build: + context: . + dockerfile: docker/Dockerfile.frontend + environment: + - STREAMLIT_SERVER_PORT=8501 + - STREAMLIT_SERVER_HEADLESS=true + deploy: + resources: + limits: + cpus: '1' + memory: 2G + restart: unless-stopped +``` + +#### 4.2 监控与告警 + +```python +# 添加健康检查端点 +from fastapi import FastAPI +from datetime import datetime + +app = FastAPI() + +@app.get("/health") +async def health_check(): + """健康检查端点""" + return { + "status": "healthy", + "timestamp": datetime.now().isoformat(), + "services": { + "postgresql": await check_postgresql(), + "qdrant": await check_qdrant(), + "llm": await check_llm_service(), + } + } + +async def check_postgresql() -> bool: + try: + # 尝试连接数据库 + async with asyncpg.connect(DB_URI) as conn: + await conn.fetchval("SELECT 1") + return True + except Exception: + return False + +async def check_qdrant() -> bool: + try: + client = QdrantClient(url=QDRANT_URL) + client.get_collections() + return True + except Exception: + return False +``` + +#### 4.3 安全最佳实践 + +```python +# 1. 使用环境变量管理密钥 +# 永远不要硬编码 API Key +import os +from typing import Optional + +def get_api_key(env_var: str, default: Optional[str] = None) -> str: + api_key = os.getenv(env_var, default) + if not api_key: + raise ValueError(f"{env_var} 环境变量未设置") + return api_key + +# 2. 输入验证 +from pydantic import BaseModel, validator + +class ChatRequest(BaseModel): + message: str + user_id: str + model: str = "zhipu" + + @validator('message') + def validate_message(cls, v): + if len(v) > 4000: + raise ValueError("消息长度不能超过 4000 字符") + return v.strip() + + @validator('model') + def validate_model(cls, v): + allowed_models = {"zhipu", "deepseek", "local"} + if v not in allowed_models: + raise ValueError(f"不支持的模型: {v}") + return v + +# 3. 速率限制 +from slowapi import Limiter +from slowapi.util import get_remote_address + +limiter = Limiter(key_func=get_remote_address) + +@app.post("/api/chat") +@limiter.limit("10/minute") # 每分钟最多 10 次请求 +async def chat(request: Request, chat_request: ChatRequest): + ... +``` + +### 5. 调试与测试 + +#### 5.1 启用调试模式 + +```bash +# 启用详细日志 +export LOG_LEVEL=DEBUG + +# 启用 LangGraph 图追踪 +export ENABLE_GRAPH_TRACE=true + +# 查看图执行流程 +# 启动后访问:http://localhost:8083/graph/trace +``` + +#### 5.2 单元测试示例 + +```python +# tests/test_rag.py +import pytest +from app.rag.fusion import reciprocal_rank_fusion +from langchain_core.documents import Document + +def test_rrf_fusion(): + # 准备测试数据 + docs1 = [ + Document(page_content="文档 A"), + Document(page_content="文档 B"), + ] + docs2 = [ + Document(page_content="文档 B"), + Document(page_content="文档 C"), + ] + + # 执行 RRF 融合 + result = reciprocal_rank_fusion([docs1, docs2]) + + # 验证结果 + assert len(result) == 3 + assert result[0].page_content == "文档 B" # B 在两个列表中都出现,应该排第一 + +@pytest.mark.asyncio +async def test_retriever(): + from app.rag.retriever import create_base_retriever + + retriever = create_base_retriever( + collection_name="test_collection", + embeddings=mock_embeddings, + ) + + docs = await retriever.ainvoke("测试查询") + assert len(docs) > 0 +``` + +--- + +## �� 故障排查 ### 常见问题 @@ -328,7 +1418,7 @@ curl http://115.190.121.151:6333/collections ``` **Q: 后端启动失败?** -- 确认端口 8001 未被占用 +- 确认端口 8083 未被占用 - 检查 `.env` 中的 API Key 是否正确 - 查看启动日志确认模型初始化成功 diff --git a/app/rag/README.md b/app/rag/README.md index b0e0423..a91d8f6 100644 --- a/app/rag/README.md +++ b/app/rag/README.md @@ -2,170 +2,390 @@ 该模块负责 RAG 系统的阶段二:**在线检索与生成**。它接收用户提问,从知识库中检索出上下文,利用各种高级策略去噪、融合,并作为增强上下文输入给大语言模型 (LLM)。 +## 🎯 核心架构 + +### 技术栈 + +| 组件 | 技术选型 | 版本 | 说明 | +|:-----|:---------|:-----|:-----| +| **基础检索** | `Qdrant` | 1.17+ | HNSW 稠密向量检索 | +| **混合检索** | `Qdrant` + `BM25` | 内置 | 稠密 + 稀疏向量融合 | +| **查询改写** | `LangChain` | 内置 | `MultiQueryGenerator` 多路改写 | +| **RRF 融合** | 自实现 | - | `reciprocal_rank_fusion` 倒数排名融合 | +| **重排序** | `llama.cpp` | 本地服务 | OpenAI 兼容 Rerank API | +| **编排框架** | `asyncio` | Python 3.10+ | 异步并行检索 | + +### 检索流水线 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ 用户提问 │ +└──────────────────────┬──────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ MultiQueryGenerator │ +│ 多路查询改写 (num_queries=3) │ +│ "如何申请项目资金?" → ["项目资金申请流程", "经费申请步骤"] │ +└──────────────────────┬──────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ 并行检索 (asyncio.gather) │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ +│ │ 查询1 检索 │ │ 查询2 检索 │ │ 查询3 检索 │ │ +│ │ (k=20) │ │ (k=20) │ │ (k=20) │ │ +│ └──────────────┘ └──────────────┘ └──────────────┘ │ +└──────────────────────┬──────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ reciprocal_rank_fusion (RRF) │ +│ RRF_score(d) = Σ 1/(k + rank_q(d)) (k=60) │ +│ 融合多路检索结果,去重排序 │ +└──────────────────────┬──────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ LLaMaCPPReranker │ +│ 远程重排序 (bge-reranker-v2-m3) │ +│ 返回 Top-N (top_n=5) 最相关文档 │ +└──────────────────────┬──────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ 返回增强上下文 │ +│ format_context() → 格式化输出 │ +└─────────────────────────────────────────────────────────────┘ +``` + +### 技术特性 + +- ✅ **多路查询改写**:通过 LLM 将单一问题改写为多个不同角度的查询 +- ✅ **RRF 融合算法**:Reciprocal Rank Fusion,无需评分归一化的融合算法 +- ✅ **远程重排序**:使用 llama.cpp 服务的 OpenAI 兼容 Rerank API +- ✅ **混合检索支持**:稠密向量 + BM25 稀疏向量混合检索 +- ✅ **异步并行检索**:多路查询并行执行,提升检索速度 +- ✅ **优雅降级**:重排序器不可用时自动降级到基础融合结果 + +## 📂 架构与文件结构 + +``` +app/rag/ +├── __init__.py +├── retriever.py # Qdrant 基础检索与混合检索 +├── reranker.py # llama.cpp 远程重排序器 +├── query_transform.py # 多路查询改写生成器 +├── fusion.py # RRF 倒数排名融合算法 +├── pipeline.py # RAG 流水线编排 +└── tools.py # LangChain Tool 封装 +``` + ## 🎯 演进路线与算法详解 (Roadmap) ### Level 1: 基础向量搜索 (Basic Similarity Search) + - **核心算法**: 近似最近邻搜索 (ANN, 常用 HNSW 算法)。将用户问题转化为向量后,计算它与库中向量的余弦相似度 (Cosine Similarity),取距离最近的 K 个块。 -- **优缺点**: 速度极快。但只能捕捉“语义相似”,如果用户搜索特定专有名词、编号、订单号,纯向量检索往往会失效(产生“幻觉”匹配)。 +- **优缺点**: 速度极快。但只能捕捉"语义相似",如果用户搜索特定专有名词、编号、订单号,纯向量检索往往会失效(产生"幻觉"匹配)。 - **实现指南**: - 使用 `rag_indexer.embedders.LlamaCppEmbedder` 作为嵌入模型 - 使用 `app/rag/retriever.py` 中的 `create_base_retriever` 创建基础检索器 - 配置 `search_kwargs={"k": 20}` 进行初步召回 +```python +from app.rag.retriever import create_base_retriever + +retriever = create_base_retriever( + collection_name="rag_documents", + embeddings=embeddings, + search_kwargs={"k": 20} +) +docs = retriever.invoke("什么是 RAG?") +``` + ### Level 2: 混合检索与重排序 (Hybrid Search + Reranker) -混合检索旨在结合向量的“语义泛化”与关键词的“精准匹配”,随后利用重排序模型过滤噪声。 + +混合检索旨在结合向量的"语义泛化"与关键词的"精准匹配",随后利用重排序模型过滤噪声。 **1. 基础召回 (混合检索)** + - **核心原理**: 结合基于 HNSW 的 Dense Vector 相似度搜索与基于 TF-IDF 的 BM25 稀疏检索 (Sparse Vector)。 - **实现指南**: 使用 `app/rag/retriever.py` 中的 `create_hybrid_retriever` 函数,配置 `dense_k=10` 和 `sparse_k=10`,总召回 20 条结果。 +```python +from app.rag.retriever import create_hybrid_retriever + +retriever = create_hybrid_retriever( + collection_name="rag_documents", + embeddings=embeddings, + dense_k=10, + sparse_k=10, + score_threshold=0.3 +) +``` + **2. 二次精排 (Cross-Encoder)** -- **核心原理**: 不同于双塔模型(分别算向量再求距离),交叉编码器将“用户问题 + 检索到的单例文档”拼接后整体输入 Transformer 模型,由模型直接输出 0~1 的相关性得分,精度极高。 + +- **核心原理**: 不同于双塔模型(分别算向量再求距离),交叉编码器将"用户问题 + 检索到的单例文档"拼接后整体输入 Transformer 模型,由模型直接输出 0~1 的相关性得分,精度极高。 - **实现指南**: - - 使用 `app/rag/reranker.py` 中的 `CrossEncoderReranker` 类,加载 `BAAI/bge-reranker-base` 模型 + - 使用 `app/rag/reranker.py` 中的 `LLaMaCPPReranker` 类,加载 `bge-reranker-v2-m3` 模型 - 设置 `top_n=5` 保留最相关的 5 条结果 - - 使用 `ContextualCompressionRetriever` 组合基础检索器和重排序器 + +```python +from app.rag.reranker import LLaMaCPPReranker + +reranker = LLaMaCPPReranker( + base_url="http://127.0.0.1:8083", + api_key="your-api-key", + top_n=5 +) +sorted_docs = reranker.compress_documents(documents, query) +``` ### Level 3: RAG-Fusion (多路改写与倒数排名融合) + RAG-Fusion 通过大模型发散思维,将单一问题改写为多个相似问题,扩大搜索面,再利用数学统计算法合并结果。 **1. 多路查询改写** + - **核心原理**: 克服用户初始提问词不达意或视角受限的问题。 -- **实现指南**: 使用 `app/rag/query_transform.py` 中的 `MultiQueryTransformer` 类,配置 `num_queries=3` 生成 3 个不同角度的查询。 +- **实现指南**: 使用 `app/rag/query_transform.py` 中的 `MultiQueryGenerator` 类,配置 `num_queries=3` 生成 3 个不同角度的查询。 + +```python +from app.rag.query_transform import MultiQueryGenerator + +generator = MultiQueryGenerator(llm=llm, num_queries=3) +queries = await generator.agenerate("如何申请项目资金?") +# 返回:["如何申请项目资金?", "项目资金申请流程是什么?", "申请项目经费需要哪些步骤?"] +``` **2. 倒数排名融合 (RRF)** -- **核心原理**: RRF (Reciprocal Rank Fusion) 是一种无需评分归一化的融合算法。公式为 $RRF\_score(d) = \sum_{q \in Q} \frac{1}{k + rank_q(d)}$,有效避免某一极端检索结果主导全局。 -- **实现指南**: 使用 `app/rag/retriever.py` 中的 `create_ensemble_retriever` 函数,配置 `search_type="rrf"` 实现倒数排名融合。 + +- **核心原理**: RRF (Reciprocal Rank Fusion) 是一种无需评分归一化的融合算法。公式为 `RRF_score(d) = Σ 1/(k + rank_q(d))`,有效避免某一极端检索结果主导全局。 +- **实现指南**: 使用 `app/rag/fusion.py` 中的 `reciprocal_rank_fusion` 函数,配置 `k=60` 实现倒数排名融合。 + +```python +from app.rag.fusion import reciprocal_rank_fusion + +# 多个查询的检索结果 +doc_lists = [result1, result2, result3] +fused_docs = reciprocal_rank_fusion(doc_lists, k=60) +``` ### Level 4: Agentic RAG / Self-RAG (智能体与自我反思) -- **核心原理**: 基于 LangGraph 的 ReAct (Reasoning and Acting) 状态机路由。大模型并非每次都去死板地执行检索,而是先判断问题:“这是闲聊?还是需要查知识库?”。如果是后者,模型输出一个 `ToolCall` 指令,触发检索。 + +- **核心原理**: 基于 LangGraph 的 ReAct (Reasoning and Acting) 状态机路由。大模型并非每次都去死板地执行检索,而是先判断问题:"这是闲聊?还是需要查知识库?"。如果是后者,模型输出一个 `ToolCall` 指令,触发检索。 - **实现指南**: 使用 `app/rag/tools.py` 中的 `search_knowledge_base` 工具,将其绑定到 LangGraph 状态机中。 - **示意图**: - ```mermaid - sequenceDiagram - participant User - participant LangGraph Agent - participant RAG_Tool - participant Qdrant - - User->>LangGraph Agent: "公司报销流程是什么?" - LangGraph Agent->>LangGraph Agent: 思考: 这是一个内部规章问题,需要查资料 - LangGraph Agent->>RAG_Tool: ToolCall(search_knowledge_base, "公司报销流程") - RAG_Tool->>Qdrant: RAG-Fusion & 混合检索 - Qdrant-->>RAG_Tool: 原始分块 - RAG_Tool->>RAG_Tool: Cross-Encoder 重排过滤 - RAG_Tool-->>LangGraph Agent: 返回最相关的5条报销规定 - LangGraph Agent->>LangGraph Agent: 思考: 资料充分,开始撰写回答 - LangGraph Agent-->>User: "根据知识库规定,报销流程分为以下3步..." - ``` + +``` +┌──────────┐ ┌──────────────┐ ┌──────────┐ ┌──────── +│ User │────>│ LangGraph │────>│ RAG_Tool │────>│ Qdrant │ +│ │ │ Agent │ │ │ │ │ +│ "公司报 │ │ 思考: 这是 │ │ ToolCall │ │ RAG- │ +│ 销流程?"│ │ 内部规章问题 │ │ search_ │ │ Fusion │ +│ │ │ 需要查资料 │ │ knowledge│ │ & 混合 │ +│ │<────│ 资料充分, │<────│ 返回最相 │<────│ 检索 │ +│ "根据知 │ │ 开始撰写回答 │ │ 关5条规定 │ │ Cross- │ +│ 识库规定 │ │ │ │ │ │ Encoder│ +│ ..." │ │ │ │ │ │ 重排 │ +└────────── └────────────── └──────────┘ └────────┘ +``` ### Level 5: GraphRAG 集成 (基于图和关系的 RAG) + - **核心原理**: 结合知识图谱的结构化关系和向量检索的语义相似度,解决跨文档复杂关系推理问题。 - **实现指南**: - 使用 `langchain_community.graphs` 模块构建知识图谱 - - 配置本地大模型(如 `Gemma-4-E2B`)用于实体关系抽取 + - 配置本地大模型(如 `Gemma-4-E4B`)用于实体关系抽取 - 实现混合检索逻辑,结合向量相似度和图路径分析 ---- +```python +from langchain_community.graphs import Neo4jGraph +from langchain_experimental.graph_transformers import LLMGraphTransformer -## 📦 所需依赖与安装 +# 实体关系抽取 +transformer = LLMGraphTransformer(llm=local_llm) +graph_documents = transformer.convert_to_graph_documents(documents) -除了基础的 LangChain 包外,在线检索模块为了支持重排和稀疏检索,还需要安装: - -```bash -# 用于 Cross-Encoder 重排序模型 (如 BAAI/bge-reranker-base) -pip install sentence-transformers - -# 用于 BM25 关键词混合检索 -pip install rank_bm25 - -# 基础框架 -pip install langchain langchain-core langchain-openai langchain-qdrant - -# 与 rag_indexer 共享的依赖 -pip install qdrant-client httpx +# 存储到图数据库 +graph = Neo4jGraph(url="bolt://localhost:7687") +graph.add_graph_documents(graph_documents) ``` ---- +## 🔧 核心组件详解 -## 📂 架构与文件结构设计 +### 1. 检索器 (retriever.py) -``` -app/rag/ -├── __init__.py -├── retriever.py # 负责 Qdrant 的基础召回与混合检索 -├── reranker.py # 负责加载 sentence-transformers 交叉编码器 -├── query_transform.py # 负责基于 MultiQueryRetriever 的改写逻辑 -├── pipeline.py # 组合上述组件,暴露出核心的 retrieve() 方法 -└── tools.py # 将 Pipeline 包装成 LangChain Tool 供 Agent 调用 +提供基于 Qdrant 的向量检索能力。 + +**基础检索器**: +```python +from app.rag.retriever import create_base_retriever + +retriever = create_base_retriever( + collection_name="rag_documents", + embeddings=embeddings, + search_kwargs={"k": 20} +) ``` ---- +**混合检索器**: +```python +from app.rag.retriever import create_hybrid_retriever -## 🔄 与 rag_indexer 集成 +retriever = create_hybrid_retriever( + collection_name="rag_documents", + embeddings=embeddings, + dense_k=10, + sparse_k=10, + score_threshold=0.3 +) +``` -### 数据结构兼容性 -- **向量存储**: rag_indexer 使用 Qdrant 存储子块向量,app/rag 直接从相同集合读取 -- **文档存储**: rag_indexer 使用 PostgreSQL 存储父块,app/rag 通过 `ParentDocumentRetriever` 关联 -- **嵌入模型**: 共享 `LlamaCppEmbedder` 确保向量空间一致性 +### 2. 多路查询改写 (query_transform.py) -### 配置共享 -- **环境变量**: QDRANT_URL、QDRANT_API_KEY、DB_URI 等配置在两个模块间共享 -- **集合名称**: 默认使用 "rag_documents" 集合,确保数据一致性 +通过 LLM 将用户问题改写为多个不同版本,扩大搜索面。 ---- +```python +from app.rag.query_transform import MultiQueryGenerator -## 🚀 与现有系统整合调用 (Agentic RAG 实现) +generator = MultiQueryGenerator(llm=llm, num_queries=3) +queries = await generator.agenerate("如何申请项目资金?") +``` -基于目前 LangGraph 系统的架构,我们将摒弃将代码堆砌在一起的旧方式,而是利用 **LangChain Tools** 的特性将 RAG 优雅地注入系统: +### 3. RRF 融合算法 (fusion.py) -1. **封装检索工具 (Tool)**: - 从 `langchain.tools` 导入 `@tool` 装饰器。定义一个名为 `search_knowledge_base(query: str)` 的函数。在函数内部,实例化并调用我们在 `pipeline.py` 中写好的多路召回与重排逻辑。 +Reciprocal Rank Fusion 算法,公式:`RRF_score(d) = Σ 1/(k + rank_q(d))` -2. **模型绑定 (Bind)**: - 在 `app/agent.py` 或 `app/nodes/tool_call.py` 中,将这个工具引入,并通过 `llm.bind_tools([search_knowledge_base])` 绑定到现有的本地大模型实例上。 +```python +from app.rag.fusion import reciprocal_rank_fusion -3. **状态机路由 (Graph Routing)**: - 你的 LangGraph 状态机会像处理普通对话一样自动接管:当模型判断需要调用查阅规章制度或专业资料时,它会输出 `ToolCall` 消息,流转到 `tool_node` 执行上述的 RAG 检索逻辑并返回上下文。 - -这让你无需修改任何前端 Streamlit 流式代码,就能平滑升级为具备超级知识库检索能力的智能体 (Agent)! +# 多个查询的检索结果 +doc_lists = [result1, result2, result3] +fused_docs = reciprocal_rank_fusion(doc_lists, k=60) +``` ---- +### 4. 重排序器 (reranker.py) -## 🎯 快速开始 +使用 llama.cpp 服务的 OpenAI 兼容 Rerank API 对检索结果重排序。 + +```python +from app.rag.reranker import LLaMaCPPReranker + +reranker = LLaMaCPPReranker( + base_url="http://127.0.0.1:8083", + api_key="your-api-key", + top_n=5 +) +sorted_docs = reranker.compress_documents(documents, query) +``` + +### 5. RAG 流水线 (pipeline.py) + +组合上述组件的完整检索流水线。 + +```python +from app.rag.pipeline import RAGPipeline + +pipeline = RAGPipeline( + retriever=retriever, + llm=llm, + num_queries=3, + rerank_top_n=5, +) + +# 异步检索 +docs = await pipeline.aretrieve("如何申请项目资金?") + +# 格式化上下文 +context = pipeline.format_context(docs) +``` + +## 🔄 与 Agent 系统集成 + +### 封装为 LangChain Tool + +```python +from langchain_core.tools import tool +from app.rag.pipeline import RAGPipeline + +@tool +def search_knowledge_base(query: str) -> str: + """搜索知识库获取相关信息""" + docs = pipeline.retrieve(query) + return pipeline.format_context(docs) +``` + +### 绑定到 LangGraph + +```python +from app.graph.graph_builder import GraphBuilder + +# 将 RAG 工具添加到工具列表 +tools = AVAILABLE_TOOLS + [search_knowledge_base] + +# 构建图 +builder = GraphBuilder(llm, tools, tools_by_name) +graph = builder.build().compile(checkpointer=checkpointer) +``` + +## ⚙️ 环境配置 + +| 变量名 | 说明 | 默认值 | +|:-------|:-----|:-------| +| `QDRANT_URL` | Qdrant 向量数据库地址 | `http://127.0.0.1:6333` | +| `QDRANT_API_KEY` | Qdrant API 密钥 | - | +| `LLAMACPP_RERANKER_URL` | llama.cpp 重排序服务地址 | `http://127.0.0.1:8083` | +| `LLAMACPP_API_KEY` | llama.cpp API 密钥 | - | + +## 🚀 快速开始 ```python # 1. 初始化嵌入模型 -from rag_indexer.embedders import LlamaCppEmbedder -embeddings = LlamaCppEmbedder() +from rag_core.embedders import LlamaCppEmbedder +embedder = LlamaCppEmbedder() +embeddings = embedder.as_langchain_embeddings() -# 2. 初始化语言模型(用于 RAG-Fusion) -from langchain_openai import OpenAI -llm = OpenAI( - openai_api_base="http://localhost:8000/v1", - openai_api_key="no-key-needed", - model_name="Qwen2.5-7B-Instruct", - temperature=0.3, +# 2. 创建检索器 +from app.rag.retriever import create_base_retriever +retriever = create_base_retriever( + collection_name="rag_documents", + embeddings=embeddings, + search_kwargs={"k": 20} ) # 3. 创建 RAG 流水线 -from app.rag.pipeline import RAGPipeline, RAGLevel +from app.rag.pipeline import RAGPipeline pipeline = RAGPipeline( - embeddings=embeddings, + retriever=retriever, llm=llm, - config={ - "collection_name": "rag_documents", - "rag_level": RAGLevel.FUSION.value, - "num_queries": 3, - "rerank_top_n": 5, - }, + num_queries=3, + rerank_top_n=5, ) # 4. 执行检索 -result = pipeline.retrieve("如何申请项目资金?") +docs = pipeline.retrieve("如何申请项目资金?") # 5. 格式化上下文 -context = pipeline.format_context(result.documents) +context = pipeline.format_context(docs) print(context) ``` + +## 📊 检索策略对比 + +| 策略 | 优点 | 缺点 | 适用场景 | +|:-----|:-----|:-----|:---------| +| **基础向量检索** | 速度快,语义理解好 | 专有名词匹配差 | 通用问答 | +| **混合检索** | 语义 + 关键词匹配 | 需要配置稀疏向量 | 专业术语查询 | +| **多路改写 + RRF** | 搜索面广,结果稳定 | 延迟略高 | 复杂问题 | +| **重排序** | 精度高 | 依赖额外模型 | 最终精排 | + +## 🤝 与 rag_indexer 集成 + +- **向量存储**:共享 Qdrant 集合,确保嵌入模型一致 +- **文档存储**:使用 PostgreSQL 存储父块,通过 UUID 映射 +- **集合名称**:默认使用 `rag_documents` 集合 + +详见 [rag_indexer/README.md](../../rag_indexer/README.md) diff --git a/docker/Dockerfile.backend b/docker/Dockerfile.backend index a42a6ac..8ad07ae 100644 --- a/docker/Dockerfile.backend +++ b/docker/Dockerfile.backend @@ -29,6 +29,13 @@ ENV DEBUG=false # ============================================================================= # 安装依赖 # ============================================================================= +# 复制本地模型文件到镜像 +COPY models/*.whl /tmp/models/ + +# 安装 +RUN pip install --no-cache-dir /tmp/models/*.whl && \ + rm -rf /tmp/models + # 设置 pip 国内镜像源 RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple diff --git a/docker/models/en_core_web_sm-3.8.0-py3-none-any.whl b/docker/models/en_core_web_sm-3.8.0-py3-none-any.whl new file mode 100644 index 0000000..532ead7 Binary files /dev/null and b/docker/models/en_core_web_sm-3.8.0-py3-none-any.whl differ diff --git a/docker/models/zh_core_web_sm-3.8.0-py3-none-any.whl b/docker/models/zh_core_web_sm-3.8.0-py3-none-any.whl new file mode 100644 index 0000000..8b945b3 Binary files /dev/null and b/docker/models/zh_core_web_sm-3.8.0-py3-none-any.whl differ diff --git a/rag_indexer/README.md b/rag_indexer/README.md index c4259c8..bad2e8d 100644 --- a/rag_indexer/README.md +++ b/rag_indexer/README.md @@ -2,88 +2,202 @@ 该模块负责 RAG 系统的阶段一:**离线索引构建**。它将外部的非结构化数据(如文档、PDF、网页等)清洗、切分并转化为向量,最终存入向量数据库中。 +## 🎯 核心架构 + +### 技术栈 + +| 组件 | 技术选型 | 版本 | 说明 | +|:-----|:---------|:-----|:-----| +| **文档解析** | `unstructured` | 0.22+ | 多格式文档解析(PDF/DOCX/TXT等) | +| **文本切分** | `langchain-text-splitters` | 内置 | 递归字符切分 + 语义切分 | +| **语义切分** | `langchain-experimental` | 内置 | `SemanticChunker` 基于句子相似度 | +| **嵌入模型** | `llama.cpp` | 本地服务 | `embeddinggemma-300M` GGUF 模型 | +| **向量数据库** | `Qdrant` | 1.17+ | HNSW 索引,支持稠密/稀疏向量 | +| **文档存储** | `PostgreSQL` | 16+ | 异步连接池,持久化父块 | +| **编排框架** | `asyncio` | Python 3.10+ | 异步批量处理与重试 | + +### 数据流向总览 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ builder.py │ +│ IndexBuilder 入口 │ +└──────────────────────┬──────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ loaders.py │ +│ DocumentLoader.load_file() │ +│ → 返回 List[Document] │ +└──────────────────────┬──────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ ParentDocumentRetriever │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ parent_splitter (粗切) │ │ +│ │ 父块 ~1000 字符 │ │ +│ └──────────────────────┬──────────────────────────────┘ │ +│ │ │ +│ ┌──────────────────────▼──────────────────────────────┐ │ +│ │ child_splitter (细切) │ │ +│ │ 子块 ~200 字符 │ │ +│ └──────────────────────┬──────────────────────────────┘ │ +│ │ │ +│ ┌─────────────┴─────────────┐ │ +│ ▼ ▼ │ +│ ┌─────────────┐ ┌─────────────────┐ │ +│ │ 子块向量 │ │ 父块原始内容 │ │ +│ │ │ │ │ │ +│ ▼ │ ▼ │ │ +│ ┌────────────┐ │ ┌─────────────────┐ │ │ +│ │vector_store│ │ │ store/ │ │ │ +│ │ (Qdrant) │ │ │ (PostgreSQL) │ │ │ +│ └──────────── │ └─────────────────┘ │ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### 技术特性 + +- ✅ **多格式支持**:PDF、DOCX、TXT、MD、HTML、PPTX、XLSX、JSON +- ✅ **三种切分策略**:递归字符切分、语义切分、父子块策略 +- ✅ **Parent-Child 架构**:子块精准检索,父块完整上下文 +- ✅ **PostgreSQL DocStore**:持久化存储父块,支持异步连接池 +- ✅ **批量写入与重试**:自动处理网络波动,确保索引完整性 +- ✅ **上下文管理器**:支持同步/异步资源管理 + +## 📂 架构与文件结构 + +``` +rag_indexer/ +├── __init__.py +├── cli.py # 命令行入口 +├── index_builder.py # 索引构建主流水线 +├── loaders.py # 文档加载器(多格式支持) +├── splitters.py # 文本切分器(递归/语义/父子块) +├── embedders.py # 嵌入模型封装 +├── vector_store.py # Qdrant 向量存储 +├── store/ +│ ├── __init__.py +│ ├── factory.py # DocStore 工厂函数 +│ └── postgres.py # PostgreSQL DocStore 实现 +└── test/ # 测试脚本 +``` + ## 🎯 演进路线与核心算法 (Roadmap) ### Level 1: 基础暴力切分 (Basic Recursive Splitting) + - **核心算法**: 递归字符切分。它按照预定义的分隔符列表(如 `["\n\n", "\n", "。", "!", "?", " ", ""]`)从大到小尝试切分文本,直到每块的大小满足最大长度限制。 - **优缺点**: 实现极简单,速度快。但非常容易将一句话拦腰截断,导致上下文语义丢失。 - **实现指南**: - - 从 `langchain_text_splitters` 导入 `RecursiveCharacterTextSplitter`。 - - 实例化时设置 `chunk_size`(如 500)和 `chunk_overlap`(如 50),直接调用 `.split_documents(raw_docs)` 方法。 + - 从 `langchain_text_splitters` 导入 `RecursiveCharacterTextSplitter` + - 实例化时设置 `chunk_size`(如 500)和 `chunk_overlap`(如 50) + - 直接调用 `.split_documents(raw_docs)` 方法 + +```python +from langchain_text_splitters import RecursiveCharacterTextSplitter + +splitter = RecursiveCharacterTextSplitter( + chunk_size=500, + chunk_overlap=50, + separators=["\n\n", "\n", "。", "!", "?", " ", ""] +) +chunks = splitter.split_documents(documents) +``` ### Level 2: 语义动态切分 (Semantic Chunking) + - **核心算法**: 句子级相似度阈值算法。 - 1. 将文章按标点符号按句子拆分。 - 2. 使用轻量级 Embedding 模型将每一句向量化。 - 3. 计算相邻两句之间的余弦相似度 (Cosine Similarity)。 - 4. 当相似度低于设定阈值时(说明两句话讲的不是同一件事,语义发生了转折),在此处"切断"形成一个新的块。 + 1. 将文章按标点符号按句子拆分 + 2. 使用轻量级 Embedding 模型将每一句向量化 + 3. 计算相邻两句之间的余弦相似度 (Cosine Similarity) + 4. 当相似度低于设定阈值时(说明两句话讲的不是同一件事,语义发生了转折),在此处"切断"形成一个新的块 - **优缺点**: 极大程度保留了段落内语义的连贯性,对 LLM 回答非常友好。但由于在切分阶段就需要调用向量模型,耗时略长。 - **实现指南**: - - 从 `langchain_text_splitters` 导入 `TextSplitter` 作为基类。 - - 从 `langchain_experimental.text_splitter` 导入 `SemanticChunker`。 - - 实现 `SemanticChunkerAdapter` 继承 `TextSplitter`,解决类型不兼容问题。 - - 实例化时需要传入你已经配置好的 Embedding 模型实例(如基于 `LlamaCppEmbedder` 封装的本地模型)。 + - 从 `langchain_experimental.text_splitter` 导入 `SemanticChunker` + - 实现 `SemanticChunkerAdapter` 继承 `TextSplitter`,解决类型不兼容问题 + - 实例化时需要传入已配置好的 Embedding 模型实例 + +```python +from langchain_experimental.text_splitter import SemanticChunker + +chunker = SemanticChunker( + embeddings=embeddings, + breakpoint_threshold_type="percentile", + breakpoint_threshold_amount=95, + min_chunk_size=100 +) +chunks = chunker.split_documents(documents) +``` ### Level 3: 高级父子块策略 (Parent-Child / Auto-merging) + - **核心算法**: 层次化双重存储与映射。 - - **切分机制**: 首先将文档粗切为较大的"父块 (Parent Chunk, 约 1000 字符)",随后将父块细切为较小的"子块 (Child Chunk, 约 200 字符)"。 - - **存储机制**: 仅仅将**子块**的向量存入 Qdrant 用于精准计算距离;将**父块**的原始内容存在 PostgreSQL DocStore 中,通过 UUID 相互映射。 + - **切分机制**: 首先将文档粗切为较大的"父块 (Parent Chunk, 约 1000 字符)",随后将父块细切为较小的"子块 (Child Chunk, 约 200 字符)" + - **存储机制**: 仅仅将**子块**的向量存入 Qdrant 用于精准计算距离;将**父块**的原始内容存在 PostgreSQL DocStore 中,通过 UUID 相互映射 - **核心思路**: 解决 RAG 领域经典的矛盾——检索时块越小越容易精确命中(去除噪声);但生成回答时,块越大越能给大模型提供充足的上下文背景。 - **实现指南**: - - 使用 `langchain_classic.retrievers` 中的 `ParentDocumentRetriever` 模块。 - - 在写入时,你需要同时准备一个底层的 `VectorStore` (即 Qdrant) 和一个 `BaseStore`。 - - **推荐方案**: 使用 `PostgresDocStore` 作为 docstore,支持持久化存储。 - - 将两种不同的 `TextSplitter` 分别赋值给检索器的 `child_splitter` 和 `parent_splitter`,然后调用 `.add_documents()` 即可让系统自动完成映射。 + - 使用 `langchain_classic.retrievers` 中的 `ParentDocumentRetriever` 模块 + - 在写入时,需要同时准备一个底层的 `VectorStore` (即 Qdrant) 和一个 `BaseStore` + - **推荐方案**: 使用 `PostgresDocStore` 作为 docstore,支持持久化存储 + - 将两种不同的 `TextSplitter` 分别赋值给检索器的 `child_splitter` 和 `parent_splitter`,然后调用 `.add_documents()` 即可让系统自动完成映射 + +```python +from langchain.retrievers import ParentDocumentRetriever + +retriever = ParentDocumentRetriever( + vectorstore=qdrant_store, + docstore=postgres_docstore, + parent_splitter=parent_splitter, + child_splitter=child_splitter, +) +await retriever.aadd_documents(documents) +``` ### Level 3.1: PostgreSQL DocStore 集成 + - **核心优势**: 利用 PostgreSQL 作为持久化存储,适合生产环境。使用异步连接池,支持高并发。 - **实现步骤**: 1. **配置连接**: 设置 `DB_URI` 环境变量或通过 `docstore_conn_string` 参数指定 2. **创建 docstore**: 使用 `rag_indexer.store.create_docstore()` 工厂函数 3. **注入到 IndexBuilder**: 通过构造函数参数注入 -- **使用示例**: - ```python - from rag_indexer.builder import IndexBuilder, SplitterType +```python +from rag_indexer.store import create_docstore - # 创建 IndexBuilder - builder = IndexBuilder( - collection_name="rag_documents", - splitter_type=SplitterType.PARENT_CHILD, - parent_chunk_size=1000, - child_chunk_size=200, - docstore_conn_string="postgresql://user:pass@host:5432/db", - ) - ``` +docstore, conn_info = create_docstore( + connection_string="postgresql://user:pass@host:5432/db", + pool_config={"min_size": 5, "max_size": 20}, +) +``` ### Level 3.2: 语义切分与父子块策略结合 + - **核心优势**: 结合语义切分的连贯性和父子块策略的层次化存储优势,实现更精准的检索和更丰富的上下文。 - **实现原理**: - **父块切分**: 使用 `RecursiveCharacterTextSplitter` 创建大块(约1000字符),提供完整的上下文背景 - **子块切分**: 使用 `SemanticChunkerAdapter` 创建小块,根据语义连贯性动态切分,提高检索精度 - **存储机制**: 子块向量存入 Qdrant 用于精准检索,父块内容存入 PostgreSQL 提供完整上下文 -- **使用示例**: - ```python - from rag_indexer.builder import IndexBuilder, SplitterType - # 创建 IndexBuilder,结合语义切分与父子块策略 - builder = IndexBuilder( - collection_name="rag_documents", - splitter_type=SplitterType.PARENT_CHILD, - # 父子块配置 - parent_chunk_size=1000, - child_chunk_size=200, - # 子块使用语义切分 - child_splitter_type=SplitterType.SEMANTIC, - # PostgreSQL 存储配置 - docstore_conn_string="postgresql://user:pass@host:5432/db", - ) - ``` -- **配置参数**: - - `child_splitter_type`: 子块切分器类型,可选 `SplitterType.RECURSIVE`(默认)或 `SplitterType.SEMANTIC` - - 当使用语义切分时,系统会自动使用已配置的 Embedding 模型进行句子级相似度计算 +```python +from rag_indexer.index_builder import IndexBuilder, IndexBuilderConfig +from rag_indexer.splitters import SplitterType + +config = IndexBuilderConfig( + collection_name="rag_documents", + splitter_type=SplitterType.PARENT_CHILD, + parent_chunk_size=1000, + child_chunk_size=200, + child_splitter_type=SplitterType.SEMANTIC, # 子块使用语义切分 + docstore=DocstoreConfig( + connection_string="postgresql://user:pass@host:5432/db", + ), +) +``` ### Level 4: GraphRAG(基于图和关系的 RAG) + - **核心算法**: LLM 实体关系抽取 (NER & Relation Extraction)。 - **核心思路**: 解决传统纯向量检索难以处理"跨文档复杂关系推理"的痛点(如:A公司的CEO是谁?他名下的B公司主要业务是什么?这种需要横跨多页 PDF 的跳跃性问题)。 - **实现原理**: @@ -97,11 +211,24 @@ - **集成方式**: 与向量存储并行,形成混合检索系统 - **实现指南**: - 使用 `langchain_community.graphs` 模块 - - 配置本地大模型(如 `Gemma-4-E2B`)用于实体关系抽取 + - 配置本地大模型(如 `Gemma-4-E4B`)用于实体关系抽取 - 构建包含实体和关系的图结构,存储到图数据库 - 实现混合检索逻辑,结合向量相似度和图路径分析 +```python +from langchain_community.graphs import Neo4jGraph +from langchain_experimental.graph_transformers import LLMGraphTransformer + +# 实体关系抽取 +transformer = LLMGraphTransformer(llm=local_llm) +graph_documents = transformer.convert_to_graph_documents(documents) + +# 存储到图数据库 +graph.add_graph_documents(graph_documents) +``` + ### Level 5: 多模态 RAG (Multi-modal RAG) + - **核心算法**: 跨模态嵌入和多模态融合。 - **核心思路**: 突破纯文本限制,支持图像、表格、音频等多种数据类型的理解和检索。 - **实现原理**: @@ -113,168 +240,220 @@ - **存储**: 向量数据库 + 对象存储 - **检索**: 混合向量检索 ---- +## 🔧 核心组件详解 -## 📂 架构与文件结构设计 +### 1. 文档加载器 (loaders.py) -``` -rag_indexer/ -├── __init__.py -├── loaders.py # 负责调用 unstructured 解析不同类型文件 -├── splitters.py # 负责实现 Recursive、Semantic 切分逻辑及适配器 -├── embedders.py # 封装本地 llama.cpp 交互的 Embedding 接口 -├── vector_store.py # 封装 Qdrant 写入、Upsert、Collection 初始化操作 -├── builder.py # 核心编排文件,将上述模块串联成 Pipeline -├── cli.py # 命令行入口 -└── store/ - ├── __init__.py - ├── factory.py # docstore 工厂函数 - └── postgres.py # PostgreSQL DocStore 实现 -``` +使用 `unstructured` 库解析多种文件格式。 ---- - -## 🔄 工作流程详解 - -### 数据流向总览 - -``` - ┌─────────────────────────────────────────┐ - │ builder.py │ - │ IndexBuilder 入口 │ - └─────────────────┬───────────────────────┘ - │ - ┌─────────────────▼───────────────────────┐ - │ loaders.py │ - │ DocumentLoader.load_file() │ - │ → 返回 List[Document] │ - └─────────────────┬───────────────────────┘ - │ - ┌─────────────────▼───────────────────────┐ - │ ParentDocumentRetriever │ - │ ┌─────────────────────────────────┐ │ - │ │ parent_splitter (粗切) │ │ - │ │ 父块 ~1000 字符 │ │ - │ └────────────┬────────────────────┘ │ - │ │ │ - │ ┌────────────▼────────────────────┐ │ - │ │ child_splitter (细切) │ │ - │ │ 子块 ~200 字符 │ │ - │ └────────────┬────────────────────┘ │ - │ │ │ - │ ┌──────────┴──────────┐ │ - │ ▼ ▼ │ - │ 子块向量 父块原始内容 │ - │ │ │ │ - │ ▼ ▼ │ - │ ┌────────────┐ ┌─────────────────┐ │ - │ │vector_store│ │ store/ │ │ - │ │ (Qdrant) │ │ (PostgreSQL) │ │ - │ └────────────┘ └─────────────────┘ │ - └─────────────────────────────────────────┘ -``` - -### 文件职责详解 - -| 文件 | 职责 | 关键类/函数 | -|------|------|------------| -| **builder.py** | 核心编排,负责串联整个流程 | `IndexBuilder` | -| **loaders.py** | 解析各种文档格式(PDF、Word、TXT等) | `DocumentLoader` | -| **splitters.py** | 文本切分策略(Recursive/Semantic)及适配器 | `SplitterType`, `get_splitter()`, `SemanticChunkerAdapter` | -| **embedders.py** | 向量化(封装 llama.cpp embedding 接口) | `LlamaCppEmbedder` | -| **vector_store.py** | Qdrant 向量数据库操作 | `QdrantVectorStore` | -| **store/postgres.py** | PostgreSQL DocStore 实现 | `PostgresDocStore` | -| **store/factory.py** | docstore 工厂函数 | `create_docstore()` | - -### 核心实现细节 - -#### 1. 文本切分 -- **递归切分**: 使用 `langchain_text_splitters.RecursiveCharacterTextSplitter`,支持中文分隔符 -- **语义切分**: 使用 `langchain_experimental.text_splitter.SemanticChunker`,通过 `SemanticChunkerAdapter` 适配 `TextSplitter` 接口 -- **父子块策略**: 父块使用递归切分(1000字符),子块可选择递归或语义切分(200字符) - -#### 2. 向量化 -- **Embedding API**: 使用 `LlamaCppEmbedder` 封装本地 llama.cpp 服务,支持 `embed_documents` 和 `embed_query` 方法 -- **向量维度**: 自动检测模型维度(默认 2560),创建对应大小的 Qdrant 集合 - -#### 3. 向量存储 -- **Qdrant 集成**: 使用 `langchain_qdrant.QdrantVectorStore` 作为底层存储 -- **集合管理**: 自动创建/复用集合,支持 `force_recreate` 参数 -- **批量写入**: 支持 `batch_size` 参数,避免单次请求过大 - -#### 4. 文档存储 -- **PostgreSQL**: 使用 `PostgresDocStore` 持久化存储父块,支持异步连接池 -- **数据映射**: 通过 UUID 将子块与父块关联,检索时返回完整父块 - -### 调用顺序 - -#### 1. 创建 IndexBuilder(入口) +**支持格式**:PDF、DOCX、DOC、TXT、MD、HTML、PPTX、XLSX、JSON ```python -from rag_indexer.builder import IndexBuilder, SplitterType +from rag_indexer.loaders import DocumentLoader -builder = IndexBuilder( - collection_name="my_docs", - splitter_type=SplitterType.PARENT_CHILD, - parent_chunk_size=1000, - child_chunk_size=200, - docstore_conn_string="postgresql://user:pass@host:5432/db", +loader = DocumentLoader( + strategy="auto", # 解析策略:auto/fast/hi_res/ocr_only + ocr_languages=["chi_sim", "eng"], # OCR 语言 + languages=["zh"], # 文档主语言 + extract_images=False, # 是否提取图片 + pdf_infer_table_structure=True, # 是否识别表格 +) + +# 加载单个文件 +docs = loader.load_file("document.pdf") + +# 加载整个目录 +docs = loader.load_directory("./docs/", recursive=True) +``` + +### 2. 文本切分器 (splitters.py) + +提供三种切分策略: + +**递归字符切分**: +```python +from rag_indexer.splitters import SplitterType, get_splitter + +splitter = get_splitter( + SplitterType.RECURSIVE, + chunk_size=500, + chunk_overlap=50, ) ``` -#### 2. 构建索引 +**语义切分**: +```python +splitter = get_splitter( + SplitterType.SEMANTIC, + embeddings=embeddings, + breakpoint_threshold_type="percentile", + min_chunk_size=100, +) +``` + +**父子块策略**:在 `IndexBuilder` 中自动配置。 + +### 3. 索引构建器 (index_builder.py) + +核心编排模块,串联整个索引构建流程。 ```python -import asyncio +from rag_indexer.index_builder import IndexBuilder, IndexBuilderConfig, DocstoreConfig +from rag_indexer.splitters import SplitterType -# 方式A:从单个文件构建 -async def main(): - count = await builder.build_from_file("/path/to/document.pdf") - print(f"已索引 {count} 个块") +# 配置 +config = IndexBuilderConfig( + collection_name="rag_documents", + splitter_type=SplitterType.PARENT_CHILD, + parent_chunk_size=1000, + child_chunk_size=200, + child_splitter_type=SplitterType.SEMANTIC, + docstore=DocstoreConfig( + connection_string="postgresql://user:pass@host:5432/db", + ), +) -# 方式B:从目录批量构建 -async def main(): - count = await builder.build_from_directory("/path/to/docs/") - print(f"已索引 {count} 个块") - -asyncio.run(main()) +# 构建索引 +async with IndexBuilder(config) as builder: + # 从单个文件构建 + count = await builder.build_from_file("document.pdf") + + # 或从目录批量构建 + count = await builder.build_from_directory("./docs/") + + print(f"已索引 {count} 个文档") ``` -#### 3. 检索(获取完整父块上下文) +### 4. 向量存储 (vector_store.py) + +封装 Qdrant 向量数据库操作。 ```python -import asyncio +from rag_core import QdrantVectorStore -async def main(): - # 检索时返回完整父块 - results = await builder.search_with_parent_context("查询内容", k=5) - for doc in results: - print(doc.page_content) +vector_store = QdrantVectorStore( + collection_name="rag_documents", + embeddings=embeddings, +) -asyncio.run(main()) +# 创建集合 +vector_store.create_collection() + +# 添加文档 +vector_store.add_documents(chunks) ``` -### 检索流程 +### 5. PostgreSQL DocStore (store/postgres.py) -``` -1. vector_store.similarity_search() → 从 Qdrant 找到相关子块 -2. retriever.get_relevant_documents() → 根据子块 ID 获取对应父块 -3. 返回完整父块给用户 +持久化存储父块内容,支持异步连接池。 + +```python +from rag_core.store import create_docstore + +docstore, conn_info = create_docstore( + connection_string="postgresql://user:pass@host:5432/db", + pool_config={"min_size": 5, "max_size": 20}, +) ``` ---- +## 📊 切分策略对比 -### 串联与触发方式 -使用 `cli.py` 入口脚本: +| 策略 | 原理 | 优点 | 缺点 | 适用场景 | +|:-----|:-----|:-----|:-----|:---------| +| **递归字符** | 按分隔符递归切分 | 速度快,实现简单 | 可能截断语义 | 简单文档 | +| **语义切分** | 基于句子相似度阈值 | 语义连贯性好 | 需要 Embedding 模型 | 专业文档 | +| **父子块** | 大块存储+小块检索 | 检索精准+上下文完整 | 存储复杂度高 | 生产环境 | + +## 🚀 快速开始 + +### 命令行方式 ```bash # 设置环境变量 export QDRANT_URL="http://115.190.121.151:6333" -export QDRANT_API_KEY="your-api-key" export DB_URI="postgresql://postgres:password@host:5432/langgraph_db?sslmode=disable" # 执行索引构建 -python -m rag_indexer.cli --path data/user_docs/tech_manual.pdf +python -m rag_indexer.cli --path data/user_docs/ --recursive ``` -这相当于系统后台的**"离线学习阶段"**,你可以随时挂载定时任务去扫描文件夹,增量更新知识库。 +### Python API 方式 + +```python +import asyncio +from rag_indexer.index_builder import IndexBuilder, IndexBuilderConfig, DocstoreConfig +from rag_indexer.splitters import SplitterType + +async def main(): + config = IndexBuilderConfig( + collection_name="rag_documents", + splitter_type=SplitterType.PARENT_CHILD, + parent_chunk_size=1000, + child_chunk_size=200, + child_splitter_type=SplitterType.SEMANTIC, + ) + + async with IndexBuilder(config) as builder: + count = await builder.build_from_directory("./user_docs/") + print(f"索引构建完成,共处理 {count} 个文档") + +asyncio.run(main()) +``` + +## ⚙️ 环境配置 + +| 变量名 | 说明 | 默认值 | +|:-------|:-----|:-------| +| `QDRANT_URL` | Qdrant 向量数据库地址 | `http://127.0.0.1:6333` | +| `QDRANT_API_KEY` | Qdrant API 密钥 | - | +| `DB_URI` | PostgreSQL 连接字符串 | - | +| `LLAMACPP_EMBEDDING_URL` | Embedding 服务地址 | `http://127.0.0.1:8082/v1` | +| `LLAMACPP_API_KEY` | llama.cpp API 密钥 | - | + +## 🔄 与 app/rag 集成 + +- **向量存储**:共享 Qdrant 集合,确保嵌入模型一致 +- **文档存储**:父块存入 PostgreSQL,通过 UUID 与子块关联 +- **集合名称**:默认使用 `rag_documents` 集合 +- **嵌入模型**:使用相同的 `LlamaCppEmbedder` 确保向量空间一致 + +详见 [app/rag/README.md](../app/rag/README.md) + +## 📝 高级配置 + +### 自定义切分参数 + +```python +config = IndexBuilderConfig( + collection_name="my_docs", + splitter_type=SplitterType.PARENT_CHILD, + parent_chunk_size=1500, # 更大的父块 + child_chunk_size=300, # 更大的子块 + parent_chunk_overlap=150, # 父块重叠 + child_chunk_overlap=30, # 子块重叠 + search_k=10, # 检索返回数量 +) +``` + +### 批量处理与重试 + +索引构建器内置自动重试机制,处理网络波动: + +- 最大重试次数:5 次 +- 退避策略:指数退避(2s, 4s, 8s, 16s, 32s) +- 批量大小:10 个文档/批次 + +### 资源管理 + +```python +# 方式一:上下文管理器(推荐) +async with IndexBuilder(config) as builder: + await builder.build_from_directory("./docs/") + +# 方式二:手动管理 +builder = IndexBuilder(config) +try: + await builder.build_from_directory("./docs/") +finally: + await builder.aclose() +```