root 5a67a77c95
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m3s
refactor: 真正利用已有 RAG 代码重构 rag_nodes.py
- 真正导入和使用 backend/app/rag/tools.py
- 添加全局 RAG 工具管理(get/set_global_rag_tool)
- 集成 RAGPipeline,支持多路查询和重排序
- 兼容 rag_initializer.py 的初始化方式
- 移除模拟实现,使用真正的 RAG 功能
2026-04-26 11:25:01 +08:00
2026-04-22 13:59:33 +08:00
2026-04-21 23:15:35 +08:00
2026-04-21 16:27:05 +08:00
2026-04-12 01:42:34 +08:00
2026-04-20 23:11:07 +08:00

AI Agent - 智能助手系统

一个基于 LangGraph + FastAPI 的智能对话助手支持多模型切换、RAG 知识库检索、文件处理和网页抓取等功能。


📑 目录导航


🎯 核心功能

面向用户的功能

  • 💬 智能对话:支持多轮对话,自动记忆上下文
  • 🌤️ 天气查询:实时获取各地天气信息
  • 📄 文档处理:读取 TXT、PDF、Excel 等格式文件
  • 🌐 网页抓取:提取网页正文内容
  • 🔍 知识库检索RAG:基于向量数据库的智能问答
  • 🔄 多模型切换:前端可选择不同大语言模型

技术特性

  • 持久化记忆PostgreSQL 存储对话历史,重启不丢失
  • 高可用架构:模型自动降级,确保服务稳定
  • 前后端分离FastAPI 后端 + Streamlit 前端
  • Docker 部署:一键启动所有服务
  • 远程服务架构PostgreSQL 和 Qdrant 部署在远程服务器
  • 流式响应SSE 流式输出,提升用户体验
  • 模块化设计:清晰的代码分层,易于扩展和维护

🏗️ 技术架构

技术栈总览

层级 组件 技术选型 版本 说明
LLM 服务 云端模型 智谱 AI (glm-4.7-flash) v4.7 快速响应,适合日常对话
DeepSeek (deepseek-reasoner) v3 深度推理,适合复杂问题
本地模型 Gemma-4-E4B-it v4 本地部署,保护隐私
Embedding 向量嵌入 llama.cpp server latest 本地 embedding 服务,支持多种模型
Agent 框架 工作流编排 LangGraph + LangChain latest 状态机驱动的智能体工作流
向量数据库 向量检索 Qdrant v1.12+ 高性能向量相似度检索(远程服务器)
后端框架 API 服务 FastAPI + Uvicorn v0.115+ RESTful API + WebSocket + SSE 流式输出
前端框架 Web 界面 Streamlit v1.40+ 交互式对话界面,组件化设计
关系数据库 持久化存储 PostgreSQL v16 对话记忆持久化(远程服务器)
容器化 服务编排 Docker + Docker Compose v24+ 一键部署所有服务
CI/CD 自动化部署 Gitea Workflows latest 代码推送自动构建部署

系统架构流程图

graph TB
    User[用户浏览器] -->|HTTP/SSE| Frontend[Streamlit 前端 :8501]
    Frontend -->|REST API| Backend[FastAPI 后端 :8083]
    
    Backend --> AgentService[AIAgentService]
    
    AgentService -->|模型路由| LLMFactory[LLM Factory]
    LLMFactory -->|创建| Zhipu[智谱 GLM-4.7]
    LLMFactory -->|创建| DeepSeek[DeepSeek Reasoner]
    LLMFactory -->|创建| LocalGemma[本地 Gemma-4]
    
    AgentService -->|初始化| LangGraph[LangGraph 工作流引擎]
    
    LangGraph -->|节点1| RetrieveMemory[记忆检索]
    LangGraph -->|节点2| MemoryTrigger[记忆触发]
    LangGraph -->|节点3| LLMCall[LLM 调用]
    LangGraph -->|节点4| ToolNode[工具执行]
    LangGraph -->|节点5| Summarize[记忆摘要]
    LangGraph -->|节点6| Finalize[最终处理]
    
    ToolNode -->|调用| Tools[工具集合]
    Tools -->|天气| WeatherTool[天气查询]
    Tools -->|文件| FileTool[文件读取]
    Tools -->|网页| WebTool[网页抓取]
    Tools -->|RAG| RAGTool[知识库检索]
    
    RAGTool -->|检索| Qdrant[(Qdrant 向量库)]
    RAGTool -->|重排序| CrossEncoder[Cross-Encoder]
    
    RetrieveMemory -->|存储| PostgreSQL[(PostgreSQL)]
    Summarize -->|存储| PostgreSQL
    
    style User fill:#e1f5ff
    style Frontend fill:#fff4e1
    style Backend fill:#e8f5e9
    style LangGraph fill:#f3e5f5
    style PostgreSQL fill:#ffebee
    style Qdrant fill:#ffebee

LangGraph 工作流详细流程

stateDiagram-v2
    [*] --> RetrieveMemory: 用户输入消息
    
    RetrieveMemory --> MemoryTrigger: 检索历史记忆
    MemoryTrigger --> CheckMemory: 检查是否需要触发记忆
    
    CheckMemory --> LLMCall: 记忆充足
    CheckMemory --> Summarize: 需要生成摘要
    
    Summarize --> PostgreSQL: 保存摘要
    PostgreSQL --> LLMCall: 继续对话
    
    LLMCall --> CheckTools: LLM 输出
    
    CheckTools --> ToolNode: 需要调用工具
    CheckTools --> Finalize: 直接回复
    
    ToolNode --> ExecuteTool: 执行工具
    ExecuteTool --> LLMCall: 工具结果返回
    
    Finalize --> FormatResponse: 格式化响应
    FormatResponse --> [*]: SSE 流式输出

