# AI Agent - 智能助手系统 一个基于 LangGraph + FastAPI 的智能对话助手,支持多模型切换、RAG 知识库检索、文件处理和网页抓取等功能。 --- ## 📑 目录导航 - [核心功能](#-核心功能) - 面向用户的功能和技术特性 - [技术架构](#️-技术架构) - 技术栈、系统架构图、工作流流程图 - [核心算法与实现原理](#-核心算法与实现原理) - RAG 检索、LangGraph 工作流、多模型路由、SSE 流式响应 - [RAG 系统完整架构](#-rag-系统完整架构) - 离线索引构建、在线检索生成、演进路线 - [快速开始](#-快速开始) - Docker 和本地部署指南 - [使用指南](#-使用指南) - 基础对话、工具调用、多模型切换 - [开发指南](#-开发指南) - 添加工具、添加模型、Docker 部署 - [实现指南与最佳实践](#️-实现指南与最佳实践) - RAG 构建、性能优化、扩展开发、部署实践 - [环境配置](#️-环境配置) - 配置文件、环境变量 - [故障排查](#-故障排查) - 常见问题 --- ## 🎯 核心功能 ### 面向用户的功能 - 💬 **智能对话**:支持多轮对话,自动记忆上下文 - 🌤️ **天气查询**:实时获取各地天气信息 - 📄 **文档处理**:读取 TXT、PDF、Excel 等格式文件 - 🌐 **网页抓取**:提取网页正文内容 - 🔍 **知识库检索(RAG)**:基于向量数据库的智能问答 - 🔄 **多模型切换**:前端可选择不同大语言模型 - 📇 **通讯录管理**:子图模块,联系人 CRUD、邮件处理 - 📖 **智能词典**:子图模块,翻译、生词本、专业术语提取 - 📰 **资讯分析**:子图模块,资讯获取、内容分析、格式化展示 ### 技术特性 - ✅ **持久化记忆**:PostgreSQL 存储对话历史,重启不丢失 - ✅ **高可用架构**:模型服务自动降级,确保服务稳定 - ✅ **前后端分离**:FastAPI 后端 + Streamlit 前端 - ✅ **Docker 部署**:一键启动所有服务 - ✅ **远程服务架构**:PostgreSQL 和 Qdrant 部署在远程服务器 - ✅ **流式响应**:SSE 流式输出,提升用户体验 - ✅ **模块化设计**:清晰的代码分层,易于扩展和维护 - ✅ **模型服务层**:统一的 Embedding、Rerank、Chat 服务接口,支持自动降级 - ✅ **子图系统**:模块化的子图架构,共享公共工具(意图理解、人工审核、格式化输出) - ✅ **React 模式**:循环推理 + 超时重试 + 结构化错误处理 --- ## 🏗️ 技术架构 ### 技术栈总览 | 层级 | 组件 | 技术选型 | 版本 | 说明 | |------|------|---------|------|------| | **LLM 服务** | 云端模型 | 智谱 AI (glm-4.7-flash) | v4.7 | 快速响应,适合日常对话 | | | | DeepSeek (deepseek-reasoner) | v3 | 深度推理,适合复杂问题 | | | 本地模型 | Gemma-4-E4B-it | v4 | 本地部署,保护隐私 | | **模型服务层** | Chat 服务 | chat_services.py | - | 统一的生成式大模型接口 | | | Embedding 服务 | embedding_services.py | - | 统一的嵌入模型接口 | | | Rerank 服务 | rerank_services.py | - | 统一的重排序接口 | | | 服务基类 | base.py | - | BaseServiceProvider + FallbackServiceChain + SingletonServiceManager | | **Embedding** | 向量嵌入 | llama.cpp server | latest | 本地 embedding 服务,支持多种模型 | | **Agent 框架** | 工作流编排 | LangGraph + LangChain | latest | 状态机驱动的智能体工作流 | | **子图系统** | 模块化子图 | agent_subgraphs/ | - | 通讯录、词典、资讯分析等子图 | | | 公共工具 | common/ | - | 状态基类、意图理解、格式化输出、人工审核 | | **向量数据库** | 向量检索 | Qdrant | v1.12+ | 高性能向量相似度检索(远程服务器) | | **后端框架** | API 服务 | FastAPI + Uvicorn | v0.115+ | RESTful API + WebSocket + SSE 流式输出 | | **前端框架** | Web 界面 | Streamlit | v1.40+ | 交互式对话界面,组件化设计 | | **关系数据库** | 持久化存储 | PostgreSQL | v16 | 对话记忆持久化(远程服务器) | | **容器化** | 服务编排 | Docker + Docker Compose | v24+ | 一键部署所有服务 | | **CI/CD** | 自动化部署 | Gitea Workflows | latest | 代码推送自动构建部署 | ### 系统架构流程图 ```mermaid graph TB User[用户浏览器] -->|HTTP/SSE| Frontend[Streamlit 前端 :8501] Frontend -->|REST API| Backend[FastAPI 后端 :8083] Backend --> AgentService[AIAgentService] Backend --> SubgraphAPI[子图 API 端点] AgentService -->|模型路由| ChatServices[模型服务层 chat_services] ChatServices -->|自动降级| FallbackChain[FallbackServiceChain] FallbackChain -->|创建| Zhipu[智谱 GLM-4.7] FallbackChain -->|创建| DeepSeek[DeepSeek Reasoner] FallbackChain -->|创建| LocalGemma[本地 Gemma-4] AgentService -->|初始化| LangGraph[LangGraph 工作流引擎] LangGraph -->|节点1| RetrieveMemory[记忆检索] LangGraph -->|节点2| MemoryTrigger[记忆触发] LangGraph -->|节点3| LLMCall[LLM 调用 (React 模式)] LangGraph -->|节点4| ToolNode[工具执行] LangGraph -->|节点5| Summarize[记忆摘要] LangGraph -->|节点6| Finalize[最终处理] ToolNode -->|调用| Tools[工具集合] Tools -->|天气| WeatherTool[天气查询] Tools -->|文件| FileTool[文件读取] Tools -->|网页| WebTool[网页抓取] Tools -->|RAG| RAGTool[知识库检索] RAGTool -->|检索| Qdrant[(Qdrant 向量库)] RAGTool -->|重排序| RerankService[rerank_services] RAGTool -->|嵌入| EmbeddingService[embedding_services] RetrieveMemory -->|存储| PostgreSQL[(PostgreSQL)] Summarize -->|存储| PostgreSQL SubgraphAPI -->|路由| Subgraphs[子图系统 agent_subgraphs] Subgraphs --> Contact[通讯录子图] Subgraphs --> Dictionary[词典子图] Subgraphs --> NewsAnalysis[资讯分析子图] Subgraphs --> Common[公共工具 common/] Common --> Intent[意图理解 (React)] Common --> HumanReview[人工审核] Common --> Formatter[格式化输出] Common --> StateBase[状态基类] style User fill:#e1f5ff style Frontend fill:#fff4e1 style Backend fill:#e8f5e9 style ChatServices fill:#c8e6c9 style LangGraph fill:#f3e5f5 style Subgraphs fill:#fff3e0 style PostgreSQL fill:#ffebee style Qdrant fill:#ffebee ``` ### LangGraph 工作流详细流程 ```mermaid stateDiagram-v2 [*] --> RetrieveMemory: 用户输入消息 RetrieveMemory --> MemoryTrigger: 检索历史记忆 MemoryTrigger --> CheckMemory: 检查是否需要触发记忆 CheckMemory --> LLMCall: 记忆充足 CheckMemory --> Summarize: 需要生成摘要 Summarize --> PostgreSQL: 保存摘要 PostgreSQL --> LLMCall: 继续对话 LLMCall --> CheckTools: LLM 输出 CheckTools --> ToolNode: 需要调用工具 CheckTools --> Finalize: 直接回复 ToolNode --> ExecuteTool: 执行工具 ExecuteTool --> LLMCall: 工具结果返回 Finalize --> FormatResponse: 格式化响应 FormatResponse --> [*]: SSE 流式输出 ``` ### 数据流向图 ``` 用户请求 │ ├─→ 前端 Streamlit │ │ │ ├─→ 发送 HTTP POST /api/chat │ │ │ └─→ 接收 SSE 流式响应 │ └─→ 后端 FastAPI │ ├─→ AIAgentService.achat() │ │ │ ├─→ 初始化 LangGraph 状态 │ │ ├─→ messages: 对话历史 │ │ ├─→ user_id: 用户标识 │ │ └─→ metadata: 元数据 │ │ │ ├─→ 执行工作流 │ │ ├─→ retrieve_memory: 从 PostgreSQL 检索历史 │ │ ├─→ memory_trigger: 判断是否触发记忆 │ │ ├─→ llm_call: 调用 LLM 生成响应 │ │ ├─→ tool_node: 执行工具(如有需要) │ │ ├─→ summarize: 生成对话摘要 │ │ └─→ finalize: 格式化最终响应 │ │ │ └─→ 返回 SSE 流 │ └─→ 持久化存储 ├─→ PostgreSQL: 对话历史和摘要 └─→ Qdrant: RAG 向量索引 ``` ### 项目结构 ``` Agent1/ ├── backend/ │ ├── app/ │ │ ├── __init__.py │ │ ├── config.py # 配置管理 │ │ ├── logger.py # 日志工具 │ │ ├── backend.py # FastAPI 后端应用(含子图 API) │ │ ├── agent/ │ │ │ ├── __init__.py │ │ │ ├── service.py # Agent 服务核心(使用 chat_services) │ │ │ ├── history.py # 历史查询服务 │ │ │ ├── prompts.py # 提示模板 │ │ │ └── rag_initializer.py # RAG 工具初始化 │ │ ├── model_services/ # 模型服务层(新增) │ │ │ ├── __init__.py │ │ │ ├── base.py # 基类:BaseServiceProvider, FallbackServiceChain, SingletonServiceManager │ │ │ ├── chat_services.py # 生成式大模型服务(替代 llm_factory) │ │ │ ├── embedding_services.py # 嵌入模型服务 │ │ │ └── rerank_services.py # 重排序服务 │ │ ├── graph/ │ │ │ ├── __init__.py │ │ │ ├── graph_builder.py # LangGraph 图构建器 │ │ │ ├── graph_tools.py # 工具定义 │ │ │ ├── state.py # 状态定义 │ │ │ ├── rag_nodes.py # RAG 集成节点 │ │ │ └── retrieve_memory.py # 记忆检索 │ │ ├── nodes/ │ │ │ ├── __init__.py │ │ │ ├── router.py # 路由决策 │ │ │ ├── llm_call.py # LLM 调用节点(React 模式) │ │ │ ├── tool_call.py # 工具执行节点 │ │ │ ├── retrieve_memory.py # 记忆检索节点 │ │ │ ├── summarize.py # 记忆存储节点 │ │ │ ├── finalize.py # 最终处理节点 │ │ │ └── memory_trigger.py # 记忆触发节点 │ │ ├── memory/ │ │ │ ├── __init__.py │ │ │ └── mem0_client.py # Mem0 客户端封装 │ │ ├── rag/ │ │ │ ├── __init__.py │ │ │ ├── retriever.py # 检索器 │ │ │ ├── rerank.py # 重排序业务逻辑(使用 rerank_services) │ │ │ ├── query_transform.py # 查询转换 │ │ │ ├── pipeline.py # RAG 流水线 │ │ │ ├── fusion.py # RAG-Fusion │ │ │ └── tools.py # RAG 工具 │ │ ├── agent_subgraphs/ # 子图模块(新增) │ │ │ ├── __init__.py │ │ │ ├── README.md # 子图文档 │ │ │ ├── common/ # 公共工具 │ │ │ │ ├── state_base.py # 状态基类 │ │ │ │ ├── intent.py # 意图理解(React 模式) │ │ │ │ ├── formatter.py # 格式化输出工具 │ │ │ │ └── human_review.py # 人工审核节点 │ │ │ ├── contact/ # 通讯录子图 │ │ │ │ ├── state.py # 状态定义 │ │ │ │ ├── nodes.py # 节点实现 │ │ │ │ ├── graph.py # 图构建 │ │ │ │ ├── api_client.py # API 客户端 │ │ │ │ └── __init__.py │ │ │ ├── dictionary/ # 词典子图 │ │ │ │ ├── state.py # 状态定义 │ │ │ │ ├── nodes.py # 节点实现 │ │ │ │ ├── graph.py # 图构建 │ │ │ │ ├── api_client.py # API 客户端 │ │ │ │ └── __init__.py │ │ │ ├── news_analysis/ # 资讯分析子图 │ │ │ │ ├── state.py # 状态定义 │ │ │ │ ├── nodes.py # 节点实现 │ │ │ │ ├── graph.py # 图构建 │ │ │ │ ├── api_client.py # API 客户端 │ │ │ │ └── __init__.py │ │ │ └── research/ # 研究分析子图(规划中) │ │ └── utils/ │ │ └── __init__.py │ └── rag_core/ │ ├── __init__.py │ ├── client.py # RAG 核心客户端 │ ├── embedders.py # 嵌入模型 │ ├── vector_store.py # 向量存储 │ ├── retriever_factory.py # 检索器工厂 │ └── store/ │ ├── __init__.py │ ├── factory.py # 存储工厂 │ └── postgres.py # PostgreSQL 存储 ├── frontend/ │ ├── run.py # 前端启动脚本 │ ├── requirements.txt │ └── src/ │ ├── __init__.py │ ├── frontend_main.py # Streamlit 主入口 │ ├── api_client.py # API 客户端(含子图调用) │ ├── config.py # 前端配置 │ ├── state.py # 状态管理 │ ├── logger.py # 日志 │ ├── utils.py # 工具函数 │ └── components/ │ ├── __init__.py │ ├── sidebar.py # 侧边栏组件 │ ├── chat_area.py # 聊天区域组件 │ └── info_panel.py # 信息面板组件 ├── rag_indexer/ │ ├── __init__.py │ ├── cli.py # 命令行入口 │ ├── index_builder.py # 索引构建器 │ ├── loaders.py # 文档加载器 │ ├── splitters.py # 文本切分器 │ └── test/ # 测试脚本 ├── docker/ │ ├── docker-compose.yml # Docker 服务编排 │ ├── backend/ │ │ └── Dockerfile # 后端镜像构建 │ ├── frontend/ │ │ └── Dockerfile # 前端镜像构建 │ └── models/ # spaCy 模型文件 ├── scripts/ │ └── start.sh # 启动脚本 ├── test/ # 测试目录 │ ├── test_backend.py │ ├── test_rag.py │ ├── test_qdrant.py │ └── test_rag_indexer_result.py ├── .gitea/ │ └── workflows/ │ └── deploy.yml # CI/CD 自动化部署 ├── requirement.txt # Python 依赖 ├── .env.docker # Docker 环境变量模板 ├── .gitignore ├── LICENSE ├── QUICKSTART.md # 快速开始指南 └── user_docs/ # 用户文档目录 ``` --- ## 🧠 核心算法与实现原理 ### 1. RAG 检索算法 #### 1.1 文本切分策略 项目支持三种文本切分策略,适应不同场景需求: **递归字符切分(Recursive Character Splitting)** ``` 算法思路: 按优先级分隔符逐级切分:["\n\n", "\n", "。", "!", "?", " ", ""] ↓ 确保每个块不超过 chunk_size(默认 500 字符) ↓ 保留 chunk_overlap(默认 50 字符)避免上下文丢失 优势:简单高效,适合结构化文档 实现:langchain_text_splitters.RecursiveCharacterTextSplitter ``` **语义切分(Semantic Chunking)** ``` 算法思路: 1. 将文档按句子边界切分 2. 使用 Embedding 模型计算相邻句子的向量相似度 3. 计算相似度变化率(breakpoint threshold) 4. 在相似度骤降处切分(语义主题变化点) 阈值策略: - percentile(百分位数):默认,取相似度分布的 95 百分位 - standard_deviation(标准差):低于均值 1.5 标准差处切分 - interquartile(四分位距):使用 IQR 方法检测异常点 优势:保持语义完整性,切分更自然 实现:langchain_experimental.text_splitter.SemanticChunker ``` **父子块切分(Parent-Child Chunking)** ``` 算法思路: 父块(大块):1000 字符,保留完整上下文 ↓ 子块(小块):语义切分,用于向量检索 ↓ 建立映射关系:child_id → parent_id ↓ 检索时:用子块检索,返回时扩展为父块上下文 检索流程: 用户查询 → 向量检索匹配子块 → 通过映射获取父块 → 返回完整上下文 优势:检索精度高,上下文完整 实现:自定义 ParentChildSplitter 类 ``` #### 1.2 混合检索(Dense + Sparse) ``` 检索架构: ┌─────────────────────────────────────────────┐ │ 用户查询 │ └──────────────────┬──────────────────────────┘ │ ┌──────────┴──────────┐ ↓ ↓ ┌───────────────┐ ┌───────────────┐ │ 稠密向量检索 │ │ 稀疏 BM25 检索 │ │ (语义相似) │ │ (关键词匹配) │ │ │ │ │ │ Embedding │ │ Token 化 │ │ → 向量相似度 │ │ → 词频统计 │ └───────┬───────┘ └───────┬───────┘ │ │ └──────────┬─────────┘ ↓ ┌────────────────┐ │ 结果融合 │ │ (RRF 算法) │ └────────┬───────┘ ↓ ┌────────────────┐ │ Cross-Encoder │ │ 重排序 │ └────────┬───────┘ ↓ ┌────────────────┐ │ Top-K 结果返回 │ └────────────────┘ ``` **稠密向量检索(Dense Retrieval)** - 使用 Embedding 模型将查询和文档映射到同一向量空间 - 计算余弦相似度,返回语义最相似的文档 - 适合:语义理解、同义词匹配、概念检索 **稀疏向量检索(Sparse Retrieval - BM25)** - 基于词频(TF)和逆文档频率(IDF)计算相关性 - 适合:专有名词、精确匹配、术语检索 #### 1.3 RRF 融合算法(Reciprocal Rank Fusion) ```python # 核心算法实现 def reciprocal_rank_fusion(doc_lists: List[List[Document]], k: int = 60) -> List[Document]: """ RRF 公式:RRF(d) = Σ (1 / (k + rank(d))) 参数说明: - k: 平滑常数,通常设为 60 - k 值越大,排名影响越小,结果越平滑 - k 值越小,高排名文档优势越大 算法步骤: 1. 遍历每个检索结果列表 2. 对每个文档,根据其排名计算 RRF 得分 3. 累加同一文档在不同列表中的得分 4. 按总得分降序排序 示例: 文档 A 在列表 1 中排名第 1,在列表 2 中排名第 3 RRF(A) = 1/(60+1) + 1/(60+3) = 0.0164 + 0.0159 = 0.0323 文档 B 在列表 1 中排名第 2,在列表 2 中排名第 2 RRF(B) = 1/(60+2) + 1/(60+2) = 0.0161 + 0.0161 = 0.0322 结果:A 排名高于 B """ doc_to_score: Dict[str, float] = {} doc_map: Dict[str, Document] = {} for docs in doc_lists: for rank, doc in enumerate(docs, start=1): doc_id = f"{doc.page_content[:200]}_{doc.metadata.get('source', '')}" if doc_id not in doc_map: doc_map[doc_id] = doc score = doc_to_score.get(doc_id, 0.0) + 1.0 / (k + rank) doc_to_score[doc_id] = score sorted_ids = sorted(doc_to_score.keys(), key=lambda x: doc_to_score[x], reverse=True) return [doc_map[doc_id] for doc_id in sorted_ids] ``` **RRF 算法优势:** - 无需归一化:不同检索器的分数范围可能不同,RRF 直接使用排名 - 参数简单:仅需调整 k 值,默认 60 在大多数场景表现良好 - 鲁棒性强:对异常值和噪声不敏感 #### 1.4 Cross-Encoder 重排序 ``` 重排序流程: ┌──────────────────────────────────────────────┐ │ 输入:RRF 融合后的 Top-20 文档 │ └──────────────────┬───────────────────────────┘ │ ↓ ┌──────────────────────────────────────────────┐ │ Cross-Encoder 模型(bge-reranker-v2-m3) │ │ │ │ 工作原理: │ │ - 将 Query 和 Document 拼接输入模型 │ │ - "[CLS] Query [SEP] Document [SEP]" │ │ - 通过 Transformer 计算交互注意力 │ │ - 输出相关性得分(0-1) │ │ │ │ 与 Bi-Encoder 的区别: │ │ - Bi-Encoder:分别编码,计算余弦相似度(快) │ │ - Cross-Encoder:联合编码,计算交互得分(准) │ └──────────────────┬───────────────────────────┘ │ ↓ ┌──────────────────────────────────────────────┐ │ 按相关性得分排序,返回 Top-5 │ └──────────────────────────────────────────────┘ ``` **实现细节:** - 使用远程 llama.cpp 服务部署重排序模型 - 兼容 OpenAI Rerank API 格式 - 超时保护:60 秒超时,失败时降级为原始排序 ### 2. LangGraph 工作流算法 #### 2.1 状态机设计 ```python # 核心状态定义 class AgentState(TypedDict): messages: Annotated[list, add_messages] # 对话历史(自动合并) user_id: str # 用户标识 memory_context: str # 检索到的记忆上下文 should_summarize: bool # 是否需要生成摘要 tool_calls: list # 工具调用列表 final_response: str # 最终响应 ``` **状态流转规则:** ``` 初始状态 → retrieve_memory → memory_trigger → [条件分支] ↓ ┌───────────────────┼───────────────────┐ ↓ ↓ ↓ should_summarize 直接回复 需要工具 ↓ ↓ ↓ summarize → save finalize tool_node ↓ ↓ ↓ llm_call ←───────────────┘ llm_call ↓ finalize ``` #### 2.2 记忆管理算法 **记忆检索策略:** ``` 1. 向量检索:将用户查询 Embedding,在 PostgreSQL 中检索相似对话 2. 时间衰减:近期对话权重更高(可选) 3. 相关性过滤:仅返回相似度 > 阈值的记忆 4. 上下文窗口:限制记忆长度,避免超出 LLM 上下文限制 ``` **摘要生成策略:** ``` 触发条件: - 对话轮数超过阈值(默认 10 轮) - 记忆上下文长度超过限制 摘要生成: 1. 提取最近 N 轮对话 2. 调用 LLM 生成摘要 3. 保存摘要到 PostgreSQL 4. 清理旧对话记录 摘要格式: "用户在 {时间} 讨论了 {主题},关键信息:{要点}" ``` ### 3. 多模型路由算法 ``` 模型选择逻辑: ┌─────────────────────────────────────────────┐ │ 用户选择模型 │ └──────────────────┬──────────────────────────┘ │ ┌──────────┼──────────┐ ↓ ↓ ↓ ┌──────┐ ┌──────┐ ┌──────┐ │ zhipu│ │deep │ │local │ └──┬───┘ └──┬───┘ └──┬───┘ │ │ │ ↓ ↓ ↓ ChatZhipu ChatOpenAI ChatOpenAI (官方SDK) (DeepSeek) (vLLM/Gemma) │ │ │ └─────────┼─────────┘ ↓ ┌────────────────┐ │ 统一接口输出 │ │ (BaseChatModel)│ └────────────────┘ ``` **高可用降级策略:** ``` 1. 优先使用用户选择的模型 2. 如果模型不可用(API 错误、超时等): - 尝试切换到备用模型 - 记录错误日志 - 返回降级提示给用户 3. 健康检查:定期检查各模型服务状态 ``` ### 4. SSE 流式响应算法 ``` 流式输出流程: ┌─────────────────────────────────────────────┐ │ LLM 生成 Token │ └──────────────────┬──────────────────────────┘ │ ↓ ┌─────────────────────────────────────────────┐ │ 事件格式化 │ │ data: {"content": "你", "type": "content"} │ │ data: {"content": "好", "type": "content"} │ │ data: {"content": "!", "type": "content"} │ │ data: {"done": true, "type": "end"} │ └──────────────────┬──────────────────────────┘ │ ↓ ┌─────────────────────────────────────────────┐ │ HTTP Response Stream │ │ Content-Type: text/event-stream │ │ Cache-Control: no-cache │ │ Connection: keep-alive │ └─────────────────────────────────────────────┘ ``` **实现要点:** - 使用 `asyncio.Queue` 缓冲 Token - 生产者:LLM 异步生成 Token 放入队列 - 消费者:FastAPI `EventSourceResponse` 从队列读取并推送 - 超时保护:30 秒无输出自动断开 --- ## 📚 RAG 系统完整架构 ### 离线索引构建 vs 在线检索生成 RAG 系统分为两个独立但协同的阶段: ``` ┌──────────────────────────────────────────────────────────────────┐ │ RAG 系统双阶段架构 │ ├──────────────────────────────────────────────────────────────────┤ │ │ │ 阶段一:离线索引构建 (rag_indexer) │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ 文档加载 → 文本切分 → Embedding → 向量存储 │ │ │ │ PDF/DOCX/TXT 递归/语义/父子块 llama.cpp Qdrant │ │ │ └────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ 阶段二:在线检索生成 (backend/app/rag) │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ 用户查询 → 查询改写 → 多路检索 → RRF 融合 → 重排序 │ │ │ │ MultiQuery Dense+Sparse Cross-Encoder │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ LLM 生成回答 │ │ │ └────────────────────────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────────┘ ``` ### RAG 演进路线 (Roadmap) #### Level 1: 基础向量搜索 (Basic Similarity Search) **核心算法**: 近似最近邻搜索 (ANN, 常用 HNSW 算法) ``` 算法原理: 用户问题 → Embedding 模型 → 向量表示 ↓ 计算与库中向量的余弦相似度 ↓ 取距离最近的 K 个块返回 优缺点: ✅ 速度极快(毫秒级) ❌ 只能捕捉"语义相似",专有名词匹配差 实现代码: from app.rag.retriever import create_base_retriever retriever = create_base_retriever( collection_name="rag_documents", embeddings=embeddings, search_kwargs={"k": 20} ) docs = retriever.invoke("什么是 RAG?") ``` #### Level 2: 混合检索与重排序 (Hybrid Search + Reranker) **1. 基础召回(混合检索)** ``` 算法原理: 稠密向量检索(语义相似)+ BM25 稀疏检索(关键词匹配) ↓ 两路结果并行获取,等待融合 实现代码: from app.rag.retriever import create_hybrid_retriever retriever = create_hybrid_retriever( collection_name="rag_documents", embeddings=embeddings, dense_k=10, sparse_k=10, score_threshold=0.3 ) ``` **2. 二次精排(Cross-Encoder)** ``` 算法原理: 不同于双塔模型(分别算向量再求距离) 交叉编码器将"问题 + 文档"拼接后整体输入 Transformer 由模型直接输出 0~1 的相关性得分,精度极高 实现代码: from app.rag.reranker import LLaMaCPPReranker reranker = LLaMaCPPReranker( base_url="http://127.0.0.1:8083", api_key="your-api-key", top_n=5 ) sorted_docs = reranker.compress_documents(documents, query) ``` #### Level 3: RAG-Fusion (多路改写与倒数排名融合) **1. 多路查询改写** ``` 算法原理: 克服用户初始提问词不达意或视角受限的问题 通过 LLM 将单一问题改写为多个不同角度的查询 实现代码: from app.rag.query_transform import MultiQueryGenerator generator = MultiQueryGenerator(llm=llm, num_queries=3) queries = await generator.agenerate("如何申请项目资金?") # 返回:["如何申请项目资金?", "项目资金申请流程是什么?", "申请项目经费需要哪些步骤?"] ``` **2. 倒数排名融合(RRF)** ``` 算法原理: RRF (Reciprocal Rank Fusion) 是一种无需评分归一化的融合算法 公式:RRF_score(d) = Σ 1/(k + rank_q(d)) 有效避免某一极端检索结果主导全局 实现代码: from app.rag.fusion import reciprocal_rank_fusion # 多个查询的检索结果 doc_lists = [result1, result2, result3] fused_docs = reciprocal_rank_fusion(doc_lists, k=60) ``` #### Level 4: Agentic RAG / Self-RAG (智能体与自我反思) ``` 核心原理: 基于 LangGraph 的 ReAct (Reasoning and Acting) 状态机路由 大模型并非每次都去死板地执行检索,而是先判断问题: "这是闲聊?还是需要查知识库?" 工作流程: ┌──────────┐ ┌──────────────┐ ┌──────────┐ ┌────────┐ │ User │────>│ LangGraph │────>│ RAG_Tool │────>│ Qdrant │ │ │ │ Agent │ │ │ │ │ │ "公司报 │ │ 思考: 这是 │ │ ToolCall │ │ RAG- │ │ 销流程?"│ │ 内部规章问题 │ │ search_ │ │ Fusion │ │ │ │ 需要查资料 │ │ knowledge│ │ & 混合 │ │ │<────│ 资料充分, │<────│ 返回最相 │<────│ 检索 │ │ "根据知 │ │ 开始撰写回答 │ │ 关5条规定 │ │ Cross- │ │ 识库规定 │ │ │ │ │ │ Encoder│ │ ..." │ │ │ │ │ │ 重排 │ └────────── └────────────── └──────────┘ └────────┘ 实现代码: from app.rag.tools import search_knowledge_base from app.graph.graph_builder import GraphBuilder # 将 RAG 工具添加到工具列表 tools = AVAILABLE_TOOLS + [search_knowledge_base] # 构建图 builder = GraphBuilder(llm, tools, tools_by_name) graph = builder.build().compile(checkpointer=checkpointer) ``` #### Level 5: GraphRAG 集成 (基于图和关系的 RAG) ``` 核心原理: 结合知识图谱的结构化关系和向量检索的语义相似度 解决跨文档复杂关系推理问题 实现代码: from langchain_community.graphs import Neo4jGraph from langchain_experimental.graph_transformers import LLMGraphTransformer # 实体关系抽取 transformer = LLMGraphTransformer(llm=local_llm) graph_documents = transformer.convert_to_graph_documents(documents) # 存储到图数据库 graph = Neo4jGraph(url="bolt://localhost:7687") graph.add_graph_documents(graph_documents) ``` ### 检索策略对比 | 策略 | 优点 | 缺点 | 适用场景 | |:-----|:-----|:-----|:---------| | **基础向量检索** | 速度快,语义理解好 | 专有名词匹配差 | 通用问答 | | **混合检索** | 语义 + 关键词匹配 | 需要配置稀疏向量 | 专业术语查询 | | **多路改写 + RRF** | 搜索面广,结果稳定 | 延迟略高 | 复杂问题 | | **重排序** | 精度高 | 依赖额外模型 | 最终精排 | | **Agentic RAG** | 智能决策,灵活 | 实现复杂 | 生产环境 | | **GraphRAG** | 关系推理能力强 | 需要图数据库 | 知识密集型场景 | ### 切分策略对比 | 策略 | 原理 | 优点 | 缺点 | 适用场景 | |:-----|:-----|:-----|:-----|:---------| | **递归字符** | 按分隔符递归切分 | 速度快,实现简单 | 可能截断语义 | 简单文档 | | **语义切分** | 基于句子相似度阈值 | 语义连贯性好 | 需要 Embedding 模型 | 专业文档 | | **父子块** | 大块存储+小块检索 | 检索精准+上下文完整 | 存储复杂度高 | 生产环境 | --- ## 🧩 子图系统架构 ### 设计理念 子图系统采用模块化设计,每个子图是一个独立的 LangGraph 工作流,共享公共工具库。 ``` ┌───────────────────────────────────────────────────────────────────┐ │ 子图系统架构 │ ├───────────────────────────────────────────────────────────────────┤ │ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ 公共工具库 (common/) │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────┐ │ │ │ │ │ state_base │ │ intent │ │ formatter │ │human │ │ │ │ │ │ (状态) │ │ (React) │ │ (格式化) │ │review│ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └──────┘ │ │ │ └────────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────┼───────────────┐ │ │ ↓ ↓ ↓ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ 通讯录子图 │ │ 词典子图 │ │ 资讯分析子图 │ │ │ │ (contact/) │ │ (dictionary/) │ │ (news_analysis) │ │ │ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │ │ │ state.py │ │ state.py │ │ state.py │ │ │ │ nodes.py │ │ nodes.py │ │ nodes.py │ │ │ │ graph.py │ │ graph.py │ │ graph.py │ │ │ │ api_client.py │ │ api_client.py │ │ api_client.py │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │ │ │ │ ▼ │ │ FastAPI 子图端点 │ │ (backend/app/backend.py) │ └───────────────────────────────────────────────────────────────────┘ ``` ### 公共工具说明 #### 1. state_base.py - 状态基类 提供类型安全的状态基类,所有子图状态继承此类。 #### 2. intent.py - 意图理解(React 模式) 智能决策节点,判断是否需要调用 RAG、是否需要重新检索、是否需要调用工具。 ```python # 核心功能 - 意图分类:闲聊 / 查询 / 任务 - RAG 决策:是否需要检索知识库 - 检索策略:是否需要多路检索 / 是否需要重新检索 ``` #### 3. formatter.py - 格式化输出工具 使用 Jinja2 模板提供美观的 Markdown 格式化输出。 #### 4. human_review.py - 人工审核节点 使用 LangGraph 的 interrupt 机制实现人工审核流程,支持: - 确定 / 取消 / 继续 三种操作 - 状态持久化 - 异步等待 ### 子图开发指南 #### 创建新子图的步骤 1. **创建子图目录** ```bash mkdir backend/app/agent_subgraphs/my_subgraph ``` 2. **创建状态定义 (state.py)** ```python from typing_extensions import TypedDict from ..common.state_base import BaseSubgraphState class MySubgraphState(BaseSubgraphState): \"\"\" 我的子图状态 \"\"\" my_field: str my_list: list[str] ``` 3. **实现节点 (nodes.py)** ```python from langgraph.graph import StateGraph from .state import MySubgraphState def my_node(state: MySubgraphState) -> MySubgraphState: \"\"\" 我的节点 \"\"\" # 实现逻辑 return state ``` 4. **构建图 (graph.py)** ```python from langgraph.graph import StateGraph, END from .state import MySubgraphState from .nodes import my_node def build_my_subgraph() -> StateGraph: \"\"\" 构建我的子图 \"\"\" graph = StateGraph(MySubgraphState) graph.add_node("my_node", my_node) graph.set_entry_point("my_node") graph.add_edge("my_node", END) return graph.compile() ``` 5. **添加 API 客户端 (api_client.py)**(可选) 用于与外部 API 交互。 6. **在 backend.py 中添加 API 端点** ### 已实现的子图 #### 1. 通讯录子图 (contact/) - 联系人 CRUD - 邮件读取与审核 - 外发邮件 - 智能嗅探 - 精美格式化展示 #### 2. 词典子图 (dictionary/) - 翻译、查词 - 每日一词 - 专业名词提炼 - 生词本管理 - 精美格式化展示 #### 3. 资讯分析子图 (news_analysis/) - 资讯获取 - 内容分析 - 精美格式化展示 #### 4. 研究分析子图 (research/) - 规划中 - 联网搜索 - 报告生成 - 引用溯源 - 可视化图表 --- ## 🚀 快速开始 详细启动指南请查看 [QUICKSTART.md](QUICKSTART.md) ### 方式一:Docker Compose(推荐) ```bash # 1. 配置环境变量 cp .env.docker .env # 编辑 .env 文件,填入真实的 API Key # 2. 启动所有服务 docker compose -f docker/docker-compose.yml up -d --build # 3. 访问应用 # 前端: http://127.0.0.1:8501 # 后端 API: http://127.0.0.1:8083 ``` ### 方式二:本地开发模式 ```bash # 1. 安装依赖 pip install -r requirement.txt # 2. 配置环境变量 cp .env.docker .env # 编辑 .env,根据本地/远程环境调整配置 # 3. 启动后端 python backend/app/backend.py # 4. 启动前端(新终端) streamlit run frontend/src/frontend_main.py # 或者使用启动脚本(推荐) ./scripts/start.sh both ``` --- ## 📖 使用指南 ### 基础对话 直接在聊天框输入问题即可: ``` 你好,请介绍一下自己 今天北京天气怎么样? 帮我总结一下 a.txt 的内容 ``` ### 工具调用示例 | 功能 | 示例提问 | |------|---------| | 🌤️ 天气查询 | "上海今天天气如何?" | | 📄 读取文本 | "读取 a.txt 的内容" | | 📑 解析 PDF | "总结 b.pdf 的主要内容" | | 📊 Excel 数据 | "显示 c.xlsx 的数据" | | 🌐 网页抓取 | "抓取 https://example.com 的内容" | | 🔍 长期记忆 | "记住我喜欢吃川菜" → "我有什么饮食偏好?" | ### 多模型切换 1. 在左侧边栏选择模型: - **智谱 GLM-4.7**:在线服务,速度快 - **DeepSeek Reasoner**:深度推理模型 - **本地 Gemma-4**:本地部署,隐私性好 2. 可随时切换,甚至在同一会话中 3. 点击 "🔄 新会话" 清空当前对话 --- ## 🔧 开发指南 ### 添加新工具 在 [backend/app/graph/graph_tools.py](file:///home/huang/Study/AIProject/Agent1/backend/app/graph/graph_tools.py) 中添加新的 `@tool` 装饰函数: ```python @tool def my_new_tool(param: str) -> str: """ 工具描述(会显示给 LLM) Args: param: 参数说明 Returns: 返回值说明 """ # 实现逻辑 return result ``` 工具会自动注册到 `AVAILABLE_TOOLS` 列表中。 ### 添加新模型 在 [backend/app/model_services/chat_services.py](file:///root/projects/ailine/backend/app/model_services/chat_services.py) 中添加新的服务提供者: ```python class NewModelChatProvider(BaseServiceProvider[BaseChatModel]): """ 新模型服务提供者 """ def __init__(self, model: str = "new-model-name"): super().__init__("new_model_chat") self._model = model def is_available(self) -> bool: """ 检查新模型服务是否可用 """ if not os.getenv("NEW_MODEL_API_KEY"): logger.warning("NEW_MODEL_API_KEY 未配置") return False logger.info(f"新模型服务配置正确,准备使用: {self._model}") return True def get_service(self) -> BaseChatModel: """ 获取新模型服务 """ if self._service_instance is None: from langchain_openai import ChatOpenAI from pydantic import SecretStr self._service_instance = ChatOpenAI( base_url="https://api.new-model.com/v1", api_key=SecretStr(os.getenv("NEW_MODEL_API_KEY")), model=self._model, temperature=0.1, max_tokens=4096, timeout=60.0, max_retries=2, streaming=True, ) return self._service_instance ``` 然后在 `CHAT_PROVIDERS` 字典中注册: ```python CHAT_PROVIDERS: Dict[str, Callable[[], BaseServiceProvider[BaseChatModel]]] = { "local": lambda: LocalVLLMChatProvider(), "zhipu": lambda: ZhipuChatProvider(), "deepseek": lambda: DeepSeekChatProvider(), "new_model": lambda: NewModelChatProvider(), # 新增 } ``` ### 使用模型服务 #### 生成式大模型 ```python from app.model_services import get_chat_service, get_all_chat_services # 自动选择可用服务(优先本地,降级智谱,再降级 DeepSeek) llm = get_chat_service() # 获取所有可用模型(用于多模型切换) all_llms = get_all_chat_services() # Dict[str, BaseChatModel] ``` #### 嵌入服务 ```python from app.model_services import get_embedding_service # 自动选择可用服务(优先本地,降级智谱) embeddings = get_embedding_service() # 使用 vector = embeddings.embed_query("hello") ``` #### 重排服务 ```python from app.model_services import get_rerank_service from app.rag.rerank import create_document_reranker # 获取原始重排服务(仅计算分数) rerank_service = get_rerank_service() scores = rerank_service.compute_scores("query", ["doc1", "doc2"]) # 使用业务逻辑层(完整的文档重排) reranker = create_document_reranker() sorted_docs = reranker.compress_documents(docs, "query", top_n=5) ``` ### Docker 部署 项目包含完整的 Docker 配置: - **docker-compose.yml**:服务编排(Backend + Frontend,连接远程数据库) - **Dockerfile.backend**:后端镜像构建 - **Dockerfile.frontend**:前端镜像构建 - **.gitea/workflows/deploy.yml**:CI/CD 自动化部署 详见 [QUICKSTART.md](QUICKSTART.md) 的 Docker 部署章节。 --- ## ⚙️ 环境配置 ### 配置文件说明 项目采用两层环境配置文件体系: | 文件 | 用途 | 是否提交 Git | |------|------|------------| | `.env.docker` | Docker 部署模板 | ✅ 是 | | `.env` | 实际使用的配置 | ❌ 否(已忽略) | **使用方法:** - **本地开发**:`cp .env.docker .env`,修改为本地服务地址 - **Docker 部署**:`cp .env.docker .env`,使用远程服务器地址 ### 必需的环境变量 | 变量名 | 说明 | 本地开发示例 | Docker 部署示例 | |--------|------|------------|----------------| | `ZHIPUAI_API_KEY` | 智谱AI API密钥 | `your-api-key` | `your-api-key` | | `DEEPSEEK_API_KEY` | DeepSeek API密钥 | `your-api-key` | `your-api-key` | | `LLAMACPP_API_KEY` | llama.cpp API 密钥 | `token-abc123` | `token-abc123` | | `LLM_API_KEY` | 主 LLM 服务 API 密钥 | `token-abc123` | `token-abc123` | | `VLLM_BASE_URL` | LLM 服务地址 | `http://127.0.0.1:8081/v1` | `http://your-server:8081/v1` | | `LLAMACPP_EMBEDDING_URL` | Embedding 服务地址 | `http://127.0.0.1:8082/v1` | `http://your-server:8082/v1` | | `LLAMACPP_RERANKER_URL` | Rerank 服务地址 | `http://127.0.0.1:8083/v1` | `http://your-server:8083/v1` | | `ZHIPU_EMBEDDING_MODEL` | 智谱嵌入模型(可选) | `embedding-3` | `embedding-3` | | `ZHIPU_RERANK_MODEL` | 智谱重排模型(可选) | `rerank-2` | `rerank-2` | | `ZHIPU_API_BASE` | 智谱 API 基础地址(可选) | `https://open.bigmodel.cn/api/paas/v4` | 同左 | | `DB_URI` | PostgreSQL 连接字符串 | `postgresql://...@115.190.121.151:5432/langgraph_db` | 同左 | | `QDRANT_URL` | Qdrant 向量数据库地址 | `http://115.190.121.151:6333` | 同左 | | `QDRANT_API_KEY` | Qdrant API 密钥 | `your-api-key` | `your-api-key` | | `QDRANT_COLLECTION_NAME` | Qdrant 集合名称 | `rag_documents` | `rag_documents` | | `LOG_LEVEL` | 日志级别 | `INFO` | `WARNING` | | `ENABLE_GRAPH_TRACE` | 是否启用图流转追踪 | `true` | `false` | | `MEMORY_SUMMARIZE_INTERVAL` | 对话摘要生成间隔 | `10` | `10` | ### 注意事项 - ⚠️ **不要硬编码敏感信息**:所有 API Key 必须通过环境变量配置 - ⚠️ **远程服务依赖**:确保可以访问远程 PostgreSQL (115.190.121.151:5432) 和 Qdrant (115.190.121.151:6333) - ⚠️ **修改后重启**:修改 `.env` 后,Docker 部署需要执行 `docker compose down && docker compose up -d --build` --- ## �️ 实现指南与最佳实践 ### 1. RAG 知识库构建指南 #### 1.1 离线索引构建流程 ```bash # 步骤 1:准备文档 # 将文档放入指定目录(如 ./documents/) # 支持格式:TXT, PDF, Markdown, DOCX # 步骤 2:选择切分策略 # 根据文档类型选择: # - 结构化文档(如手册、API 文档)→ Recursive Splitting # - 非结构化文档(如文章、报告)→ Semantic Chunking # - 长文档(如书籍、论文)→ Parent-Child Chunking # 步骤 3:构建索引 cd rag_indexer python cli.py build \ --input-dir ../documents \ --collection-name my_knowledge \ --splitter-type semantic \ --chunk-size 500 # 步骤 4:验证索引 python cli.py query \ --collection-name my_knowledge \ --query "如何配置系统?" \ --top-k 5 ``` #### 1.2 切分策略选择建议 | 文档类型 | 推荐策略 | chunk_size | 说明 | |---------|---------|------------|------| | API 文档 | Recursive | 300-500 | 结构清晰,按章节切分 | | 技术文章 | Semantic | 自适应 | 保持语义完整性 | | 法律合同 | Parent-Child | 父:1000, 子:200 | 需要完整上下文 | | 问答对 | Recursive | 200-300 | 短文本,精确匹配 | | 产品手册 | Parent-Child | 父:1500, 子:300 | 长文档,跨章节检索 | #### 1.3 Embedding 模型选择 | 模型 | 维度 | 语言 | 速度 | 精度 | 适用场景 | |------|------|------|------|------|---------| | bge-large-zh-v1.5 | 1024 | 中文 | 中 | 高 | 中文文档检索 | | bge-m3 | 1024 | 多语言 | 中 | 高 | 多语言混合文档 | | text-embedding-3-small | 1536 | 英文 | 快 | 中 | 英文文档检索 | | gte-large | 1024 | 中文 | 慢 | 很高 | 高精度要求场景 | ### 2. 性能优化指南 #### 2.1 检索性能优化 ```python # 优化 1:调整检索参数 search_kwargs = { "k": 20, # 召回数量(增加召回,提高精度) "score_threshold": 0.3, # 相似度阈值(过滤低质量结果) "fetch_k": 50, # 初始召回数量(用于 MMR 去重) "lambda_mult": 0.7, # MMR 多样性参数(0=去重,1=不去重) } # 优化 2:使用缓存 from functools import lru_cache @lru_cache(maxsize=1000) def cached_retrieve(query: str) -> List[Document]: """缓存常见查询结果""" return retriever.invoke(query) # 优化 3:批量 Embedding # 构建索引时,使用批量处理提高效率 batch_size = 32 for i in range(0, len(documents), batch_size): batch = documents[i:i+batch_size] embeddings = embedder.embed_documents([doc.page_content for doc in batch]) ``` #### 2.2 LLM 调用优化 ```python # 优化 1:使用流式响应 # 减少首字延迟,提升用户体验 response = await llm.astream(messages) async for chunk in response: yield chunk.content # 优化 2:控制上下文长度 MAX_CONTEXT_LENGTH = 4000 def truncate_context(context: str, max_length: int = MAX_CONTEXT_LENGTH) -> str: """截断上下文,保留开头和结尾""" if len(context) <= max_length: return context half = max_length // 2 return context[:half] + "\n...\n" + context[-half:] # 优化 3:Prompt 优化 # 使用结构化 Prompt,提高 LLM 理解能力 prompt_template = """ 你是一个智能助手。请根据以下上下文回答用户问题。 上下文: {context} 问题:{question} 回答要求: 1. 仅基于上下文回答,不要编造信息 2. 如果上下文中没有答案,明确告知用户 3. 引用上下文中的具体信息时,注明来源 """ ``` #### 2.3 数据库性能优化 ```sql -- PostgreSQL 优化 -- 1. 为对话历史表添加索引 CREATE INDEX idx_conversations_user_id ON conversations(user_id); CREATE INDEX idx_conversations_created_at ON conversations(created_at); -- 2. 为记忆摘要表添加索引 CREATE INDEX idx_summaries_user_id ON summaries(user_id); -- 3. 定期清理旧数据(保留最近 90 天) DELETE FROM conversations WHERE created_at < NOW() - INTERVAL '90 days'; -- Qdrant 优化 -- 1. 使用 HNSW 索引(默认已启用) -- 2. 调整 ef_construct 和 m 参数平衡速度和精度 -- 3. 定期优化集合 curl -X POST http://localhost:6333/collections/my_docs/actions \ -H "Content-Type: application/json" \ -d '{"actions": [{"optimize": {}}]}' ``` ### 3. 扩展开发指南 #### 3.1 添加自定义工具 ```python # 在 backend/app/graph/graph_tools.py 中添加 from langchain_core.tools import tool from typing import Optional @tool def calculate_expression( expression: str, precision: int = 2 ) -> str: """ 计算数学表达式的值。 支持基本运算:加减乘除、幂运算、括号。 Args: expression: 数学表达式,如 "2 + 3 * 4" precision: 结果精度(小数位数),默认 2 Returns: 计算结果 """ try: # 安全计算(避免 eval 的安全风险) import ast import operator # 定义允许的操作符 ops = { ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, ast.Div: operator.truediv, ast.Pow: operator.pow, } def eval_node(node): if isinstance(node, ast.Num): return node.n elif isinstance(node, ast.BinOp): left = eval_node(node.left) right = eval_node(node.right) return ops[type(node.op)](left, right) else: raise ValueError(f"不支持的操作: {type(node)}") tree = ast.parse(expression, mode='eval') result = eval_node(tree.body) return f"{result:.{precision}f}" except Exception as e: return f"计算错误: {str(e)}" # 工具会自动注册,无需手动添加到 AVAILABLE_TOOLS ``` #### 3.2 添加自定义 LLM 提供商 ```python # 在 backend/app/agent/llm_factory.py 中添加 from langchain_openai import ChatOpenAI from langchain_core.language_models import BaseChatModel class LLMFactory: @staticmethod def create_anthropic_model() -> BaseChatModel: """创建 Anthropic Claude 模型""" api_key = os.getenv("ANTHROPIC_API_KEY") if not api_key: raise ValueError("ANTHROPIC_API_KEY 环境变量未设置") return ChatAnthropic( model="claude-3-sonnet-20240229", temperature=0.1, max_tokens=4096, api_key=api_key, ) # 注册到 CREATORS CREATORS = { "local": create_local, "deepseek": create_deepseek, "zhipu": create_zhipu, "anthropic": create_anthropic_model, # 新增 } ``` #### 3.3 添加自定义 RAG 检索器 ```python # 在 backend/app/rag/retriever.py 中添加 from langchain_core.retrievers import BaseRetriever from langchain_core.documents import Document class HybridRetriever(BaseRetriever): """混合检索器:结合多种检索策略""" dense_retriever: BaseRetriever sparse_retriever: BaseRetriever reranker: Optional[BaseReranker] = None top_k: int = 5 def _get_relevant_documents(self, query: str) -> List[Document]: # 并行检索 dense_docs = self.dense_retriever.invoke(query) sparse_docs = self.sparse_retriever.invoke(query) # RRF 融合 fused_docs = reciprocal_rank_fusion([dense_docs, sparse_docs]) # 重排序 if self.reranker: fused_docs = self.reranker.compress_documents(fused_docs[:20], query) return fused_docs[:self.top_k] ``` ### 4. 部署最佳实践 #### 4.1 生产环境配置 ```yaml # docker-compose.prod.yml version: '3.8' services: backend: build: context: . dockerfile: docker/Dockerfile.backend environment: - LOG_LEVEL=WARNING # 生产环境降低日志级别 - ENABLE_GRAPH_TRACE=false # 关闭图追踪(提升性能) - MEMORY_SUMMARIZE_INTERVAL=5 # 更频繁的摘要生成 deploy: resources: limits: cpus: '2' memory: 4G reservations: cpus: '1' memory: 2G restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8083/health"] interval: 30s timeout: 10s retries: 3 frontend: build: context: . dockerfile: docker/Dockerfile.frontend environment: - STREAMLIT_SERVER_PORT=8501 - STREAMLIT_SERVER_HEADLESS=true deploy: resources: limits: cpus: '1' memory: 2G restart: unless-stopped ``` #### 4.2 监控与告警 ```python # 添加健康检查端点 from fastapi import FastAPI from datetime import datetime app = FastAPI() @app.get("/health") async def health_check(): """健康检查端点""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "services": { "postgresql": await check_postgresql(), "qdrant": await check_qdrant(), "llm": await check_llm_service(), } } async def check_postgresql() -> bool: try: # 尝试连接数据库 async with asyncpg.connect(DB_URI) as conn: await conn.fetchval("SELECT 1") return True except Exception: return False async def check_qdrant() -> bool: try: client = QdrantClient(url=QDRANT_URL) client.get_collections() return True except Exception: return False ``` #### 4.3 安全最佳实践 ```python # 1. 使用环境变量管理密钥 # 永远不要硬编码 API Key import os from typing import Optional def get_api_key(env_var: str, default: Optional[str] = None) -> str: api_key = os.getenv(env_var, default) if not api_key: raise ValueError(f"{env_var} 环境变量未设置") return api_key # 2. 输入验证 from pydantic import BaseModel, validator class ChatRequest(BaseModel): message: str user_id: str model: str = "zhipu" @validator('message') def validate_message(cls, v): if len(v) > 4000: raise ValueError("消息长度不能超过 4000 字符") return v.strip() @validator('model') def validate_model(cls, v): allowed_models = {"zhipu", "deepseek", "local"} if v not in allowed_models: raise ValueError(f"不支持的模型: {v}") return v # 3. 速率限制 from slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) @app.post("/api/chat") @limiter.limit("10/minute") # 每分钟最多 10 次请求 async def chat(request: Request, chat_request: ChatRequest): ... ``` ### 5. 调试与测试 #### 5.1 启用调试模式 ```bash # 启用详细日志 export LOG_LEVEL=DEBUG # 启用 LangGraph 图追踪 export ENABLE_GRAPH_TRACE=true # 查看图执行流程 # 启动后访问:http://localhost:8083/graph/trace ``` #### 5.2 单元测试示例 ```python # tests/test_rag.py import pytest from app.rag.fusion import reciprocal_rank_fusion from langchain_core.documents import Document def test_rrf_fusion(): # 准备测试数据 docs1 = [ Document(page_content="文档 A"), Document(page_content="文档 B"), ] docs2 = [ Document(page_content="文档 B"), Document(page_content="文档 C"), ] # 执行 RRF 融合 result = reciprocal_rank_fusion([docs1, docs2]) # 验证结果 assert len(result) == 3 assert result[0].page_content == "文档 B" # B 在两个列表中都出现,应该排第一 @pytest.mark.asyncio async def test_retriever(): from app.rag.retriever import create_base_retriever retriever = create_base_retriever( collection_name="test_collection", embeddings=mock_embeddings, ) docs = await retriever.ainvoke("测试查询") assert len(docs) > 0 ``` --- ## �� 故障排查 ### 常见问题 **Q: 无法连接远程数据库?** ```bash # 测试 PostgreSQL psql -h 115.190.121.151 -U postgres -d langgraph_db -c "SELECT version();" # 测试 Qdrant curl http://115.190.121.151:6333/collections ``` **Q: 后端启动失败?** - 确认端口 8083 未被占用 - 检查 `.env` 中的 API Key 是否正确 - 查看启动日志确认模型初始化成功 **Q: 模型切换后无响应?** - 检查所选模型的配置是否正确 - 确认 vLLM 容器是否运行(如使用本地模型) - 尝试切换到另一个模型 更多问题排查请查看 [QUICKSTART.md](QUICKSTART.md) --- ## 📝 许可证 本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。 ## 🤝 贡献 欢迎提交 Issue 和 Pull Request!