# AI Agent - 智能助手系统
一个基于 LangGraph + FastAPI 的智能对话助手,支持多模型切换、RAG 知识库检索、文件处理和网页抓取等功能。
---
## 📑 目录导航
- [核心功能](#-核心功能) - 面向用户的功能和技术特性
- [技术架构](#️-技术架构) - 技术栈、系统架构图、工作流流程图
- [核心算法与实现原理](#-核心算法与实现原理) - RAG 检索、LangGraph 工作流、多模型路由、SSE 流式响应
- [RAG 系统完整架构](#-rag-系统完整架构) - 离线索引构建、在线检索生成、演进路线
- [快速开始](#-快速开始) - Docker 和本地部署指南
- [使用指南](#-使用指南) - 基础对话、工具调用、多模型切换
- [开发指南](#-开发指南) - 添加工具、添加模型、Docker 部署
- [实现指南与最佳实践](#️-实现指南与最佳实践) - RAG 构建、性能优化、扩展开发、部署实践
- [环境配置](#️-环境配置) - 配置文件、环境变量
- [故障排查](#-故障排查) - 常见问题
---
## 🎯 核心功能
### 面向用户的功能
- 💬 **智能对话**:支持多轮对话,自动记忆上下文
- 🌤️ **天气查询**:实时获取各地天气信息
- 📄 **文档处理**:读取 TXT、PDF、Excel 等格式文件
- 🌐 **网页抓取**:提取网页正文内容
- 🔍 **知识库检索(RAG)**:基于向量数据库的智能问答
- 🔄 **多模型切换**:前端可选择不同大语言模型
- 📇 **通讯录管理**:子图模块,联系人 CRUD、邮件处理
- 📖 **智能词典**:子图模块,翻译、生词本、专业术语提取
- 📰 **资讯分析**:子图模块,资讯获取、内容分析、格式化展示
### 技术特性
- ✅ **持久化记忆**:PostgreSQL 存储对话历史,重启不丢失
- ✅ **高可用架构**:模型服务自动降级,确保服务稳定
- ✅ **前后端分离**:FastAPI 后端 + Streamlit 前端
- ✅ **Docker 部署**:一键启动所有服务
- ✅ **远程服务架构**:PostgreSQL 和 Qdrant 部署在远程服务器
- ✅ **流式响应**:SSE 流式输出,提升用户体验
- ✅ **模块化设计**:清晰的代码分层,易于扩展和维护
- ✅ **模型服务层**:统一的 Embedding、Rerank、Chat 服务接口,支持自动降级
- ✅ **子图系统**:模块化的子图架构,共享公共工具(意图理解、人工审核、格式化输出)
- ✅ **React 模式** ⭐:Reasoning → Acting → Observing 循环,LLM 先思考再行动,支持多次工具调用
---
## 🏗️ 技术架构
### 技术栈总览
| 层级 | 组件 | 技术选型 | 版本 | 说明 |
|------|------|---------|------|------|
| **LLM 服务** | 云端模型 | 智谱 AI (glm-4.7-flash) | v4.7 | 快速响应,适合日常对话 |
| | | DeepSeek (deepseek-reasoner) | v3 | 深度推理,适合复杂问题 |
| | 本地模型 | Gemma-4-E4B-it | v4 | 本地部署,保护隐私 |
| **模型服务层** | Chat 服务 | chat_services.py | - | 统一的生成式大模型接口 |
| | Embedding 服务 | embedding_services.py | - | 统一的嵌入模型接口 |
| | Rerank 服务 | rerank_services.py | - | 统一的重排序接口 |
| | 服务基类 | base.py | - | BaseServiceProvider + FallbackServiceChain + SingletonServiceManager |
| **Embedding** | 向量嵌入 | llama.cpp server | latest | 本地 embedding 服务,支持多种模型 |
| **Agent 框架** | 工作流编排 | LangGraph + LangChain | latest | 状态机驱动的智能体工作流 |
| **子图系统** | 模块化子图 | agent_subgraphs/ | - | 通讯录、词典、资讯分析等子图 |
| | 公共工具 | common/ | - | 状态基类、意图理解、格式化输出、人工审核 |
| **向量数据库** | 向量检索 | Qdrant | v1.12+ | 高性能向量相似度检索(远程服务器) |
| **后端框架** | API 服务 | FastAPI + Uvicorn | v0.115+ | RESTful API + WebSocket + SSE 流式输出 |
| **前端框架** | Web 界面 | Streamlit | v1.40+ | 交互式对话界面,组件化设计 |
| **关系数据库** | 持久化存储 | PostgreSQL | v16 | 对话记忆持久化(远程服务器) |
| **容器化** | 服务编排 | Docker + Docker Compose | v24+ | 一键部署所有服务 |
| **CI/CD** | 自动化部署 | Gitea Workflows | latest | 代码推送自动构建部署 |
### 系统架构流程图
```mermaid
graph TB
User[用户浏览器] -->|HTTP/SSE| Frontend[Streamlit 前端 :8501]
Frontend -->|REST API| Backend[FastAPI 后端 :8079]
Backend --> AgentService[AIAgentService]
Backend --> SubgraphAPI[子图 API 端点
/subgraph/contact
/subgraph/dictionary
/subgraph/news]
AgentService -->|模型路由| ChatServices[模型服务层 chat_services]
ChatServices -->|自动降级| FallbackChain[FallbackServiceChain]
FallbackChain -->|创建| Zhipu[智谱 GLM-4.7]
FallbackChain -->|创建| DeepSeek[DeepSeek Reasoner]
FallbackChain -->|创建| LocalGemma[本地 Gemma-4]
AgentService -->|初始化| LangGraph[LangGraph 工作流引擎]
LangGraph -->|节点1| RetrieveMemory[记忆检索]
LangGraph -->|节点2| MemoryTrigger[记忆触发]
LangGraph -->|节点3| LLMCall[LLM 调用 (React 模式)]
LangGraph -->|节点4| ToolNode[工具执行]
LangGraph -->|节点5| Summarize[记忆摘要]
LangGraph -->|节点6| Finalize[最终处理]
ToolNode -->|调用| Tools[工具集合]
Tools -->|天气| WeatherTool[天气查询]
Tools -->|文件| FileTool[文件读取]
Tools -->|网页| WebTool[网页抓取]
Tools -->|RAG| RAGTool[知识库检索]
RAGTool -->|检索| Qdrant[(Qdrant 向量库)]
RAGTool -->|重排序| RerankService[rerank_services]
RAGTool -->|嵌入| EmbeddingService[embedding_services]
RetrieveMemory -->|存储| PostgreSQL[(PostgreSQL)]
Summarize -->|存储| PostgreSQL
SubgraphAPI -->|路由| Subgraphs[子图系统 agent_subgraphs]
Subgraphs --> Contact[通讯录子图]
Subgraphs --> Dictionary[词典子图]
Subgraphs --> NewsAnalysis[资讯分析子图]
Subgraphs --> Common[公共工具 common/]
Common --> Intent[意图理解 (React)]
Common --> HumanReview[人工审核]
Common --> Formatter[格式化输出]
Common --> StateBase[状态基类]
Contact -->|数据库| ContactDB[(PostgreSQL 联系人)]
Dictionary -->|数据库| DictionaryDB[(PostgreSQL 生词本)]
NewsAnalysis -->|检索| NewsQdrant[(Qdrant 向量检索)]
style User fill:#e1f5ff
style Frontend fill:#fff4e1
style Backend fill:#e8f5e9
style ChatServices fill:#c8e6c9
style LangGraph fill:#f3e5f5
style Subgraphs fill:#fff3e0
style PostgreSQL fill:#ffebee
style Qdrant fill:#ffebee
```
---
### 主图与子图的关联架构
```mermaid
graph TB
subgraph "主图 MainGraph"
StartMain[START]
IntentMain[意图分类节点
判断用户意图]
ChatNode[普通对话节点
调用主 LLM]
SubgraphCaller[子图调用器
调用对应子图]
FinalMain[最终响应]
EndMain[END]
StartMain -->|用户输入| IntentMain
IntentMain -->|chat| ChatNode
IntentMain -->|contact| SubgraphCaller
IntentMain -->|dictionary| SubgraphCaller
IntentMain -->|research| SubgraphCaller
IntentMain -->|news| SubgraphCaller
ChatNode --> FinalMain
SubgraphCaller --> FinalMain
FinalMain --> EndMain
end
subgraph "通讯录子图 ContactSubgraph"
StartContact[START]
IntentContact[parse_intent
解析意图]
ListContacts[list_contacts
列出联系人]
AddContact[add_contact
添加联系人]
ListEmails[list_emails
列出邮件]
GenEmail[generate_email_draft
生成邮件草稿]
HumanReview[human_review
人工审核]
SendEmail[send_email
发送邮件]
SniffContact[sniff_contacts
智能嗅探]
FormatContact[format_result
格式化输出]
EndContact[END]
StartContact --> IntentContact
IntentContact -->|list| ListContacts
IntentContact -->|add| AddContact
IntentContact -->|list_emails| ListEmails
IntentContact -->|generate_email| GenEmail
IntentContact -->|sniff| SniffContact
ListContacts --> FormatContact
AddContact --> FormatContact
ListEmails --> FormatContact
SniffContact --> FormatContact
GenEmail --> HumanReview
HumanReview -->|approve| SendEmail
HumanReview -->|reject| FormatContact
SendEmail --> FormatContact
FormatContact --> EndContact
end
subgraph "词典子图 DictionarySubgraph"
StartDict[START]
IntentDict[parse_intent
解析意图]
QueryWord[query_word
查询单词]
Translate[translate_text
翻译文本]
ExtractTerms[extract_terms
提取专业术语]
DailyWord[get_daily_word
每日一词]
LookupWord[lookup_word_book
查询生词本]
AddToWord[add_to_word_book
添加到生词本]
FormatDict[format_result
格式化输出]
EndDict[END]
StartDict --> IntentDict
IntentDict -->|query| QueryWord
IntentDict -->|translate| Translate
IntentDict -->|extract| ExtractTerms
IntentDict -->|daily| DailyWord
IntentDict -->|lookup| LookupWord
IntentDict -->|add| AddToWord
QueryWord --> FormatDict
Translate --> FormatDict
ExtractTerms --> FormatDict
DailyWord --> FormatDict
LookupWord --> FormatDict
AddToWord --> FormatDict
FormatDict --> EndDict
end
subgraph "资讯分析子图 NewsSubgraph"
StartNews[START]
IntentNews[parse_intent
解析意图]
QueryNews[query_news
查询资讯]
AnalyzeUrl[analyze_url
分析链接]
ExtractKeywords[extract_keywords
提取关键词]
GenReport[generate_report
生成报告]
FormatNews[format_result
格式化输出]
EndNews[END]
StartNews --> IntentNews
IntentNews -->|query| QueryNews
IntentNews -->|analyze| AnalyzeUrl
IntentNews -->|keywords| ExtractKeywords
IntentNews -->|report| GenReport
QueryNews --> FormatNews
AnalyzeUrl --> FormatNews
ExtractKeywords --> FormatNews
GenReport --> FormatNews
FormatNews --> EndNews
end
SubgraphCaller -.->|调用
状态传递| StartContact
SubgraphCaller -.->|调用
状态传递| StartDict
SubgraphCaller -.->|调用
状态传递| StartNews
style IntentMain fill:#ffe0b2
style ChatNode fill:#e0e0e0
style SubgraphCaller fill:#bbdefb
style FinalMain fill:#fff59d
style IntentContact fill:#c8e6c9
style IntentDict fill:#e1bee7
style IntentNews fill:#ffcdd2
```
### LangGraph 工作流详细流程
```mermaid
stateDiagram-v2
[*] --> RetrieveMemory: 用户输入消息
RetrieveMemory --> MemoryTrigger: 检索历史记忆
MemoryTrigger --> LLMCall: 检查记忆触发条件
%% ⭐ React (Reasoning → Acting → Observing) 循环开始
LLMCall --> CheckTools: LLM 输出
CheckTools --> ToolNode: 需要调用工具
CheckTools --> CheckSummary: 直接回复
ToolNode --> ExecuteTool: 执行工具
ExecuteTool --> LLMCall: ⭐ 工具结果返回(Observing → Reasoning 循环)
%% ⭐ React 循环结束
CheckSummary --> Summarize: 达到摘要阈值
CheckSummary --> Finalize: 未达阈值,直接结束
Summarize --> PostgreSQL: 保存摘要
PostgreSQL --> Finalize: 继续对话
Finalize --> FormatResponse: 格式化响应
FormatResponse --> [*]: SSE 流式输出
note right of LLMCall
⭐ Reasoning
LLM 思考:
- 需要调用工具吗?
- 调用什么工具?
end note
note right of ToolNode
⭐ Acting
执行工具:
- 天气查询
- 文件读取
- RAG 检索
- 等等
end note
note right of ExecuteTool
⭐ Observing
观察工具结果,
返回给 LLM 再次思考
end note
note right of LLMCall
⭐ React 循环
Reasoning → Acting → Observing → Reasoning...
可以多次循环
end note
```
### 数据流向图
```
用户请求
│
├─→ 前端 Streamlit
│ │
│ ├─→ 发送 HTTP POST /api/chat
│ │
│ └─→ 接收 SSE 流式响应
│
└─→ 后端 FastAPI
│
├─→ AIAgentService.achat()
│ │
│ ├─→ 初始化 LangGraph 状态
│ │ ├─→ messages: 对话历史
│ │ ├─→ user_id: 用户标识
│ │ └─→ metadata: 元数据
│ │
│ ├─→ 执行工作流
│ │ ├─→ retrieve_memory: 从 PostgreSQL 检索历史
│ │ ├─→ memory_trigger: 判断是否触发记忆
│ │ ├─→ llm_call: 调用 LLM 生成响应
│ │ ├─→ tool_node: 执行工具(如有需要)
│ │ ├─→ summarize: 生成对话摘要
│ │ └─→ finalize: 格式化最终响应
│ │
│ └─→ 返回 SSE 流
│
└─→ 持久化存储
├─→ PostgreSQL: 对话历史和摘要
└─→ Qdrant: RAG 向量索引
```
### 项目结构
```
Agent1/
├── backend/
│ ├── app/
│ │ ├── __init__.py
│ │ ├── config.py # 配置管理
│ │ ├── logger.py # 日志工具
│ │ ├── backend.py # FastAPI 后端应用(含子图 API)
│ │ ├── agent/
│ │ │ ├── __init__.py
│ │ │ ├── service.py # Agent 服务核心(使用 chat_services)
│ │ │ ├── history.py # 历史查询服务
│ │ │ ├── prompts.py # 提示模板
│ │ │ └── rag_initializer.py # RAG 工具初始化
│ │ ├── model_services/ # 模型服务层(新增)
│ │ │ ├── __init__.py
│ │ │ ├── base.py # 基类:BaseServiceProvider, FallbackServiceChain, SingletonServiceManager
│ │ │ ├── chat_services.py # 生成式大模型服务(替代 llm_factory)
│ │ │ ├── embedding_services.py # 嵌入模型服务
│ │ │ └── rerank_services.py # 重排序服务
│ │ ├── graph/
│ │ │ ├── __init__.py
│ │ │ ├── graph_builder.py # LangGraph 图构建器
│ │ │ ├── graph_tools.py # 工具定义
│ │ │ ├── state.py # 状态定义
│ │ │ ├── rag_nodes.py # RAG 集成节点
│ │ │ └── retrieve_memory.py # 记忆检索
│ │ ├── nodes/
│ │ │ ├── __init__.py
│ │ │ ├── router.py # 路由决策
│ │ │ ├── llm_call.py # LLM 调用节点(React 模式)
│ │ │ ├── tool_call.py # 工具执行节点
│ │ │ ├── retrieve_memory.py # 记忆检索节点
│ │ │ ├── summarize.py # 记忆存储节点
│ │ │ ├── finalize.py # 最终处理节点
│ │ │ └── memory_trigger.py # 记忆触发节点
│ │ ├── memory/
│ │ │ ├── __init__.py
│ │ │ └── mem0_client.py # Mem0 客户端封装
│ │ ├── rag/
│ │ │ ├── __init__.py
│ │ │ ├── retriever.py # 检索器
│ │ │ ├── rerank.py # 重排序业务逻辑(使用 rerank_services)
│ │ │ ├── query_transform.py # 查询转换
│ │ │ ├── pipeline.py # RAG 流水线
│ │ │ ├── fusion.py # RAG-Fusion
│ │ │ └── tools.py # RAG 工具
│ │ ├── agent_subgraphs/ # 子图模块(新增)
│ │ │ ├── __init__.py
│ │ │ ├── README.md # 子图文档
│ │ │ ├── common/ # 公共工具
│ │ │ │ ├── state_base.py # 状态基类
│ │ │ │ ├── intent.py # 意图理解(React 模式)
│ │ │ │ ├── formatter.py # 格式化输出工具
│ │ │ │ └── human_review.py # 人工审核节点
│ │ │ ├── contact/ # 通讯录子图
│ │ │ │ ├── state.py # 状态定义
│ │ │ │ ├── nodes.py # 节点实现
│ │ │ │ ├── graph.py # 图构建
│ │ │ │ ├── api_client.py # API 客户端
│ │ │ │ └── __init__.py
│ │ │ ├── dictionary/ # 词典子图
│ │ │ │ ├── state.py # 状态定义
│ │ │ │ ├── nodes.py # 节点实现
│ │ │ │ ├── graph.py # 图构建
│ │ │ │ ├── api_client.py # API 客户端
│ │ │ │ └── __init__.py
│ │ │ ├── news_analysis/ # 资讯分析子图
│ │ │ │ ├── state.py # 状态定义
│ │ │ │ ├── nodes.py # 节点实现
│ │ │ │ ├── graph.py # 图构建
│ │ │ │ ├── api_client.py # API 客户端
│ │ │ │ └── __init__.py
│ │ │ └── research/ # 研究分析子图(规划中)
│ │ └── utils/
│ │ └── __init__.py
│ └── rag_core/
│ ├── __init__.py
│ ├── client.py # RAG 核心客户端
│ ├── embedders.py # 嵌入模型
│ ├── vector_store.py # 向量存储
│ ├── retriever_factory.py # 检索器工厂
│ └── store/
│ ├── __init__.py
│ ├── factory.py # 存储工厂
│ └── postgres.py # PostgreSQL 存储
├── frontend/
│ ├── run.py # 前端启动脚本
│ ├── requirements.txt
│ └── src/
│ ├── __init__.py
│ ├── frontend_main.py # Streamlit 主入口
│ ├── api_client.py # API 客户端(含子图调用)
│ ├── config.py # 前端配置
│ ├── state.py # 状态管理
│ ├── logger.py # 日志
│ ├── utils.py # 工具函数
│ └── components/
│ ├── __init__.py
│ ├── sidebar.py # 侧边栏组件
│ ├── chat_area.py # 聊天区域组件
│ └── info_panel.py # 信息面板组件
├── rag_indexer/
│ ├── __init__.py
│ ├── cli.py # 命令行入口
│ ├── index_builder.py # 索引构建器
│ ├── loaders.py # 文档加载器
│ ├── splitters.py # 文本切分器
│ └── test/ # 测试脚本
├── docker/
│ ├── docker-compose.yml # Docker 服务编排
│ ├── backend/
│ │ └── Dockerfile # 后端镜像构建
│ ├── frontend/
│ │ └── Dockerfile # 前端镜像构建
│ └── models/ # spaCy 模型文件
├── scripts/
│ └── start.sh # 启动脚本
├── test/ # 测试目录
│ ├── test_backend.py
│ ├── test_rag.py
│ ├── test_qdrant.py
│ └── test_rag_indexer_result.py
├── .gitea/
│ └── workflows/
│ └── deploy.yml # CI/CD 自动化部署
├── requirement.txt # Python 依赖
├── .env.docker # Docker 环境变量模板
├── .gitignore
├── LICENSE
├── QUICKSTART.md # 快速开始指南
└── user_docs/ # 用户文档目录
```
---
## 🧠 核心算法与实现原理
### 1. RAG 检索算法
#### 1.1 文本切分策略
项目支持三种文本切分策略,适应不同场景需求:
**递归字符切分(Recursive Character Splitting)**
```
算法思路:
按优先级分隔符逐级切分:["\n\n", "\n", "。", "!", "?", " ", ""]
↓
确保每个块不超过 chunk_size(默认 500 字符)
↓
保留 chunk_overlap(默认 50 字符)避免上下文丢失
优势:简单高效,适合结构化文档
实现:langchain_text_splitters.RecursiveCharacterTextSplitter
```
**语义切分(Semantic Chunking)**
```
算法思路:
1. 将文档按句子边界切分
2. 使用 Embedding 模型计算相邻句子的向量相似度
3. 计算相似度变化率(breakpoint threshold)
4. 在相似度骤降处切分(语义主题变化点)
阈值策略:
- percentile(百分位数):默认,取相似度分布的 95 百分位
- standard_deviation(标准差):低于均值 1.5 标准差处切分
- interquartile(四分位距):使用 IQR 方法检测异常点
优势:保持语义完整性,切分更自然
实现:langchain_experimental.text_splitter.SemanticChunker
```
**父子块切分(Parent-Child Chunking)**
```
算法思路:
父块(大块):1000 字符,保留完整上下文
↓
子块(小块):语义切分,用于向量检索
↓
建立映射关系:child_id → parent_id
↓
检索时:用子块检索,返回时扩展为父块上下文
检索流程:
用户查询 → 向量检索匹配子块 → 通过映射获取父块 → 返回完整上下文
优势:检索精度高,上下文完整
实现:自定义 ParentChildSplitter 类
```
#### 1.2 混合检索(Dense + Sparse)
```
检索架构:
┌─────────────────────────────────────────────┐
│ 用户查询 │
└──────────────────┬──────────────────────────┘
│
┌──────────┴──────────┐
↓ ↓
┌───────────────┐ ┌───────────────┐
│ 稠密向量检索 │ │ 稀疏 BM25 检索 │
│ (语义相似) │ │ (关键词匹配) │
│ │ │ │
│ Embedding │ │ Token 化 │
│ → 向量相似度 │ │ → 词频统计 │
└───────┬───────┘ └───────┬───────┘
│ │
└──────────┬─────────┘
↓
┌────────────────┐
│ 结果融合 │
│ (RRF 算法) │
└────────┬───────┘
↓
┌────────────────┐
│ Cross-Encoder │
│ 重排序 │
└────────┬───────┘
↓
┌────────────────┐
│ Top-K 结果返回 │
└────────────────┘
```
**稠密向量检索(Dense Retrieval)**
- 使用 Embedding 模型将查询和文档映射到同一向量空间
- 计算余弦相似度,返回语义最相似的文档
- 适合:语义理解、同义词匹配、概念检索
**稀疏向量检索(Sparse Retrieval - BM25)**
- 基于词频(TF)和逆文档频率(IDF)计算相关性
- 适合:专有名词、精确匹配、术语检索
#### 1.3 RRF 融合算法(Reciprocal Rank Fusion)
```python
# 核心算法实现
def reciprocal_rank_fusion(doc_lists: List[List[Document]], k: int = 60) -> List[Document]:
"""
RRF 公式:RRF(d) = Σ (1 / (k + rank(d)))
参数说明:
- k: 平滑常数,通常设为 60
- k 值越大,排名影响越小,结果越平滑
- k 值越小,高排名文档优势越大
算法步骤:
1. 遍历每个检索结果列表
2. 对每个文档,根据其排名计算 RRF 得分
3. 累加同一文档在不同列表中的得分
4. 按总得分降序排序
示例:
文档 A 在列表 1 中排名第 1,在列表 2 中排名第 3
RRF(A) = 1/(60+1) + 1/(60+3) = 0.0164 + 0.0159 = 0.0323
文档 B 在列表 1 中排名第 2,在列表 2 中排名第 2
RRF(B) = 1/(60+2) + 1/(60+2) = 0.0161 + 0.0161 = 0.0322
结果:A 排名高于 B
"""
doc_to_score: Dict[str, float] = {}
doc_map: Dict[str, Document] = {}
for docs in doc_lists:
for rank, doc in enumerate(docs, start=1):
doc_id = f"{doc.page_content[:200]}_{doc.metadata.get('source', '')}"
if doc_id not in doc_map:
doc_map[doc_id] = doc
score = doc_to_score.get(doc_id, 0.0) + 1.0 / (k + rank)
doc_to_score[doc_id] = score
sorted_ids = sorted(doc_to_score.keys(), key=lambda x: doc_to_score[x], reverse=True)
return [doc_map[doc_id] for doc_id in sorted_ids]
```
**RRF 算法优势:**
- 无需归一化:不同检索器的分数范围可能不同,RRF 直接使用排名
- 参数简单:仅需调整 k 值,默认 60 在大多数场景表现良好
- 鲁棒性强:对异常值和噪声不敏感
#### 1.4 Cross-Encoder 重排序
```
重排序流程:
┌──────────────────────────────────────────────┐
│ 输入:RRF 融合后的 Top-20 文档 │
└──────────────────┬───────────────────────────┘
│
↓
┌──────────────────────────────────────────────┐
│ Cross-Encoder 模型(bge-reranker-v2-m3) │
│ │
│ 工作原理: │
│ - 将 Query 和 Document 拼接输入模型 │
│ - "[CLS] Query [SEP] Document [SEP]" │
│ - 通过 Transformer 计算交互注意力 │
│ - 输出相关性得分(0-1) │
│ │
│ 与 Bi-Encoder 的区别: │
│ - Bi-Encoder:分别编码,计算余弦相似度(快) │
│ - Cross-Encoder:联合编码,计算交互得分(准) │
└──────────────────┬───────────────────────────┘
│
↓
┌──────────────────────────────────────────────┐
│ 按相关性得分排序,返回 Top-5 │
└──────────────────────────────────────────────┘
```
**实现细节:**
- 使用远程 llama.cpp 服务部署重排序模型
- 兼容 OpenAI Rerank API 格式
- 超时保护:60 秒超时,失败时降级为原始排序
---
### 1.5 RAG 评估方法 ⭐
如何评估 RAG 系统的召回率和相关性?
**核心指标:**
- **Recall@k**:前 k 个结果中包含多少比例的相关文档
- **Precision@k**:前 k 个结果中有多少比例是相关文档
- **F1@k**:召回率和精确率的调和平均数
- **MRR**:平均倒数排名
- **相关性评分**:0-5 分的相关性评估
**详细指南:**
参见 [backend/docs/RAG_EVALUATION_GUIDE.md](backend/docs/RAG_EVALUATION_GUIDE.md)
**快速使用:**
```bash
# 运行评估脚本
cd backend
python scripts/evaluate_rag.py
```
---
### 2. LangGraph 工作流算法
#### 2.1 React (Reasoning → Acting → Observing) 模式 ⭐
**设计理念**:让 LLM 先思考(Reasoning),再行动(Acting),然后观察结果(Observing),可以多次循环。
```
┌─────────────────────────────────────────────────────────────────┐
│ React 模式循环 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 1. Reasoning (思考) │ │
│ │ LLMCall 节点 │ │
│ │ - 分析用户问题 │ │
│ │ - 决定是否需要调用工具 │ │
│ │ - 决定调用哪个工具 │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 2. Acting (行动) │ │
│ │ ToolNode 节点 │ │
│ │ - 执行工具调用 │ │
│ │ - 天气查询 / 文件读取 / RAG 检索等 │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 3. Observing (观察) │ │
│ │ ExecuteTool → LLMCall │ │
│ │ - 观察工具结果 │ │
│ │ - 返回给 LLM 再次思考 │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 4. 循环或结束 │ │
│ │ should_continue 路由 │ │
│ │ - 还需要调用工具吗? → 继续循环 │ │
│ │ - 不需要了 → 结束流程 │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
**关键实现点**:
1. **`llm.bind_tools(tools)`** - 在 `create_llm_call_node` 中,让 LLM 知道可以调用哪些工具
2. **`should_continue` 路由函数** - 检查 LLM 输出是否包含 `tool_calls`
3. **`tool_node → llm_call` 循环边** - 工具结果返回给 LLM 再次思考
4. **可以多次循环** - LLM 可以调用多个工具,或者同一个工具多次
**实际代码位置**:
- `backend/app/graph/graph_builder.py` 第 79 行:`builder.add_edge("tool_node", "llm_call")`
- `backend/app/nodes/router.py`:`should_continue` 函数检查 `last_message.tool_calls`
#### 2.2 状态机设计
```python
# 核心状态定义
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # 对话历史(自动合并)
user_id: str # 用户标识
memory_context: str # 检索到的记忆上下文
should_summarize: bool # 是否需要生成摘要
tool_calls: list # 工具调用列表
final_response: str # 最终响应
```
**状态流转规则**:
```
初始状态 → retrieve_memory → memory_trigger → [条件分支]
↓
┌───────────────────┼───────────────┐
↓ ↓ ↓
should_summarize 直接回复 需要工具
↓ ↓ ↓
summarize → save finalize tool_node
↓ ↓ ↓
llm_call ←───────────────┘ llm_call ←┘
↓
finalize
```
#### 2.3 记忆管理算法
**记忆检索策略:**
```
1. 向量检索:将用户查询 Embedding,在 PostgreSQL 中检索相似对话
2. 时间衰减:近期对话权重更高(可选)
3. 相关性过滤:仅返回相似度 > 阈值的记忆
4. 上下文窗口:限制记忆长度,避免超出 LLM 上下文限制
```
**摘要生成策略:**
```
触发条件:
- 对话轮数超过阈值(默认 10 轮)
- 记忆上下文长度超过限制
摘要生成:
1. 提取最近 N 轮对话
2. 调用 LLM 生成摘要
3. 保存摘要到 PostgreSQL
4. 清理旧对话记录
摘要格式:
"用户在 {时间} 讨论了 {主题},关键信息:{要点}"
```
### 3. 多模型路由算法
```
模型选择逻辑:
┌─────────────────────────────────────────────┐
│ 用户选择模型 │
└──────────────────┬──────────────────────────┘
│
┌──────────┼──────────┐
↓ ↓ ↓
┌──────┐ ┌──────┐ ┌──────┐
│ zhipu│ │deep │ │local │
└──┬───┘ └──┬───┘ └──┬───┘
│ │ │
↓ ↓ ↓
ChatZhipu ChatOpenAI ChatOpenAI
(官方SDK) (DeepSeek) (vLLM/Gemma)
│ │ │
└─────────┼─────────┘
↓
┌────────────────┐
│ 统一接口输出 │
│ (BaseChatModel)│
└────────────────┘
```
**高可用降级策略:**
```
1. 优先使用用户选择的模型
2. 如果模型不可用(API 错误、超时等):
- 尝试切换到备用模型
- 记录错误日志
- 返回降级提示给用户
3. 健康检查:定期检查各模型服务状态
```
### 4. SSE 流式响应算法
```
流式输出流程:
┌─────────────────────────────────────────────┐
│ LLM 生成 Token │
└──────────────────┬──────────────────────────┘
│
↓
┌─────────────────────────────────────────────┐
│ 事件格式化 │
│ data: {"content": "你", "type": "content"} │
│ data: {"content": "好", "type": "content"} │
│ data: {"content": "!", "type": "content"} │
│ data: {"done": true, "type": "end"} │
└──────────────────┬──────────────────────────┘
│
↓
┌─────────────────────────────────────────────┐
│ HTTP Response Stream │
│ Content-Type: text/event-stream │
│ Cache-Control: no-cache │
│ Connection: keep-alive │
└─────────────────────────────────────────────┘
```
**实现要点:**
- 使用 `asyncio.Queue` 缓冲 Token
- 生产者:LLM 异步生成 Token 放入队列
- 消费者:FastAPI `EventSourceResponse` 从队列读取并推送
- 超时保护:30 秒无输出自动断开
---
## 📚 RAG 系统完整架构
### 离线索引构建 vs 在线检索生成
RAG 系统分为两个独立但协同的阶段:
```
┌──────────────────────────────────────────────────────────────────┐
│ RAG 系统双阶段架构 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ 阶段一:离线索引构建 (rag_indexer) │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ 文档加载 → 文本切分 → Embedding → 向量存储 │ │
│ │ PDF/DOCX/TXT 递归/语义/父子块 llama.cpp Qdrant │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 阶段二:在线检索生成 (backend/app/rag) │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ 用户查询 → 查询改写 → 多路检索 → RRF 融合 → 重排序 │ │
│ │ MultiQuery Dense+Sparse Cross-Encoder │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ LLM 生成回答 │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
```
### RAG 演进路线 (Roadmap)
#### Level 1: 基础向量搜索 (Basic Similarity Search)
**核心算法**: 近似最近邻搜索 (ANN, 常用 HNSW 算法)
```
算法原理:
用户问题 → Embedding 模型 → 向量表示
↓
计算与库中向量的余弦相似度
↓
取距离最近的 K 个块返回
优缺点:
✅ 速度极快(毫秒级)
❌ 只能捕捉"语义相似",专有名词匹配差
实现代码:
from app.rag.retriever import create_base_retriever
retriever = create_base_retriever(
collection_name="rag_documents",
embeddings=embeddings,
search_kwargs={"k": 20}
)
docs = retriever.invoke("什么是 RAG?")
```
#### Level 2: 混合检索与重排序 (Hybrid Search + Reranker)
**1. 基础召回(混合检索)**
```
算法原理:
稠密向量检索(语义相似)+ BM25 稀疏检索(关键词匹配)
↓
两路结果并行获取,等待融合
实现代码:
from app.rag.retriever import create_hybrid_retriever
retriever = create_hybrid_retriever(
collection_name="rag_documents",
embeddings=embeddings,
dense_k=10,
sparse_k=10,
score_threshold=0.3
)
```
**2. 二次精排(Cross-Encoder)**
```
算法原理:
不同于双塔模型(分别算向量再求距离)
交叉编码器将"问题 + 文档"拼接后整体输入 Transformer
由模型直接输出 0~1 的相关性得分,精度极高
实现代码:
from app.rag.reranker import LLaMaCPPReranker
reranker = LLaMaCPPReranker(
base_url="http://127.0.0.1:8083",
api_key="your-api-key",
top_n=5
)
sorted_docs = reranker.compress_documents(documents, query)
```
#### Level 3: RAG-Fusion (多路改写与倒数排名融合)
**1. 多路查询改写**
```
算法原理:
克服用户初始提问词不达意或视角受限的问题
通过 LLM 将单一问题改写为多个不同角度的查询
实现代码:
from app.rag.query_transform import MultiQueryGenerator
generator = MultiQueryGenerator(llm=llm, num_queries=3)
queries = await generator.agenerate("如何申请项目资金?")
# 返回:["如何申请项目资金?", "项目资金申请流程是什么?", "申请项目经费需要哪些步骤?"]
```
**2. 倒数排名融合(RRF)**
```
算法原理:
RRF (Reciprocal Rank Fusion) 是一种无需评分归一化的融合算法
公式:RRF_score(d) = Σ 1/(k + rank_q(d))
有效避免某一极端检索结果主导全局
实现代码:
from app.rag.fusion import reciprocal_rank_fusion
# 多个查询的检索结果
doc_lists = [result1, result2, result3]
fused_docs = reciprocal_rank_fusion(doc_lists, k=60)
```
#### Level 4: Agentic RAG / Self-RAG (智能体与自我反思)
```
核心原理:
基于 LangGraph 的 ReAct (Reasoning and Acting) 状态机路由
大模型并非每次都去死板地执行检索,而是先判断问题:
"这是闲聊?还是需要查知识库?"
工作流程:
┌──────────┐ ┌──────────────┐ ┌──────────┐ ┌────────┐
│ User │────>│ LangGraph │────>│ RAG_Tool │────>│ Qdrant │
│ │ │ Agent │ │ │ │ │
│ "公司报 │ │ 思考: 这是 │ │ ToolCall │ │ RAG- │
│ 销流程?"│ │ 内部规章问题 │ │ search_ │ │ Fusion │
│ │ │ 需要查资料 │ │ knowledge│ │ & 混合 │
│ │<────│ 资料充分, │<────│ 返回最相 │<────│ 检索 │
│ "根据知 │ │ 开始撰写回答 │ │ 关5条规定 │ │ Cross- │
│ 识库规定 │ │ │ │ │ │ Encoder│
│ ..." │ │ │ │ │ │ 重排 │
└────────── └────────────── └──────────┘ └────────┘
实现代码:
from app.rag.tools import search_knowledge_base
from app.graph.graph_builder import GraphBuilder
# 将 RAG 工具添加到工具列表
tools = AVAILABLE_TOOLS + [search_knowledge_base]
# 构建图
builder = GraphBuilder(llm, tools, tools_by_name)
graph = builder.build().compile(checkpointer=checkpointer)
```
#### Level 5: GraphRAG 集成 (基于图和关系的 RAG)
```
核心原理:
结合知识图谱的结构化关系和向量检索的语义相似度
解决跨文档复杂关系推理问题
实现代码:
from langchain_community.graphs import Neo4jGraph
from langchain_experimental.graph_transformers import LLMGraphTransformer
# 实体关系抽取
transformer = LLMGraphTransformer(llm=local_llm)
graph_documents = transformer.convert_to_graph_documents(documents)
# 存储到图数据库
graph = Neo4jGraph(url="bolt://localhost:7687")
graph.add_graph_documents(graph_documents)
```
### 检索策略对比
| 策略 | 优点 | 缺点 | 适用场景 |
|:-----|:-----|:-----|:---------|
| **基础向量检索** | 速度快,语义理解好 | 专有名词匹配差 | 通用问答 |
| **混合检索** | 语义 + 关键词匹配 | 需要配置稀疏向量 | 专业术语查询 |
| **多路改写 + RRF** | 搜索面广,结果稳定 | 延迟略高 | 复杂问题 |
| **重排序** | 精度高 | 依赖额外模型 | 最终精排 |
| **Agentic RAG** | 智能决策,灵活 | 实现复杂 | 生产环境 |
| **GraphRAG** | 关系推理能力强 | 需要图数据库 | 知识密集型场景 |
### 切分策略对比
| 策略 | 原理 | 优点 | 缺点 | 适用场景 |
|:-----|:-----|:-----|:-----|:---------|
| **递归字符** | 按分隔符递归切分 | 速度快,实现简单 | 可能截断语义 | 简单文档 |
| **语义切分** | 基于句子相似度阈值 | 语义连贯性好 | 需要 Embedding 模型 | 专业文档 |
| **父子块** | 大块存储+小块检索 | 检索精准+上下文完整 | 存储复杂度高 | 生产环境 |
---
## 🧩 子图系统架构
### 设计理念
子图系统采用模块化设计,每个子图是一个独立的 LangGraph 工作流,共享公共工具库。
```
┌───────────────────────────────────────────────────────────────────┐
│ 子图系统架构 │
├───────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 公共工具库 (common/) │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────┐ │ │
│ │ │ state_base │ │ intent │ │ formatter │ │human │ │ │
│ │ │ (状态基类) │ │ (React模式) │ │ (格式化) │ │review│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ └──────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ↓ ↓ ↓ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 通讯录子图 │ │ 词典子图 │ │ 资讯分析子图 │ │
│ │ (contact/) │ │ (dictionary/) │ │ (news_analysis) │ │
│ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │
│ │ state.py │ │ state.py │ │ state.py │ │
│ │ nodes.py │ │ nodes.py │ │ nodes.py │ │
│ │ graph.py │ │ graph.py │ │ graph.py │ │
│ │ api_client.py │ │ api_client.py │ │ api_client.py │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │ │
│ ▼ │
│ FastAPI 子图端点 │
│ (backend/app/backend.py) │
└───────────────────────────────────────────────────────────────────┘
```
---
### 子图架构总览
```mermaid
graph TB
subgraph "主图 MainGraph"
MainState[主图 MainState
用户输入 + 上下文]
IntentRouter[意图分类器
intent.py]
MainLLM[主 LLM 节点
普通对话]
end
subgraph "通讯录子图 ContactSubgraph"
ContactState[ContactState
联系人状态]
ContactNodes[内部节点
parse_intent
add_contact
list_contacts
generate_draft
human_review
should_continue]
ContactDB[(PostgreSQL 联系人)]
end
subgraph "词典子图 DictionarySubgraph"
DictState[DictionaryState
词典状态]
DictNodes[内部节点
translate
lookup_word
extract_terms
daily_word
lookup_word_book
add_to_word_book]
DictDB[(PostgreSQL 生词本)]
end
subgraph "资讯分析子图 NewsSubgraph"
NewsState[NewsAnalysisState
资讯状态]
NewsNodes[内部节点
query_news
analyze_url
extract_keywords
generate_report]
NewsQdrant[(Qdrant 向量检索)]
end
MainState -->|用户查询| IntentRouter
IntentRouter -->|contact| ContactState
IntentRouter -->|dictionary| DictState
IntentRouter -->|news| NewsState
IntentRouter -->|chat| MainLLM
ContactState -->|调用| ContactNodes
ContactNodes -->|读写| ContactDB
ContactNodes -->|返回结果| MainState
DictState -->|调用| DictNodes
DictNodes -->|读写| DictDB
DictNodes -->|返回结果| MainState
NewsState -->|调用| NewsNodes
NewsNodes -->|检索| NewsQdrant
NewsNodes -->|返回结果| MainState
style MainState fill:#e3f2fd
style IntentRouter fill:#ffe0b2
style ContactState fill:#c8e6c9
style DictState fill:#e1bee7
style NewsState fill:#ffcdd2
```
---
### 状态传送机制
```mermaid
sequenceDiagram
participant User as 用户
participant Frontend as 前端
participant Backend as 后端 API
participant Main as 主图
participant Intent as 意图分类器
participant Subgraph as 子图
participant DB as 数据库
User->>Frontend: 输入查询
Frontend->>Backend: POST /subgraph/{type}/{action}
Backend->>Main: 初始化 MainState
Main->>Intent: 调用意图分类
Intent-->>Main: 返回子图类型 (contact/dictionary/news/chat)
alt 是子图请求
Main->>Subgraph: 传递状态
activate Subgraph
Subgraph->>Subgraph: 解析意图 (parse_intent)
Subgraph->>DB: 读取/写入数据
DB-->>Subgraph: 返回数据
Subgraph->>Subgraph: 执行业务逻辑
Subgraph->>Subgraph: 格式化结果 (format_result)
Subgraph-->>Main: 返回结果状态
deactivate Subgraph
else 是普通对话
Main->>Main: 调用主 LLM
end
Main->>Main: 合并状态
Main-->>Backend: 返回最终结果
Backend-->>Frontend: JSON 响应
Frontend-->>User: 显示结果
```
---
### 公共工具说明
#### 1. state_base.py - 状态基类
提供类型安全的状态基类,所有子图状态继承此类。
#### 2. intent.py - 意图理解(React 模式)
智能决策节点,判断是否需要调用 RAG、是否需要重新检索、是否需要调用工具。
```python
# 核心功能
- 意图分类:闲聊 / 查询 / 任务
- RAG 决策:是否需要检索知识库
- 检索策略:是否需要多路检索 / 是否需要重新检索
```
#### 3. formatter.py - 格式化输出工具
使用 Jinja2 模板提供美观的 Markdown 格式化输出。
#### 4. human_review.py - 人工审核节点
使用 LangGraph 的 interrupt 机制实现人工审核流程,支持:
- 确定 / 取消 / 继续 三种操作
- 状态持久化
- 异步等待
---
### 子图开发指南
#### 创建新子图的步骤
1. **创建子图目录**
```bash
mkdir backend/app/agent_subgraphs/my_subgraph
```
2. **创建状态定义 (state.py)**
```python
from typing_extensions import TypedDict
from ..common.state_base import BaseSubgraphState
class MySubgraphState(BaseSubgraphState):
\"\"\"
我的子图状态
\"\"\"
my_field: str
my_list: list[str]
```
3. **实现节点 (nodes.py)**
```python
from langgraph.graph import StateGraph
from .state import MySubgraphState
def my_node(state: MySubgraphState) -> MySubgraphState:
\"\"\"
我的节点
\"\"\"
# 实现逻辑
return state
```
4. **构建图 (graph.py)**
```python
from langgraph.graph import StateGraph, END
from .state import MySubgraphState
from .nodes import my_node
def build_my_subgraph() -> StateGraph:
\"\"\"
构建我的子图
\"\"\"
graph = StateGraph(MySubgraphState)
graph.add_node("my_node", my_node)
graph.set_entry_point("my_node")
graph.add_edge("my_node", END)
return graph.compile()
```
5. **添加 API 客户端 (api_client.py)**(可选)
用于与外部 API 交互。
---
### 已实现的子图
#### 1. 通讯录子图 (contact/)
**详细流程图:**
```mermaid
stateDiagram-v2
[*] --> parse_intent: 开始
parse_intent --> list_contacts: action=list
parse_intent --> add_contact: action=add
parse_intent --> list_emails: action=list_emails
parse_intent --> generate_email_draft: action=generate_email
parse_intent --> sniff_contacts: action=sniff
list_contacts --> format_result: 返回联系人列表
add_contact --> format_result: 添加成功
list_emails --> format_result: 返回邮件列表
sniff_contacts --> format_result: 嗅探完成
generate_email_draft --> human_review: 生成草稿
human_review --> send_email: action=approve
human_review --> format_result: action=reject
send_email --> format_result: 邮件已发送
format_result --> [*]: 格式化输出
note right of parse_intent
解析用户意图
支持的操作:
- list: 列出联系人
- add: 添加联系人
- list_emails: 列出邮件
- generate_email: 生成邮件草稿
- sniff: 智能嗅探
end note
note right of human_review
人工审核节点
使用 LangGraph interrupt
支持三种操作:
- approve: 审核通过,发送邮件
- reject: 审核拒绝,返回结果
- modify: 审核修改,编辑内容
end note
```
**功能列表:**
- 联系人 CRUD(增删改查)
- 邮件读取与审核
- 外发邮件
- 智能嗅探
- 精美格式化展示
**API 端点:**
```
GET /subgraph/contact/{action}
参数:
- action: list | add | list_emails | generate_email | sniff
- query: 查询内容
- user_id: 用户 ID
```
---
#### 2. 词典子图 (dictionary/)
**详细流程图:**
```mermaid
stateDiagram-v2
[*] --> parse_intent: 开始
parse_intent --> query_word: action=query
parse_intent --> translate_text: action=translate
parse_intent --> extract_terms: action=extract
parse_intent --> get_daily_word: action=daily
parse_intent --> lookup_word_book: action=lookup
parse_intent --> add_to_word_book: action=add
query_word --> format_result: 查询单词
translate_text --> format_result: 翻译文本
extract_terms --> format_result: 提取专业术语
get_daily_word --> format_result: 每日一词
lookup_word_book --> format_result: 查询生词本
add_to_word_book --> format_result: 添加到生词本
format_result --> [*]: 格式化输出
note right of parse_intent
解析用户意图
支持的操作:
- query: 查询单词
- translate: 翻译文本
- extract: 提取专业术语
- daily: 每日一词
- lookup: 查询生词本
- add: 添加到生词本
end note
```
**功能列表:**
- 翻译、查词
- 每日一词
- 专业名词提炼
- 生词本管理
- 精美格式化展示
**API 端点:**
```
GET /subgraph/dictionary/{action}
参数:
- action: query | translate | extract | daily | lookup | add
- query: 查询内容
- user_id: 用户 ID
```
---
#### 3. 资讯分析子图 (news_analysis/)
**详细流程图:**
```mermaid
stateDiagram-v2
[*] --> parse_intent: 开始
parse_intent --> query_news: action=query
parse_intent --> analyze_url: action=analyze
parse_intent --> extract_keywords: action=keywords
parse_intent --> generate_report: action=report
query_news --> format_result: 查询资讯
analyze_url --> format_result: 分析链接
extract_keywords --> format_result: 提取关键词
generate_report --> format_result: 生成报告
format_result --> [*]: 格式化输出
note right of parse_intent
解析用户意图
支持的操作:
- query: 查询资讯
- analyze: 分析链接
- keywords: 提取关键词
- report: 生成报告
end note
```
**功能列表:**
- 资讯获取
- 内容分析
- 精美格式化展示
**API 端点:**
```
GET /subgraph/news/{action}
参数:
- action: query | analyze | keywords | report
- query: 查询内容
- user_id: 用户 ID
```
---
#### 4. 研究分析子图 (research/) - 规划中
- 联网搜索
- 报告生成
- 引用溯源
- 可视化图表
---
## 🚀 快速开始
详细启动指南请查看 [QUICKSTART.md](QUICKSTART.md)
### 方式一:Docker Compose(推荐)
```bash
# 1. 配置环境变量
cp .env.docker .env
# 编辑 .env 文件,填入真实的 API Key
# 2. 启动所有服务
docker compose -f docker/docker-compose.yml up -d --build
# 3. 访问应用
# 前端: http://127.0.0.1:8501
# 后端 API: http://127.0.0.1:8083
```
### 方式二:本地开发模式
```bash
# 1. 安装依赖
pip install -r requirement.txt
# 2. 配置环境变量
cp .env.docker .env
# 编辑 .env,根据本地/远程环境调整配置
# 3. 启动后端
python backend/app/backend.py
# 4. 启动前端(新终端)
streamlit run frontend/src/frontend_main.py
# 或者使用启动脚本(推荐)
./scripts/start.sh both
```
---
## 📖 使用指南
### 基础对话
直接在聊天框输入问题即可:
```
你好,请介绍一下自己
今天北京天气怎么样?
帮我总结一下 a.txt 的内容
```
### 工具调用示例
| 功能 | 示例提问 |
|------|---------|
| 🌤️ 天气查询 | "上海今天天气如何?" |
| 📄 读取文本 | "读取 a.txt 的内容" |
| 📑 解析 PDF | "总结 b.pdf 的主要内容" |
| 📊 Excel 数据 | "显示 c.xlsx 的数据" |
| 🌐 网页抓取 | "抓取 https://example.com 的内容" |
| 🔍 长期记忆 | "记住我喜欢吃川菜" → "我有什么饮食偏好?" |
### 多模型切换
1. 在左侧边栏选择模型:
- **智谱 GLM-4.7**:在线服务,速度快
- **DeepSeek Reasoner**:深度推理模型
- **本地 Gemma-4**:本地部署,隐私性好
2. 可随时切换,甚至在同一会话中
3. 点击 "🔄 新会话" 清空当前对话
---
## 🔧 开发指南
### 添加新工具
在 [backend/app/graph/graph_tools.py](file:///home/huang/Study/AIProject/Agent1/backend/app/graph/graph_tools.py) 中添加新的 `@tool` 装饰函数:
```python
@tool
def my_new_tool(param: str) -> str:
"""
工具描述(会显示给 LLM)
Args:
param: 参数说明
Returns:
返回值说明
"""
# 实现逻辑
return result
```
工具会自动注册到 `AVAILABLE_TOOLS` 列表中。
### 添加新模型
在 [backend/app/model_services/chat_services.py](file:///root/projects/ailine/backend/app/model_services/chat_services.py) 中添加新的服务提供者:
```python
class NewModelChatProvider(BaseServiceProvider[BaseChatModel]):
"""
新模型服务提供者
"""
def __init__(self, model: str = "new-model-name"):
super().__init__("new_model_chat")
self._model = model
def is_available(self) -> bool:
"""
检查新模型服务是否可用
"""
if not os.getenv("NEW_MODEL_API_KEY"):
logger.warning("NEW_MODEL_API_KEY 未配置")
return False
logger.info(f"新模型服务配置正确,准备使用: {self._model}")
return True
def get_service(self) -> BaseChatModel:
"""
获取新模型服务
"""
if self._service_instance is None:
from langchain_openai import ChatOpenAI
from pydantic import SecretStr
self._service_instance = ChatOpenAI(
base_url="https://api.new-model.com/v1",
api_key=SecretStr(os.getenv("NEW_MODEL_API_KEY")),
model=self._model,
temperature=0.1,
max_tokens=4096,
timeout=60.0,
max_retries=2,
streaming=True,
)
return self._service_instance
```
然后在 `CHAT_PROVIDERS` 字典中注册:
```python
CHAT_PROVIDERS: Dict[str, Callable[[], BaseServiceProvider[BaseChatModel]]] = {
"local": lambda: LocalVLLMChatProvider(),
"zhipu": lambda: ZhipuChatProvider(),
"deepseek": lambda: DeepSeekChatProvider(),
"new_model": lambda: NewModelChatProvider(), # 新增
}
```
### 使用模型服务
#### 生成式大模型
```python
from app.model_services import get_chat_service, get_all_chat_services
# 自动选择可用服务(优先本地,降级智谱,再降级 DeepSeek)
llm = get_chat_service()
# 获取所有可用模型(用于多模型切换)
all_llms = get_all_chat_services() # Dict[str, BaseChatModel]
```
#### 嵌入服务
```python
from app.model_services import get_embedding_service
# 自动选择可用服务(优先本地,降级智谱)
embeddings = get_embedding_service()
# 使用
vector = embeddings.embed_query("hello")
```
#### 重排服务
```python
from app.model_services import get_rerank_service
from app.rag.rerank import create_document_reranker
# 获取原始重排服务(仅计算分数)
rerank_service = get_rerank_service()
scores = rerank_service.compute_scores("query", ["doc1", "doc2"])
# 使用业务逻辑层(完整的文档重排)
reranker = create_document_reranker()
sorted_docs = reranker.compress_documents(docs, "query", top_n=5)
```
### Docker 部署
项目包含完整的 Docker 配置:
- **docker-compose.yml**:服务编排(Backend + Frontend,连接远程数据库)
- **Dockerfile.backend**:后端镜像构建
- **Dockerfile.frontend**:前端镜像构建
- **.gitea/workflows/deploy.yml**:CI/CD 自动化部署
详见 [QUICKSTART.md](QUICKSTART.md) 的 Docker 部署章节。
---
## ⚙️ 环境配置
### 配置文件说明
项目采用两层环境配置文件体系:
| 文件 | 用途 | 是否提交 Git |
|------|------|------------|
| `.env.docker` | Docker 部署模板 | ✅ 是 |
| `.env` | 实际使用的配置 | ❌ 否(已忽略) |
**使用方法:**
- **本地开发**:`cp .env.docker .env`,修改为本地服务地址
- **Docker 部署**:`cp .env.docker .env`,使用远程服务器地址
### 必需的环境变量
| 变量名 | 说明 | 本地开发示例 | Docker 部署示例 |
|--------|------|------------|----------------|
| `ZHIPUAI_API_KEY` | 智谱AI API密钥 | `your-api-key` | `your-api-key` |
| `DEEPSEEK_API_KEY` | DeepSeek API密钥 | `your-api-key` | `your-api-key` |
| `LLAMACPP_API_KEY` | llama.cpp API 密钥 | `token-abc123` | `token-abc123` |
| `LLM_API_KEY` | 主 LLM 服务 API 密钥 | `token-abc123` | `token-abc123` |
| `VLLM_BASE_URL` | LLM 服务地址 | `http://127.0.0.1:8081/v1` | `http://your-server:8081/v1` |
| `LLAMACPP_EMBEDDING_URL` | Embedding 服务地址 | `http://127.0.0.1:8082/v1` | `http://your-server:8082/v1` |
| `LLAMACPP_RERANKER_URL` | Rerank 服务地址 | `http://127.0.0.1:8083/v1` | `http://your-server:8083/v1` |
| `ZHIPU_EMBEDDING_MODEL` | 智谱嵌入模型(可选) | `embedding-3` | `embedding-3` |
| `ZHIPU_RERANK_MODEL` | 智谱重排模型(可选) | `rerank-2` | `rerank-2` |
| `ZHIPU_API_BASE` | 智谱 API 基础地址(可选) | `https://open.bigmodel.cn/api/paas/v4` | 同左 |
| `DB_URI` | PostgreSQL 连接字符串 | `postgresql://...@115.190.121.151:5432/langgraph_db` | 同左 |
| `QDRANT_URL` | Qdrant 向量数据库地址 | `http://115.190.121.151:6333` | 同左 |
| `QDRANT_API_KEY` | Qdrant API 密钥 | `your-api-key` | `your-api-key` |
| `QDRANT_COLLECTION_NAME` | Qdrant 集合名称 | `rag_documents` | `rag_documents` |
| `LOG_LEVEL` | 日志级别 | `INFO` | `WARNING` |
| `ENABLE_GRAPH_TRACE` | 是否启用图流转追踪 | `true` | `false` |
| `MEMORY_SUMMARIZE_INTERVAL` | 对话摘要生成间隔 | `10` | `10` |
### 注意事项
- ⚠️ **不要硬编码敏感信息**:所有 API Key 必须通过环境变量配置
- ⚠️ **远程服务依赖**:确保可以访问远程 PostgreSQL (115.190.121.151:5432) 和 Qdrant (115.190.121.151:6333)
- ⚠️ **修改后重启**:修改 `.env` 后,Docker 部署需要执行 `docker compose down && docker compose up -d --build`
---
## �️ 实现指南与最佳实践
### 1. RAG 知识库构建指南
#### 1.1 离线索引构建流程
```bash
# 步骤 1:准备文档
# 将文档放入指定目录(如 ./documents/)
# 支持格式:TXT, PDF, Markdown, DOCX
# 步骤 2:选择切分策略
# 根据文档类型选择:
# - 结构化文档(如手册、API 文档)→ Recursive Splitting
# - 非结构化文档(如文章、报告)→ Semantic Chunking
# - 长文档(如书籍、论文)→ Parent-Child Chunking
# 步骤 3:构建索引
cd rag_indexer
python cli.py build \
--input-dir ../documents \
--collection-name my_knowledge \
--splitter-type semantic \
--chunk-size 500
# 步骤 4:验证索引
python cli.py query \
--collection-name my_knowledge \
--query "如何配置系统?" \
--top-k 5
```
#### 1.2 切分策略选择建议
| 文档类型 | 推荐策略 | chunk_size | 说明 |
|---------|---------|------------|------|
| API 文档 | Recursive | 300-500 | 结构清晰,按章节切分 |
| 技术文章 | Semantic | 自适应 | 保持语义完整性 |
| 法律合同 | Parent-Child | 父:1000, 子:200 | 需要完整上下文 |
| 问答对 | Recursive | 200-300 | 短文本,精确匹配 |
| 产品手册 | Parent-Child | 父:1500, 子:300 | 长文档,跨章节检索 |
#### 1.3 Embedding 模型选择
| 模型 | 维度 | 语言 | 速度 | 精度 | 适用场景 |
|------|------|------|------|------|---------|
| bge-large-zh-v1.5 | 1024 | 中文 | 中 | 高 | 中文文档检索 |
| bge-m3 | 1024 | 多语言 | 中 | 高 | 多语言混合文档 |
| text-embedding-3-small | 1536 | 英文 | 快 | 中 | 英文文档检索 |
| gte-large | 1024 | 中文 | 慢 | 很高 | 高精度要求场景 |
### 2. 性能优化指南
#### 2.1 检索性能优化
```python
# 优化 1:调整检索参数
search_kwargs = {
"k": 20, # 召回数量(增加召回,提高精度)
"score_threshold": 0.3, # 相似度阈值(过滤低质量结果)
"fetch_k": 50, # 初始召回数量(用于 MMR 去重)
"lambda_mult": 0.7, # MMR 多样性参数(0=去重,1=不去重)
}
# 优化 2:使用缓存
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_retrieve(query: str) -> List[Document]:
"""缓存常见查询结果"""
return retriever.invoke(query)
# 优化 3:批量 Embedding
# 构建索引时,使用批量处理提高效率
batch_size = 32
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
embeddings = embedder.embed_documents([doc.page_content for doc in batch])
```
#### 2.2 LLM 调用优化
```python
# 优化 1:使用流式响应
# 减少首字延迟,提升用户体验
response = await llm.astream(messages)
async for chunk in response:
yield chunk.content
# 优化 2:控制上下文长度
MAX_CONTEXT_LENGTH = 4000
def truncate_context(context: str, max_length: int = MAX_CONTEXT_LENGTH) -> str:
"""截断上下文,保留开头和结尾"""
if len(context) <= max_length:
return context
half = max_length // 2
return context[:half] + "\n...\n" + context[-half:]
# 优化 3:Prompt 优化
# 使用结构化 Prompt,提高 LLM 理解能力
prompt_template = """
你是一个智能助手。请根据以下上下文回答用户问题。
上下文:
{context}
问题:{question}
回答要求:
1. 仅基于上下文回答,不要编造信息
2. 如果上下文中没有答案,明确告知用户
3. 引用上下文中的具体信息时,注明来源
"""
```
#### 2.3 数据库性能优化
```sql
-- PostgreSQL 优化
-- 1. 为对话历史表添加索引
CREATE INDEX idx_conversations_user_id ON conversations(user_id);
CREATE INDEX idx_conversations_created_at ON conversations(created_at);
-- 2. 为记忆摘要表添加索引
CREATE INDEX idx_summaries_user_id ON summaries(user_id);
-- 3. 定期清理旧数据(保留最近 90 天)
DELETE FROM conversations
WHERE created_at < NOW() - INTERVAL '90 days';
-- Qdrant 优化
-- 1. 使用 HNSW 索引(默认已启用)
-- 2. 调整 ef_construct 和 m 参数平衡速度和精度
-- 3. 定期优化集合
curl -X POST http://localhost:6333/collections/my_docs/actions \
-H "Content-Type: application/json" \
-d '{"actions": [{"optimize": {}}]}'
```
### 3. 扩展开发指南
#### 3.1 添加自定义工具
```python
# 在 backend/app/graph/graph_tools.py 中添加
from langchain_core.tools import tool
from typing import Optional
@tool
def calculate_expression(
expression: str,
precision: int = 2
) -> str:
"""
计算数学表达式的值。
支持基本运算:加减乘除、幂运算、括号。
Args:
expression: 数学表达式,如 "2 + 3 * 4"
precision: 结果精度(小数位数),默认 2
Returns:
计算结果
"""
try:
# 安全计算(避免 eval 的安全风险)
import ast
import operator
# 定义允许的操作符
ops = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
}
def eval_node(node):
if isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.BinOp):
left = eval_node(node.left)
right = eval_node(node.right)
return ops[type(node.op)](left, right)
else:
raise ValueError(f"不支持的操作: {type(node)}")
tree = ast.parse(expression, mode='eval')
result = eval_node(tree.body)
return f"{result:.{precision}f}"
except Exception as e:
return f"计算错误: {str(e)}"
# 工具会自动注册,无需手动添加到 AVAILABLE_TOOLS
```
#### 3.2 添加自定义 LLM 提供商
```python
# 在 backend/app/agent/llm_factory.py 中添加
from langchain_openai import ChatOpenAI
from langchain_core.language_models import BaseChatModel
class LLMFactory:
@staticmethod
def create_anthropic_model() -> BaseChatModel:
"""创建 Anthropic Claude 模型"""
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY 环境变量未设置")
return ChatAnthropic(
model="claude-3-sonnet-20240229",
temperature=0.1,
max_tokens=4096,
api_key=api_key,
)
# 注册到 CREATORS
CREATORS = {
"local": create_local,
"deepseek": create_deepseek,
"zhipu": create_zhipu,
"anthropic": create_anthropic_model, # 新增
}
```
#### 3.3 添加自定义 RAG 检索器
```python
# 在 backend/app/rag/retriever.py 中添加
from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document
class HybridRetriever(BaseRetriever):
"""混合检索器:结合多种检索策略"""
dense_retriever: BaseRetriever
sparse_retriever: BaseRetriever
reranker: Optional[BaseReranker] = None
top_k: int = 5
def _get_relevant_documents(self, query: str) -> List[Document]:
# 并行检索
dense_docs = self.dense_retriever.invoke(query)
sparse_docs = self.sparse_retriever.invoke(query)
# RRF 融合
fused_docs = reciprocal_rank_fusion([dense_docs, sparse_docs])
# 重排序
if self.reranker:
fused_docs = self.reranker.compress_documents(fused_docs[:20], query)
return fused_docs[:self.top_k]
```
### 4. 部署最佳实践
#### 4.1 生产环境配置
```yaml
# docker-compose.prod.yml
version: '3.8'
services:
backend:
build:
context: .
dockerfile: docker/Dockerfile.backend
environment:
- LOG_LEVEL=WARNING # 生产环境降低日志级别
- ENABLE_GRAPH_TRACE=false # 关闭图追踪(提升性能)
- MEMORY_SUMMARIZE_INTERVAL=5 # 更频繁的摘要生成
deploy:
resources:
limits:
cpus: '2'
memory: 4G
reservations:
cpus: '1'
memory: 2G
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8083/health"]
interval: 30s
timeout: 10s
retries: 3
frontend:
build:
context: .
dockerfile: docker/Dockerfile.frontend
environment:
- STREAMLIT_SERVER_PORT=8501
- STREAMLIT_SERVER_HEADLESS=true
deploy:
resources:
limits:
cpus: '1'
memory: 2G
restart: unless-stopped
```
#### 4.2 监控与告警
```python
# 添加健康检查端点
from fastapi import FastAPI
from datetime import datetime
app = FastAPI()
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"services": {
"postgresql": await check_postgresql(),
"qdrant": await check_qdrant(),
"llm": await check_llm_service(),
}
}
async def check_postgresql() -> bool:
try:
# 尝试连接数据库
async with asyncpg.connect(DB_URI) as conn:
await conn.fetchval("SELECT 1")
return True
except Exception:
return False
async def check_qdrant() -> bool:
try:
client = QdrantClient(url=QDRANT_URL)
client.get_collections()
return True
except Exception:
return False
```
#### 4.3 安全最佳实践
```python
# 1. 使用环境变量管理密钥
# 永远不要硬编码 API Key
import os
from typing import Optional
def get_api_key(env_var: str, default: Optional[str] = None) -> str:
api_key = os.getenv(env_var, default)
if not api_key:
raise ValueError(f"{env_var} 环境变量未设置")
return api_key
# 2. 输入验证
from pydantic import BaseModel, validator
class ChatRequest(BaseModel):
message: str
user_id: str
model: str = "zhipu"
@validator('message')
def validate_message(cls, v):
if len(v) > 4000:
raise ValueError("消息长度不能超过 4000 字符")
return v.strip()
@validator('model')
def validate_model(cls, v):
allowed_models = {"zhipu", "deepseek", "local"}
if v not in allowed_models:
raise ValueError(f"不支持的模型: {v}")
return v
# 3. 速率限制
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/chat")
@limiter.limit("10/minute") # 每分钟最多 10 次请求
async def chat(request: Request, chat_request: ChatRequest):
...
```
### 5. 调试与测试
#### 5.1 启用调试模式
```bash
# 启用详细日志
export LOG_LEVEL=DEBUG
# 启用 LangGraph 图追踪
export ENABLE_GRAPH_TRACE=true
# 查看图执行流程
# 启动后访问:http://localhost:8083/graph/trace
```
#### 5.2 单元测试示例
```python
# tests/test_rag.py
import pytest
from app.rag.fusion import reciprocal_rank_fusion
from langchain_core.documents import Document
def test_rrf_fusion():
# 准备测试数据
docs1 = [
Document(page_content="文档 A"),
Document(page_content="文档 B"),
]
docs2 = [
Document(page_content="文档 B"),
Document(page_content="文档 C"),
]
# 执行 RRF 融合
result = reciprocal_rank_fusion([docs1, docs2])
# 验证结果
assert len(result) == 3
assert result[0].page_content == "文档 B" # B 在两个列表中都出现,应该排第一
@pytest.mark.asyncio
async def test_retriever():
from app.rag.retriever import create_base_retriever
retriever = create_base_retriever(
collection_name="test_collection",
embeddings=mock_embeddings,
)
docs = await retriever.ainvoke("测试查询")
assert len(docs) > 0
```
---
## �� 故障排查
### 常见问题
**Q: 无法连接远程数据库?**
```bash
# 测试 PostgreSQL
psql -h 115.190.121.151 -U postgres -d langgraph_db -c "SELECT version();"
# 测试 Qdrant
curl http://115.190.121.151:6333/collections
```
**Q: 后端启动失败?**
- 确认端口 8083 未被占用
- 检查 `.env` 中的 API Key 是否正确
- 查看启动日志确认模型初始化成功
**Q: 模型切换后无响应?**
- 检查所选模型的配置是否正确
- 确认 vLLM 容器是否运行(如使用本地模型)
- 尝试切换到另一个模型
更多问题排查请查看 [QUICKSTART.md](QUICKSTART.md)
---
## 📝 许可证
本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。
## 🤝 贡献
欢迎提交 Issue 和 Pull Request!