新增功能: - 创建 app/model_services 模块,提供统一的模型服务获取接口 - 实现 BaseServiceProvider 基类和 FallbackServiceChain 降级链 - 实现 get_embedding_service():优先本地 llama.cpp,降级到智谱 API - 实现 get_rerank_service():优先本地 llama.cpp,降级到智谱 API - 支持单例管理,确保全局只有一个服务实例 修改内容: - 更新 app/config.py,添加智谱 API 相关配置 - 修改 rag_core/vector_store.py:支持接受外部传入的 embeddings - 修改 rag_core/retriever_factory.py:支持接受外部传入的 embeddings - 修改 app/agent/rag_initializer.py:使用 get_embedding_service() - 修改 app/rag/pipeline.py:使用 get_rerank_service() - 修改 app/memory/mem0_client.py:智能判断可用服务配置 mem0 - 修改 rag_indexer/index_builder.py:支持使用新服务,保持向后兼容 - 修改 rag_indexer/config.py:添加智谱配置 环境变量: - ZHIPUAI_API_KEY:智谱 API 密钥(必选) - ZHIPU_EMBEDDING_MODEL:可选,默认 embedding-3 - ZHIPU_RERANK_MODEL:可选,默认 rerank-2 - ZHIPU_API_BASE:可选,默认 https://open.bigmodel.cn/api/paas/v4
AI Agent - 智能助手系统
一个基于 LangGraph + FastAPI 的智能对话助手,支持多模型切换、RAG 知识库检索、文件处理和网页抓取等功能。
📑 目录导航
- 核心功能 - 面向用户的功能和技术特性
- 技术架构 - 技术栈、系统架构图、工作流流程图
- 核心算法与实现原理 - RAG 检索、LangGraph 工作流、多模型路由、SSE 流式响应
- RAG 系统完整架构 - 离线索引构建、在线检索生成、演进路线
- 快速开始 - Docker 和本地部署指南
- 使用指南 - 基础对话、工具调用、多模型切换
- 开发指南 - 添加工具、添加模型、Docker 部署
- 实现指南与最佳实践 - RAG 构建、性能优化、扩展开发、部署实践
- 环境配置 - 配置文件、环境变量
- 故障排查 - 常见问题
🎯 核心功能
面向用户的功能
- 💬 智能对话:支持多轮对话,自动记忆上下文
- 🌤️ 天气查询:实时获取各地天气信息
- 📄 文档处理:读取 TXT、PDF、Excel 等格式文件
- 🌐 网页抓取:提取网页正文内容
- 🔍 知识库检索(RAG):基于向量数据库的智能问答
- 🔄 多模型切换:前端可选择不同大语言模型
技术特性
- ✅ 持久化记忆:PostgreSQL 存储对话历史,重启不丢失
- ✅ 高可用架构:模型自动降级,确保服务稳定
- ✅ 前后端分离:FastAPI 后端 + Streamlit 前端
- ✅ Docker 部署:一键启动所有服务
- ✅ 远程服务架构:PostgreSQL 和 Qdrant 部署在远程服务器
- ✅ 流式响应:SSE 流式输出,提升用户体验
- ✅ 模块化设计:清晰的代码分层,易于扩展和维护
🏗️ 技术架构
技术栈总览
| 层级 | 组件 | 技术选型 | 版本 | 说明 |
|---|---|---|---|---|
| LLM 服务 | 云端模型 | 智谱 AI (glm-4.7-flash) | v4.7 | 快速响应,适合日常对话 |
| DeepSeek (deepseek-reasoner) | v3 | 深度推理,适合复杂问题 | ||
| 本地模型 | Gemma-4-E4B-it | v4 | 本地部署,保护隐私 | |
| Embedding | 向量嵌入 | llama.cpp server | latest | 本地 embedding 服务,支持多种模型 |
| Agent 框架 | 工作流编排 | LangGraph + LangChain | latest | 状态机驱动的智能体工作流 |
| 向量数据库 | 向量检索 | Qdrant | v1.12+ | 高性能向量相似度检索(远程服务器) |
| 后端框架 | API 服务 | FastAPI + Uvicorn | v0.115+ | RESTful API + WebSocket + SSE 流式输出 |
| 前端框架 | Web 界面 | Streamlit | v1.40+ | 交互式对话界面,组件化设计 |
| 关系数据库 | 持久化存储 | PostgreSQL | v16 | 对话记忆持久化(远程服务器) |
| 容器化 | 服务编排 | Docker + Docker Compose | v24+ | 一键部署所有服务 |
| CI/CD | 自动化部署 | Gitea Workflows | latest | 代码推送自动构建部署 |
系统架构流程图
graph TB
User[用户浏览器] -->|HTTP/SSE| Frontend[Streamlit 前端 :8501]
Frontend -->|REST API| Backend[FastAPI 后端 :8083]
Backend --> AgentService[AIAgentService]
AgentService -->|模型路由| LLMFactory[LLM Factory]
LLMFactory -->|创建| Zhipu[智谱 GLM-4.7]
LLMFactory -->|创建| DeepSeek[DeepSeek Reasoner]
LLMFactory -->|创建| LocalGemma[本地 Gemma-4]
AgentService -->|初始化| LangGraph[LangGraph 工作流引擎]
LangGraph -->|节点1| RetrieveMemory[记忆检索]
LangGraph -->|节点2| MemoryTrigger[记忆触发]
LangGraph -->|节点3| LLMCall[LLM 调用]
LangGraph -->|节点4| ToolNode[工具执行]
LangGraph -->|节点5| Summarize[记忆摘要]
LangGraph -->|节点6| Finalize[最终处理]
ToolNode -->|调用| Tools[工具集合]
Tools -->|天气| WeatherTool[天气查询]
Tools -->|文件| FileTool[文件读取]
Tools -->|网页| WebTool[网页抓取]
Tools -->|RAG| RAGTool[知识库检索]
RAGTool -->|检索| Qdrant[(Qdrant 向量库)]
RAGTool -->|重排序| CrossEncoder[Cross-Encoder]
RetrieveMemory -->|存储| PostgreSQL[(PostgreSQL)]
Summarize -->|存储| PostgreSQL
style User fill:#e1f5ff
style Frontend fill:#fff4e1
style Backend fill:#e8f5e9
style LangGraph fill:#f3e5f5
style PostgreSQL fill:#ffebee
style Qdrant fill:#ffebee
LangGraph 工作流详细流程
stateDiagram-v2
[*] --> RetrieveMemory: 用户输入消息
RetrieveMemory --> MemoryTrigger: 检索历史记忆
MemoryTrigger --> CheckMemory: 检查是否需要触发记忆
CheckMemory --> LLMCall: 记忆充足
CheckMemory --> Summarize: 需要生成摘要
Summarize --> PostgreSQL: 保存摘要
PostgreSQL --> LLMCall: 继续对话
LLMCall --> CheckTools: LLM 输出
CheckTools --> ToolNode: 需要调用工具
CheckTools --> Finalize: 直接回复
ToolNode --> ExecuteTool: 执行工具
ExecuteTool --> LLMCall: 工具结果返回
Finalize --> FormatResponse: 格式化响应
FormatResponse --> [*]: SSE 流式输出
数据流向图
用户请求
│
├─→ 前端 Streamlit
│ │
│ ├─→ 发送 HTTP POST /api/chat
│ │
│ └─→ 接收 SSE 流式响应
│
└─→ 后端 FastAPI
│
├─→ AIAgentService.achat()
│ │
│ ├─→ 初始化 LangGraph 状态
│ │ ├─→ messages: 对话历史
│ │ ├─→ user_id: 用户标识
│ │ └─→ metadata: 元数据
│ │
│ ├─→ 执行工作流
│ │ ├─→ retrieve_memory: 从 PostgreSQL 检索历史
│ │ ├─→ memory_trigger: 判断是否触发记忆
│ │ ├─→ llm_call: 调用 LLM 生成响应
│ │ ├─→ tool_node: 执行工具(如有需要)
│ │ ├─→ summarize: 生成对话摘要
│ │ └─→ finalize: 格式化最终响应
│ │
│ └─→ 返回 SSE 流
│
└─→ 持久化存储
├─→ PostgreSQL: 对话历史和摘要
└─→ Qdrant: RAG 向量索引
项目结构
Agent1/
├── backend/
│ ├── app/
│ │ ├── __init__.py
│ │ ├── config.py # 配置管理
│ │ ├── logger.py # 日志工具
│ │ ├── backend.py # FastAPI 后端应用
│ │ ├── agent/
│ │ │ ├── __init__.py
│ │ │ ├── service.py # Agent 服务核心
│ │ │ ├── llm_factory.py # LLM 工厂
│ │ │ ├── history.py # 历史查询服务
│ │ │ ├── prompts.py # 提示模板
│ │ │ └── rag_initializer.py # RAG 工具初始化
│ │ ├── graph/
│ │ │ ├── __init__.py
│ │ │ ├── graph_builder.py # LangGraph 图构建器
│ │ │ ├── graph_tools.py # 工具定义
│ │ │ ├── state.py # 状态定义
│ │ │ └── retrieve_memory.py # 记忆检索
│ │ ├── nodes/
│ │ │ ├── __init__.py
│ │ │ ├── router.py # 路由决策
│ │ │ ├── llm_call.py # LLM 调用节点
│ │ │ ├── tool_call.py # 工具执行节点
│ │ │ ├── retrieve_memory.py # 记忆检索节点
│ │ │ ├── summarize.py # 记忆存储节点
│ │ │ ├── finalize.py # 最终处理节点
│ │ │ └── memory_trigger.py # 记忆触发节点
│ │ ├── memory/
│ │ │ ├── __init__.py
│ │ │ └── mem0_client.py # Mem0 客户端封装
│ │ ├── rag/
│ │ │ ├── __init__.py
│ │ │ ├── retriever.py # 检索器
│ │ │ ├── reranker.py # 重排序
│ │ │ ├── query_transform.py # 查询转换
│ │ │ ├── pipeline.py # RAG 流水线
│ │ │ ├── fusion.py # RAG-Fusion
│ │ │ └── tools.py # RAG 工具
│ │ └── utils/
│ │ └── __init__.py
│ └── rag_core/
│ ├── __init__.py
│ ├── client.py # RAG 核心客户端
│ ├── embedders.py # 嵌入模型
│ ├── vector_store.py # 向量存储
│ ├── retriever_factory.py # 检索器工厂
│ └── store/
│ ├── __init__.py
│ ├── factory.py # 存储工厂
│ └── postgres.py # PostgreSQL 存储
├── frontend/
│ ├── run.py # 前端启动脚本
│ ├── requirements.txt
│ └── src/
│ ├── __init__.py
│ ├── frontend_main.py # Streamlit 主入口
│ ├── api_client.py # API 客户端
│ ├── config.py # 前端配置
│ ├── state.py # 状态管理
│ ├── logger.py # 日志
│ ├── utils.py # 工具函数
│ └── components/
│ ├── __init__.py
│ ├── sidebar.py # 侧边栏组件
│ ├── chat_area.py # 聊天区域组件
│ └── info_panel.py # 信息面板组件
├── rag_indexer/
│ ├── __init__.py
│ ├── cli.py # 命令行入口
│ ├── index_builder.py # 索引构建器
│ ├── loaders.py # 文档加载器
│ ├── splitters.py # 文本切分器
│ └── test/ # 测试脚本
├── docker/
│ ├── docker-compose.yml # Docker 服务编排
│ ├── backend/
│ │ └── Dockerfile # 后端镜像构建
│ ├── frontend/
│ │ └── Dockerfile # 前端镜像构建
│ └── models/ # spaCy 模型文件
├── scripts/
│ └── start.sh # 启动脚本
├── test/ # 测试目录
│ ├── test_backend.py
│ ├── test_rag.py
│ ├── test_dqrant.py
│ └── test_rag_indexer_result.py
├── .gitea/
│ └── workflows/
│ └── deploy.yml # CI/CD 自动化部署
├── requirement.txt # Python 依赖
├── .env.docker # Docker 环境变量模板
├── .gitignore
├── LICENSE
├── QUICKSTART.md # 快速开始指南
└── user_docs/ # 用户文档目录
🧠 核心算法与实现原理
1. RAG 检索算法
1.1 文本切分策略
项目支持三种文本切分策略,适应不同场景需求:
递归字符切分(Recursive Character Splitting)
算法思路:
按优先级分隔符逐级切分:["\n\n", "\n", "。", "!", "?", " ", ""]
↓
确保每个块不超过 chunk_size(默认 500 字符)
↓
保留 chunk_overlap(默认 50 字符)避免上下文丢失
优势:简单高效,适合结构化文档
实现:langchain_text_splitters.RecursiveCharacterTextSplitter
语义切分(Semantic Chunking)
算法思路:
1. 将文档按句子边界切分
2. 使用 Embedding 模型计算相邻句子的向量相似度
3. 计算相似度变化率(breakpoint threshold)
4. 在相似度骤降处切分(语义主题变化点)
阈值策略:
- percentile(百分位数):默认,取相似度分布的 95 百分位
- standard_deviation(标准差):低于均值 1.5 标准差处切分
- interquartile(四分位距):使用 IQR 方法检测异常点
优势:保持语义完整性,切分更自然
实现:langchain_experimental.text_splitter.SemanticChunker
父子块切分(Parent-Child Chunking)
算法思路:
父块(大块):1000 字符,保留完整上下文
↓
子块(小块):语义切分,用于向量检索
↓
建立映射关系:child_id → parent_id
↓
检索时:用子块检索,返回时扩展为父块上下文
检索流程:
用户查询 → 向量检索匹配子块 → 通过映射获取父块 → 返回完整上下文
优势:检索精度高,上下文完整
实现:自定义 ParentChildSplitter 类
1.2 混合检索(Dense + Sparse)
检索架构:
┌─────────────────────────────────────────────┐
│ 用户查询 │
└──────────────────┬──────────────────────────┘
│
┌──────────┴──────────┐
↓ ↓
┌───────────────┐ ┌───────────────┐
│ 稠密向量检索 │ │ 稀疏 BM25 检索 │
│ (语义相似) │ │ (关键词匹配) │
│ │ │ │
│ Embedding │ │ Token 化 │
│ → 向量相似度 │ │ → 词频统计 │
└───────┬───────┘ └───────┬───────┘
│ │
└──────────┬─────────┘
↓
┌────────────────┐
│ 结果融合 │
│ (RRF 算法) │
└────────┬───────┘
↓
┌────────────────┐
│ Cross-Encoder │
│ 重排序 │
└────────┬───────┘
↓
┌────────────────┐
│ Top-K 结果返回 │
└────────────────┘
稠密向量检索(Dense Retrieval)
- 使用 Embedding 模型将查询和文档映射到同一向量空间
- 计算余弦相似度,返回语义最相似的文档
- 适合:语义理解、同义词匹配、概念检索
稀疏向量检索(Sparse Retrieval - BM25)
- 基于词频(TF)和逆文档频率(IDF)计算相关性
- 适合:专有名词、精确匹配、术语检索
1.3 RRF 融合算法(Reciprocal Rank Fusion)
# 核心算法实现
def reciprocal_rank_fusion(doc_lists: List[List[Document]], k: int = 60) -> List[Document]:
"""
RRF 公式:RRF(d) = Σ (1 / (k + rank(d)))
参数说明:
- k: 平滑常数,通常设为 60
- k 值越大,排名影响越小,结果越平滑
- k 值越小,高排名文档优势越大
算法步骤:
1. 遍历每个检索结果列表
2. 对每个文档,根据其排名计算 RRF 得分
3. 累加同一文档在不同列表中的得分
4. 按总得分降序排序
示例:
文档 A 在列表 1 中排名第 1,在列表 2 中排名第 3
RRF(A) = 1/(60+1) + 1/(60+3) = 0.0164 + 0.0159 = 0.0323
文档 B 在列表 1 中排名第 2,在列表 2 中排名第 2
RRF(B) = 1/(60+2) + 1/(60+2) = 0.0161 + 0.0161 = 0.0322
结果:A 排名高于 B
"""
doc_to_score: Dict[str, float] = {}
doc_map: Dict[str, Document] = {}
for docs in doc_lists:
for rank, doc in enumerate(docs, start=1):
doc_id = f"{doc.page_content[:200]}_{doc.metadata.get('source', '')}"
if doc_id not in doc_map:
doc_map[doc_id] = doc
score = doc_to_score.get(doc_id, 0.0) + 1.0 / (k + rank)
doc_to_score[doc_id] = score
sorted_ids = sorted(doc_to_score.keys(), key=lambda x: doc_to_score[x], reverse=True)
return [doc_map[doc_id] for doc_id in sorted_ids]
RRF 算法优势:
- 无需归一化:不同检索器的分数范围可能不同,RRF 直接使用排名
- 参数简单:仅需调整 k 值,默认 60 在大多数场景表现良好
- 鲁棒性强:对异常值和噪声不敏感
1.4 Cross-Encoder 重排序
重排序流程:
┌──────────────────────────────────────────────┐
│ 输入:RRF 融合后的 Top-20 文档 │
└──────────────────┬───────────────────────────┘
│
↓
┌──────────────────────────────────────────────┐
│ Cross-Encoder 模型(bge-reranker-v2-m3) │
│ │
│ 工作原理: │
│ - 将 Query 和 Document 拼接输入模型 │
│ - "[CLS] Query [SEP] Document [SEP]" │
│ - 通过 Transformer 计算交互注意力 │
│ - 输出相关性得分(0-1) │
│ │
│ 与 Bi-Encoder 的区别: │
│ - Bi-Encoder:分别编码,计算余弦相似度(快) │
│ - Cross-Encoder:联合编码,计算交互得分(准) │
└──────────────────┬───────────────────────────┘
│
↓
┌──────────────────────────────────────────────┐
│ 按相关性得分排序,返回 Top-5 │
└──────────────────────────────────────────────┘
实现细节:
- 使用远程 llama.cpp 服务部署重排序模型
- 兼容 OpenAI Rerank API 格式
- 超时保护:60 秒超时,失败时降级为原始排序
2. LangGraph 工作流算法
2.1 状态机设计
# 核心状态定义
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # 对话历史(自动合并)
user_id: str # 用户标识
memory_context: str # 检索到的记忆上下文
should_summarize: bool # 是否需要生成摘要
tool_calls: list # 工具调用列表
final_response: str # 最终响应
状态流转规则:
初始状态 → retrieve_memory → memory_trigger → [条件分支]
↓
┌───────────────────┼───────────────────┐
↓ ↓ ↓
should_summarize 直接回复 需要工具
↓ ↓ ↓
summarize → save finalize tool_node
↓ ↓ ↓
llm_call ←───────────────┘ llm_call
↓
finalize
2.2 记忆管理算法
记忆检索策略:
1. 向量检索:将用户查询 Embedding,在 PostgreSQL 中检索相似对话
2. 时间衰减:近期对话权重更高(可选)
3. 相关性过滤:仅返回相似度 > 阈值的记忆
4. 上下文窗口:限制记忆长度,避免超出 LLM 上下文限制
摘要生成策略:
触发条件:
- 对话轮数超过阈值(默认 10 轮)
- 记忆上下文长度超过限制
摘要生成:
1. 提取最近 N 轮对话
2. 调用 LLM 生成摘要
3. 保存摘要到 PostgreSQL
4. 清理旧对话记录
摘要格式:
"用户在 {时间} 讨论了 {主题},关键信息:{要点}"
3. 多模型路由算法
模型选择逻辑:
┌─────────────────────────────────────────────┐
│ 用户选择模型 │
└──────────────────┬──────────────────────────┘
│
┌──────────┼──────────┐
↓ ↓ ↓
┌──────┐ ┌──────┐ ┌──────┐
│ zhipu│ │deep │ │local │
└──┬───┘ └──┬───┘ └──┬───┘
│ │ │
↓ ↓ ↓
ChatZhipu ChatOpenAI ChatOpenAI
(官方SDK) (DeepSeek) (vLLM/Gemma)
│ │ │
└─────────┼─────────┘
↓
┌────────────────┐
│ 统一接口输出 │
│ (BaseChatModel)│
└────────────────┘
高可用降级策略:
1. 优先使用用户选择的模型
2. 如果模型不可用(API 错误、超时等):
- 尝试切换到备用模型
- 记录错误日志
- 返回降级提示给用户
3. 健康检查:定期检查各模型服务状态
4. SSE 流式响应算法
流式输出流程:
┌─────────────────────────────────────────────┐
│ LLM 生成 Token │
└──────────────────┬──────────────────────────┘
│
↓
┌─────────────────────────────────────────────┐
│ 事件格式化 │
│ data: {"content": "你", "type": "content"} │
│ data: {"content": "好", "type": "content"} │
│ data: {"content": "!", "type": "content"} │
│ data: {"done": true, "type": "end"} │
└──────────────────┬──────────────────────────┘
│
↓
┌─────────────────────────────────────────────┐
│ HTTP Response Stream │
│ Content-Type: text/event-stream │
│ Cache-Control: no-cache │
│ Connection: keep-alive │
└─────────────────────────────────────────────┘
实现要点:
- 使用
asyncio.Queue缓冲 Token - 生产者:LLM 异步生成 Token 放入队列
- 消费者:FastAPI
EventSourceResponse从队列读取并推送 - 超时保护:30 秒无输出自动断开
📚 RAG 系统完整架构
离线索引构建 vs 在线检索生成
RAG 系统分为两个独立但协同的阶段:
┌──────────────────────────────────────────────────────────────────┐
│ RAG 系统双阶段架构 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ 阶段一:离线索引构建 (rag_indexer) │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ 文档加载 → 文本切分 → Embedding → 向量存储 │ │
│ │ PDF/DOCX/TXT 递归/语义/父子块 llama.cpp Qdrant │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 阶段二:在线检索生成 (backend/app/rag) │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ 用户查询 → 查询改写 → 多路检索 → RRF 融合 → 重排序 │ │
│ │ MultiQuery Dense+Sparse Cross-Encoder │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ LLM 生成回答 │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
RAG 演进路线 (Roadmap)
Level 1: 基础向量搜索 (Basic Similarity Search)
核心算法: 近似最近邻搜索 (ANN, 常用 HNSW 算法)
算法原理:
用户问题 → Embedding 模型 → 向量表示
↓
计算与库中向量的余弦相似度
↓
取距离最近的 K 个块返回
优缺点:
✅ 速度极快(毫秒级)
❌ 只能捕捉"语义相似",专有名词匹配差
实现代码:
from app.rag.retriever import create_base_retriever
retriever = create_base_retriever(
collection_name="rag_documents",
embeddings=embeddings,
search_kwargs={"k": 20}
)
docs = retriever.invoke("什么是 RAG?")
Level 2: 混合检索与重排序 (Hybrid Search + Reranker)
1. 基础召回(混合检索)
算法原理:
稠密向量检索(语义相似)+ BM25 稀疏检索(关键词匹配)
↓
两路结果并行获取,等待融合
实现代码:
from app.rag.retriever import create_hybrid_retriever
retriever = create_hybrid_retriever(
collection_name="rag_documents",
embeddings=embeddings,
dense_k=10,
sparse_k=10,
score_threshold=0.3
)
2. 二次精排(Cross-Encoder)
算法原理:
不同于双塔模型(分别算向量再求距离)
交叉编码器将"问题 + 文档"拼接后整体输入 Transformer
由模型直接输出 0~1 的相关性得分,精度极高
实现代码:
from app.rag.reranker import LLaMaCPPReranker
reranker = LLaMaCPPReranker(
base_url="http://127.0.0.1:8083",
api_key="your-api-key",
top_n=5
)
sorted_docs = reranker.compress_documents(documents, query)
Level 3: RAG-Fusion (多路改写与倒数排名融合)
1. 多路查询改写
算法原理:
克服用户初始提问词不达意或视角受限的问题
通过 LLM 将单一问题改写为多个不同角度的查询
实现代码:
from app.rag.query_transform import MultiQueryGenerator
generator = MultiQueryGenerator(llm=llm, num_queries=3)
queries = await generator.agenerate("如何申请项目资金?")
# 返回:["如何申请项目资金?", "项目资金申请流程是什么?", "申请项目经费需要哪些步骤?"]
2. 倒数排名融合(RRF)
算法原理:
RRF (Reciprocal Rank Fusion) 是一种无需评分归一化的融合算法
公式:RRF_score(d) = Σ 1/(k + rank_q(d))
有效避免某一极端检索结果主导全局
实现代码:
from app.rag.fusion import reciprocal_rank_fusion
# 多个查询的检索结果
doc_lists = [result1, result2, result3]
fused_docs = reciprocal_rank_fusion(doc_lists, k=60)
Level 4: Agentic RAG / Self-RAG (智能体与自我反思)
核心原理:
基于 LangGraph 的 ReAct (Reasoning and Acting) 状态机路由
大模型并非每次都去死板地执行检索,而是先判断问题:
"这是闲聊?还是需要查知识库?"
工作流程:
┌──────────┐ ┌──────────────┐ ┌──────────┐ ┌────────┐
│ User │────>│ LangGraph │────>│ RAG_Tool │────>│ Qdrant │
│ │ │ Agent │ │ │ │ │
│ "公司报 │ │ 思考: 这是 │ │ ToolCall │ │ RAG- │
│ 销流程?"│ │ 内部规章问题 │ │ search_ │ │ Fusion │
│ │ │ 需要查资料 │ │ knowledge│ │ & 混合 │
│ │<────│ 资料充分, │<────│ 返回最相 │<────│ 检索 │
│ "根据知 │ │ 开始撰写回答 │ │ 关5条规定 │ │ Cross- │
│ 识库规定 │ │ │ │ │ │ Encoder│
│ ..." │ │ │ │ │ │ 重排 │
└────────── └────────────── └──────────┘ └────────┘
实现代码:
from app.rag.tools import search_knowledge_base
from app.graph.graph_builder import GraphBuilder
# 将 RAG 工具添加到工具列表
tools = AVAILABLE_TOOLS + [search_knowledge_base]
# 构建图
builder = GraphBuilder(llm, tools, tools_by_name)
graph = builder.build().compile(checkpointer=checkpointer)
Level 5: GraphRAG 集成 (基于图和关系的 RAG)
核心原理:
结合知识图谱的结构化关系和向量检索的语义相似度
解决跨文档复杂关系推理问题
实现代码:
from langchain_community.graphs import Neo4jGraph
from langchain_experimental.graph_transformers import LLMGraphTransformer
# 实体关系抽取
transformer = LLMGraphTransformer(llm=local_llm)
graph_documents = transformer.convert_to_graph_documents(documents)
# 存储到图数据库
graph = Neo4jGraph(url="bolt://localhost:7687")
graph.add_graph_documents(graph_documents)
检索策略对比
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 基础向量检索 | 速度快,语义理解好 | 专有名词匹配差 | 通用问答 |
| 混合检索 | 语义 + 关键词匹配 | 需要配置稀疏向量 | 专业术语查询 |
| 多路改写 + RRF | 搜索面广,结果稳定 | 延迟略高 | 复杂问题 |
| 重排序 | 精度高 | 依赖额外模型 | 最终精排 |
| Agentic RAG | 智能决策,灵活 | 实现复杂 | 生产环境 |
| GraphRAG | 关系推理能力强 | 需要图数据库 | 知识密集型场景 |
切分策略对比
| 策略 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 递归字符 | 按分隔符递归切分 | 速度快,实现简单 | 可能截断语义 | 简单文档 |
| 语义切分 | 基于句子相似度阈值 | 语义连贯性好 | 需要 Embedding 模型 | 专业文档 |
| 父子块 | 大块存储+小块检索 | 检索精准+上下文完整 | 存储复杂度高 | 生产环境 |
<EFBFBD><EFBFBD> 快速开始
详细启动指南请查看 QUICKSTART.md
方式一:Docker Compose(推荐)
# 1. 配置环境变量
cp .env.docker .env
# 编辑 .env 文件,填入真实的 API Key
# 2. 启动所有服务
docker compose -f docker/docker-compose.yml up -d --build
# 3. 访问应用
# 前端: http://127.0.0.1:8501
# 后端 API: http://127.0.0.1:8083
方式二:本地开发模式
# 1. 安装依赖
pip install -r requirement.txt
# 2. 配置环境变量
cp .env.docker .env
# 编辑 .env,根据本地/远程环境调整配置
# 3. 启动后端
python backend/app/backend.py
# 4. 启动前端(新终端)
streamlit run frontend/src/frontend_main.py
# 或者使用启动脚本(推荐)
./scripts/start.sh both
📖 使用指南
基础对话
直接在聊天框输入问题即可:
你好,请介绍一下自己
今天北京天气怎么样?
帮我总结一下 a.txt 的内容
工具调用示例
| 功能 | 示例提问 |
|---|---|
| 🌤️ 天气查询 | "上海今天天气如何?" |
| 📄 读取文本 | "读取 a.txt 的内容" |
| 📑 解析 PDF | "总结 b.pdf 的主要内容" |
| 📊 Excel 数据 | "显示 c.xlsx 的数据" |
| 🌐 网页抓取 | "抓取 https://example.com 的内容" |
| 🔍 长期记忆 | "记住我喜欢吃川菜" → "我有什么饮食偏好?" |
多模型切换
-
在左侧边栏选择模型:
- 智谱 GLM-4.7:在线服务,速度快
- DeepSeek Reasoner:深度推理模型
- 本地 Gemma-4:本地部署,隐私性好
-
可随时切换,甚至在同一会话中
-
点击 "🔄 新会话" 清空当前对话
🔧 开发指南
添加新工具
在 backend/app/graph/graph_tools.py 中添加新的 @tool 装饰函数:
@tool
def my_new_tool(param: str) -> str:
"""
工具描述(会显示给 LLM)
Args:
param: 参数说明
Returns:
返回值说明
"""
# 实现逻辑
return result
工具会自动注册到 AVAILABLE_TOOLS 列表中。
添加新模型
在 backend/app/agent/llm_factory.py 中添加模型创建方法:
@staticmethod
def create_new_model():
api_key = os.getenv("NEW_MODEL_API_KEY")
return ChatOpenAI(
base_url="https://api.new-model.com/v1",
api_key=SecretStr(api_key),
model="model-name",
temperature=0.1,
streaming=True,
)
然后在 CREATORS 字典中注册:
CREATORS = {
"local": create_local,
"deepseek": create_deepseek,
"zhipu": create_zhipu,
"new_model": create_new_model, # 新增
}
Docker 部署
项目包含完整的 Docker 配置:
- docker-compose.yml:服务编排(Backend + Frontend,连接远程数据库)
- Dockerfile.backend:后端镜像构建
- Dockerfile.frontend:前端镜像构建
- .gitea/workflows/deploy.yml:CI/CD 自动化部署
详见 QUICKSTART.md 的 Docker 部署章节。
⚙️ 环境配置
配置文件说明
项目采用两层环境配置文件体系:
| 文件 | 用途 | 是否提交 Git |
|---|---|---|
.env.docker |
Docker 部署模板 | ✅ 是 |
.env |
实际使用的配置 | ❌ 否(已忽略) |
使用方法:
- 本地开发:
cp .env.docker .env,修改为本地服务地址 - Docker 部署:
cp .env.docker .env,使用远程服务器地址
必需的环境变量
| 变量名 | 说明 | 本地开发示例 | Docker 部署示例 |
|---|---|---|---|
ZHIPUAI_API_KEY |
智谱AI API密钥 | your-api-key |
your-api-key |
DEEPSEEK_API_KEY |
DeepSeek API密钥 | your-api-key |
your-api-key |
LLAMACPP_API_KEY |
llama.cpp API 密钥 | token-abc123 |
token-abc123 |
VLLM_BASE_URL |
LLM 服务地址 | http://127.0.0.1:8081/v1 |
http://your-server:8081/v1 |
LLAMACPP_EMBEDDING_URL |
Embedding 服务地址 | http://127.0.0.1:8082/v1 |
http://your-server:8082/v1 |
DB_URI |
PostgreSQL 连接字符串 | postgresql://...@115.190.121.151:5432/langgraph_db |
同左 |
QDRANT_URL |
Qdrant 向量数据库地址 | http://115.190.121.151:6333 |
同左 |
LOG_LEVEL |
日志级别 | INFO |
WARNING |
ENABLE_GRAPH_TRACE |
是否启用图流转追踪 | true |
false |
MEMORY_SUMMARIZE_INTERVAL |
对话摘要生成间隔 | 10 |
10 |
注意事项
- ⚠️ 不要硬编码敏感信息:所有 API Key 必须通过环境变量配置
- ⚠️ 远程服务依赖:确保可以访问远程 PostgreSQL (115.190.121.151:5432) 和 Qdrant (115.190.121.151:6333)
- ⚠️ 修改后重启:修改
.env后,Docker 部署需要执行docker compose down && docker compose up -d --build
<EFBFBD>️ 实现指南与最佳实践
1. RAG 知识库构建指南
1.1 离线索引构建流程
# 步骤 1:准备文档
# 将文档放入指定目录(如 ./documents/)
# 支持格式:TXT, PDF, Markdown, DOCX
# 步骤 2:选择切分策略
# 根据文档类型选择:
# - 结构化文档(如手册、API 文档)→ Recursive Splitting
# - 非结构化文档(如文章、报告)→ Semantic Chunking
# - 长文档(如书籍、论文)→ Parent-Child Chunking
# 步骤 3:构建索引
cd rag_indexer
python cli.py build \
--input-dir ../documents \
--collection-name my_knowledge \
--splitter-type semantic \
--chunk-size 500
# 步骤 4:验证索引
python cli.py query \
--collection-name my_knowledge \
--query "如何配置系统?" \
--top-k 5
1.2 切分策略选择建议
| 文档类型 | 推荐策略 | chunk_size | 说明 |
|---|---|---|---|
| API 文档 | Recursive | 300-500 | 结构清晰,按章节切分 |
| 技术文章 | Semantic | 自适应 | 保持语义完整性 |
| 法律合同 | Parent-Child | 父:1000, 子:200 | 需要完整上下文 |
| 问答对 | Recursive | 200-300 | 短文本,精确匹配 |
| 产品手册 | Parent-Child | 父:1500, 子:300 | 长文档,跨章节检索 |
1.3 Embedding 模型选择
| 模型 | 维度 | 语言 | 速度 | 精度 | 适用场景 |
|---|---|---|---|---|---|
| bge-large-zh-v1.5 | 1024 | 中文 | 中 | 高 | 中文文档检索 |
| bge-m3 | 1024 | 多语言 | 中 | 高 | 多语言混合文档 |
| text-embedding-3-small | 1536 | 英文 | 快 | 中 | 英文文档检索 |
| gte-large | 1024 | 中文 | 慢 | 很高 | 高精度要求场景 |
2. 性能优化指南
2.1 检索性能优化
# 优化 1:调整检索参数
search_kwargs = {
"k": 20, # 召回数量(增加召回,提高精度)
"score_threshold": 0.3, # 相似度阈值(过滤低质量结果)
"fetch_k": 50, # 初始召回数量(用于 MMR 去重)
"lambda_mult": 0.7, # MMR 多样性参数(0=去重,1=不去重)
}
# 优化 2:使用缓存
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_retrieve(query: str) -> List[Document]:
"""缓存常见查询结果"""
return retriever.invoke(query)
# 优化 3:批量 Embedding
# 构建索引时,使用批量处理提高效率
batch_size = 32
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
embeddings = embedder.embed_documents([doc.page_content for doc in batch])
2.2 LLM 调用优化
# 优化 1:使用流式响应
# 减少首字延迟,提升用户体验
response = await llm.astream(messages)
async for chunk in response:
yield chunk.content
# 优化 2:控制上下文长度
MAX_CONTEXT_LENGTH = 4000
def truncate_context(context: str, max_length: int = MAX_CONTEXT_LENGTH) -> str:
"""截断上下文,保留开头和结尾"""
if len(context) <= max_length:
return context
half = max_length // 2
return context[:half] + "\n...\n" + context[-half:]
# 优化 3:Prompt 优化
# 使用结构化 Prompt,提高 LLM 理解能力
prompt_template = """
你是一个智能助手。请根据以下上下文回答用户问题。
上下文:
{context}
问题:{question}
回答要求:
1. 仅基于上下文回答,不要编造信息
2. 如果上下文中没有答案,明确告知用户
3. 引用上下文中的具体信息时,注明来源
"""
2.3 数据库性能优化
-- PostgreSQL 优化
-- 1. 为对话历史表添加索引
CREATE INDEX idx_conversations_user_id ON conversations(user_id);
CREATE INDEX idx_conversations_created_at ON conversations(created_at);
-- 2. 为记忆摘要表添加索引
CREATE INDEX idx_summaries_user_id ON summaries(user_id);
-- 3. 定期清理旧数据(保留最近 90 天)
DELETE FROM conversations
WHERE created_at < NOW() - INTERVAL '90 days';
-- Qdrant 优化
-- 1. 使用 HNSW 索引(默认已启用)
-- 2. 调整 ef_construct 和 m 参数平衡速度和精度
-- 3. 定期优化集合
curl -X POST http://localhost:6333/collections/my_docs/actions \
-H "Content-Type: application/json" \
-d '{"actions": [{"optimize": {}}]}'
3. 扩展开发指南
3.1 添加自定义工具
# 在 backend/app/graph/graph_tools.py 中添加
from langchain_core.tools import tool
from typing import Optional
@tool
def calculate_expression(
expression: str,
precision: int = 2
) -> str:
"""
计算数学表达式的值。
支持基本运算:加减乘除、幂运算、括号。
Args:
expression: 数学表达式,如 "2 + 3 * 4"
precision: 结果精度(小数位数),默认 2
Returns:
计算结果
"""
try:
# 安全计算(避免 eval 的安全风险)
import ast
import operator
# 定义允许的操作符
ops = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
}
def eval_node(node):
if isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.BinOp):
left = eval_node(node.left)
right = eval_node(node.right)
return ops[type(node.op)](left, right)
else:
raise ValueError(f"不支持的操作: {type(node)}")
tree = ast.parse(expression, mode='eval')
result = eval_node(tree.body)
return f"{result:.{precision}f}"
except Exception as e:
return f"计算错误: {str(e)}"
# 工具会自动注册,无需手动添加到 AVAILABLE_TOOLS
3.2 添加自定义 LLM 提供商
# 在 backend/app/agent/llm_factory.py 中添加
from langchain_openai import ChatOpenAI
from langchain_core.language_models import BaseChatModel
class LLMFactory:
@staticmethod
def create_anthropic_model() -> BaseChatModel:
"""创建 Anthropic Claude 模型"""
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY 环境变量未设置")
return ChatAnthropic(
model="claude-3-sonnet-20240229",
temperature=0.1,
max_tokens=4096,
api_key=api_key,
)
# 注册到 CREATORS
CREATORS = {
"local": create_local,
"deepseek": create_deepseek,
"zhipu": create_zhipu,
"anthropic": create_anthropic_model, # 新增
}
3.3 添加自定义 RAG 检索器
# 在 backend/app/rag/retriever.py 中添加
from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document
class HybridRetriever(BaseRetriever):
"""混合检索器:结合多种检索策略"""
dense_retriever: BaseRetriever
sparse_retriever: BaseRetriever
reranker: Optional[BaseReranker] = None
top_k: int = 5
def _get_relevant_documents(self, query: str) -> List[Document]:
# 并行检索
dense_docs = self.dense_retriever.invoke(query)
sparse_docs = self.sparse_retriever.invoke(query)
# RRF 融合
fused_docs = reciprocal_rank_fusion([dense_docs, sparse_docs])
# 重排序
if self.reranker:
fused_docs = self.reranker.compress_documents(fused_docs[:20], query)
return fused_docs[:self.top_k]
4. 部署最佳实践
4.1 生产环境配置
# docker-compose.prod.yml
version: '3.8'
services:
backend:
build:
context: .
dockerfile: docker/Dockerfile.backend
environment:
- LOG_LEVEL=WARNING # 生产环境降低日志级别
- ENABLE_GRAPH_TRACE=false # 关闭图追踪(提升性能)
- MEMORY_SUMMARIZE_INTERVAL=5 # 更频繁的摘要生成
deploy:
resources:
limits:
cpus: '2'
memory: 4G
reservations:
cpus: '1'
memory: 2G
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8083/health"]
interval: 30s
timeout: 10s
retries: 3
frontend:
build:
context: .
dockerfile: docker/Dockerfile.frontend
environment:
- STREAMLIT_SERVER_PORT=8501
- STREAMLIT_SERVER_HEADLESS=true
deploy:
resources:
limits:
cpus: '1'
memory: 2G
restart: unless-stopped
4.2 监控与告警
# 添加健康检查端点
from fastapi import FastAPI
from datetime import datetime
app = FastAPI()
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"services": {
"postgresql": await check_postgresql(),
"qdrant": await check_qdrant(),
"llm": await check_llm_service(),
}
}
async def check_postgresql() -> bool:
try:
# 尝试连接数据库
async with asyncpg.connect(DB_URI) as conn:
await conn.fetchval("SELECT 1")
return True
except Exception:
return False
async def check_qdrant() -> bool:
try:
client = QdrantClient(url=QDRANT_URL)
client.get_collections()
return True
except Exception:
return False
4.3 安全最佳实践
# 1. 使用环境变量管理密钥
# 永远不要硬编码 API Key
import os
from typing import Optional
def get_api_key(env_var: str, default: Optional[str] = None) -> str:
api_key = os.getenv(env_var, default)
if not api_key:
raise ValueError(f"{env_var} 环境变量未设置")
return api_key
# 2. 输入验证
from pydantic import BaseModel, validator
class ChatRequest(BaseModel):
message: str
user_id: str
model: str = "zhipu"
@validator('message')
def validate_message(cls, v):
if len(v) > 4000:
raise ValueError("消息长度不能超过 4000 字符")
return v.strip()
@validator('model')
def validate_model(cls, v):
allowed_models = {"zhipu", "deepseek", "local"}
if v not in allowed_models:
raise ValueError(f"不支持的模型: {v}")
return v
# 3. 速率限制
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/chat")
@limiter.limit("10/minute") # 每分钟最多 10 次请求
async def chat(request: Request, chat_request: ChatRequest):
...
5. 调试与测试
5.1 启用调试模式
# 启用详细日志
export LOG_LEVEL=DEBUG
# 启用 LangGraph 图追踪
export ENABLE_GRAPH_TRACE=true
# 查看图执行流程
# 启动后访问:http://localhost:8083/graph/trace
5.2 单元测试示例
# tests/test_rag.py
import pytest
from app.rag.fusion import reciprocal_rank_fusion
from langchain_core.documents import Document
def test_rrf_fusion():
# 准备测试数据
docs1 = [
Document(page_content="文档 A"),
Document(page_content="文档 B"),
]
docs2 = [
Document(page_content="文档 B"),
Document(page_content="文档 C"),
]
# 执行 RRF 融合
result = reciprocal_rank_fusion([docs1, docs2])
# 验证结果
assert len(result) == 3
assert result[0].page_content == "文档 B" # B 在两个列表中都出现,应该排第一
@pytest.mark.asyncio
async def test_retriever():
from app.rag.retriever import create_base_retriever
retriever = create_base_retriever(
collection_name="test_collection",
embeddings=mock_embeddings,
)
docs = await retriever.ainvoke("测试查询")
assert len(docs) > 0
<EFBFBD><EFBFBD> 故障排查
常见问题
Q: 无法连接远程数据库?
# 测试 PostgreSQL
psql -h 115.190.121.151 -U postgres -d langgraph_db -c "SELECT version();"
# 测试 Qdrant
curl http://115.190.121.151:6333/collections
Q: 后端启动失败?
- 确认端口 8083 未被占用
- 检查
.env中的 API Key 是否正确 - 查看启动日志确认模型初始化成功
Q: 模型切换后无响应?
- 检查所选模型的配置是否正确
- 确认 vLLM 容器是否运行(如使用本地模型)
- 尝试切换到另一个模型
更多问题排查请查看 QUICKSTART.md
📝 许可证
本项目采用 MIT 许可证。详见 LICENSE 文件。
🤝 贡献
欢迎提交 Issue 和 Pull Request!