数据流向图

用户请求
   │
   ├─→ 前端 Streamlit
   │      │
   │      ├─→ 发送 HTTP POST /api/chat
   │      │
   │      └─→ 接收 SSE 流式响应
   │
   └─→ 后端 FastAPI
          │
          ├─→ AIAgentService.achat()
          │      │
          │      ├─→ 初始化 LangGraph 状态
          │      │      ├─→ messages: 对话历史
          │      │      ├─→ user_id: 用户标识
          │      │      └─→ metadata: 元数据
          │      │
          │      ├─→ 执行工作流
          │      │      ├─→ retrieve_memory: 从 PostgreSQL 检索历史
          │      │      ├─→ memory_trigger: 判断是否触发记忆
          │      │      ├─→ llm_call: 调用 LLM 生成响应
          │      │      ├─→ tool_node: 执行工具(如有需要)
          │      │      ├─→ summarize: 生成对话摘要
          │      │      └─→ finalize: 格式化最终响应
          │      │
          │      └─→ 返回 SSE 流
          │
          └─→ 持久化存储
                 ├─→ PostgreSQL: 对话历史和摘要
                 └─→ Qdrant: RAG 向量索引

项目结构

Agent1/
├── backend/
│   ├── app/
│   │   ├── __init__.py
│   │   ├── config.py              # 配置管理
│   │   ├── logger.py              # 日志工具
│   │   ├── backend.py             # FastAPI 后端应用
│   │   ├── agent/
│   │   │   ├── __init__.py
│   │   │   ├── service.py         # Agent 服务核心
│   │   │   ├── llm_factory.py     # LLM 工厂
│   │   │   ├── history.py         # 历史查询服务
│   │   │   ├── prompts.py         # 提示模板
│   │   │   └── rag_initializer.py # RAG 工具初始化
│   │   ├── graph/
│   │   │   ├── __init__.py
│   │   │   ├── graph_builder.py   # LangGraph 图构建器
│   │   │   ├── graph_tools.py     # 工具定义
│   │   │   ├── state.py           # 状态定义
│   │   │   └── retrieve_memory.py # 记忆检索
│   │   ├── nodes/
│   │   │   ├── __init__.py
│   │   │   ├── router.py          # 路由决策
│   │   │   ├── llm_call.py        # LLM 调用节点
│   │   │   ├── tool_call.py       # 工具执行节点
│   │   │   ├── retrieve_memory.py # 记忆检索节点
│   │   │   ├── summarize.py       # 记忆存储节点
│   │   │   ├── finalize.py        # 最终处理节点
│   │   │   └── memory_trigger.py  # 记忆触发节点
│   │   ├── memory/
│   │   │   ├── __init__.py
│   │   │   └── mem0_client.py     # Mem0 客户端封装
│   │   ├── rag/
│   │   │   ├── __init__.py
│   │   │   ├── retriever.py       # 检索器
│   │   │   ├── reranker.py        # 重排序
│   │   │   ├── query_transform.py # 查询转换
│   │   │   ├── pipeline.py        # RAG 流水线
│   │   │   ├── fusion.py          # RAG-Fusion
│   │   │   └── tools.py           # RAG 工具
│   │   └── utils/
│   │       └── __init__.py
│   └── rag_core/
│       ├── __init__.py
│       ├── client.py              # RAG 核心客户端
│       ├── embedders.py           # 嵌入模型
│       ├── vector_store.py        # 向量存储
│       ├── retriever_factory.py   # 检索器工厂
│       └── store/
│           ├── __init__.py
│           ├── factory.py         # 存储工厂
│           └── postgres.py        # PostgreSQL 存储
├── frontend/
│   ├── run.py                   # 前端启动脚本
│   ├── requirements.txt
│   └── src/
│       ├── __init__.py
│       ├── frontend_main.py       # Streamlit 主入口
│       ├── api_client.py          # API 客户端
│       ├── config.py              # 前端配置
│       ├── state.py               # 状态管理
│       ├── logger.py              # 日志
│       ├── utils.py               # 工具函数
│       └── components/
│           ├── __init__.py
│           ├── sidebar.py         # 侧边栏组件
│           ├── chat_area.py       # 聊天区域组件
│           └── info_panel.py      # 信息面板组件
├── rag_indexer/
│   ├── __init__.py
│   ├── cli.py                 # 命令行入口
│   ├── index_builder.py       # 索引构建器
│   ├── loaders.py             # 文档加载器
│   ├── splitters.py           # 文本切分器
│   └── test/                  # 测试脚本
├── docker/
│   ├── docker-compose.yml     # Docker 服务编排
│   ├── backend/
│   │   └── Dockerfile         # 后端镜像构建
│   ├── frontend/
│   │   └── Dockerfile         # 前端镜像构建
│   └── models/                # spaCy 模型文件
├── scripts/
│   └── start.sh               # 启动脚本
├── test/                      # 测试目录
│   ├── test_backend.py
│   ├── test_rag.py
│   ├── test_dqrant.py
│   └── test_rag_indexer_result.py
├── .gitea/
│   └── workflows/
│       └── deploy.yml         # CI/CD 自动化部署
├── requirement.txt            # Python 依赖
├── .env.docker                # Docker 环境变量模板
├── .gitignore
├── LICENSE
├── QUICKSTART.md              # 快速开始指南
└── user_docs/                 # 用户文档目录

🧠 核心算法与实现原理

1. RAG 检索算法

1.1 文本切分策略

项目支持三种文本切分策略,适应不同场景需求:

递归字符切分Recursive Character Splitting

算法思路:
  按优先级分隔符逐级切分:["\n\n", "\n", "。", "", "", " ", ""]
  ↓
  确保每个块不超过 chunk_size默认 500 字符)
  ↓
  保留 chunk_overlap默认 50 字符)避免上下文丢失
  
优势:简单高效,适合结构化文档
实现langchain_text_splitters.RecursiveCharacterTextSplitter

语义切分Semantic Chunking

算法思路:
  1. 将文档按句子边界切分
  2. 使用 Embedding 模型计算相邻句子的向量相似度
  3. 计算相似度变化率breakpoint threshold
  4. 在相似度骤降处切分(语义主题变化点)
  
阈值策略:
  - percentile百分位数默认取相似度分布的 95 百分位
  - standard_deviation标准差低于均值 1.5 标准差处切分
  - interquartile四分位距使用 IQR 方法检测异常点
  
优势:保持语义完整性,切分更自然
实现langchain_experimental.text_splitter.SemanticChunker

父子块切分Parent-Child Chunking

算法思路:
  父块大块1000 字符,保留完整上下文
    ↓
  子块(小块):语义切分,用于向量检索
    ↓
  建立映射关系child_id → parent_id
    ↓
  检索时:用子块检索,返回时扩展为父块上下文
  
检索流程:
  用户查询 → 向量检索匹配子块 → 通过映射获取父块 → 返回完整上下文
  
优势:检索精度高,上下文完整
实现:自定义 ParentChildSplitter 类

1.2 混合检索Dense + Sparse

检索架构:
┌─────────────────────────────────────────────┐
│              用户查询                         │
└──────────────────┬──────────────────────────┘
                   │
        ┌──────────┴──────────┐
        ↓                     ↓
┌───────────────┐    ┌───────────────┐
│  稠密向量检索   │    │  稀疏 BM25 检索 │
│  (语义相似)    │    │  (关键词匹配)   │
│               │    │               │
│  Embedding    │    │  Token 化     │
│  → 向量相似度  │    │  → 词频统计    │
└───────┬───────┘    └───────┬───────┘
        │                    │
        └──────────┬─────────┘
                   ↓
          ┌────────────────┐
          │   结果融合      │
          │  (RRF 算法)     │
          └────────┬───────┘
                   ↓
          ┌────────────────┐
          │  Cross-Encoder  │
          │   重排序        │
          └────────┬───────┘
                   ↓
          ┌────────────────┐
          │  Top-K 结果返回  │
          └────────────────┘

稠密向量检索Dense Retrieval

  • 使用 Embedding 模型将查询和文档映射到同一向量空间
  • 计算余弦相似度,返回语义最相似的文档
  • 适合:语义理解、同义词匹配、概念检索

稀疏向量检索Sparse Retrieval - BM25

  • 基于词频TF和逆文档频率IDF计算相关性
  • 适合:专有名词、精确匹配、术语检索

1.3 RRF 融合算法Reciprocal Rank Fusion

# 核心算法实现
def reciprocal_rank_fusion(doc_lists: List[List[Document]], k: int = 60) -> List[Document]:
    """
    RRF 公式RRF(d) = Σ (1 / (k + rank(d)))
    
    参数说明:
    - k: 平滑常数,通常设为 60
      - k 值越大,排名影响越小,结果越平滑
      - k 值越小,高排名文档优势越大
    
    算法步骤:
    1. 遍历每个检索结果列表
    2. 对每个文档,根据其排名计算 RRF 得分
    3. 累加同一文档在不同列表中的得分
    4. 按总得分降序排序
    
    示例:
    文档 A 在列表 1 中排名第 1在列表 2 中排名第 3
    RRF(A) = 1/(60+1) + 1/(60+3) = 0.0164 + 0.0159 = 0.0323
    
    文档 B 在列表 1 中排名第 2在列表 2 中排名第 2
    RRF(B) = 1/(60+2) + 1/(60+2) = 0.0161 + 0.0161 = 0.0322
    
    结果A 排名高于 B
    """
    doc_to_score: Dict[str, float] = {}
    doc_map: Dict[str, Document] = {}
    
    for docs in doc_lists:
        for rank, doc in enumerate(docs, start=1):
            doc_id = f"{doc.page_content[:200]}_{doc.metadata.get('source', '')}"
            if doc_id not in doc_map:
                doc_map[doc_id] = doc
            score = doc_to_score.get(doc_id, 0.0) + 1.0 / (k + rank)
            doc_to_score[doc_id] = score
    
    sorted_ids = sorted(doc_to_score.keys(), key=lambda x: doc_to_score[x], reverse=True)
    return [doc_map[doc_id] for doc_id in sorted_ids]

RRF 算法优势:

  • 无需归一化不同检索器的分数范围可能不同RRF 直接使用排名
  • 参数简单:仅需调整 k 值,默认 60 在大多数场景表现良好
  • 鲁棒性强:对异常值和噪声不敏感

1.4 Cross-Encoder 重排序

重排序流程:
┌──────────────────────────────────────────────┐
│  输入RRF 融合后的 Top-20 文档                │
└──────────────────┬───────────────────────────┘
                   │
                   ↓
┌──────────────────────────────────────────────┐
│  Cross-Encoder 模型bge-reranker-v2-m3      │
│                                              │
│  工作原理:                                    │
│  - 将 Query 和 Document 拼接输入模型           │
│  - "[CLS] Query [SEP] Document [SEP]"         │
│  - 通过 Transformer 计算交互注意力              │
│  - 输出相关性得分0-1                       │
│                                              │
│  与 Bi-Encoder 的区别:                        │
│  - Bi-Encoder分别编码计算余弦相似度    │
│  - Cross-Encoder联合编码计算交互得分   │
└──────────────────┬───────────────────────────┘
                   │
                   ↓
┌──────────────────────────────────────────────┐
│  按相关性得分排序,返回 Top-5                  │
└──────────────────────────────────────────────┘

实现细节:

  • 使用远程 llama.cpp 服务部署重排序模型
  • 兼容 OpenAI Rerank API 格式
  • 超时保护60 秒超时,失败时降级为原始排序

2. LangGraph 工作流算法

2.1 状态机设计

# 核心状态定义
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # 对话历史(自动合并)
    user_id: str                              # 用户标识
    memory_context: str                       # 检索到的记忆上下文
    should_summarize: bool                    # 是否需要生成摘要
    tool_calls: list                          # 工具调用列表
    final_response: str                       # 最终响应

状态流转规则:

初始状态 → retrieve_memory → memory_trigger → [条件分支]
                                      ↓
                    ┌───────────────────┼───────────────────┐
                    ↓                   ↓                   ↓
              should_summarize     直接回复           需要工具
                    ↓                   ↓                   ↓
              summarize → save      finalize          tool_node
                    ↓                   ↓                   ↓
              llm_call ←───────────────┘               llm_call
                    ↓
                finalize

2.2 记忆管理算法

记忆检索策略:

1. 向量检索:将用户查询 Embedding在 PostgreSQL 中检索相似对话
2. 时间衰减:近期对话权重更高(可选)
3. 相关性过滤:仅返回相似度 > 阈值的记忆
4. 上下文窗口:限制记忆长度,避免超出 LLM 上下文限制

摘要生成策略:

触发条件:
  - 对话轮数超过阈值(默认 10 轮)
  - 记忆上下文长度超过限制
  
摘要生成:
  1. 提取最近 N 轮对话
  2. 调用 LLM 生成摘要
  3. 保存摘要到 PostgreSQL
  4. 清理旧对话记录
  
摘要格式:
  "用户在 {时间} 讨论了 {主题},关键信息:{要点}"

3. 多模型路由算法

模型选择逻辑:
┌─────────────────────────────────────────────┐
│  用户选择模型                                 │
└──────────────────┬──────────────────────────┘
                   │
        ┌──────────┼──────────┐
        ↓          ↓          ↓
    ┌──────┐  ┌──────┐  ┌──────┐
    │ zhipu│  │deep  │  │local │
    └──┬───┘  └──┬───┘  └──┬───┘
       │         │         │
       ↓         ↓         ↓
   ChatZhipu  ChatOpenAI ChatOpenAI
   (官方SDK)  (DeepSeek)  (vLLM/Gemma)
       │         │         │
       └─────────┼─────────┘
                 ↓
        ┌────────────────┐
        │  统一接口输出   │
        │  (BaseChatModel)│
        └────────────────┘

高可用降级策略:

1. 优先使用用户选择的模型
2. 如果模型不可用API 错误、超时等):
   - 尝试切换到备用模型
   - 记录错误日志
   - 返回降级提示给用户
3. 健康检查:定期检查各模型服务状态

4. SSE 流式响应算法

流式输出流程:
┌─────────────────────────────────────────────┐
│  LLM 生成 Token                              │
└──────────────────┬──────────────────────────┘
                   │
                   ↓
┌─────────────────────────────────────────────┐
│  事件格式化                                  │
│  data: {"content": "你", "type": "content"} │
│  data: {"content": "好", "type": "content"} │
│  data: {"content": "", "type": "content"} │
│  data: {"done": true, "type": "end"}        │
└──────────────────┬──────────────────────────┘
                   │
                   ↓
┌─────────────────────────────────────────────┐
│  HTTP Response Stream                        │
│  Content-Type: text/event-stream            │
│  Cache-Control: no-cache                    │
│  Connection: keep-alive                     │
└─────────────────────────────────────────────┘

实现要点:

  • 使用 asyncio.Queue 缓冲 Token
  • 生产者LLM 异步生成 Token 放入队列
  • 消费者FastAPI EventSourceResponse 从队列读取并推送
  • 超时保护30 秒无输出自动断开

📚 RAG 系统完整架构

离线索引构建 vs 在线检索生成

RAG 系统分为两个独立但协同的阶段:

┌──────────────────────────────────────────────────────────────────┐
│                    RAG 系统双阶段架构                              │
├──────────────────────────────────────────────────────────────────┤
│                                                                  │
│  阶段一:离线索引构建 (rag_indexer)                               │
│  ┌────────────────────────────────────────────────────────┐     │
│  │  文档加载 → 文本切分 → Embedding → 向量存储              │     │
│  │  PDF/DOCX/TXT    递归/语义/父子块   llama.cpp       Qdrant    │     │
│  └────────────────────────────────────────────────────────┘     │
│                              │                                    │
│                              ▼                                    │
│  阶段二:在线检索生成 (backend/app/rag)                                   │
│  ┌────────────────────────────────────────────────────────┐     │
│  │  用户查询 → 查询改写 → 多路检索 → RRF 融合 → 重排序      │     │
│  │              MultiQuery    Dense+Sparse    Cross-Encoder     │     │
│  │                              │                              │     │
│  │                              ▼                              │     │
│  │                        LLM 生成回答                          │     │
│  └────────────────────────────────────────────────────────┘     │
│                                                                  │
└──────────────────────────────────────────────────────────────────┘

RAG 演进路线 (Roadmap)

核心算法: 近似最近邻搜索 (ANN, 常用 HNSW 算法)

算法原理:
  用户问题 → Embedding 模型 → 向量表示
    ↓
  计算与库中向量的余弦相似度
    ↓
  取距离最近的 K 个块返回
  
优缺点:
  ✅ 速度极快(毫秒级)
  ❌ 只能捕捉"语义相似",专有名词匹配差
  
实现代码:
  from app.rag.retriever import create_base_retriever
  
  retriever = create_base_retriever(
      collection_name="rag_documents",
      embeddings=embeddings,
      search_kwargs={"k": 20}
  )
  docs = retriever.invoke("什么是 RAG")

Level 2: 混合检索与重排序 (Hybrid Search + Reranker)

1. 基础召回(混合检索)

算法原理:
  稠密向量检索(语义相似)+ BM25 稀疏检索(关键词匹配)
    ↓
  两路结果并行获取,等待融合
  
实现代码:
  from app.rag.retriever import create_hybrid_retriever
  
  retriever = create_hybrid_retriever(
      collection_name="rag_documents",
      embeddings=embeddings,
      dense_k=10,
      sparse_k=10,
      score_threshold=0.3
  )

2. 二次精排Cross-Encoder

算法原理:
  不同于双塔模型(分别算向量再求距离)
  交叉编码器将"问题 + 文档"拼接后整体输入 Transformer
  由模型直接输出 0~1 的相关性得分,精度极高
  
实现代码:
  from app.rag.reranker import LLaMaCPPReranker
  
  reranker = LLaMaCPPReranker(
      base_url="http://127.0.0.1:8083",
      api_key="your-api-key",
      top_n=5
  )
  sorted_docs = reranker.compress_documents(documents, query)

Level 3: RAG-Fusion (多路改写与倒数排名融合)

1. 多路查询改写

算法原理:
  克服用户初始提问词不达意或视角受限的问题
  通过 LLM 将单一问题改写为多个不同角度的查询
  
实现代码:
  from app.rag.query_transform import MultiQueryGenerator
  
  generator = MultiQueryGenerator(llm=llm, num_queries=3)
  queries = await generator.agenerate("如何申请项目资金?")
  # 返回:["如何申请项目资金?", "项目资金申请流程是什么?", "申请项目经费需要哪些步骤?"]

2. 倒数排名融合RRF

算法原理:
  RRF (Reciprocal Rank Fusion) 是一种无需评分归一化的融合算法
  公式RRF_score(d) = Σ 1/(k + rank_q(d))
  有效避免某一极端检索结果主导全局
  
实现代码:
  from app.rag.fusion import reciprocal_rank_fusion
  
  # 多个查询的检索结果
  doc_lists = [result1, result2, result3]
  fused_docs = reciprocal_rank_fusion(doc_lists, k=60)

Level 4: Agentic RAG / Self-RAG (智能体与自我反思)

核心原理:
  基于 LangGraph 的 ReAct (Reasoning and Acting) 状态机路由
  大模型并非每次都去死板地执行检索,而是先判断问题:
  "这是闲聊?还是需要查知识库?"

工作流程:
  ┌──────────┐     ┌──────────────┐     ┌──────────┐     ┌────────┐
  │   User   │────>│ LangGraph    │────>│ RAG_Tool │────>│ Qdrant │
  │          │     │ Agent        │     │          │     │        │
  │ "公司报  │     │ 思考: 这是   │     │ ToolCall │     │ RAG-   │
  │ 销流程?"│     │ 内部规章问题 │     │ search_  │     │ Fusion │
  │          │     │ 需要查资料   │     │ knowledge│     │ & 混合 │
  │          │<────│ 资料充分,   │<────│ 返回最相 │<────│ 检索   │
  │ "根据知  │     │ 开始撰写回答 │     │ 关5条规定 │     │ Cross- │
  │ 识库规定 │     │              │     │          │     │ Encoder│
  │ ..."     │     │              │     │          │     │ 重排   │
  └──────────     └──────────────     └──────────┘     └────────┘

实现代码:
  from app.rag.tools import search_knowledge_base
  from app.graph.graph_builder import GraphBuilder
  
  # 将 RAG 工具添加到工具列表
  tools = AVAILABLE_TOOLS + [search_knowledge_base]
  
  # 构建图
  builder = GraphBuilder(llm, tools, tools_by_name)
  graph = builder.build().compile(checkpointer=checkpointer)

Level 5: GraphRAG 集成 (基于图和关系的 RAG)

核心原理:
  结合知识图谱的结构化关系和向量检索的语义相似度
  解决跨文档复杂关系推理问题
  
实现代码:
  from langchain_community.graphs import Neo4jGraph
  from langchain_experimental.graph_transformers import LLMGraphTransformer
  
  # 实体关系抽取
  transformer = LLMGraphTransformer(llm=local_llm)
  graph_documents = transformer.convert_to_graph_documents(documents)
  
  # 存储到图数据库
  graph = Neo4jGraph(url="bolt://localhost:7687")
  graph.add_graph_documents(graph_documents)

检索策略对比

策略 优点 缺点 适用场景
基础向量检索 速度快,语义理解好 专有名词匹配差 通用问答
混合检索 语义 + 关键词匹配 需要配置稀疏向量 专业术语查询
多路改写 + RRF 搜索面广,结果稳定 延迟略高 复杂问题
重排序 精度高 依赖额外模型 最终精排
Agentic RAG 智能决策,灵活 实现复杂 生产环境
GraphRAG 关系推理能力强 需要图数据库 知识密集型场景

切分策略对比

策略 原理 优点 缺点 适用场景
递归字符 按分隔符递归切分 速度快,实现简单 可能截断语义 简单文档
语义切分 基于句子相似度阈值 语义连贯性好 需要 Embedding 模型 专业文档
父子块 大块存储+小块检索 检索精准+上下文完整 存储复杂度高 生产环境

<EFBFBD><EFBFBD> 快速开始

详细启动指南请查看 QUICKSTART.md

方式一Docker Compose推荐

# 1. 配置环境变量
cp .env.docker .env
# 编辑 .env 文件,填入真实的 API Key

# 2. 启动所有服务
docker compose -f docker/docker-compose.yml up -d --build

# 3. 访问应用
# 前端: http://127.0.0.1:8501
# 后端 API: http://127.0.0.1:8083

方式二:本地开发模式

# 1. 安装依赖
pip install -r requirement.txt

# 2. 配置环境变量
cp .env.docker .env
# 编辑 .env根据本地/远程环境调整配置

# 3. 启动后端
python backend/app/backend.py

# 4. 启动前端(新终端)
streamlit run frontend/src/frontend_main.py

# 或者使用启动脚本(推荐)
./scripts/start.sh both

📖 使用指南

基础对话

直接在聊天框输入问题即可:

你好,请介绍一下自己
今天北京天气怎么样?
帮我总结一下 a.txt 的内容

工具调用示例

功能 示例提问
🌤️ 天气查询 "上海今天天气如何?"
📄 读取文本 "读取 a.txt 的内容"
📑 解析 PDF "总结 b.pdf 的主要内容"
📊 Excel 数据 "显示 c.xlsx 的数据"
🌐 网页抓取 "抓取 https://example.com 的内容"
🔍 长期记忆 "记住我喜欢吃川菜" → "我有什么饮食偏好?"

多模型切换

  1. 在左侧边栏选择模型:

    • 智谱 GLM-4.7:在线服务,速度快
    • DeepSeek Reasoner:深度推理模型
    • 本地 Gemma-4:本地部署,隐私性好
  2. 可随时切换,甚至在同一会话中

  3. 点击 "🔄 新会话" 清空当前对话


🔧 开发指南

添加新工具

backend/app/graph/graph_tools.py 中添加新的 @tool 装饰函数:

@tool
def my_new_tool(param: str) -> str:
    """
    工具描述(会显示给 LLM
    
    Args:
        param: 参数说明
        
    Returns:
        返回值说明
    """
    # 实现逻辑
    return result

工具会自动注册到 AVAILABLE_TOOLS 列表中。

添加新模型

backend/app/agent/llm_factory.py 中添加模型创建方法:

@staticmethod
def create_new_model():
    api_key = os.getenv("NEW_MODEL_API_KEY")
    return ChatOpenAI(
        base_url="https://api.new-model.com/v1",
        api_key=SecretStr(api_key),
        model="model-name",
        temperature=0.1,
        streaming=True,
    )

然后在 CREATORS 字典中注册:

CREATORS = {
    "local": create_local,
    "deepseek": create_deepseek,
    "zhipu": create_zhipu,
    "new_model": create_new_model,  # 新增
}

Docker 部署

项目包含完整的 Docker 配置:

  • docker-compose.yml服务编排Backend + Frontend连接远程数据库
  • Dockerfile.backend:后端镜像构建
  • Dockerfile.frontend:前端镜像构建
  • .gitea/workflows/deploy.ymlCI/CD 自动化部署

详见 QUICKSTART.md 的 Docker 部署章节。


⚙️ 环境配置

配置文件说明

项目采用两层环境配置文件体系:

文件 用途 是否提交 Git
.env.docker Docker 部署模板
.env 实际使用的配置 否(已忽略)

使用方法:

  • 本地开发cp .env.docker .env,修改为本地服务地址
  • Docker 部署cp .env.docker .env,使用远程服务器地址

必需的环境变量

变量名 说明 本地开发示例 Docker 部署示例
ZHIPUAI_API_KEY 智谱AI API密钥 your-api-key your-api-key
DEEPSEEK_API_KEY DeepSeek API密钥 your-api-key your-api-key
LLAMACPP_API_KEY llama.cpp API 密钥 token-abc123 token-abc123
VLLM_BASE_URL LLM 服务地址 http://127.0.0.1:8081/v1 http://your-server:8081/v1
LLAMACPP_EMBEDDING_URL Embedding 服务地址 http://127.0.0.1:8082/v1 http://your-server:8082/v1
DB_URI PostgreSQL 连接字符串 postgresql://...@115.190.121.151:5432/langgraph_db 同左
QDRANT_URL Qdrant 向量数据库地址 http://115.190.121.151:6333 同左
LOG_LEVEL 日志级别 INFO WARNING
ENABLE_GRAPH_TRACE 是否启用图流转追踪 true false
MEMORY_SUMMARIZE_INTERVAL 对话摘要生成间隔 10 10

注意事项

  • ⚠️ 不要硬编码敏感信息:所有 API Key 必须通过环境变量配置
  • ⚠️ 远程服务依赖:确保可以访问远程 PostgreSQL (115.190.121.151:5432) 和 Qdrant (115.190.121.151:6333)
  • ⚠️ 修改后重启:修改 .envDocker 部署需要执行 docker compose down && docker compose up -d --build

<EFBFBD> 实现指南与最佳实践

1. RAG 知识库构建指南

1.1 离线索引构建流程

# 步骤 1准备文档
# 将文档放入指定目录(如 ./documents/
# 支持格式TXT, PDF, Markdown, DOCX

# 步骤 2选择切分策略
# 根据文档类型选择:
# - 结构化文档如手册、API 文档)→ Recursive Splitting
# - 非结构化文档(如文章、报告)→ Semantic Chunking
# - 长文档(如书籍、论文)→ Parent-Child Chunking

# 步骤 3构建索引
cd rag_indexer
python cli.py build \
    --input-dir ../documents \
    --collection-name my_knowledge \
    --splitter-type semantic \
    --chunk-size 500

# 步骤 4验证索引
python cli.py query \
    --collection-name my_knowledge \
    --query "如何配置系统?" \
    --top-k 5

1.2 切分策略选择建议

文档类型 推荐策略 chunk_size 说明
API 文档 Recursive 300-500 结构清晰,按章节切分
技术文章 Semantic 自适应 保持语义完整性
法律合同 Parent-Child 父:1000, 子:200 需要完整上下文
问答对 Recursive 200-300 短文本,精确匹配
产品手册 Parent-Child 父:1500, 子:300 长文档,跨章节检索

1.3 Embedding 模型选择

模型 维度 语言 速度 精度 适用场景
bge-large-zh-v1.5 1024 中文 中文文档检索
bge-m3 1024 多语言 多语言混合文档
text-embedding-3-small 1536 英文 英文文档检索
gte-large 1024 中文 很高 高精度要求场景

2. 性能优化指南

2.1 检索性能优化

# 优化 1调整检索参数
search_kwargs = {
    "k": 20,              # 召回数量(增加召回,提高精度)
    "score_threshold": 0.3,  # 相似度阈值(过滤低质量结果)
    "fetch_k": 50,        # 初始召回数量(用于 MMR 去重)
    "lambda_mult": 0.7,   # MMR 多样性参数0=去重1=不去重)
}

# 优化 2使用缓存
from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_retrieve(query: str) -> List[Document]:
    """缓存常见查询结果"""
    return retriever.invoke(query)

# 优化 3批量 Embedding
# 构建索引时,使用批量处理提高效率
batch_size = 32
for i in range(0, len(documents), batch_size):
    batch = documents[i:i+batch_size]
    embeddings = embedder.embed_documents([doc.page_content for doc in batch])

2.2 LLM 调用优化

# 优化 1使用流式响应
# 减少首字延迟,提升用户体验
response = await llm.astream(messages)
async for chunk in response:
    yield chunk.content

# 优化 2控制上下文长度
MAX_CONTEXT_LENGTH = 4000

def truncate_context(context: str, max_length: int = MAX_CONTEXT_LENGTH) -> str:
    """截断上下文,保留开头和结尾"""
    if len(context) <= max_length:
        return context
    half = max_length // 2
    return context[:half] + "\n...\n" + context[-half:]

# 优化 3Prompt 优化
# 使用结构化 Prompt提高 LLM 理解能力
prompt_template = """
你是一个智能助手。请根据以下上下文回答用户问题。

上下文:
{context}

问题:{question}

回答要求:
1. 仅基于上下文回答,不要编造信息
2. 如果上下文中没有答案,明确告知用户
3. 引用上下文中的具体信息时,注明来源
"""

2.3 数据库性能优化

-- PostgreSQL 优化
-- 1. 为对话历史表添加索引
CREATE INDEX idx_conversations_user_id ON conversations(user_id);
CREATE INDEX idx_conversations_created_at ON conversations(created_at);

-- 2. 为记忆摘要表添加索引
CREATE INDEX idx_summaries_user_id ON summaries(user_id);

-- 3. 定期清理旧数据(保留最近 90 天)
DELETE FROM conversations 
WHERE created_at < NOW() - INTERVAL '90 days';

-- Qdrant 优化
-- 1. 使用 HNSW 索引(默认已启用)
-- 2. 调整 ef_construct 和 m 参数平衡速度和精度
-- 3. 定期优化集合
curl -X POST http://localhost:6333/collections/my_docs/actions \
  -H "Content-Type: application/json" \
  -d '{"actions": [{"optimize": {}}]}'

3. 扩展开发指南

3.1 添加自定义工具

# 在 backend/app/graph/graph_tools.py 中添加

from langchain_core.tools import tool
from typing import Optional

@tool
def calculate_expression(
    expression: str,
    precision: int = 2
) -> str:
    """
    计算数学表达式的值。
    
    支持基本运算:加减乘除、幂运算、括号。
    
    Args:
        expression: 数学表达式,如 "2 + 3 * 4"
        precision: 结果精度(小数位数),默认 2
        
    Returns:
        计算结果
    """
    try:
        # 安全计算(避免 eval 的安全风险)
        import ast
        import operator
        
        # 定义允许的操作符
        ops = {
            ast.Add: operator.add,
            ast.Sub: operator.sub,
            ast.Mult: operator.mul,
            ast.Div: operator.truediv,
            ast.Pow: operator.pow,
        }
        
        def eval_node(node):
            if isinstance(node, ast.Num):
                return node.n
            elif isinstance(node, ast.BinOp):
                left = eval_node(node.left)
                right = eval_node(node.right)
                return ops[type(node.op)](left, right)
            else:
                raise ValueError(f"不支持的操作: {type(node)}")
        
        tree = ast.parse(expression, mode='eval')
        result = eval_node(tree.body)
        return f"{result:.{precision}f}"
    except Exception as e:
        return f"计算错误: {str(e)}"

# 工具会自动注册,无需手动添加到 AVAILABLE_TOOLS

3.2 添加自定义 LLM 提供商

# 在 backend/app/agent/llm_factory.py 中添加

from langchain_openai import ChatOpenAI
from langchain_core.language_models import BaseChatModel

class LLMFactory:
    @staticmethod
    def create_anthropic_model() -> BaseChatModel:
        """创建 Anthropic Claude 模型"""
        api_key = os.getenv("ANTHROPIC_API_KEY")
        if not api_key:
            raise ValueError("ANTHROPIC_API_KEY 环境变量未设置")
        
        return ChatAnthropic(
            model="claude-3-sonnet-20240229",
            temperature=0.1,
            max_tokens=4096,
            api_key=api_key,
        )
    
    # 注册到 CREATORS
    CREATORS = {
        "local": create_local,
        "deepseek": create_deepseek,
        "zhipu": create_zhipu,
        "anthropic": create_anthropic_model,  # 新增
    }

3.3 添加自定义 RAG 检索器

# 在 backend/app/rag/retriever.py 中添加

from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document

class HybridRetriever(BaseRetriever):
    """混合检索器:结合多种检索策略"""
    
    dense_retriever: BaseRetriever
    sparse_retriever: BaseRetriever
    reranker: Optional[BaseReranker] = None
    top_k: int = 5
    
    def _get_relevant_documents(self, query: str) -> List[Document]:
        # 并行检索
        dense_docs = self.dense_retriever.invoke(query)
        sparse_docs = self.sparse_retriever.invoke(query)
        
        # RRF 融合
        fused_docs = reciprocal_rank_fusion([dense_docs, sparse_docs])
        
        # 重排序
        if self.reranker:
            fused_docs = self.reranker.compress_documents(fused_docs[:20], query)
        
        return fused_docs[:self.top_k]

4. 部署最佳实践

4.1 生产环境配置

# docker-compose.prod.yml
version: '3.8'

services:
  backend:
    build:
      context: .
      dockerfile: docker/Dockerfile.backend
    environment:
      - LOG_LEVEL=WARNING  # 生产环境降低日志级别
      - ENABLE_GRAPH_TRACE=false  # 关闭图追踪(提升性能)
      - MEMORY_SUMMARIZE_INTERVAL=5  # 更频繁的摘要生成
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 4G
        reservations:
          cpus: '1'
          memory: 2G
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8083/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  frontend:
    build:
      context: .
      dockerfile: docker/Dockerfile.frontend
    environment:
      - STREAMLIT_SERVER_PORT=8501
      - STREAMLIT_SERVER_HEADLESS=true
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 2G
    restart: unless-stopped

4.2 监控与告警

# 添加健康检查端点
from fastapi import FastAPI
from datetime import datetime

app = FastAPI()

@app.get("/health")
async def health_check():
    """健康检查端点"""
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "services": {
            "postgresql": await check_postgresql(),
            "qdrant": await check_qdrant(),
            "llm": await check_llm_service(),
        }
    }

async def check_postgresql() -> bool:
    try:
        # 尝试连接数据库
        async with asyncpg.connect(DB_URI) as conn:
            await conn.fetchval("SELECT 1")
        return True
    except Exception:
        return False

async def check_qdrant() -> bool:
    try:
        client = QdrantClient(url=QDRANT_URL)
        client.get_collections()
        return True
    except Exception:
        return False

4.3 安全最佳实践

# 1. 使用环境变量管理密钥
# 永远不要硬编码 API Key
import os
from typing import Optional

def get_api_key(env_var: str, default: Optional[str] = None) -> str:
    api_key = os.getenv(env_var, default)
    if not api_key:
        raise ValueError(f"{env_var} 环境变量未设置")
    return api_key

# 2. 输入验证
from pydantic import BaseModel, validator

class ChatRequest(BaseModel):
    message: str
    user_id: str
    model: str = "zhipu"
    
    @validator('message')
    def validate_message(cls, v):
        if len(v) > 4000:
            raise ValueError("消息长度不能超过 4000 字符")
        return v.strip()
    
    @validator('model')
    def validate_model(cls, v):
        allowed_models = {"zhipu", "deepseek", "local"}
        if v not in allowed_models:
            raise ValueError(f"不支持的模型: {v}")
        return v

# 3. 速率限制
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/api/chat")
@limiter.limit("10/minute")  # 每分钟最多 10 次请求
async def chat(request: Request, chat_request: ChatRequest):
    ...

5. 调试与测试

5.1 启用调试模式

# 启用详细日志
export LOG_LEVEL=DEBUG

# 启用 LangGraph 图追踪
export ENABLE_GRAPH_TRACE=true

# 查看图执行流程
# 启动后访问http://localhost:8083/graph/trace

5.2 单元测试示例

# tests/test_rag.py
import pytest
from app.rag.fusion import reciprocal_rank_fusion
from langchain_core.documents import Document

def test_rrf_fusion():
    # 准备测试数据
    docs1 = [
        Document(page_content="文档 A"),
        Document(page_content="文档 B"),
    ]
    docs2 = [
        Document(page_content="文档 B"),
        Document(page_content="文档 C"),
    ]
    
    # 执行 RRF 融合
    result = reciprocal_rank_fusion([docs1, docs2])
    
    # 验证结果
    assert len(result) == 3
    assert result[0].page_content == "文档 B"  # B 在两个列表中都出现,应该排第一

@pytest.mark.asyncio
async def test_retriever():
    from app.rag.retriever import create_base_retriever
    
    retriever = create_base_retriever(
        collection_name="test_collection",
        embeddings=mock_embeddings,
    )
    
    docs = await retriever.ainvoke("测试查询")
    assert len(docs) > 0

<EFBFBD><EFBFBD> 故障排查

常见问题

Q: 无法连接远程数据库?

# 测试 PostgreSQL
psql -h 115.190.121.151 -U postgres -d langgraph_db -c "SELECT version();"

# 测试 Qdrant
curl http://115.190.121.151:6333/collections

Q: 后端启动失败?

  • 确认端口 8083 未被占用
  • 检查 .env 中的 API Key 是否正确
  • 查看启动日志确认模型初始化成功

Q: 模型切换后无响应?

  • 检查所选模型的配置是否正确
  • 确认 vLLM 容器是否运行(如使用本地模型)
  • 尝试切换到另一个模型

更多问题排查请查看 QUICKSTART.md


📝 许可证

本项目采用 MIT 许可证。详见 LICENSE 文件。

🤝 贡献

欢迎提交 Issue 和 Pull Request

Description
No description provided
Readme Apache-2.0 60 MiB
Languages
Python 91.9%
TypeScript 4.5%
Shell 2.8%
Dockerfile 0.8%