From 4c119073bc2ab536994b992c96ee2afdbad56538 Mon Sep 17 00:00:00 2001 From: root <953994191@qq.com> Date: Sat, 9 May 2026 01:51:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2362 ++--------------- backend/app/backend.py | 92 +- backend/app/core/__init__.py | 7 +- backend/app/core/formatter.py | 564 ++-- backend/app/core/retry_utils.py | 332 --- backend/app/core/stream_finalizer.py | 220 ++ backend/app/core/web_search.py | 40 +- backend/app/middleware/__init__.py | 15 + backend/app/middleware/response_formatter.py | 91 + backend/app/subgraphs/contact/nodes.py | 12 +- backend/app/subgraphs/dictionary/nodes.py | 13 +- backend/app/subgraphs/news_analysis/nodes.py | 10 +- backend/app/templates/__init__.py | 9 + backend/app/templates/conversation_summary.md | 26 + backend/app/templates/error_notification.md | 21 + backend/app/templates/knowledge_summary.md | 48 + backend/app/templates/tool_result.md | 24 + backend/app/templates/web_search_result.md | 28 + 18 files changed, 973 insertions(+), 2941 deletions(-) delete mode 100644 backend/app/core/retry_utils.py create mode 100644 backend/app/core/stream_finalizer.py create mode 100644 backend/app/middleware/__init__.py create mode 100644 backend/app/middleware/response_formatter.py create mode 100644 backend/app/templates/__init__.py create mode 100644 backend/app/templates/conversation_summary.md create mode 100644 backend/app/templates/error_notification.md create mode 100644 backend/app/templates/knowledge_summary.md create mode 100644 backend/app/templates/tool_result.md create mode 100644 backend/app/templates/web_search_result.md diff --git a/README.md b/README.md index 068bf1f..b39408d 100644 --- a/README.md +++ b/README.md @@ -1,2176 +1,284 @@ # AI Agent - 智能助手系统 -一个基于 LangGraph + FastAPI 的智能对话助手,支持多模型切换、联网搜索、可视化图表、以及多个专业子图模块(通讯录、词典、资讯分析)等功能。 +基于 LangGraph + FastAPI 的智能对话助手,支持多模型切换、RAG 检索、联网搜索、子图系统等。 --- -## 📑 目录导航 +## 目录 + +- [核心功能](#核心功能) +- [快速开始](#快速开始) +- [项目结构](#项目结构) +- [API 接口](#api-接口) +- [配置说明](#配置说明) -- [核心功能](#-核心功能) - 面向用户的功能和技术特性 -- [使用指南](#-使用指南) - 基础对话、工具调用、多模型切换 -- [技术架构](#️-技术架构) - 技术栈、系统架构图、工作流流程图 -- [模型服务使用情况](#55-模型服务使用情况详解) - 模型选型、Token估算、成本分析 -- [核心算法与实现原理](#-核心算法与实现原理) - LangGraph 工作流、多模型路由、SSE 流式响应 -- [快速开始](#-快速开始) - Docker 和本地部署指南 -- [开发指南](#-开发指南) - 添加工具、添加模型、Docker 部署 -- [实现指南与最佳实践](#️-实现指南与最佳实践) - 性能优化、扩展开发、部署实践 -- [环境配置](#️-环境配置) - 配置文件、环境变量 -- [故障排查](#-故障排查) - 常见问题 --- -## 🎯 核心功能 +## 核心功能 -### 面向用户的功能 +### 对话能力 -- 💬 **智能对话**:支持多轮对话,自动记忆上下文 -- 🌐 **联网搜索**:免费使用 DuckDuckGo 搜索,无需 API Key,支持引用溯源 -- 📊 **可视化图表**:支持 Mermaid 图表和 matplotlib 图表生成 -- 🔄 **多模型切换**:前端可选择不同大语言模型 -- 📇 **通讯录管理**:子图模块,联系人 CRUD、邮件处理 -- 📖 **智能词典**:子图模块,翻译、生词本、专业术语提取 -- 📰 **资讯分析**:子图模块,资讯获取、内容分析、格式化展示 +| 功能 | 说明 | +|------|------| +| 智能对话 | 多轮对话,自动记忆上下文 | +| 多模型切换 | 前端可选择不同 LLM | +| 流式响应 | SSE 流式输出,打字机效果 | +| 持久化记忆 | PostgreSQL 存储对话历史 | + +### 工具系统 + +| 工具 | 说明 | +|------|------| +| RAG 检索 | 知识库检索,带置信度评估 | +| 联网搜索 | DuckDuckGo 免费搜索 | +| 通讯录 | 联系人管理、邮件处理 | +| 词典 | 翻译、术语提取、生词本 | +| 资讯分析 | 新闻检索、关键词提取 | ### 技术特性 -- ✅ **持久化记忆**:PostgreSQL 存储对话历史,重启不丢失 -- ✅ **高可用架构**:模型服务自动降级,确保服务稳定 -- ✅ **前后端分离**:FastAPI 后端 + Streamlit 前端 -- ✅ **Docker 部署**:一键启动所有服务 -- ✅ **远程服务架构**:PostgreSQL 和 Qdrant 部署在远程服务器 -- ✅ **流式响应**:SSE 流式输出,提升用户体验 -- ✅ **模块化设计**:清晰的代码分层,易于扩展和维护 -- ✅ **模型服务层**:统一的 Embedding、Rerank、Chat 服务接口,支持自动降级 -- ✅ **子图系统**:模块化的子图架构,共享公共工具(意图理解、人工审核、格式化输出) -- ✅ **公共工具库**:联网搜索、可视化图表等通用工具,所有子图和主图均可使用 -- ✅ **React 模式** ⭐:Reasoning → Acting → Observing 循环,LLM 先思考再行动,支持多次工具调用 -- ✅ **混合路由架构** ⭐⭐:前置快速路由(规则分流 + 轻量级意图分类)+ 完整 React 循环(兜底) -- ✅ **双模型服务** ⭐:get_chat_service()(大模型)+ get_small_llm_service()(轻量级模型) -- ✅ **自动升级机制**:快速路径失败时,自动回到完整 React 循环 -- ✅ **向后兼容**:可通过 use_hybrid_router=True/False 切换混合路由/纯 React 模式 +- **模块化架构**:清晰的代码分层,易于扩展 +- **模型降级**:多模型自动降级,保证高可用 +- **子图系统**:独立的工作流模块,按需调用 +- **格式化输出**:统一的 Markdown 格式化,支持模板 --- +## 快速开始 ---- +### 环境要求 -## 📖 使用指南 +- Python 3.10+ +- PostgreSQL(远程或本地) +- Qdrant 向量数据库(远程或本地) -### 基础对话 - -直接在聊天框输入问题即可: - -``` -你好,请介绍一下自己 -帮我写一个 Python 脚本 -``` - -### 主要功能 - -| 功能 | 说明 | 示例提问 | -|------|------|---------| -| 🧠 混合路由智能分流 | 自动判断任务类型,选择最佳路径 | 自然对话即可 | -| ⚡ 快速路径 | 闲聊、RAG查询、工具调用可走快速路径 | "你好"、"什么是 RAG" | -| 🔄 React 推理循环 | 复杂任务走完整的思考-行动-观察循环 | "帮我分析一下这个文档" | -| 🌐 联网搜索 | 免费 DuckDuckGo 搜索 | "今天北京天气怎么样?" | -| 📚 RAG 知识库检索 | 检索本地知识库 | "如何配置系统?" | -| 📇 通讯录管理 | 联系人 CRUD、邮件处理 | "帮我查看一下张三的联系方式" | -| 📖 智能词典 | 翻译、生词本、专业术语提取 | "帮我翻译这句话" | -| 📰 资讯分析 | 资讯获取、内容分析 | "帮我分析一下这篇新闻" | -| 📊 可视化图表 | 支持 Mermaid 图表生成 | "帮我画一个流程图" | - -### 多模型切换 - -1. 在左侧边栏选择模型: - - **智谱 GLM-4**:在线服务,速度快 - - **DeepSeek V3**:深度推理模型 - - **OpenAI GPT-4o-mini**:通用对话模型 - - **本地 Qwen3.5-9B**:本地部署,隐私性好 - -2. 可随时切换,甚至在同一会话中 - -3. 点击 "🔄 新会话" 清空当前对话 - ---- - -## 🏗️ 技术架构 - -### 1. 技术栈总览 - -| 层级 | 组件 | 技术选型 | 说明 | -|------|------|---------|------| -| **Agent 框架** | 工作流编排 | LangGraph + LangChain | 状态机驱动的智能体工作流 | -| **后端框架** | API 服务 | FastAPI + Uvicorn | RESTful API + SSE 流式输出 | -| **前端框架** | Web 界面 | Streamlit | 交互式对话界面 | -| **关系数据库** | 持久化存储 | PostgreSQL | 对话记忆持久化(远程服务器) | -| **向量数据库** | 向量检索 | Qdrant | 高性能向量相似度检索(远程服务器) | -| **容器化** | 服务编排 | Docker + Docker Compose | 一键部署所有服务 | -| **CI/CD** | 自动化部署 | Gitea Workflows | 代码推送自动构建部署 | -| **LLM 服务** | 云端模型 | 智谱 AI (glm-4-plus) | 快速响应,适合日常对话 | -| | | DeepSeek (deepseek-chat-v3) | 深度推理,适合复杂问题 | -| | | OpenAI (gpt-4o-mini) | 通用对话 | -| | 本地模型 | Qwen3.5-9B.Q4_K_M | 本地部署 GGUF 格式 | -| **模型服务层** | Chat 服务 | chat_services.py | 统一的生成式大模型接口 | -| | Embedding 服务 | embedding_services.py | 统一的嵌入模型接口 | -| | Rerank 服务 | rerank_services.py | 统一的重排序接口 | -| **Embedding** | 向量嵌入 | llama.cpp server | 本地 embedding 服务 (:18001) | - ---- - -### 2. 系统全景图 - -展示系统各组件之间的高层交互关系,隐藏执行细节。 - -```mermaid -graph TB - subgraph UserLayer["👤 用户层"] - Browser["浏览器"] - Streamlit["Streamlit 前端
:8501"] - end - - subgraph AppLayer["⚙️ 应用服务层"] - FastAPI["FastAPI 后端
:8079"] - Agent["AIAgentService
(智能体协调)"] - end - - subgraph EngineLayer["🧠 智能体引擎"] - LangGraph["LangGraph 工作流引擎
路由 / React / 工具调用"] - end - - subgraph ServicesLayer["🧩 领域服务"] - RAG["RAG 检索服务"] - Tools["工具集
(搜索、通讯录、词典等)"] - end - - subgraph ModelLayer["🤖 模型层"] - LLM["LLM 服务
(多模型降级链)"] - Embedding["Embedding 服务
:18001"] - Rerank["Rerank 服务
:18002"] - end - - subgraph DataLayer["💾 数据层"] - Qdrant["Qdrant 向量库"] - PostgreSQL["PostgreSQL"] - end - - Browser --> Streamlit - Streamlit --> FastAPI - FastAPI --> Agent - Agent --> LangGraph - LangGraph --> RAG - LangGraph --> Tools - LangGraph --> LLM - RAG --> Embedding - RAG --> Rerank - RAG --> Qdrant - Agent --> PostgreSQL - LangGraph --> PostgreSQL - - style UserLayer fill:#e1f5ff - style AppLayer fill:#fff4e1 - style EngineLayer fill:#ffe0b2 - style ServicesLayer fill:#e8f5e9 - style ModelLayer fill:#f3e5f5 - style DataLayer fill:#ffebee -``` - ---- - -### 3. 智能体引擎(主图) - -#### 3.1 核心流程 - -```mermaid -graph TB - Start([开始]) --> Retrieve["检索长期记忆"] - Retrieve --> Trigger["记忆触发"] - Trigger --> Init["初始化状态"] - Init --> Route{"混合路由
hybrid_router"} - - Route -->|闲聊/简单问答| Fast["⚡ 快速路径
(fast_*)"] - Route -->|知识检索| FastRAG["⚡ 快速 RAG"] - Route -->|工具调用| FastTool["⚡ 快速工具"] - Route -->|复杂任务| React["🔄 React 循环"] - - React --> Reason["推理"] - Reason --> Action["选择行动"] - Action -->|需要检索| RAGNode["RAG 检索"] - Action -->|搜索| Web["联网搜索"] - Action -->|通讯录| Contact["通讯录子图"] - Action -->|词典| Dict["词典子图"] - Action -->|资讯| News["资讯子图"] - Action -->|LLM| LLMCall["LLM 生成"] - - RAGNode --> Observe["观察结果"] - Web --> Observe - Contact --> Observe - Dict --> Observe - News --> Observe - LLMCall --> Observe - Observe -->|未完成| Reason - Observe -->|完成| Final["生成最终回复"] - - Fast --> Final - FastRAG --> Final - FastTool --> Final - Final --> End([结束]) - - style Route fill:#fff3e0,stroke:#ff9800,stroke-width:3px - style React fill:#f3e5f5,stroke:#7e57c2,stroke-width:2px -``` - -#### 3.2 路由策略 - -- **闲聊 / 简单问答** → `fast_chitchat`:直接调用 LLM 生成回复 -- **知识检索** → `fast_rag`:RAG 检索后生成回复 -- **工具调用** → `fast_tool`:直接执行原子工具操作 -- **复杂任务** → `React 循环`:推理 → 行动 → 观察的多轮迭代 - -#### 3.3 React 循环详解 - -当任务需要多步推理时,引擎进入 React 循环: - -1. **推理**:LLM 分析当前状态,决定下一步行动 -2. **行动**:执行工具调用(RAG 检索、联网搜索、子图操作等) -3. **观察**:收集行动结果,更新状态 -4. 若任务未完成,回到步骤 1;否则退出循环,生成最终回复 - ---- - -### 4. 子图系统 - -每个子图都是独立的模块化工作流,由主图的 React 循环按需调用。 - -#### 4.1 通讯录子图 - -支持联系人管理、邮件草稿生成与人工审核发送。 - -```mermaid -graph LR - Start([START]) --> Intent["parse_intent"] - Intent --> List["list_contacts"] - Intent --> Add["add_contact"] - Intent --> EmailList["list_emails"] - Intent --> GenEmail["generate_email_draft"] - Intent --> Sniff["sniff_contacts"] - GenEmail --> Human["human_review"] - Human --> Send["send_email"] - Send --> Format["format_result"] - List --> Format - Add --> Format - EmailList --> Format - Sniff --> Format - Format --> End([END]) -``` - -#### 4.2 词典子图 - -支持单词查询、翻译、术语提取、每日单词、单词本管理。 - -```mermaid -graph LR - Start([START]) --> Intent["parse_intent"] - Intent --> Query["query_word"] - Intent --> Trans["translate_text"] - Intent --> Terms["extract_terms"] - Intent --> Daily["get_daily_word"] - Intent --> Lookup["lookup_word_book"] - Intent --> AddWord["add_to_word_book"] - Query --> Format["format_result"] - Trans --> Format - Terms --> Format - Daily --> Format - Lookup --> Format - AddWord --> Format - Format --> End([END]) -``` - -#### 4.3 资讯分析子图 - -支持新闻检索、URL 分析、关键词提取与报告生成。 - -```mermaid -graph LR - Start([START]) --> Intent["parse_intent"] - Intent --> QueryNews["query_news"] - Intent --> AnalyzeUrl["analyze_url"] - Intent --> Keywords["extract_keywords"] - Intent --> Report["generate_report"] - QueryNews --> Format["format_result"] - AnalyzeUrl --> Format - Keywords --> Format - Report --> Format - Format --> End([END]) -``` - ---- - -### 5. RAG 系统 - -#### 5.1 离线索引 - -文档导入、切分、嵌入生成到存入向量库的完整流程。 - -```mermaid -flowchart TB - subgraph Input["文档输入"] - DocSource["文档源
PDF/DOCX/TXT/Markdown"] - end - - subgraph Load["文档加载"] - Loader["Unstructured / PyMuPDF / TextLoader"] - end - - subgraph Split["文本切分"] - Recursive["RecursiveCharacterTextSplitter
按分隔符递归切分"] - Semantic["SemanticChunker
基于语义相似度"] - ParentChild["ParentChildSplitter
父子块切分"] - end - - subgraph Embed["嵌入生成"] - Dense["稠密向量
Qwen3-Embedding-0.6B"] - Sparse["稀疏向量 BM25
FastEmbed"] - end - - subgraph Store["向量存储"] - QdrantStore["Qdrant
HNSW 索引 + 稀疏索引"] - end - - DocSource --> Loader - Loader --> Recursive - Loader --> Semantic - Loader --> ParentChild - Recursive --> Dense - Semantic --> Dense - ParentChild --> Dense - Recursive --> Sparse - Semantic --> Sparse - ParentChild --> Sparse - Dense --> QdrantStore - Sparse --> QdrantStore - - style Input fill:#e3f2fd - style Load fill:#fff3e0 - style Split fill:#f3e5f5 - style Embed fill:#e8f5e9 - style Store fill:#ffebee -``` - -**技术组件说明:** - -| 组件 | 技术选型 | 说明 | -|------|---------|------| -| 文档加载 | Unstructured / PyMuPDF / TextLoader | 支持多种文档格式 | -| 文本切分 | RecursiveCharacterTextSplitter | 默认 500 字符,按分隔符递归切分 | -| 语义切分 | SemanticChunker | 基于 Embedding 相似度自动切分 | -| 父子切分 | ParentChildSplitter | 大块存储上下文,小块用于检索 | -| 稠密嵌入 | Qwen3-Embedding-0.6B-Q8_0 | llama.cpp server (:18001) | -| 稀疏嵌入 | FastEmbed BM25 | 本地计算,无需额外服务 | -| 向量存储 | Qdrant | HNSW 索引,高性能 ANN 检索 | - -#### 5.2 在线检索 - -用户查询经过改写、混合检索、融合、重排序,最终由 LLM 生成回答。 - -```mermaid -flowchart TB - subgraph Input["查询输入"] - Query["用户查询"] - end - - subgraph Processing["查询处理"] - Rewrite["查询改写
(LLM 生成多角度查询)"] - end - - subgraph Retrieval["混合检索"] - Parallel["并行检索"] - DenseRet["稠密向量检索"] - SparseRet["稀疏 BM25 检索"] - end - - subgraph Fusion["结果融合"] - RRF["RRF 融合
(Qdrant 服务端融合)"] - end - - subgraph Rerank["重排序"] - CrossEncoder["Cross-Encoder 重排
bge-reranker-v2-m3"] - end - - subgraph Generation["生成"] - LLMGen["LLM 生成回答"] - end - - Query --> Rewrite --> Parallel - Parallel --> DenseRet - Parallel --> SparseRet - DenseRet --> RRF - SparseRet --> RRF - RRF --> CrossEncoder --> LLMGen - - style Input fill:#e3f2fd - style Processing fill:#fff3e0 - style Retrieval fill:#f3e5f5 - style Fusion fill:#e8f5e9 - style Rerank fill:#ffebee - style Generation fill:#fff3e0 -``` - -**技术组件说明:** - -| 阶段 | 技术选型 | 说明 | -|------|---------|------| -| 查询改写 | MultiQuery | LLM 生成 3~5 个多角度查询 | -| 稠密检索 | Qwen3-Embedding | 余弦相似度向量检索 | -| 稀疏检索 | FastEmbed BM25 | TF-IDF 词频统计检索 | -| 结果融合 | Qdrant Fusion API | 服务端 RRF 融合,减少数据传输 | -| 重排序 | bge-reranker-v2-m3 | Cross-Encoder 交互编码,精度更高 | -| LLM 生成 | chat_services | 统一的大模型服务接口 | - ---- - - ---- - -### 5.5. 模型服务使用情况详解 - -#### 5.5.1. 模型服务架构总览 - -本项目采用**分层模型策略**,根据任务复杂度选择不同能力和成本的模型: - -| 模型类型 | 用途 | 主要来源 | 成本考量 | -|---------|------|---------|---------| -| 小模型 (Small LLM) | 意图分类、路由决策、查询改写 | 本地模型 / DeepSeek小模型 | 低成本,高频率 | -| 大模型 (Main LLM) | 对话生成、推理、工具调用 | 智谱 / DeepSeek / 本地 | 高能力,低频率 | -| Embedding模型 | 文本向量化、语义检索 | 本地 llama.cpp / 智谱API | 批量处理 | -| Rerank模型 | 检索结果重排序 | 硅基流动 / 智谱API | 精准排序 | -| Sparse模型 | BM25稀疏检索 | FastEmbed本地 | 关键词匹配 | - -#### 5.5.2. 小模型使用场景及Token估算 - -**小模型**主要用于高频率、低复杂度的任务: - -| 使用场景 | 位置文件 | 用途描述 | 单次Token估算 | 调用频率 | -|---------|---------|---------|-------------|---------| -| 意图分类 (1) | `app/core/intent_classifier.py` | 判断用户意图类型 | ~300输入 + ~50输出 | 每轮对话1次 | -| 意图分类 (2) | `app/main_graph/nodes/hybrid_router.py` | 混合路由决策 | ~200输入 + ~50输出 | 每轮对话1次 | -| 闲聊回复 | `app/main_graph/nodes/fast_paths.py` | 快速回复问候语 | ~50输入 + ~30输出 | 按需调用 | - -**Token估算说明**: -- 单次意图分类:总计 ~350-600 tokens -- 小模型成本通常是大模型的 1/10 - 1/100 -- 每日1000次对话,小模型仅消耗 ~350k-600k tokens - -#### 5.5.3. 大模型使用场景及Token估算 - -**大模型**用于核心对话生成和复杂推理: - -| 使用场景 | 位置文件 | 用途描述 | 单次Token估算 | 调用频率 | -|---------|---------|---------|-------------|---------| -| RAG查询改写 | `app/main_graph/utils/rag_initializer.py` | 生成多角度查询 | ~100输入 + ~150输出 | RAG调用时 | -| 主对话生成 | `app/main_graph/nodes/llm_call.py` | 用户查询响应 | ~500-2000输入 + ~200-1000输出 | 每轮对话1次 | -| React推理 | `app/main_graph/nodes/reasoning.py` | 任务分解与规划 | ~300-1000输入 + ~100-500输出 | 复杂任务多次 | -| 记忆摘要 | `app/memory/mem0_client.py` | 长期记忆压缩 | ~500-2000输入 + ~200-500输出 | 每N轮对话1次 | - -**Token估算说明**: -- 普通对话:总计 ~1000-3000 tokens -- RAG查询:额外 ~250 tokens -- 复杂多步推理:可能额外增加 500-3000 tokens -- 每日1000次对话,大模型预计消耗 1M-3M tokens - -#### 5.5.4. Embedding模型使用场景 - -**Embedding模型**用于语义检索和向量存储: - -| 使用场景 | 位置文件 | 用途描述 | 估算 | -|---------|---------|---------|------| -| RAG文档索引 | `rag_indexer/index_builder.py` | 文档分片向量化 | 每个文档片段1次 | -| 在线检索 | `app/rag/retriever.py` | 查询向量化 + 相似度检索 | 每次检索1次 | -| 记忆向量化 | `app/memory/mem0_client.py` | 记忆内容向量化存储 | 每次记忆更新1次 | - -**Embedding说明**: -- 向量维度:1024 (Qwen3-Embedding-0.6B) 或 2048 (智谱 embedding-3) -- 批量处理:建议使用 batch_size=10-20 提高效率 -- 本地优先:优先使用 llama.cpp 服务,降低API调用成本 - -#### 5.5.5. Rerank模型使用场景 - -**Rerank模型**用于检索结果精细化排序: - -| 使用场景 | 位置文件 | 用途描述 | 估算 | -|---------|---------|---------|------| -| RAG结果重排 | `app/rag/rerank.py` | 提升检索相关性 | 每次检索调用 | -| 混合检索重排 | `app/rag/retriever.py` | 稀疏+稠密结果融合排序 | 每次检索调用 | - -**Rerank说明**: -- 通常在 RRF 融合后使用,进一步提升精准度 -- 重排数量建议:rerank_top_n=3-10 -- 成本权衡:rerank 会增加额外调用成本,但精度提升明显 - -#### 5.5.6. 模型服务选型参考对比 - -为方便不同部署场景选择,提供以下模型选型参考: - -| 维度 | 本地优先方案 | 云端优先方案 | 混合方案 | -|------|------------|------------|---------| -| **小模型** | Qwen3.5-9B (本地) | DeepSeek-Chat (API) | 本地+DeepSeek降级 | -| **大模型** | Qwen3.5-9B (本地) | 智谱 GLM-4 / DeepSeek | 本地+云端降级链 | -| **Embedding** | Qwen3-Embedding-0.6B (本地llama.cpp) | 智谱 embedding-2 | 本地优先,智谱降级 | -| **Rerank** | (可选本地) | 硅基流动 bge-reranker-v2-m3 | 硅基流动API | -| **Sparse** | FastEmbed BM25 (本地) | FastEmbed BM25 (本地) | 本地 | - -**成本参考对比**(每1M tokens,仅作示例): - -| 模型 | 输入成本 | 输出成本 | 适用场景 | -|------|---------|---------|---------| -| **本地模型** | ~0元 | ~0元 | 有GPU机器,隐私敏感 | -| **DeepSeek-Chat** | ~¥0.5 | ~¥1.0 | 通用推理,成本适中 | -| **智谱 GLM-4** | ~¥1.0 | ~¥2.0 | 高质量对话 | -| **智谱 embedding-2** | ~¥0.2 | - | 向量嵌入 | -| **硅基流动 Rerank** | ~¥0.3 | - | 精准重排 | - -**部署建议**: -- **个人/测试**:全云端方案,快速上手 -- **小团队**:小模型本地,大模型云端降级 -- **企业/隐私敏感**:全本地部署,或使用私有API -- **生产环境**:核心能力本地+云端降级链,保证高可用 - -#### 5.5.7. 模型服务降级链路设计 - -本项目所有模型服务都设计了**自动降级链路**,保证服务高可用: - -| 服务类型 | 主服务 | 降级服务 | -|---------|--------|---------| -| 对话生成 | 本地模型 → 智谱GLM-4 → DeepSeek | -| Embedding | 本地llama.cpp → 智谱embedding-2 | -| Rerank | 硅基流动 → 智谱rerank-2 | - -降级逻辑实现在 `app/model_services/base.py: FallbackServiceChain`,对上层业务透明。 - ---- - -### 6. 模型服务层 - -#### 6.1 多模型降级链 - -当首选模型调用失败时,自动依次尝试备用模型,保证服务可用性。 - -```mermaid -graph LR - Start([API 请求]) --> Zhipu["智谱 GLM-4"] - Zhipu -->|失败| DeepSeek["DeepSeek V3"] - DeepSeek -->|失败| OpenAI["OpenAI GPT-4o"] - OpenAI -->|失败| Local["本地 Qwen3.5-9B"] - Local --> Response([返回响应]) - style Start fill:#e1f5ff - style Response fill:#c8e6c9 -``` - -**降级策略:** 云端模型按优先级依次尝试,最终回退到本地模型,确保无单点故障。 - -#### 6.2 统一服务接口 - -所有模型调用均通过以下三个统一接口访问,上层业务不感知具体模型: - -- **`chat_services`**:对话生成 -- **`embedding_services`**:文本向量化 -- **`rerank_services`**:搜索结果重排序 - ---- - -### 7. 数据存储 - -| 存储 | 用途 | 访问方式 | -|------|------|---------| -| **PostgreSQL** | 对话历史、长期记忆 | 远程服务器,SQLAlchemy ORM | -| **Qdrant** | 文档向量、知识库 | 远程服务器,gRPC/HTTP API | - ---- - -### 数据流向图 - -``` -用户请求 - │ - ├─→ 前端 Streamlit - │ │ - │ ├─→ 发送 HTTP POST /api/chat - │ │ - │ └─→ 接收 SSE 流式响应 - │ - └─→ 后端 FastAPI - │ - ├─→ AIAgentService.achat() - │ │ - │ ├─→ 初始化 LangGraph 状态 - │ │ ├─→ messages: 对话历史 - │ │ ├─→ user_id: 用户标识 - │ │ └─→ metadata: 元数据 - │ │ - │ └─→ 执行工作流(混合路由 + React 循环) - │ ├─→ retrieve_memory: 从 PostgreSQL 检索历史 - │ ├─→ memory_trigger: 判断是否触发记忆 - │ ├─→ init_state: 初始化状态 - │ ├─→ hybrid_router: 混合路由决策 - │ │ ├─→ fast_chitchat: 闲聊快速路径 - │ │ ├─→ fast_rag: RAG 快速路径 - │ │ ├─→ fast_tool: 工具快速路径 - │ │ └─→ react_loop: React 循环(兜底) - │ │ - │ ├─→ React 循环(react_reason 节点) - │ │ ├─→ rag_retrieve: RAG 检索 - │ │ ├─→ web_search: 联网搜索 - │ │ ├─→ contact_subgraph: 通讯录子图 - │ │ ├─→ dictionary_subgraph: 词典子图 - │ │ ├─→ news_analysis_subgraph: 资讯子图 - │ │ └─→ llm_call: LLM 调用 - │ │ - │ ├─→ summarize: 生成对话摘要(如需要) - │ └─→ finalize: 格式化最终响应 - │ - └─→ 持久化存储 - ├─→ PostgreSQL: 对话历史和摘要 - └─→ Qdrant: RAG 向量索引 -``` - -### 项目结构 - -``` -Agent1/ -├── backend/ -│ ├── app/ -│ │ ├── __init__.py -│ │ ├── config.py # 配置管理 -│ │ ├── logger.py # 日志工具 -│ │ ├── backend.py # FastAPI 后端应用(含子图 API) -│ │ ├── README.md # 后端目录文档 -│ │ │ -│ │ ├── core/ # ⭐ 核心模块 - 基类和通用工具 -│ │ │ ├── __init__.py -│ │ │ ├── state_base.py # 子图状态基类 -│ │ │ ├── intent.py # 意图理解(React 模式) -│ │ │ ├── intent_classifier.py # 意图分类器 -│ │ │ ├── formatter.py # 格式化输出工具 -│ │ │ ├── human_review.py # 人工审核节点 -│ │ │ ├── web_search.py # ⭐ 联网搜索工具(DuckDuckGo) -│ │ │ └── visualization.py # ⭐ 可视化图表工具(Mermaid + matplotlib) -│ │ │ -│ │ ├── agent/ # ⭐ Agent 服务层 -│ │ │ ├── __init__.py -│ │ │ ├── agent_service.py # Agent 服务核心(使用 chat_services) -│ │ │ ├── history.py # 历史查询服务 -│ │ │ └── prompts.py # 提示词模板 -│ │ │ -│ │ ├── main_graph/ # ⭐ 主图 - LangGraph 主流程(混合路由 + React 循环) -│ │ │ ├── __init__.py -│ │ │ ├── state.py # 主图状态定义 MainGraphState -│ │ │ ├── graph.py # LangGraph 组件导出 -│ │ │ ├── config.py # 主图配置 -│ │ │ ├── nodes/ # 主图节点 -│ │ │ │ ├── __init__.py -│ │ │ │ ├── _utils.py # 节点公共工具 -│ │ │ │ ├── reasoning.py # React 推理节点 -│ │ │ │ ├── hybrid_router.py # 混合路由节点 -│ │ │ │ ├── fast_paths.py # 快速路径节点 -│ │ │ │ ├── llm_call.py # LLM 调用节点 -│ │ │ │ ├── routing.py # 路由决策(init_state, route_by_reasoning) -│ │ │ │ ├── rag_nodes.py # RAG 检索节点 -│ │ │ │ ├── web_search.py # 联网搜索节点 -│ │ │ │ ├── retrieve_memory.py # 记忆检索节点 -│ │ │ │ ├── memory_trigger.py # 记忆触发节点 -│ │ │ │ ├── summarize.py # 记忆摘要节点 -│ │ │ │ ├── finalize.py # 最终处理节点 -│ │ │ │ ├── tool_call.py # 工具执行节点 -│ │ │ │ └── error_handling.py # 错误处理节点 -│ │ │ ├── tools/ # 主图工具 -│ │ │ │ ├── __init__.py -│ │ │ │ ├── common_tools.py # 通用工具 -│ │ │ │ ├── subgraph_tools.py # 子图调用工具 -│ │ │ │ └── graph_tools.py # 图工具 -│ │ │ └── utils/ # 主图工具函数 -│ │ │ ├── __init__.py -│ │ │ ├── main_graph_builder.py # 主图构建器 -│ │ │ ├── rag_initializer.py # RAG 初始化 -│ │ │ └── retry_utils.py # 重试工具 -│ │ │ -│ │ ├── subgraphs/ # ⭐ 子图模块 -│ │ │ ├── __init__.py -│ │ │ ├── contact/ # 通讯录子图 -│ │ │ │ ├── state.py # 状态定义 -│ │ │ │ ├── nodes.py # 节点实现 -│ │ │ │ ├── graph.py # 图构建 -│ │ │ │ ├── api_client.py # API 客户端 -│ │ │ │ └── __init__.py -│ │ │ ├── dictionary/ # 词典子图 -│ │ │ │ ├── state.py # 状态定义 -│ │ │ │ ├── nodes.py # 节点实现 -│ │ │ │ ├── graph.py # 图构建 -│ │ │ │ ├── api_client.py # API 客户端 -│ │ │ │ └── __init__.py -│ │ │ └── news_analysis/ # 资讯分析子图 -│ │ │ ├── state.py # 状态定义 -│ │ │ ├── nodes.py # 节点实现 -│ │ │ ├── graph.py # 图构建 -│ │ │ ├── api_client.py # API 客户端 -│ │ │ └── __init__.py -│ │ │ -│ │ ├── model_services/ # 模型服务层 -│ │ │ ├── __init__.py -│ │ │ ├── base.py # 基类:BaseServiceProvider, FallbackServiceChain, SingletonServiceManager -│ │ │ ├── chat_services.py # 生成式大模型服务 -│ │ │ ├── embedding_services.py # 嵌入模型服务 -│ │ │ └── rerank_services.py # 重排序服务 -│ │ │ -│ │ ├── mcp/ # MCP 模块 -│ │ │ ├── __init__.py -│ │ │ ├── mcp_manager.py # MCP 管理器 -│ │ │ ├── mcp_client.py # MCP 客户端 -│ │ │ ├── adapters/ # MCP 适配器 -│ │ │ │ ├── __init__.py -│ │ │ │ ├── base_adapter.py -│ │ │ │ ├── contact_adapter.py -│ │ │ │ ├── dictionary_adapter.py -│ │ │ │ └── news_adapter.py -│ │ │ └── mcp_example.py -│ │ │ -│ │ ├── memory/ # 记忆模块 -│ │ │ ├── __init__.py -│ │ │ └── mem0_client.py # Mem0 客户端封装 -│ │ │ -│ │ ├── rag/ # RAG 模块 -│ │ │ ├── __init__.py -│ │ │ ├── retriever.py # 检索器 -│ │ │ ├── rerank.py # 重排序业务逻辑(使用 rerank_services) -│ │ │ ├── query_transform.py # 查询转换 -│ │ │ ├── pipeline.py # RAG 流水线 -│ │ │ ├── fusion.py # RAG-Fusion -│ │ │ ├── tools.py # RAG 工具 -│ │ │ └── evaluate.py # RAG 评估 -│ │ │ -│ │ ├── db/ # 数据库模块 -│ │ │ ├── __init__.py -│ │ │ ├── base.py # 数据库基类 -│ │ │ ├── models.py # 数据模型 -│ │ │ └── init_db.py # 数据库初始化 -│ │ │ -│ │ └── utils/ # 工具模块 -│ │ ├── __init__.py -│ │ └── logging.py # 日志工具 -│ └── rag_core/ # ⭐ RAG 核心库(统一组件) -│ ├── __init__.py -│ ├── config.py # RAG 配置 -│ ├── client.py # RAG 核心客户端 -│ ├── embedders.py # 嵌入模型 -│ ├── sparse_embedder.py # BM25 稀疏嵌入 -│ ├── vector_store.py # 向量存储(Dense + Sparse) -│ └── doc_store.py # 文档存储 -├── frontend/ -│ ├── run.py # 前端启动脚本 -│ ├── requirements.txt -│ └── src/ -│ ├── __init__.py -│ ├── frontend_main.py # Streamlit 主入口 -│ ├── api_client.py # API 客户端(含子图调用) -│ ├── config.py # 前端配置 -│ ├── state.py # 状态管理 -│ ├── logger.py # 日志 -│ ├── utils.py # 工具函数 -│ └── components/ -│ ├── __init__.py -│ ├── sidebar.py # 侧边栏组件 -│ ├── chat_area.py # 聊天区域组件 -│ └── info_panel.py # 信息面板组件 -├── rag_indexer/ -│ ├── __init__.py -│ ├── cli.py # 命令行入口 -│ ├── index_builder.py # 索引构建器 -│ ├── loaders.py # 文档加载器 -│ ├── splitters.py # 文本切分器 -│ └── test/ # 测试脚本 -├── docker/ -│ ├── docker-compose.yml # Docker 服务编排 -│ ├── backend/ -│ │ └── Dockerfile # 后端镜像构建 -│ ├── frontend/ -│ │ └── Dockerfile # 前端镜像构建 -│ └── models/ # spaCy 模型文件 -├── scripts/ -│ └── start.sh # 启动脚本 -├── test/ # 测试目录 -│ ├── test_backend.py -│ ├── test_rag.py -│ ├── test_qdrant.py -│ └── test_rag_indexer_result.py -├── .gitea/ -│ └── workflows/ -│ └── deploy.yml # CI/CD 自动化部署 -├── requirement.txt # Python 依赖 -├── .env.docker # Docker 环境变量模板 -├── .gitignore -├── LICENSE -├── QUICKSTART.md # 快速开始指南 -└── user_docs/ # 用户文档目录 -``` - ---- - -## 🧠 核心算法与实现原理 - -### 1. RAG 检索算法 - -项目采用稠密 + 稀疏混合检索架构,结合 RRF 融合和 Cross-Encoder 重排序,实现高精度知识库问答。 - -**核心特性:** -- 三种文本切分策略:递归字符切分、语义切分、父子块切分 -- 稠密向量检索(Embedding)+ 稀疏 BM25 检索 -- RRF 融合算法实现多检索源结果合并 -- Cross-Encoder 重排序提升相关性 - -**详细文档:** -- 算法原理详见 [backend/docs/RAG_ALGORITHM.md](backend/docs/RAG_ALGORITHM.md) -- 系统架构详见 [backend/docs/RAG_ARCHITECTURE.md](backend/docs/RAG_ARCHITECTURE.md) - ---- - -### LangGraph 工作流详细流程 - -#### 1. 混合路由 + React 循环架构 ⭐⭐ - -**设计理念**:混合路由(Hybrid Router)作为前置决策,快速路径处理简单任务,React 循环作为复杂任务的兜底方案。 - -``` -┌─────────────────────────────────────────────────────────────────────────────┐ -│ 主图执行流程 │ -├─────────────────────────────────────────────────────────────────────────────┤ -│ │ -│ ┌─────────────────────────────────────────────────────────────────────┐ │ -│ │ 阶段1: 记忆检索 (retrieve_memory) │ │ -│ │ - 从 PostgreSQL 检索用户历史对话 │ │ -│ │ - 生成 memory_context 供后续使用 │ │ -│ └─────────────────────────────────────────────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────────────────────────────────────────────┐ │ -│ │ 阶段2: 记忆触发 (memory_trigger) │ │ -│ │ - 判断是否需要激活记忆上下文 │ │ -│ └─────────────────────────────────────────────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────────────────────────────────────────────┐ │ -│ │ 阶段3: 初始化状态 (init_state) │ │ -│ │ - 初始化 MainGraphState │ │ -│ │ - 设置 user_query、messages 等 │ │ -│ └─────────────────────────────────────────────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────────────────────────────────────────────┐ │ -│ │ 阶段4: 混合路由 (hybrid_router) ⭐ │ │ -│ │ - 规则分流:闲聊关键词、子图关键词(<5ms) │ │ -│ │ - LLM 分类:使用轻量级模型进行意图分类(chitchat/knowledge/tool) │ │ -│ │ - 输出决策:fast_chitchat / fast_rag / fast_tool / react_loop │ │ -│ └─────────────────────────────────────────────────────────────────────┘ │ -│ ↓ │ -│ ┌──────────────────────┼──────────────────────┐ │ -│ ↓ ↓ ↓ │ -│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ -│ │快速路径 │ │快速路径 │ │快速路径 │ │ -│ │fast_* │ │fast_rag │ │fast_tool│ │ -│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ -│ │ │ │ │ -│ │ ┌────────────────┘ ┌──────────────┘ │ -│ │ │ │ │ -│ ↓ ↓ ↓ │ -│ ┌─────────────────────────────────────────────────────────────┐ │ -│ │ 阶段5: React 循环 (react_reason) │ │ -│ │ - 调用 app/core/intent.py 的 react_reason_async() │ │ -│ │ - 使用 app/model_services/ 获取 chat 服务 │ │ -│ │ - 推理下一步动作(rag_retrieve/web_search/子图/llm_call) │ │ -│ └─────────────────────────────────────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────────────────────────────────────────────┐ │ -│ │ React 循环内节点 │ │ -│ │ rag_retrieve → react_reason(回到推理) │ │ -│ │ web_search → react_reason(回到推理) │ │ -│ │ contact_subgraph → react_reason(回到推理) │ │ -│ │ dictionary_subgraph → react_reason(回到推理) │ │ -│ │ news_analysis_subgraph → react_reason(回到推理) │ │ -│ │ handle_error → react_reason(错误处理后继续推理) │ │ -│ │ llm_call → 退出循环,进入完成阶段 │ │ -│ └─────────────────────────────────────────────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────────────────────────────────────────────┐ │ -│ │ 阶段6: LLM 调用 (llm_call) │ │ -│ │ - 调用主 LLM 生成最终回答 │ │ -│ │ - 使用 llm.bind_tools(tools) 绑定工具 │ │ -│ │ - 支持流式输出到前端 │ │ -│ └─────────────────────────────────────────────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────────────────────────────────────────────┐ │ -│ │ 阶段7: 记忆摘要 (summarize) / 最终处理 (finalize) │ │ -│ │ - 对话轮数 >= 5 时触发摘要 │ │ -│ │ - 保存对话到 PostgreSQL │ │ -│ └─────────────────────────────────────────────────────────────────────┘ │ -│ │ -└─────────────────────────────────────────────────────────────────────────────┘ -``` - -#### 2. React 推理循环详解 - -React 推理循环使用 `app/core/intent.py` 中的 `react_reason_async()` 函数: - -``` -┌─────────────────────────────────────────────────────────────────┐ -│ React 推理循环 │ -├─────────────────────────────────────────────────────────────────┤ -│ │ -│ ┌───────────────────────────────────────────────────────────┐ │ -│ │ 1. Reasoning (推理) - react_reason 节点 │ │ -│ │ - 调用 react_reason_async() │ │ -│ │ - 传入上下文:retrieved_docs、reasoning_history、 │ │ -│ │ previous_actions、messages、errors │ │ -│ │ - LLM 决定下一步 action │ │ -│ │ - 记录到 reasoning_history │ │ -│ └───────────────────────────────────────────────────────────┘ │ -│ ↓ │ -│ ┌───────────────────────────────────────────────────────────┐ │ -│ │ 2. Acting (行动) │ │ -│ │ - rag_retrieve: RAG 检索 │ │ -│ │ - web_search: 联网搜索 │ │ -│ │ - contact_subgraph: 通讯录子图 │ │ -│ │ - dictionary_subgraph: 词典子图 │ │ -│ │ - news_analysis_subgraph: 资讯分析子图 │ │ -│ │ - handle_error: 错误处理 │ │ -│ └───────────────────────────────────────────────────────────┘ │ -│ ↓ │ -│ ┌───────────────────────────────────────────────────────────┐ │ -│ │ 3. Observing (观察) / 循环 │ │ -│ │ - 工具结果返回给 react_reason │ │ -│ │ - 再次推理下一步 │ │ -│ │ - 最多 10 次循环 (max_steps=10) │ │ -│ │ - 或直到推理决定 llm_call │ │ -│ └───────────────────────────────────────────────────────────┘ │ -│ ↓ │ -│ ┌───────────────────────────────────────────────────────────┐ │ -│ │ 4. 退出条件 │ │ -│ │ - action == llm_call: 退出循环,进入 llm_call 节点 │ │ -│ │ - max_steps 达到: 强制退出到 llm_call │ │ -│ │ - 错误累积过多: 进入 handle_error │ │ -│ └───────────────────────────────────────────────────────────┘ │ -│ │ -└─────────────────────────────────────────────────────────────────┘ -``` - -**关键实现点**: -1. **`react_reason_async()`** - 在 `app/core/intent.py` 中,使用 chat_services 获取 LLM 进行推理 -2. **`route_by_reasoning`** - 路由函数,根据推理结果决定下一步节点 -3. **循环边** - 工具节点执行后回到 react_reason 继续推理 -4. **自动升级** - 快速路径失败时,回到 react_reason 继续推理 - -#### 2.2 状态机设计 - -```python -# 核心状态定义 -class MainGraphState(TypedDict): - messages: Annotated[list, add_messages] # 对话历史(自动合并) - user_id: str # 用户标识 - user_query: str # 用户查询 - memory_context: str # 检索到的记忆上下文 - memory_triggered: bool # 记忆是否触发 - should_summarize: bool # 是否需要生成摘要 - retrieved_docs: list # RAG 检索到的文档 - reasoning_history: list # React 推理历史 - previous_actions: list # 之前的动作 - errors: list # 错误列表 - current_step: int # 当前步骤 - max_steps: int # 最大步骤数 - llm_override: str # LLM 覆盖 - final_response: str # 最终响应 -``` - -**状态流转规则**: -``` -初始状态 → retrieve_memory → memory_trigger → init_state → hybrid_router - ↓ - ┌───────────────────────────┼───────────────────────────┐ - ↓ ↓ ↓ - fast_chitchat fast_rag fast_tool - ↓ ↓ ↓ - [成功→llm_call / 失败→react_reason] [成功→llm_call / 失败→react_reason] [成功→llm_call / 失败→react_reason] - ↓ - ┌───────────────────────────────────────────────┘ - ↓ - react_reason - ↓ - ┌──────────────┼──────────────┬──────────────┬──────────────┬──────────────┬──────────────┐ - ↓ ↓ ↓ ↓ ↓ ↓ ↓ - rag_retrieve web_search contact_subgraph dictionary_subgraph news_analysis_subgraph handle_error llm_call - ↓ ↓ ↓ ↓ ↓ ↓ ↓ - react_reason ←────────┴──────────────┴──────────────┴──────────────┴──────────────┴──────────────┘ - ↓ - llm_call - ↓ - summarize - ↓ - finalize -``` - -#### 2.3 记忆管理算法 - -**记忆检索策略:** -``` -1. 向量检索:将用户查询 Embedding,在 PostgreSQL 中检索相似对话 -2. 时间衰减:近期对话权重更高(可选) -3. 相关性过滤:仅返回相似度 > 阈值的记忆 -4. 上下文窗口:限制记忆长度,避免超出 LLM 上下文限制 -``` - -**摘要生成策略:** -``` -触发条件: - - 对话轮数超过阈值(默认 10 轮) - - 记忆上下文长度超过限制 - -摘要生成: - 1. 提取最近 N 轮对话 - 2. 调用 LLM 生成摘要 - 3. 保存摘要到 PostgreSQL - 4. 清理旧对话记录 - -摘要格式: - "用户在 {时间} 讨论了 {主题},关键信息:{要点}" -``` - -### 3. 多模型路由算法 - -``` -模型选择逻辑: -┌─────────────────────────────────────────────┐ -│ 用户选择模型 │ -└──────────────────┬──────────────────────────┘ - │ - ┌──────────┼──────────┬──────────┐ - ↓ ↓ ↓ ↓ - ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ - │ zhipu│ │deep │ │openai│ │local │ - └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ - │ │ │ │ - ↓ ↓ ↓ ↓ - ChatZhipu ChatOpenAI ChatOpenAI ChatOpenAI - (官方SDK) (DeepSeek) (OpenAI) (llama.cpp/Qwen) - │ │ │ │ - └─────────┴─────────┴─────────┘ - ↓ - ┌────────────────┐ - │ 统一接口输出 │ - │ (BaseChatModel)│ - └────────────────┘ -``` - -**高可用降级策略:** -``` -1. 优先使用用户选择的模型 -2. 如果模型不可用(API 错误、超时等): - - 尝试切换到备用模型 - - 记录错误日志 - - 返回降级提示给用户 -3. 健康检查:定期检查各模型服务状态 -``` - -### 4. SSE 流式响应算法 - -``` -流式输出流程: -┌─────────────────────────────────────────────┐ -│ LLM 生成 Token │ -└──────────────────┬──────────────────────────┘ - │ - ↓ -┌─────────────────────────────────────────────┐ -│ 事件格式化 │ -│ data: {"content": "你", "type": "content"} │ -│ data: {"content": "好", "type": "content"} │ -│ data: {"content": "!", "type": "content"} │ -│ data: {"done": true, "type": "end"} │ -└──────────────────┬──────────────────────────┘ - │ - ↓ -┌─────────────────────────────────────────────┐ -│ HTTP Response Stream │ -│ Content-Type: text/event-stream │ -│ Cache-Control: no-cache │ -│ Connection: keep-alive │ -└─────────────────────────────────────────────┘ -``` - -**实现要点:** -- 使用 `asyncio.Queue` 缓冲 Token -- 生产者:LLM 异步生成 Token 放入队列 -- 消费者:FastAPI `EventSourceResponse` 从队列读取并推送 -- 超时保护:30 秒无输出自动断开 - ---- - -## 📚 RAG 系统完整架构 - -### 离线索引构建 vs 在线检索生成 - -RAG 系统分为两个独立但协同的阶段: - -``` -┌──────────────────────────────────────────────────────────────────┐ -│ RAG 系统双阶段架构 │ -├──────────────────────────────────────────────────────────────────┤ -│ │ -│ 阶段一:离线索引构建 (rag_indexer) │ -│ ┌────────────────────────────────────────────────────────┐ │ -│ │ 文档加载 → 文本切分 → Embedding → 向量存储 │ │ -│ │ PDF/DOCX/TXT 递归/语义/父子块 llama.cpp Qdrant │ │ -│ └────────────────────────────────────────────────────────┘ │ -│ │ │ -│ ▼ │ -│ 阶段二:在线检索生成 (backend/app/rag) │ -│ ┌────────────────────────────────────────────────────────┐ │ -│ │ 用户查询 → 查询改写 → 多路检索 → RRF 融合 → 重排序 │ │ -│ │ MultiQuery Dense+Sparse Cross-Encoder │ │ -│ │ │ │ │ -│ │ ▼ │ │ -│ │ LLM 生成回答 │ │ -│ └────────────────────────────────────────────────────────┘ │ -│ │ -└──────────────────────────────────────────────────────────────────┘ -``` - -### RAG 演进路线 (Roadmap) - -#### Level 1: 基础向量搜索 (Basic Similarity Search) - -**核心算法**: 近似最近邻搜索 (ANN, 常用 HNSW 算法) - -``` -算法原理: - 用户问题 → Embedding 模型 → 向量表示 - ↓ - 计算与库中向量的余弦相似度 - ↓ - 取距离最近的 K 个块返回 - -优缺点: - ✅ 速度极快(毫秒级) - ❌ 只能捕捉"语义相似",专有名词匹配差 - -实现代码: - from backend.app.rag.retriever import create_base_retriever - - retriever = create_base_retriever( - collection_name="rag_documents", - embeddings=embeddings, - search_kwargs={"k": 20} - ) - docs = retriever.invoke("什么是 RAG?") -``` - -#### Level 2: 混合检索与重排序 (Hybrid Search + Reranker) - -**1. 基础召回(混合检索)** - -``` -算法原理: - 稠密向量检索(语义相似)+ BM25 稀疏检索(关键词匹配) - ↓ - 两路结果并行获取,等待融合 - -实现代码: - from backend.app.rag.retriever import create_hybrid_retriever - - retriever = create_hybrid_retriever( - collection_name="rag_documents", - embeddings=embeddings, - dense_k=10, - sparse_k=10, - score_threshold=0.3 - ) -``` - -**2. 二次精排(Cross-Encoder)** - -``` -算法原理: - 不同于双塔模型(分别算向量再求距离) - 交叉编码器将"问题 + 文档"拼接后整体输入 Transformer - 由模型直接输出 0~1 的相关性得分,精度极高 - -实现代码: - from backend.app.rag.reranker import LLaMaCPPReranker - - reranker = LLaMaCPPReranker( - base_url="http://127.0.0.1:8083", - api_key="your-api-key", - top_n=5 - ) - sorted_docs = reranker.compress_documents(documents, query) -``` - -#### Level 3: RAG-Fusion (多路改写与倒数排名融合) - -**1. 多路查询改写** - -``` -算法原理: - 克服用户初始提问词不达意或视角受限的问题 - 通过 LLM 将单一问题改写为多个不同角度的查询 - -实现代码: - from backend.app.rag.query_transform import MultiQueryGenerator - - generator = MultiQueryGenerator(llm=llm, num_queries=3) - queries = await generator.agenerate("如何申请项目资金?") - # 返回:["如何申请项目资金?", "项目资金申请流程是什么?", "申请项目经费需要哪些步骤?"] -``` - -**2. 倒数排名融合(RRF)** - -``` -算法原理: - RRF (Reciprocal Rank Fusion) 是一种无需评分归一化的融合算法 - 公式:RRF_score(d) = Σ 1/(k + rank_q(d)) - 有效避免某一极端检索结果主导全局 - -实现代码: - from backend.app.rag.fusion import reciprocal_rank_fusion - - # 多个查询的检索结果 - doc_lists = [result1, result2, result3] - fused_docs = reciprocal_rank_fusion(doc_lists, k=60) -``` - -#### Level 4: Agentic RAG / Self-RAG (智能体与自我反思) - -``` -核心原理: - 基于 LangGraph 的 ReAct (Reasoning and Acting) 状态机路由 - 大模型并非每次都去死板地执行检索,而是先判断问题: - "这是闲聊?还是需要查知识库?" - -工作流程: - ┌──────────┐ ┌──────────────┐ ┌──────────┐ ┌────────┐ - │ User │────>│ LangGraph │────>│ RAG_Tool │────>│ Qdrant │ - │ │ │ Agent │ │ │ │ │ - │ "公司报 │ │ 思考: 这是 │ │ ToolCall │ │ RAG- │ - │ 销流程?"│ │ 内部规章问题 │ │ search_ │ │ Fusion │ - │ │ │ 需要查资料 │ │ knowledge│ │ & 混合 │ - │ │<────│ 资料充分, │<────│ 返回最相 │<────│ 检索 │ - │ "根据知 │ │ 开始撰写回答 │ │ 关5条规定 │ │ Cross- │ - │ 识库规定 │ │ │ │ │ │ Encoder│ - │ ..." │ │ │ │ │ │ 重排 │ - └────────── └────────────── └──────────┘ └────────┘ - -实现代码: - from backend.app.rag.tools import search_knowledge_base - from backend.app.main_graph.utils.main_graph_builder import MainGraphBuilder - - # 构建图 - builder = MainGraphBuilder() - graph = builder.build_graph().compile(checkpointer=checkpointer) -``` - -#### Level 5: GraphRAG 集成 (基于图和关系的 RAG) - -``` -核心原理: - 结合知识图谱的结构化关系和向量检索的语义相似度 - 解决跨文档复杂关系推理问题 - -实现代码: - from langchain_community.graphs import Neo4jGraph - from langchain_experimental.graph_transformers import LLMGraphTransformer - - # 实体关系抽取 - transformer = LLMGraphTransformer(llm=local_llm) - graph_documents = transformer.convert_to_graph_documents(documents) - - # 存储到图数据库 - graph = Neo4jGraph(url="bolt://localhost:7687") - graph.add_graph_documents(graph_documents) -``` - -### 检索策略对比 - -| 策略 | 优点 | 缺点 | 适用场景 | -|:-----|:-----|:-----|:---------| -| **基础向量检索** | 速度快,语义理解好 | 专有名词匹配差 | 通用问答 | -| **混合检索** | 语义 + 关键词匹配 | 需要配置稀疏向量 | 专业术语查询 | -| **多路改写 + RRF** | 搜索面广,结果稳定 | 延迟略高 | 复杂问题 | -| **重排序** | 精度高 | 依赖额外模型 | 最终精排 | -| **Agentic RAG** | 智能决策,灵活 | 实现复杂 | 生产环境 | -| **GraphRAG** | 关系推理能力强 | 需要图数据库 | 知识密集型场景 | - -### 切分策略对比 - -| 策略 | 原理 | 优点 | 缺点 | 适用场景 | -|:-----|:-----|:-----|:-----|:---------| -| **递归字符** | 按分隔符递归切分 | 速度快,实现简单 | 可能截断语义 | 简单文档 | -| **语义切分** | 基于句子相似度阈值 | 语义连贯性好 | 需要 Embedding 模型 | 专业文档 | -| **父子块** | 大块存储+小块检索 | 检索精准+上下文完整 | 存储复杂度高 | 生产环境 | - ---- - -## 🧩 子图系统架构 - -### 设计理念 - -子图系统采用模块化设计,每个子图是一个独立的 LangGraph 工作流,共享公共工具库。 - -``` -┌───────────────────────────────────────────────────────────────────┐ -│ 子图系统架构 │ -├───────────────────────────────────────────────────────────────────┤ -│ │ -│ ┌───────────────────────────────────────────────────────────┐ │ -│ │ 公共工具库 (common/) │ │ -│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────┐ │ │ -│ │ │ state_base │ │ intent │ │ formatter │ │human │ │ │ -│ │ │ (状态基类) │ │ (React模式) │ │ (格式化) │ │review│ │ │ -│ │ └─────────────┘ └─────────────┘ └─────────────┘ └──────┘ │ │ -│ └───────────────────────────────────────────────────────────┘ │ -│ │ │ -│ ┌───────────────┼───────────────┐ │ -│ ↓ ↓ ↓ │ -│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ -│ │ 通讯录子图 │ │ 词典子图 │ │ 资讯分析子图 │ │ -│ │ (contact/) │ │ (dictionary/) │ │ (news_analysis) │ │ -│ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │ -│ │ state.py │ │ state.py │ │ state.py │ │ -│ │ nodes.py │ │ nodes.py │ │ nodes.py │ │ -│ │ graph.py │ │ graph.py │ │ graph.py │ │ -│ │ api_client.py │ │ api_client.py │ │ api_client.py │ │ -│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ -│ │ │ -│ ▼ │ -│ FastAPI 子图端点 │ -│ (backend/app/backend.py) │ -└───────────────────────────────────────────────────────────────────┘ -``` - ---- - -### 子图架构总览 - -```mermaid -graph TB - subgraph "主图 MainGraph" - MainState[主图 MainState
用户输入 + 上下文] - IntentRouter[意图分类器
intent.py] - MainLLM[主 LLM 节点
普通对话] - end - - subgraph "通讯录子图 ContactSubgraph" - ContactState[ContactState
联系人状态] - ContactNodes[内部节点
parse_intent
add_contact
list_contacts
generate_draft
human_review
should_continue] - ContactDB[(PostgreSQL 联系人)] - end - - subgraph "词典子图 DictionarySubgraph" - DictState[DictionaryState
词典状态] - DictNodes[内部节点
translate
lookup_word
extract_terms
daily_word
lookup_word_book
add_to_word_book] - DictDB[(PostgreSQL 生词本)] - end - - subgraph "资讯分析子图 NewsSubgraph" - NewsState[NewsAnalysisState
资讯状态] - NewsNodes[内部节点
query_news
analyze_url
extract_keywords
generate_report] - NewsQdrant[(Qdrant 向量检索)] - end - - MainState -->|用户查询| IntentRouter - IntentRouter -->|contact| ContactState - IntentRouter -->|dictionary| DictState - IntentRouter -->|news| NewsState - IntentRouter -->|chat| MainLLM - - ContactState -->|调用| ContactNodes - ContactNodes -->|读写| ContactDB - ContactNodes -->|返回结果| MainState - - DictState -->|调用| DictNodes - DictNodes -->|读写| DictDB - DictNodes -->|返回结果| MainState - - NewsState -->|调用| NewsNodes - NewsNodes -->|检索| NewsQdrant - NewsNodes -->|返回结果| MainState - - style MainState fill:#e3f2fd - style IntentRouter fill:#ffe0b2 - style ContactState fill:#c8e6c9 - style DictState fill:#e1bee7 - style NewsState fill:#ffcdd2 -``` - ---- - -### 状态传送机制 - -```mermaid -sequenceDiagram - participant User as 用户 - participant Frontend as 前端 - participant Backend as 后端 API - participant Main as 主图 - participant Intent as 意图分类器 - participant Subgraph as 子图 - participant DB as 数据库 - - User->>Frontend: 输入查询 - Frontend->>Backend: POST /subgraph/{type}/{action} - Backend->>Main: 初始化 MainState - Main->>Intent: 调用意图分类 - Intent-->>Main: 返回子图类型 (contact/dictionary/news/chat) - - alt 是子图请求 - Main->>Subgraph: 传递状态 - activate Subgraph - Subgraph->>Subgraph: 解析意图 (parse_intent) - Subgraph->>DB: 读取/写入数据 - DB-->>Subgraph: 返回数据 - Subgraph->>Subgraph: 执行业务逻辑 - Subgraph->>Subgraph: 格式化结果 (format_result) - Subgraph-->>Main: 返回结果状态 - deactivate Subgraph - else 是普通对话 - Main->>Main: 调用主 LLM - end - - Main->>Main: 合并状态 - Main-->>Backend: 返回最终结果 - Backend-->>Frontend: JSON 响应 - Frontend-->>User: 显示结果 -``` - ---- - -### 核心工具说明 - -#### 1. state_base.py - 状态基类 -提供类型安全的状态基类,所有子图状态继承此类。 - -#### 2. intent.py - 意图理解(React 模式) -智能决策节点,判断是否需要调用 RAG、是否需要重新检索、是否需要调用工具。 - -```python -# 核心功能 -- 意图分类:闲聊 / 查询 / 任务 -- RAG 决策:是否需要检索知识库 -- 检索策略:是否需要多路检索 / 是否需要重新检索 -``` - -#### 3. formatter.py - 格式化输出工具 -使用 Jinja2 模板提供美观的 Markdown 格式化输出。 - -#### 4. human_review.py - 人工审核节点 -使用 LangGraph 的 interrupt 机制实现人工审核流程,支持: -- 确定 / 取消 / 继续 三种操作 -- 状态持久化 -- 异步等待 - ---- - -### 子图开发指南 - -#### 创建新子图的步骤 - -1. **创建子图目录** -```bash -mkdir backend/app/subgraphs/my_subgraph -``` - -2. **创建状态定义 (state.py)** -```python -from typing_extensions import TypedDict -from backend.app.core.state_base import BaseSubgraphState - -class MySubgraphState(BaseSubgraphState): - \"\"\" - 我的子图状态 - \"\"\" - my_field: str - my_list: list[str] -``` - -3. **实现节点 (nodes.py)** -```python -from langgraph.graph import StateGraph -from .state import MySubgraphState - -def my_node(state: MySubgraphState) -> MySubgraphState: - \"\"\" - 我的节点 - \"\"\" - # 实现逻辑 - return state -``` - -4. **构建图 (graph.py)** -```python -from langgraph.graph import StateGraph, END -from .state import MySubgraphState -from .nodes import my_node - -def build_my_subgraph() -> StateGraph: - \"\"\" - 构建我的子图 - \"\"\" - graph = StateGraph(MySubgraphState) - - graph.add_node("my_node", my_node) - graph.set_entry_point("my_node") - graph.add_edge("my_node", END) - - return graph.compile() -``` - -5. **添加 API 客户端 (api_client.py)**(可选) -用于与外部 API 交互。 - ---- - -### 已实现的子图 - -#### 1. 通讯录子图 (contact/) - -**详细流程图:** -```mermaid -stateDiagram-v2 - [*] --> parse_intent: 开始 - - parse_intent --> list_contacts: action=list - parse_intent --> add_contact: action=add - parse_intent --> list_emails: action=list_emails - parse_intent --> generate_email_draft: action=generate_email - parse_intent --> sniff_contacts: action=sniff - - list_contacts --> format_result: 返回联系人列表 - add_contact --> format_result: 添加成功 - list_emails --> format_result: 返回邮件列表 - sniff_contacts --> format_result: 嗅探完成 - - generate_email_draft --> human_review: 生成草稿 - human_review --> send_email: action=approve - human_review --> format_result: action=reject - - send_email --> format_result: 邮件已发送 - - format_result --> [*]: 格式化输出 - - note right of parse_intent - 解析用户意图 - 支持的操作: - - list: 列出联系人 - - add: 添加联系人 - - list_emails: 列出邮件 - - generate_email: 生成邮件草稿 - - sniff: 智能嗅探 - end note - - note right of human_review - 人工审核节点 - 使用 LangGraph interrupt - 支持三种操作: - - approve: 审核通过,发送邮件 - - reject: 审核拒绝,返回结果 - - modify: 审核修改,编辑内容 - end note -``` - -**功能列表:** -- 联系人 CRUD(增删改查) -- 邮件读取与审核 -- 外发邮件 -- 智能嗅探 -- 精美格式化展示 - -**API 端点:** -``` -GET /subgraph/contact/{action} -参数: -- action: list | add | list_emails | generate_email | sniff -- query: 查询内容 -- user_id: 用户 ID -``` - ---- - -#### 2. 词典子图 (dictionary/) - -**详细流程图:** -```mermaid -stateDiagram-v2 - [*] --> parse_intent: 开始 - - parse_intent --> query_word: action=query - parse_intent --> translate_text: action=translate - parse_intent --> extract_terms: action=extract - parse_intent --> get_daily_word: action=daily - parse_intent --> lookup_word_book: action=lookup - parse_intent --> add_to_word_book: action=add - - query_word --> format_result: 查询单词 - translate_text --> format_result: 翻译文本 - extract_terms --> format_result: 提取专业术语 - get_daily_word --> format_result: 每日一词 - lookup_word_book --> format_result: 查询生词本 - add_to_word_book --> format_result: 添加到生词本 - - format_result --> [*]: 格式化输出 - - note right of parse_intent - 解析用户意图 - 支持的操作: - - query: 查询单词 - - translate: 翻译文本 - - extract: 提取专业术语 - - daily: 每日一词 - - lookup: 查询生词本 - - add: 添加到生词本 - end note -``` - -**功能列表:** -- 翻译、查词 -- 每日一词 -- 专业名词提炼 -- 生词本管理 -- 精美格式化展示 - -**API 端点:** -``` -GET /subgraph/dictionary/{action} -参数: -- action: query | translate | extract | daily | lookup | add -- query: 查询内容 -- user_id: 用户 ID -``` - ---- - -#### 3. 资讯分析子图 (news_analysis/) - -**详细流程图:** -```mermaid -stateDiagram-v2 - [*] --> parse_intent: 开始 - - parse_intent --> query_news: action=query - parse_intent --> analyze_url: action=analyze - parse_intent --> extract_keywords: action=keywords - parse_intent --> generate_report: action=report - - query_news --> format_result: 查询资讯 - analyze_url --> format_result: 分析链接 - extract_keywords --> format_result: 提取关键词 - generate_report --> format_result: 生成报告 - - format_result --> [*]: 格式化输出 - - note right of parse_intent - 解析用户意图 - 支持的操作: - - query: 查询资讯 - - analyze: 分析链接 - - keywords: 提取关键词 - - report: 生成报告 - end note -``` - -**功能列表:** -- 资讯查询 -- 链接分析 -- 关键词提取 -- 报告生成 -- 精美格式化展示 - -**API 端点:** -``` -GET /subgraph/news/{action} -参数: -- action: query | analyze | keywords | report -- query: 查询内容 -- user_id: 用户 ID -``` - ---- - -#### 4. 研究分析子图 (research/) - 未实现 -以下功能尚未实现: -- 联网搜索 -- 报告生成(资讯分析子图已提供基础报告生成) -- 引用溯源 -- 可视化图表 - ---- - -## 🚀 快速开始 - -详细启动指南请查看 [QUICKSTART.md](QUICKSTART.md) - -### 方式一:Docker Compose(推荐) +### 安装依赖 ```bash -# 1. 配置环境变量 -cp .env.docker .env -# 编辑 .env 文件,填入真实的 API Key - -# 2. 启动所有服务 -docker compose -f docker/docker-compose.yml up -d --build - -# 3. 访问应用 -# 前端: http://127.0.0.1:8501 -# 后端 API: http://127.0.0.1:8083 +cd backend +pip install -r requirements.txt ``` -### 方式二:本地开发模式 +### 配置环境变量 ```bash -# 1. 安装依赖 -pip install -r requirement.txt +cp .env.example .env +# 编辑 .env 填写必要的配置 +``` -# 2. 配置环境变量 -cp .env.docker .env -# 编辑 .env,根据本地/远程环境调整配置 +### 启动服务 -# 3. 启动后端 -python backend/app/backend.py - -# 4. 启动前端(新终端) -streamlit run frontend/src/frontend_main.py - -# 或者使用启动脚本(推荐) -./scripts/start.sh both +```bash +cd backend +python -m app.backend +# 服务运行在 http://localhost:8079 ``` --- -## 🔧 开发指南 +## 项目结构 -### 添加新工具 - -在 [backend/app/main_graph/tools/common_tools.py](file:///home/huang/Study/AIProject/Agent1/backend/app/main_graph/tools/common_tools.py) 中添加新的 `@tool` 装饰函数: - -```python -from langchain_core.tools import tool -from typing import Optional - -@tool -def my_new_tool(param: str) -> str: - """ - 工具描述(会显示给 LLM) - - Args: - param: 参数说明 - - Returns: - 返回值说明 - """ - # 实现逻辑 - return result +``` +backend/app/ +├── agent/ # Agent 服务层 +│ ├── agent_service.py # 主服务 +│ ├── service_config.py # 配置构建 +│ ├── stream_handler.py # 流式处理 +│ ├── stream_context.py # 流式上下文 +│ ├── history.py # 历史查询 +│ └── prompts.py # 提示词 +│ +├── main_graph/ # 主图(LangGraph 工作流) +│ ├── main_graph_builder.py # 图构建器 +│ ├── state.py # 状态定义 +│ └── nodes/ # 节点实现 +│ ├── agent.py # 推理节点 +│ ├── tools.py # 工具执行 +│ ├── finalize.py # 后处理 +│ ├── memory_trigger.py # 记忆触发 +│ ├── retrieve_memory.py # 记忆检索 +│ └── summarize.py # 记忆摘要 +│ +├── tools/ # 工具定义 +│ ├── rag.py # RAG 检索工具 +│ ├── web_search.py # 联网搜索工具 +│ └── subgraph.py # 子图调用工具 +│ +├── subgraphs/ # 子图模块 +│ ├── contact/ # 通讯录子图 +│ ├── dictionary/ # 词典子图 +│ └── news_analysis/ # 资讯分析子图 +│ +├── core/ # 核心工具 +│ ├── formatter.py # 格式化工具 +│ ├── stream_finalizer.py # 流式格式化 +│ ├── web_search.py # 搜索工具类 +│ ├── visualization.py # 可视化图表 +│ └── human_review.py # 人工审核 +│ +├── rag/ # RAG 检索系统 +│ ├── pipeline.py # 检索流水线 +│ ├── retriever.py # 检索器 +│ ├── rerank.py # 重排 +│ ├── fusion.py # 结果融合 +│ └── query_transform.py # 查询改写 +│ +├── model_services/ # 模型服务层 +│ ├── chat_services.py # 对话服务 +│ ├── embedding_services.py # 嵌入服务 +│ └── rerank_services.py # 重排服务 +│ +├── memory/ # 记忆系统 +│ └── mem0_client.py # Mem0 客户端 +│ +├── templates/ # 输出模板 +│ ├── error_notification.md # 错误提示 +│ ├── web_search_result.md # 搜索结果 +│ ├── knowledge_summary.md # 知识总结 +│ └── conversation_summary.md # 对话摘要 +│ +├── middleware/ # 中间件 +│ └── response_formatter.py # 响应格式化 +│ +├── db/ # 数据库 +│ ├── models.py # 数据模型 +│ └── init_db.py # 初始化 +│ +├── mcp/ # MCP 协议 +│ ├── mcp_client.py # MCP 客户端 +│ ├── mcp_manager.py # MCP 管理器 +│ └── adapters/ # 适配器 +│ +├── config.py # 配置管理 +├── logger.py # 日志工具 +└── backend.py # FastAPI 应用 ``` -然后在 [backend/app/main_graph/tools/graph_tools.py](file:///home/huang/Study/AIProject/Agent1/backend/app/main_graph/tools/graph_tools.py) 的 `AVAILABLE_TOOLS` 列表中注册。 +--- -### 添加新模型 +## API 接口 -在 [backend/app/model_services/chat_services.py](file:///home/huang/Study/AIProject/Agent1/backend/app/model_services/chat_services.py) 中添加新的服务提供者: +### 对话接口 -```python -class NewModelChatProvider(BaseServiceProvider[BaseChatModel]): - """ - 新模型服务提供者 - """ +#### 同步对话 - def __init__(self, model: str = "new-model-name"): - super().__init__("new_model_chat") - self._model = model +```http +POST /chat +Content-Type: application/json - def is_available(self) -> bool: - """ - 检查新模型服务是否可用 - """ - if not os.getenv("NEW_MODEL_API_KEY"): - logger.warning("NEW_MODEL_API_KEY 未配置") - return False - logger.info(f"新模型服务配置正确,准备使用: {self._model}") - return True - - def get_service(self) -> BaseChatModel: - """ - 获取新模型服务 - """ - if self._service_instance is None: - from langchain_openai import ChatOpenAI - from pydantic import SecretStr - - self._service_instance = ChatOpenAI( - base_url="https://api.new-model.com/v1", - api_key=SecretStr(os.getenv("NEW_MODEL_API_KEY")), - model=self._model, - temperature=0.1, - max_tokens=4096, - timeout=60.0, - max_retries=2, - streaming=True, - ) - return self._service_instance -``` - -然后在 `CHAT_PROVIDERS` 字典中注册: - -```python -CHAT_PROVIDERS: Dict[str, Callable[[], BaseServiceProvider[BaseChatModel]]] = { - "local": lambda: LocalChatProvider(), - "zhipu": lambda: ZhipuChatProvider(), - "deepseek": lambda: DeepSeekChatProvider(), - "openai": lambda: OpenAIChatProvider(), - "new_model": lambda: NewModelChatProvider(), # 新增 +{ + "message": "你好", + "thread_id": "可选的会话ID", + "model": "local", + "user_id": "default_user" } ``` -### 添加新的子图 - -#### 1. 创建子图目录结构 - -在 `backend/app/subgraphs/` 下创建新的子图目录: - -``` -backend/app/subgraphs/ -└── my_subgraph/ - ├── __init__.py - ├── state.py # 子图状态定义 - ├── nodes.py # 子图节点实现 - ├── graph.py # 子图构建 - └── api_client.py # 外部 API 客户端(可选) +响应: +```json +{ + "reply": "你好!有什么可以帮助你的?", + "thread_id": "生成的会话ID", + "model_used": "local", + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + "elapsed_time": 1.23 +} ``` -#### 2. 定义子图状态 +#### 流式对话 -在 `state.py` 中定义: +```http +POST /chat/stream +Content-Type: application/json + +{ + "message": "你好", + "model": "local" +} +``` + +响应为 SSE 流式数据: +``` +data: {"type": "llm_token", "node": "agent", "token": "你"} + +data: {"type": "llm_token", "node": "agent", "token": "好"} + +data: {"type": "done", "model_used": "local"} + +data: [DONE] +``` + +### 历史接口 + +```http +GET /threads?user_id=xxx&limit=10 +GET /thread/{thread_id}/messages +GET /thread/{thread_id}/summary +``` + +### 审核接口 + +```http +GET /reviews/pending +POST /reviews/{review_id}/approve +POST /reviews/{review_id}/reject +``` + +--- + +## 配置说明 + +### 环境变量 + +| 变量 | 说明 | 示例 | +|------|------|------| +| `DB_URI` | PostgreSQL 连接串 | `postgresql://user:pass@host:5432/db` | +| `QDRANT_URL` | Qdrant 地址 | `http://localhost:6333` | +| `VLLM_BASE_URL` | 本地 LLM 地址 | `http://localhost:8000/v1` | +| `DEEPSEEK_API_KEY` | DeepSeek API Key | `sk-xxx` | +| `ZHIPUAI_API_KEY` | 智谱 API Key | `xxx` | + +### 模板定制 + +输出模板位于 `app/templates/`,可直接编辑 `.md` 文件调整格式: + +```markdown +## ⚠️ 操作失败 + +**错误类型**: {{ error_type }} +**错误详情**: {{ error_message }} + +### 💡 建议操作 + +{{ suggestions }} +``` + +--- + +## 格式化输出 + +项目提供统一的格式化工具: ```python -from typing import TypedDict, Annotated, Literal -from langgraph.graph.message import add_messages +from backend.app.core import get_formatter -class MySubgraphState(TypedDict): - """子图状态""" - messages: Annotated[list, add_messages] - user_id: str - query: str - result: str - step: Literal["init", "process", "format", "end"] +fmt = get_formatter() + +# 渲染错误模板 +error_out = fmt.render_error( + error_type="超时", + error_message="连接超时", + suggestions=["重试", "切换模型"] +) + +# 渲染自定义模板 +output = fmt.render("my_template", key="value") + +# 流式结束后追加格式化内容 +from backend.app.core import create_finalizer +finalizer = create_finalizer() +finalizer.add_table([{"姓名": "张三"}]) +append_content = finalizer.build_append() ``` - -#### 3. 实现子图节点 - -在 `nodes.py` 中实现节点函数: - -```python -from .state import MySubgraphState - -def process_query(state: MySubgraphState) -> MySubgraphState: - """处理查询""" - query = state["query"] - # 处理逻辑 - return { - "step": "format", - "result": "处理结果" - } - -def format_output(state: MySubgraphState) -> MySubgraphState: - """格式化输出""" - result = state["result"] - return { - "step": "end", - "result": f"格式化后的结果: {result}" - } -``` - -#### 4. 构建子图 - -在 `graph.py` 中构建: - -```python -from langgraph.graph import StateGraph, END -from .state import MySubgraphState -from .nodes import process_query, format_output - -def create_my_subgraph() -> StateGraph: - """创建子图""" - graph = StateGraph(MySubgraphState) - - graph.add_node("process_query", process_query) - graph.add_node("format_output", format_output) - - graph.set_entry_point("process_query") - - graph.add_edge("process_query", "format_output") - graph.add_edge("format_output", END) - - return graph -``` - -#### 5. 在主图中注册子图工具 - -在 [backend/app/main_graph/tools/subgraph_tools.py](file:///home/huang/Study/AIProject/Agent1/backend/app/main_graph/tools/subgraph_tools.py) 中添加子图调用工具: - -```python -@tool -async def my_subgraph_tool(query: str) -> str: - """ - 我的子图工具描述 - - Args: - query: 用户查询 - - Returns: - 子图执行结果 - """ - # 调用子图逻辑 - return result -``` - -#### 6. 在 React Reason 中添加路由 - -在 [backend/app/core/intent.py](file:///home/huang/Study/AIProject/Agent1/backend/app/core/intent.py) 的 `react_reason_async` 函数中添加对子图工具的支持。 - -### Docker 部署 - -项目包含完整的 Docker 配置: - -- **docker-compose.yml**:服务编排(Backend + Frontend,连接远程数据库) -- **docker/Dockerfile.backend**:后端镜像构建 -- **docker/Dockerfile.frontend**:前端镜像构建 -- **.gitea/workflows/deploy.yml**:CI/CD 自动化部署 - -详见 [QUICKSTART.md](QUICKSTART.md) 的 Docker 部署章节。 - ---- - -## ⚙️ 环境配置 - -### 配置文件说明 - -项目采用两层环境配置文件体系: - -| 文件 | 用途 | 是否提交 Git | -|------|------|------------| -| `.env.docker` | Docker 部署模板 | ✅ 是 | -| `.env` | 实际使用的配置 | ❌ 否(已忽略) | - -**使用方法:** - -- **本地开发**:`cp .env.docker .env`,修改为本地服务地址 -- **Docker 部署**:`cp .env.docker .env`,使用远程服务器地址 - -### 重要配置(必需) - -| 变量名 | 说明 | 示例值 | -|--------|------|-------| -| `ZHIPUAI_API_KEY` | 智谱AI API密钥 | `your-api-key` | -| `DEEPSEEK_API_KEY` | DeepSeek API密钥 | `your-api-key` | -| `LLAMACPP_API_KEY` | llama.cpp API密钥 | `your-api-key` | -| `VLLM_BASE_URL` | 主 LLM 服务地址 | `http://127.0.0.1:18000/v1` | -| `LLAMACPP_EMBEDDING_URL` | Embedding 服务地址 | `http://127.0.0.1:18001/v1` | -| `LLAMACPP_RERANKER_URL` | Rerank 服务地址 | `http://127.0.0.1:18002/v1` | -| `DB_HOST` | PostgreSQL 主机 | `115.190.121.151` | -| `DB_PORT` | PostgreSQL 端口 | `5432` | -| `DB_USER` | PostgreSQL 用户名 | `postgres` | -| `DB_PASSWORD` | PostgreSQL 密码 | `your-password` | -| `DB_NAME` | PostgreSQL 数据库名 | `langgraph_db` | -| `QDRANT_URL` | Qdrant 向量数据库地址 | `http://115.190.121.151:6333` | -| `QDRANT_API_KEY` | Qdrant API 密钥 | `your-api-key` | - -### 其他配置(有默认值) - -| 变量名 | 说明 | 默认值 | -|--------|------|-------| -| `BACKEND_PORT` | 后端服务端口 | `8079` | -| `API_URL` | 前端调用后端地址 | `http://backend:8079/chat` | -| `MEMORY_SUMMARIZE_INTERVAL` | 对话摘要生成间隔 | `10` | -| `ENABLE_GRAPH_TRACE` | 是否启用图追踪 | `true` | -| `FASTEMBED_CACHE_PATH` | FastEmbed 缓存路径 | `/app/fastembed_cache` | -| `RAG_COLLECTION_NAME` | RAG 集合名称 | `rag_documents` | -| `RAG_STRATEGY` | RAG 切分策略 | `parent-child` | -| `RAG_STORAGE_TYPE` | RAG 存储类型 | `postgres` | -| `LOG_LEVEL` | 日志级别 | `DEBUG` | -| `DEBUG` | 调试模式 | `true` | - -### 注意事项 - -- ⚠️ **不要硬编码敏感信息**:所有 API Key 必须通过环境变量配置 -- ⚠️ **远程服务依赖**:确保可以访问远程 PostgreSQL (115.190.121.151:5432) 和 Qdrant (115.190.121.151:6333) -- ⚠️ **修改后重启**:修改 `.env` 后,Docker 部署需要执行 `docker compose down && docker compose up -d --build` - ---- - -## 🔍 故障排查 - -### 常见问题 - -**Q: 无法连接远程数据库?** -```bash -# 测试 PostgreSQL -psql -h 115.190.121.151 -U postgres -d langgraph_db -c "SELECT version();" - -# 测试 Qdrant -curl http://115.190.121.151:6333/collections -``` - -**Q: 后端启动失败?** -- 确认端口 8079 未被占用 -- 检查 `.env` 中的 API Key 是否正确 -- 查看启动日志确认模型初始化成功 - -**Q: 模型切换后无响应?** -- 检查所选模型的配置是否正确 -- 确认 llama.cpp 服务是否运行(如使用本地模型) -- 尝试切换到另一个模型 - -**Q: 混合路由异常?** -- 检查 `ENABLE_GRAPH_TRACE=true` 查看详细执行流程 -- 确认快速路径工具是否正确注册 -- 查看 React Reason 节点的输出 - -更多问题排查请查看 [QUICKSTART.md](QUICKSTART.md) - ---- - -## 📝 许可证 - -本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。 - -## 🤝 贡献 - -欢迎提交 Issue 和 Pull Request! - ---- -### TODO - -## 1.llm的BUG修复 -看了你的代码,目前的核心问题在于**推理节点和回答节点的职责虽然分开了,但实现上仍有混淆**,导致流程混乱。下面帮你梳理清楚。 - ---- - -## 当前架构分析 - -### ✅ 正确的部分 -- **`llm_call` 节点**已经使用了**无工具模型**(`models[model_name]`,没有 `bind_tools`),只负责根据上下文生成最终回答。这是对的。 -- **`AIAgentService`** 中,`process_message_stream` 处理了 `tool_calls` 事件,说明你预料到推理 LLM 可能产生 tool call。 - -### ❌ 混乱的根源 -- **推理 LLM 很可能仍然绑定了工具**。 - 在 `build_react_main_graph(chat_services=self.chat_services, tools=self.tools, ...)` 中,你很可能在构建推理节点时,将 `tools` 传给了推理 LLM(`llm.bind_tools(tools)`)。这会导致: - 1. 推理 LLM 直接输出 OpenAI 格式的 function call,而不是你期望的 `{"action": "RETRIEVE_RAG"}` 文本。 - 2. LangGraph 会自动解析这些 tool call 并执行(如果工具节点被正确连接),这就绕过了你自己的路由逻辑(条件边)。 - 3. 你的流式处理代码(`_handle_message_chunk`)收到了 `tool_calls`,但只是把它们当作事件转发给前端,并没有真正由你的代码去执行工具,导致工具调用可能由 LangGraph 自动完成,或者完全丢失。 -- **流式处理中的 `_handle_message_chunk`** 处理了 `tool_calls`,但实际上这些 tool call 如果被 LangGraph 自动执行了,前端会看到工具调用,但之后突然就跳到了回答,中间缺乏透明控制。如果推理 LLM 输出的是文本动作(比如 JSON),则 `tool_calls` 永远不会出现,这段处理逻辑就成了死代码。 - ---- - -## 应该实现的目标架构(也是你一贯讨论的) -``` -推理节点 (react_reason) - ├─ 使用不带工具绑定的 LLM - ├─ Prompt 中列出可用动作:RETRIEVE_RAG, WEB_SEARCH, DIRECT_RESPONSE - ├─ 输出结构化 JSON:{"action": "...", "query": "...", "reasoning": "..."} - └─ 你的代码解析 JSON → 修改 state.next_action - -条件边 (route_by_reasoning) - ├─ 读取 state.next_action - ├─ 映射到工具节点:rag_retrieve, web_search_node - └─ 或直接进入 llm_call 节点 - -工具节点 (rag_retrieve / web_search_node) - ├─ 执行实际检索/搜索 - └─ 将结果写回 state.rag_context, state.retrieved_docs - -回答节点 (llm_call) - ├─ 使用无工具 LLM - ├─ 基于 state.rag_context 生成最终回答 - └─ 绝不调用任何工具 -``` - ---- - -## 需要现在修改的地方(按优先级) - -### 1. 推理节点:移除工具绑定,改为文本决策 -在 `build_react_main_graph` 中(或在推理节点的构造代码里),确保推理 LLM 是通过 `prompt | llm` 而不是 `prompt | llm.bind_tools(tools)` 调用的。 -**做法**: -- 推理节点的 system prompt 里列出可用动作及格式要求。 -- 推理 LLM 只输出 JSON,例如: - ```json - {"action": "RETRIEVE_RAG", "search_query": "吕布 事迹 三国演义"} - ``` -- 在推理节点后增加一个解析函数,将 JSON 解析为具体的 `action` 和参数,更新到 state 中。 - -### 2. 条件边:根据解析的 `action` 干净路由 -确保 `route_by_reasoning` 使用解析后的 `state.next_action`(字符串)进行路由,而不是再去检查 `history` 或 `retrieved_docs`。同时将**置信度阈值判断**放在路由之前或放在推理节点的 prompt 中(让 LLM 决策时就遵守规则),避免在条件边里重复判断。 - -### 3. llm_call 节点:保持当前的无工具状态 ✅ -当前已正确,无需改动。唯一建议:确保 `models[model_name]` 确实没有绑定工具。 - -### 4. 流式处理:简化事件处理 -如果推理节点不再输出 tool call,可以将 `_handle_message_chunk` 中的 tool_call 处理分支删除或注释,避免混淆。 -未来如果你想展示推理过程,可以发送 custom event(如你现在做的 react_reasoning 事件)。 - ---- - -## 你现在应该做的具体步骤 -1. **检查 `build_react_main_graph` 函数**,找到推理节点的创建代码,确认是否调用了 `llm.bind_tools()`。如果调用了,改为 `llm`,并更新 prompt 为 JSON 输出。 -2. **确保推理节点的 prompt 包含以下内容**: - - 当前状态(RAG 置信度、尝试次数、已有的检索结果摘要) - - 决策规则(置信度阈值、最大重试次数等) - - 要求输出纯 JSON(不要代码块标记),格式为 `{"action": "...", "args": {...}}` -3. **在推理节点返回后,添加一个解析函数**,提取 `action` 和参数,设置 `state.next_action`、`state.rag_query` 等字段。 -4. **修改条件边**,直接根据 `state.next_action` 跳转。 -5. **测试**:运行“吕布的事迹?”查询,应该看到推理节点输出 `RETRIEVE_RAG`,然后 `rag_retrieve` 执行,再次推理(如果置信度低),或者直接 `DIRECT_RESPONSE` → `llm_call` 生成回答。最终回答应基于检索到的吕布相关文本,而不是无关片段。 - ---- - -## 总结 -**当前逻辑有问题**,主要是因为推理节点可能仍绑定了工具,导致 tool call 自动执行,打乱了你的路由控制。按上述方案调整后,Agent 的决策和执行会变得透明、可控,职责分明。如果你需要,我可以帮你重写推理节点的核心逻辑。 - -## 2.优化:实现推理验证 -1. 在 React 循环中增加“验证”步骤 -在推理 LLM 输出 DIRECT_RESPONSE 后,不直接返回给用户,而是先进入一个 validate_answer 节点: - -text -推理节点 → DIRECT_RESPONSE → validate_answer → 合格?→ 返回用户 - ↓ 不合格 - 重新规划动作(如重新检索) -验证内容:检查回答是否自洽、引用依据是否充分、是否回答了用户问题等。 - -2. 使用 LLM 自省(Self-Reflection) -在 validate_answer 节点里调用一个专门的校验 LLM(可以是轻量模型),给它这样的 prompt: - -text -你是一个严格的校验员。请检查以下回答是否满足要求: - -【用户问题】 -{user_query} - -【检索到的资料】 -{rag_context} - -【生成的回答】 -{llm_response} - -请判断: -1. 回答是否基于给定的资料? -2. 回答是否直接回应了用户问题? -3. 回答是否存在事实错误或逻辑漏洞? - -输出 JSON:{"pass": true/false, "reason": "..."} -如果 pass = false,则退回推理节点重新规划(如重新检索或联网搜索)。 - -3. 在 System Prompt 里要求推理节点评估回答质量 -你可以在推理节点的 prompt 里增加一条规则: - -text -当你决定 DIRECT_RESPONSE 并收到 llm_call 的回答后,必须自我检查: -- 回答是否与检索到的资料一致? -- 是否回答了用户核心问题? -如果发现不一致或遗漏,必须重新规划。 -这相当于把反思逻辑融入了推理循环。 \ No newline at end of file diff --git a/backend/app/backend.py b/backend/app/backend.py index ae63204..c6611d8 100644 --- a/backend/app/backend.py +++ b/backend/app/backend.py @@ -3,6 +3,7 @@ FastAPI 后端 - 支持动态模型切换,使用 PostgreSQL 持久化记忆 采用依赖注入模式,优雅管理资源生命周期 """ +import asyncio import warnings # 抑制 WebSocket 弃用警告(websockets 库升级导致,uvicorn 尚未跟进) warnings.filterwarnings("ignore", category=DeprecationWarning, module="websockets") @@ -42,6 +43,7 @@ from backend.app.subgraphs.news_analysis.api_client import NewsAPIClient from .db.init_db import init_subgraph_tables from .db.models import ContactRepository, DictionaryRepository, NewsRepository from backend.app.logger import info, error +from backend.app.core import get_formatter @asynccontextmanager async def lifespan(app: FastAPI): @@ -189,30 +191,35 @@ async def chat_endpoint( ) except Exception as e: error(f"同步响应异常: {e}") - - # === 兜底输出机制 === + + # === 统一错误格式化 === error_message = str(e) - is_timeout_error = any(keyword in error_message.lower() for keyword in + is_timeout_error = any(keyword in error_message.lower() for keyword in ["timeout", "timed out", "超时", "connection", "unavailable", "不可用"]) - - # 1. 自我介绍 - intro_text = "你好!我是 AI 智能助手,我可以帮你处理各种问题,包括查询通讯录、词典翻译、新闻分析、知识库检索、联网搜索等。\n\n" - - # 2. 错误信息(红色突出) - error_display = f"**⚠️ 当前遇到问题**\n\n```diff\n- {error_message}\n```\n\n" - - # 3. 模型切换提示(如果是超时/不可用错误) - switch_hint = "" + + formatter = get_formatter() + if is_timeout_error: - switch_hint = "💡 **提示**:当前模型可能响应超时或不可用,请尝试手动切换到其他模型(如 DeepSeek、智谱AI等)。\n\n" - - # 4. 组合完整兜底回复 - fallback_text = intro_text + error_display + switch_hint - + error_reply = formatter.render_error( + error_type="模型响应超时", + error_message=error_message, + suggestions=[ + "当前模型可能响应超时或不可用", + "请尝试手动切换到其他模型(如 DeepSeek、智谱AI等)", + "或稍后重试" + ], + ) + else: + error_reply = formatter.render_error( + error_type="处理异常", + error_message=error_message, + suggestions=["请稍后重试", "如果问题持续存在,请联系管理员"], + ) + actual_model = request.model if request.model in agent_service.graphs else next(iter(agent_service.graphs.keys())) - + return ChatResponse( - reply=fallback_text, + reply=error_reply, thread_id=thread_id, model_used=actual_model, input_tokens=0, @@ -273,33 +280,36 @@ async def chat_stream_endpoint( yield "data: [DONE]\n\n" except Exception as e: error(f"流式响应异常: {e}") - - # === 兜底输出机制 === + + # === 统一错误格式化 === error_message = str(e) - is_timeout_error = any(keyword in error_message.lower() for keyword in + is_timeout_error = any(keyword in error_message.lower() for keyword in ["timeout", "timed out", "超时", "connection", "unavailable", "不可用"]) - - # 1. 自我介绍 - intro_text = "你好!我是 AI 智能助手,我可以帮你处理各种问题,包括查询通讯录、词典翻译、新闻分析、知识库检索、联网搜索等。\n\n" - - # 2. 错误信息(红色突出) - error_display = f"**⚠️ 当前遇到问题**\n\n```diff\n- {error_message}\n```\n\n" - - # 3. 模型切换提示(如果是超时/不可用错误) - switch_hint = "" + + formatter = get_formatter() + if is_timeout_error: - switch_hint = "💡 **提示**:当前模型可能响应超时或不可用,请尝试手动切换到其他模型(如 DeepSeek、智谱AI等)。\n\n" - - # 4. 组合完整兜底回复 - fallback_text = intro_text + error_display + switch_hint - - # 5. 以 llm_token 方式发送兜底回复,模拟打字机效果 - for char in fallback_text: - yield f"data: {json.dumps({'type': 'llm_token', 'node': 'fallback', 'token': char}, ensure_ascii=False)}\n\n" + error_reply = formatter.render_error( + error_type="模型响应超时", + error_message=error_message, + suggestions=[ + "当前模型可能响应超时或不可用", + "请尝试手动切换到其他模型(如 DeepSeek、智谱AI等)", + ], + ) + else: + error_reply = formatter.render_error( + error_type="处理异常", + error_message=error_message, + suggestions=["请稍后重试"], + ) + + # 以 llm_token 方式发送错误回复,模拟打字机效果 + for char in error_reply: + yield f"data: {json.dumps({'type': 'llm_token', 'node': 'error', 'token': char}, ensure_ascii=False)}\n\n" import asyncio await asyncio.sleep(0.01) - - # 6. 发送错误事件 + yield f"data: {json.dumps({'type': 'error', 'message': error_message}, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" diff --git a/backend/app/core/__init__.py b/backend/app/core/__init__.py index 1bfbf19..7384b41 100644 --- a/backend/app/core/__init__.py +++ b/backend/app/core/__init__.py @@ -1,6 +1,7 @@ """核心模块 - 基类和通用工具""" -from .formatter import MarkdownFormatter +from .formatter import MarkdownFormatter, OutputRenderer, get_formatter +from .stream_finalizer import StreamFinalizer, create_finalizer from .state_base import BaseState from .human_review import ( ReviewManager, @@ -23,6 +24,10 @@ from .visualization import ( __all__ = [ "MarkdownFormatter", + "OutputRenderer", + "get_formatter", + "StreamFinalizer", + "create_finalizer", "BaseState", "ReviewManager", "InMemoryReviewStore", diff --git a/backend/app/core/formatter.py b/backend/app/core/formatter.py index 951de8d..d6639dc 100644 --- a/backend/app/core/formatter.py +++ b/backend/app/core/formatter.py @@ -1,482 +1,238 @@ """ 格式化输出工具模块 -提供基于 Jinja2 模板的 Markdown 格式化输出能力 -功能: -1. TemplateManager - 模板管理器,支持加载和渲染 Jinja2 模板 -2. MarkdownFormatter - Markdown 格式化工具,提供常用格式(表格、列表、引用等) -3. OutputRenderer - 输出渲染器,统一接口生成最终输出 -4. PresetTemplates - 预置模板(对话摘要、报告、列表等) +提供统一的 Markdown 格式化能力: +1. MarkdownFormatter - 静态方法生成 Markdown 元素 +2. OutputRenderer - 模板渲染 + 全局单例 +3. get_formatter() - 获取全局单例 """ -import os +from typing import Dict, List, Any, Optional from pathlib import Path -from typing import Dict, List, Any, Optional, Union -from dataclasses import dataclass -from abc import ABC, abstractmethod -# 尝试导入 Jinja2,如果没有则提供基础实现 -try: - from jinja2 import Template as JinjaTemplate, Environment, BaseLoader - HAS_JINJA2 = True -except ImportError: - HAS_JINJA2 = False +from backend.app.logger import info, warning +from backend.app.templates import TEMPLATES_DIR -class BaseFormatter(ABC): - """格式化器基类""" - - @abstractmethod - def format(self, data: Any) -> str: - """格式化数据为字符串""" - pass +# ========== Markdown 格式化器 ========== +class MarkdownFormatter: + """Markdown 格式化工具(静态方法)""" -class MarkdownFormatter(BaseFormatter): - """Markdown 格式化工具""" - @staticmethod def table(data: List[Dict[str, Any]], headers: Optional[List[str]] = None) -> str: - """ - 生成 Markdown 表格 - - Args: - data: 数据列表,每个元素是一个字典 - headers: 表头列表,如果为 None 则使用字典的键 - - Returns: - Markdown 表格字符串 - """ + """生成 Markdown 表格""" if not data: return "" - + if headers is None: headers = list(data[0].keys()) if data else [] - + if not headers: return "" - + lines = [] - - # 表头行 header_line = "| " + " | ".join(str(h) for h in headers) + " |" lines.append(header_line) - - # 分隔线 separator_line = "| " + " | ".join("---" for _ in headers) + " |" lines.append(separator_line) - - # 数据行 + for row in data: row_values = [str(row.get(h, "")) for h in headers] - row_line = "| " + " | ".join(row_values) + " |" - lines.append(row_line) - + lines.append("| " + " | ".join(row_values) + " |") + return "\n".join(lines) - + @staticmethod def bullet_list(items: List[str], indent: int = 0) -> str: - """ - 生成无序列表 - - Args: - items: 列表项 - indent: 缩进层级 - - Returns: - Markdown 无序列表字符串 - """ + """生成无序列表""" indent_str = " " * indent return "\n".join(f"{indent_str}- {item}" for item in items) - + @staticmethod def numbered_list(items: List[str], start: int = 1, indent: int = 0) -> str: - """ - 生成有序列表 - - Args: - items: 列表项 - start: 起始编号 - indent: 缩进层级 - - Returns: - Markdown 有序列表字符串 - """ + """生成有序列表""" indent_str = " " * indent return "\n".join(f"{indent_str}{i}. {item}" for i, item in enumerate(items, start=start)) - + @staticmethod def quote(text: str, author: Optional[str] = None) -> str: - """ - 生成引用块 - - Args: - text: 引用文本 - author: 作者(可选) - - Returns: - Markdown 引用块字符串 - """ - quoted_lines = "\n".join(f"> {line}" for line in text.split("\n")) + """生成引用块""" + quoted = "\n".join(f"> {line}" for line in text.split("\n")) if author: - quoted_lines += f"\n> — {author}" - return quoted_lines - + quoted += f"\n> — {author}" + return quoted + @staticmethod def code(code: str, language: str = "") -> str: - """ - 生成代码块 - - Args: - code: 代码内容 - language: 语言标识符 - - Returns: - Markdown 代码块字符串 - """ + """生成代码块""" return f"```{language}\n{code}\n```" - + @staticmethod def heading(text: str, level: int = 1) -> str: - """ - 生成标题 - - Args: - text: 标题文本 - level: 标题级别(1-6) - - Returns: - Markdown 标题字符串 - """ + """生成标题""" level = max(1, min(6, level)) return f"{'#' * level} {text}" - + @staticmethod def link(text: str, url: str) -> str: - """ - 生成链接 - - Args: - text: 链接文本 - url: 链接地址 - - Returns: - Markdown 链接字符串 - """ + """生成链接""" return f"[{text}]({url})" - + @staticmethod def bold(text: str) -> str: """生成粗体""" return f"**{text}**" - + @staticmethod def italic(text: str) -> str: """生成斜体""" return f"*{text}*" - + @staticmethod def divider() -> str: """生成分割线""" return "---" - - def format(self, data: Any) -> str: - """实现基类方法,根据数据类型自动选择格式化方式""" + + @staticmethod + def format(data: Any) -> str: + """根据数据类型自动选择格式化方式""" if isinstance(data, list): - if len(data) > 0 and isinstance(data[0], dict): - return self.table(data) - else: - return self.bullet_list([str(item) for item in data]) + if data and isinstance(data[0], dict): + return MarkdownFormatter.table(data) + return MarkdownFormatter.bullet_list([str(item) for item in data]) elif isinstance(data, dict): - return self.table([data]) - else: - return str(data) + return MarkdownFormatter.table([data]) + return str(data) -@dataclass -class Template: - """模板数据类""" - name: str - content: str - description: str = "" - - -class DictLoader(BaseLoader): - """字典模板加载器 - - 用于从内存字典中加载模板 - """ - - def __init__(self, templates: Dict[str, str]): - self.templates = templates - - def get_source(self, environment, template): - if template not in self.templates: - raise TemplateNotFound(template) - source = self.templates[template] - return source, None, lambda: True - +# ========== 模板加载器 ========== class TemplateManager: - """Jinja2 模板管理器""" - + """模板管理器,支持从文件加载 .md 或 .jinja 模板""" + def __init__(self, template_dir: Optional[Path] = None): - """ - 初始化模板管理器 - - Args: - template_dir: 模板目录路径 - """ - self._templates: Dict[str, Template] = {} - self.template_dir = template_dir - self._env: Optional[Environment] = None - - if HAS_JINJA2: - self._env = Environment(loader=DictLoader({})) - - def _refresh_env(self) -> None: - """刷新 Jinja2 环境""" - if HAS_JINJA2 and self._env is not None: - template_dict = {name: t.content for name, t in self._templates.items()} - self._env = Environment(loader=DictLoader(template_dict)) - - def add_template(self, name: str, content: str, description: str = "") -> None: - """ - 添加模板 - - Args: - name: 模板名称 - content: 模板内容 - description: 模板描述 - """ - self._templates[name] = Template(name=name, content=content, description=description) - self._refresh_env() - - def load_template(self, name: str, file_path: Path) -> None: - """ - 从文件加载模板 - - Args: - name: 模板名称 - file_path: 模板文件路径 - """ - if file_path.exists(): - content = file_path.read_text(encoding='utf-8') - self.add_template(name, content, f"从文件加载: {file_path}") - - def get_template(self, name: str) -> Optional[Template]: - """ - 获取模板 - - Args: - name: 模板名称 - - Returns: - 模板对象,如果不存在返回 None - """ + self.template_dir = template_dir or TEMPLATES_DIR + self._templates: Dict[str, str] = {} + self._load_templates() + + def _load_templates(self) -> None: + """从模板目录加载所有 .md 和 .jinja 文件""" + if not self.template_dir.exists(): + warning(f"[Formatter] 模板目录不存在: {self.template_dir}") + return + + for tmpl_file in self.template_dir.glob("*.md"): + name = tmpl_file.stem + self._templates[name] = tmpl_file.read_text(encoding="utf-8") + info(f"[Formatter] 加载模板: {name}") + + for tmpl_file in self.template_dir.glob("*.jinja"): + name = tmpl_file.stem + self._templates[name] = tmpl_file.read_text(encoding="utf-8") + info(f"[Formatter] 加载模板: {name}") + + def get(self, name: str) -> Optional[str]: + """获取模板内容""" return self._templates.get(name) - - def render(self, template_name: str, context: Dict[str, Any]) -> str: - """ - 渲染模板 - - Args: - template_name: 模板名称 - context: 渲染上下文 - - Returns: - 渲染后的字符串 - """ - template = self.get_template(template_name) - if template is None: - raise ValueError(f"模板不存在: {template_name}") - - return self.render_string(template.content, context) - - def render_string(self, template_string: str, context: Dict[str, Any]) -> str: - """ - 渲染模板字符串 - - Args: - template_string: 模板字符串 - context: 渲染上下文 - - Returns: - 渲染后的字符串 - """ - if HAS_JINJA2 and self._env is not None: - try: - jinja_template = self._env.from_string(template_string) - return jinja_template.render(**context) - except Exception: - # 如果 Jinja2 渲染失败,使用简单替换 - pass - - # 简单的字符串替换作为备选方案 - result = template_string + + def render(self, name: str, context: Dict[str, Any]) -> str: + """渲染模板""" + template = self.get(name) + if not template: + raise ValueError(f"模板不存在: {name}") + + return self._render_string(template, context) + + def _render_string(self, template_str: str, context: Dict[str, Any]) -> str: + """渲染模板字符串""" + # 尝试 Jinja2 + try: + from jinja2 import Template + tmpl = Template(template_str) + return tmpl.render(**context) + except ImportError: + pass + + # 简单替换作为兜底 + result = template_str for key, value in context.items(): - result = result.replace(f"{{{{{key}}}}}", str(value)) - result = result.replace(f"{{{{ {key} }}}}", str(value)) + str_val = str(value) + result = result.replace(f"{{{{ {key} }}}}", str_val) + result = result.replace(f"{{{{{key}}}}}", str_val) + # Handle conditional blocks {{#if key}}...{{/if}} + import re + result = re.sub(r"\{\{#if\s+" + re.escape(key) + r"\}\}(.*?)\{\{/if\}\}", + str_val and r"\1" or "", result, flags=re.DOTALL) + # Handle each loops {{#each key}}...{{/each}} + if isinstance(value, list): + pattern = r"\{\{#each\s+" + re.escape(key) + r"\}\}(.*?)\{\{/each\}\}" + matches = re.findall(pattern, result, flags=re.DOTALL) + for match in matches: + rendered_items = [] + for idx, item in enumerate(value): + item_str = match + if isinstance(item, dict): + for k, v in item.items(): + item_str = item_str.replace("{{ " + k + " }}", str(v)) + item_str = item_str.replace("{{" + k + "}}", str(v)) + item_str = item_str.replace("{{ @" + k + " }}", str(idx + 1)) + item_str = item_str.replace("{{ @index }}", str(idx + 1)) + else: + item_str = item_str.replace("{{ this }}", str(item)) + rendered_items.append(item_str) + result = re.sub(pattern, "\n".join(rendered_items), result, flags=re.DOTALL) return result -class PresetTemplates: - """预置模板集合""" - - @staticmethod - def conversation_summary() -> str: - """对话摘要模板""" - return """# 对话摘要 - -**时间**: {{ timestamp }} - -**参与者**: {{ participants }} - ---- - -## 对话要点 -{{ bullet_list(points) }} - ---- - -## 总结 -{{ summary }} -""" - - @staticmethod - def research_report() -> str: - """研究报告模板""" - return """# {{ title }} - -**日期**: {{ date }} -**作者**: {{ author }} - ---- - -## 摘要 -{{ summary }} - ---- - -## 发现 -{{ bullet_list(findings) }} - ---- - -## 数据来源 -{{ sources }} -""" - - @staticmethod - def task_list() -> str: - """任务列表模板""" - return """# 任务列表 - -**更新时间**: {{ update_time }} - ---- - -## 待办 -{{ numbered_list(todos) }} - ---- - -## 已完成 -{{ numbered_list(completed) }} -""" - - @staticmethod - def data_summary() -> str: - """数据摘要模板""" - return """# 数据摘要 - -**生成时间**: {{ timestamp }} - ---- - -## 数据概览 -{{ table(data_overview) }} - ---- - -## 关键指标 -{{ bullet_list(metrics) }} -""" - +# ========== 输出渲染器 ========== class OutputRenderer: - """输出渲染器""" - - def __init__(self, template_manager: Optional[TemplateManager] = None): + """ + 输出渲染器 - 统一接口渲染模板 + 使用全局单例,通过 get_formatter() 获取 + """ + + def __init__(self, template_dir: Optional[Path] = None): + self._templates = TemplateManager(template_dir) + self.md = MarkdownFormatter() + + def render(self, template_name: str, **context) -> str: """ - 初始化输出渲染器 - + 渲染模板 + Args: - template_manager: 模板管理器 + template_name: 模板名称(不含扩展名) + **context: 渲染上下文 """ - self.template_manager = template_manager or TemplateManager() - self.markdown = MarkdownFormatter() - - # 自动注册预置模板 - self._register_presets() - - def _register_presets(self) -> None: - """注册预置模板""" - self.template_manager.add_template( - "conversation_summary", - PresetTemplates.conversation_summary(), - "对话摘要模板" - ) - self.template_manager.add_template( - "research_report", - PresetTemplates.research_report(), - "研究报告模板" - ) - self.template_manager.add_template( - "task_list", - PresetTemplates.task_list(), - "任务列表模板" - ) - self.template_manager.add_template( - "data_summary", - PresetTemplates.data_summary(), - "数据摘要模板" - ) - - def render(self, template_name: str, context: Dict[str, Any]) -> str: - """ - 使用模板渲染输出 - - Args: - template_name: 模板名称 - context: 渲染上下文 - - Returns: - 渲染后的字符串 - """ - # 将格式化工具注入上下文 - render_context = context.copy() - render_context["bullet_list"] = self.markdown.bullet_list - render_context["numbered_list"] = self.markdown.numbered_list - render_context["table"] = self.markdown.table - render_context["quote"] = self.markdown.quote - render_context["code"] = self.markdown.code - render_context["heading"] = self.markdown.heading - render_context["link"] = self.markdown.link - render_context["bold"] = self.markdown.bold - render_context["italic"] = self.markdown.italic - render_context["divider"] = self.markdown.divider - - return self.template_manager.render(template_name, render_context) - + return self._templates.render(template_name, context) + def render_plain(self, data: Any) -> str: - """ - 直接格式化数据为 Markdown - - Args: - data: 数据 - - Returns: - 格式化后的字符串 - """ - return self.markdown.format(data) + """直接格式化数据为 Markdown""" + return self.md.format(data) + + def render_error(self, error_type: str, error_message: str = "", + suggestions: Optional[List[str]] = None, + retry_count: int = 0, max_retries: Optional[int] = None) -> str: + """渲染错误通知模板""" + context = { + "error_type": error_type, + "error_message": error_message, + "suggestions": self.md.bullet_list(suggestions or ["请稍后重试"]), + "retry_count": retry_count, + "max_retries": max_retries, + } + return self.render("error_notification", **context) + + +# ========== 全局单例 ========== + +_formatter: Optional[OutputRenderer] = None + + +def get_formatter() -> OutputRenderer: + """获取全局 OutputRenderer 单例""" + global _formatter + if _formatter is None: + _formatter = OutputRenderer() + return _formatter diff --git a/backend/app/core/retry_utils.py b/backend/app/core/retry_utils.py deleted file mode 100644 index 69c44a5..0000000 --- a/backend/app/core/retry_utils.py +++ /dev/null @@ -1,332 +0,0 @@ -""" -超时和重试工具模块 -为 React 模式提供超时控制和重试机制 -""" - -import time -import asyncio -from functools import wraps -from typing import Callable, Any, Optional, Type, Tuple, Union -from dataclasses import dataclass, field -from enum import Enum, auto - - -class RetryStrategy(Enum): - """重试策略""" - FIXED = auto() # 固定间隔 - EXPONENTIAL = auto() # 指数退避 - LINEAR = auto() # 线性增长 - - -@dataclass -class RetryConfig: - """重试配置""" - max_retries: int = 3 # 最大重试次数 - base_delay: float = 1.0 # 基础延迟(秒) - max_delay: float = 10.0 # 最大延迟(秒) - strategy: RetryStrategy = RetryStrategy.EXPONENTIAL - timeout: Optional[float] = 30.0 # 单次调用超时(秒) - recoverable_exceptions: Tuple[Type[Exception], ...] = field( - default_factory=lambda: (Exception,) - ) - unrecoverable_exceptions: Tuple[Type[Exception], ...] = field( - default_factory=tuple - ) - - -@dataclass -class RetryResult: - """重试结果""" - success: bool - result: Any = None - error: Optional[Exception] = None - retry_count: int = 0 - total_time: float = 0.0 - timed_out: bool = False - - -# ========== 同步重试装饰器 ========== -def with_retry( - config: Optional[RetryConfig] = None, - max_retries: int = 3, - timeout: Optional[float] = 30.0, - base_delay: float = 1.0, - on_retry: Optional[Callable[[int, Exception], None]] = None -): - """ - 同步重试装饰器 - - Args: - config: 重试配置对象 - max_retries: 最大重试次数(如果没有 config) - timeout: 单次调用超时(秒) - base_delay: 基础延迟(秒) - on_retry: 重试回调函数(retry_count, exception) - """ - if config is None: - config = RetryConfig( - max_retries=max_retries, - timeout=timeout, - base_delay=base_delay - ) - - def decorator(func: Callable) -> Callable: - @wraps(func) - def wrapper(*args, **kwargs) -> RetryResult: - start_time = time.time() - last_error = None - - for attempt in range(config.max_retries + 1): - try: - # 执行函数(带超时) - if config.timeout: - # 使用信号量或线程实现超时(简化版) - result = func(*args, **kwargs) - else: - result = func(*args, **kwargs) - - # 成功 - total_time = time.time() - start_time - return RetryResult( - success=True, - result=result, - retry_count=attempt, - total_time=total_time - ) - - except Exception as e: - last_error = e - - # 检查是否是不可恢复的异常 - if isinstance(e, config.unrecoverable_exceptions): - break - - # 检查是否达到最大重试次数 - if attempt >= config.max_retries: - break - - # 计算延迟 - delay = _calculate_delay(attempt, config) - - # 回调通知 - if on_retry: - on_retry(attempt + 1, e) - - # 等待 - time.sleep(delay) - - # 所有重试都失败 - total_time = time.time() - start_time - return RetryResult( - success=False, - error=last_error, - retry_count=config.max_retries, - total_time=total_time - ) - - return wrapper - return decorator - - -# ========== 异步重试装饰器 ========== -def with_async_retry( - config: Optional[RetryConfig] = None, - max_retries: int = 3, - timeout: Optional[float] = 30.0, - base_delay: float = 1.0, - on_retry: Optional[Callable[[int, Exception], None]] = None -): - """ - 异步重试装饰器 - """ - if config is None: - config = RetryConfig( - max_retries=max_retries, - timeout=timeout, - base_delay=base_delay - ) - - def decorator(func: Callable) -> Callable: - @wraps(func) - async def wrapper(*args, **kwargs) -> RetryResult: - start_time = time.time() - last_error = None - - for attempt in range(config.max_retries + 1): - try: - # 执行函数(带超时) - if config.timeout: - result = await asyncio.wait_for( - func(*args, **kwargs), - timeout=config.timeout - ) - else: - result = await func(*args, **kwargs) - - # 成功 - total_time = time.time() - start_time - return RetryResult( - success=True, - result=result, - retry_count=attempt, - total_time=total_time - ) - - except asyncio.TimeoutError as e: - last_error = e - timed_out = True - - except Exception as e: - last_error = e - timed_out = False - - # 检查是否是不可恢复的异常 - if isinstance(e, config.unrecoverable_exceptions): - break - - # 检查是否达到最大重试次数 - if attempt >= config.max_retries: - break - - # 计算延迟 - delay = _calculate_delay(attempt, config) - - # 回调通知 - if on_retry: - on_retry(attempt + 1, last_error) - - # 等待 - await asyncio.sleep(delay) - - # 所有重试都失败 - total_time = time.time() - start_time - return RetryResult( - success=False, - error=last_error, - retry_count=config.max_retries, - total_time=total_time, - timed_out=isinstance(last_error, asyncio.TimeoutError) - ) - - return wrapper - return decorator - - -# ========== 辅助函数 ========== -def _calculate_delay(attempt: int, config: RetryConfig) -> float: - """计算延迟时间""" - if config.strategy == RetryStrategy.FIXED: - delay = config.base_delay - elif config.strategy == RetryStrategy.LINEAR: - delay = config.base_delay * (attempt + 1) - elif config.strategy == RetryStrategy.EXPONENTIAL: - delay = config.base_delay * (2 ** attempt) - else: - delay = config.base_delay - - # 不超过最大延迟 - return min(delay, config.max_delay) - - -# ========== 为 React 节点设计的超时重试包装器 ========== -def create_retry_wrapper_for_node( - node_func: Callable, - node_name: str, - max_retries: int = 2, - timeout: float = 30.0 -): - """ - 为 React 节点创建带重试和超时的包装器 - - Args: - node_func: 原始节点函数 - node_name: 节点名称(用于错误标识) - max_retries: 最大重试次数 - timeout: 单次执行超时 - - Returns: 包装后的节点函数 - """ - config = RetryConfig( - max_retries=max_retries, - timeout=timeout, - strategy=RetryStrategy.EXPONENTIAL - ) - - @wraps(node_func) - def wrapped_node(state): - # 记录开始时间 - start_time = time.time() - - # 重试循环 - last_error = None - for attempt in range(config.max_retries + 1): - try: - # 执行节点 - result = node_func(state) - - # 检查节点是否报告了错误 - if hasattr(state, "current_error") and state.current_error: - # 节点内部报告了错误,继续重试 - last_error = Exception(state.current_error.error_message) - if attempt < config.max_retries: - delay = _calculate_delay(attempt, config) - time.sleep(delay) - continue - - # 成功 - return result - - except Exception as e: - last_error = e - - if attempt >= config.max_retries: - break - - # 等待后重试 - delay = _calculate_delay(attempt, config) - time.sleep(delay) - - # 所有重试都失败,更新状态错误信息 - from backend.app.main_graph.state import ErrorRecord, ErrorSeverity - - error_record = ErrorRecord( - error_type=f"{node_name}TimeoutError", - error_message=str(last_error) if last_error else f"{node_name} 执行超时", - severity=ErrorSeverity.ERROR, - source=node_name, - retry_count=config.max_retries, - max_retries=config.max_retries, - context={ - "timeout": timeout, - "total_time": time.time() - start_time - } - ) - - if hasattr(state, "errors"): - state.errors.append(error_record) - if hasattr(state, "current_error"): - state.current_error = error_record - if hasattr(state, "error_message"): - state.error_message = str(last_error) - if hasattr(state, "current_phase"): - state.current_phase = "error_handling" - - return state - - return wrapped_node - - -# ========== 预配置的 RAG 重试配置 ========== -RAG_RETRY_CONFIG = RetryConfig( - max_retries=2, - timeout=60.0, # RAG 可以容忍稍长的超时 - base_delay=2.0, - strategy=RetryStrategy.EXPONENTIAL -) - -# ========== 预配置的子图重试配置 ========== -SUBGRAPH_RETRY_CONFIG = RetryConfig( - max_retries=1, # 子图通常不适合多次重试 - timeout=120.0, # 子图执行时间较长 - base_delay=3.0 -) diff --git a/backend/app/core/stream_finalizer.py b/backend/app/core/stream_finalizer.py new file mode 100644 index 0000000..0ed51f4 --- /dev/null +++ b/backend/app/core/stream_finalizer.py @@ -0,0 +1,220 @@ +""" +流式输出格式化器 + +在流式输出结束后,追加格式化结构(表格、引用等) +解决流式输出与模板渲染的冲突 +""" + +from typing import List, Dict, Any, Optional +from dataclasses import dataclass + +from backend.app.core.formatter import get_formatter + + +@dataclass +class StreamAppend: + """流式追加内容""" + type: str # "table" | "quote" | "list" | "divider" | "text" + content: Any + + +class StreamFinalizer: + """ + 流式输出格式化器 + + 在流式输出结束后追加结构化内容: + - 表格 + - 引用块 + - 分割线 + - 文本 + + 使用方式: + 1. 流式输出主体内容 + 2. 调用 build_append() 获取追加内容 + 3. 发送追加事件到前端 + """ + + def __init__(self): + self.formatter = get_formatter() + self.md = self.formatter.md + self._appends: List[StreamAppend] = [] + + def reset(self): + """重置追加队列""" + self._appends = [] + return self + + def add_table(self, data: List[Dict], headers: Optional[List[str]] = None): + """添加表格""" + self._appends.append(StreamAppend( + type="table", + content={"data": data, "headers": headers} + )) + return self + + def add_quote(self, text: str, author: Optional[str] = None): + """添加引用块""" + self._appends.append(StreamAppend( + type="quote", + content={"text": text, "author": author} + )) + return self + + def add_list(self, items: List[str], numbered: bool = False): + """添加列表""" + self._appends.append(StreamAppend( + type="list", + content={"items": items, "numbered": numbered} + )) + return self + + def add_divider(self): + """添加分割线""" + self._appends.append(StreamAppend(type="divider", content=None)) + return self + + def add_text(self, text: str): + """添加文本""" + self._appends.append(StreamAppend(type="text", content=text)) + return self + + def add_knowledge_summary(self, topic: str, summary: str, + key_points: Optional[List[Dict]] = None, + table_data: Optional[List[Dict]] = None, + sources: Optional[List[Dict]] = None): + """添加知识总结(使用模板)""" + table = "" + if table_data: + table = self.md.table(table_data) + + self._appends.append(StreamAppend( + type="template", + content={ + "name": "knowledge_summary", + "context": { + "topic": topic, + "timestamp": self._now(), + "summary": summary, + "key_points": key_points or [], + "table_data": table_data, + "table": table, + "sources": sources or [], + } + } + )) + return self + + def add_web_results(self, query: str, results: List[Dict]): + """添加搜索结果""" + self._appends.append(StreamAppend( + type="template", + content={ + "name": "web_search_result", + "context": { + "query": query, + "result_count": len(results), + "results": results, + } + } + )) + return self + + def build_append(self) -> str: + """ + 构建追加内容 + + Returns: + 格式化后的追加文本 + """ + if not self._appends: + return "" + + lines = [] + lines.append("") # 空行分隔 + lines.append(self.md.divider()) + lines.append("") + + for append in self._appends: + if append.type == "table": + lines.append(self.md.table(append.content["data"], append.content.get("headers"))) + elif append.type == "quote": + lines.append(self.md.quote(append.content["text"], append.content.get("author"))) + elif append.type == "list": + if append.content["numbered"]: + lines.append(self.md.numbered_list(append.content["items"])) + else: + lines.append(self.md.bullet_list(append.content["items"])) + elif append.type == "divider": + lines.append(self.md.divider()) + elif append.type == "text": + lines.append(append.content) + elif append.type == "template": + template_name = append.content["name"] + context = append.content["context"] + lines.append(self.formatter.render(template_name, **context)) + + lines.append("") + + return "\n".join(lines) + + def build_events(self) -> List[Dict[str, Any]]: + """ + 构建追加事件列表(用于流式发送) + + Returns: + 事件列表,每项包含 type 和 content + """ + if not self._appends: + return [] + + events = [] + for append in self._appends: + if append.type == "table": + events.append({ + "type": "append_table", + "content": self.md.table(append.content["data"], append.content.get("headers")) + }) + elif append.type == "quote": + events.append({ + "type": "append_quote", + "content": self.md.quote(append.content["text"], append.content.get("author")) + }) + elif append.type == "list": + events.append({ + "type": "append_list", + "content": self.md.numbered_list(append.content["items"]) if append.content["numbered"] + else self.md.bullet_list(append.content["items"]) + }) + elif append.type == "divider": + events.append({ + "type": "append_divider", + "content": self.md.divider() + }) + elif append.type == "text": + events.append({ + "type": "append_text", + "content": append.content + }) + elif append.type == "template": + template_name = append.content["name"] + context = append.content["context"] + events.append({ + "type": "append_template", + "template": template_name, + "content": self.formatter.render(template_name, **context) + }) + + return events + + @staticmethod + def _now() -> str: + """获取当前时间""" + from datetime import datetime + return datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + +# ========== 便捷函数 ========== + +def create_finalizer() -> StreamFinalizer: + """创建流式格式化器""" + return StreamFinalizer() diff --git a/backend/app/core/web_search.py b/backend/app/core/web_search.py index 2ae09df..170ab07 100644 --- a/backend/app/core/web_search.py +++ b/backend/app/core/web_search.py @@ -158,12 +158,13 @@ class WebSearchTool: return results - def format_search_results(self, results: List[SearchResult]) -> str: + def format_search_results(self, results: List[SearchResult], query: str = "") -> str: """ - 格式化搜索结果(带引用溯源) + 格式化搜索结果(使用模板渲染) Args: results: 搜索结果列表 + query: 搜索关键词 Returns: 格式化后的 Markdown 文本 @@ -171,22 +172,27 @@ class WebSearchTool: if not results: return "未找到相关搜索结果" - lines = ["## 🔍 联网搜索结果\n"] + from backend.app.core import get_formatter + formatter = get_formatter() - for idx, result in enumerate(results, 1): - lines.append(f"### [{idx}] {result.title}") - lines.append(f"- 🔗 来源:[{result.url}]({result.url})") - lines.append(f"- 📝 摘要:{result.snippet}") - lines.append(f"- 📅 时间:{result.timestamp.strftime('%Y-%m-%d %H:%M:%S')}") - lines.append("") + # 转换为字典列表供模板使用 + result_dicts = [] + for r in results: + result_dicts.append({ + "title": r.title, + "url": r.url, + "snippet": r.snippet, + "source": r.source, + "timestamp": r.timestamp.strftime('%Y-%m-%d %H:%M:%S') if r.timestamp else "", + }) - lines.append("---") - lines.append("💡 **引用溯源说明**:") - lines.append("- 以上搜索结果均标注了来源链接") - lines.append("- 使用方括号数字标识引用(如 [1]、[2])") - lines.append("- 可通过链接追溯原始信息") - - return "\n".join(lines) + return formatter.render( + "web_search_result", + query=query, + result_count=len(results), + results=result_dicts, + citation_note="💡 **引用溯源说明**:以上搜索结果均标注了来源链接,可通过链接追溯原始信息。" + ) # 单例实例 @@ -214,4 +220,4 @@ def web_search(query: str, max_results: int = 5) -> str: """ tool = get_web_search_tool() results = tool.search(query, max_results) - return tool.format_search_results(results) + return tool.format_search_results(results, query=query) diff --git a/backend/app/middleware/__init__.py b/backend/app/middleware/__init__.py new file mode 100644 index 0000000..a43ba59 --- /dev/null +++ b/backend/app/middleware/__init__.py @@ -0,0 +1,15 @@ +""" +中间件模块 +""" + +from .response_formatter import ( + ResponseFormatterMiddleware, + format_error_response, + format_success_response, +) + +__all__ = [ + "ResponseFormatterMiddleware", + "format_error_response", + "format_success_response", +] diff --git a/backend/app/middleware/response_formatter.py b/backend/app/middleware/response_formatter.py new file mode 100644 index 0000000..3492cca --- /dev/null +++ b/backend/app/middleware/response_formatter.py @@ -0,0 +1,91 @@ +""" +响应格式化中间件 + +自动将 API 响应中的字符串或错误信息格式化为统一风格 +""" + +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import JSONResponse +from typing import Callable + +from backend.app.core import get_formatter + + +class ResponseFormatterMiddleware(BaseHTTPMiddleware): + """ + 响应格式化中间件 + + 功能: + 1. 统一响应包装 + 2. 错误信息格式化 + 3. 调试信息注入(可选) + """ + + async def dispatch(self, request: Request, call_next: Callable): + response = await call_next(request) + return response + + +def format_error_response( + error_type: str, + error_message: str, + suggestions: list = None, + retry_count: int = 0, + max_retries: int = None +) -> str: + """ + 格式化错误响应 + + Args: + error_type: 错误类型 + error_message: 错误详情 + suggestions: 建议操作列表 + retry_count: 已重试次数 + max_retries: 最大重试次数 + + Returns: + 格式化后的 Markdown 文本 + """ + formatter = get_formatter() + return formatter.render_error( + error_type=error_type, + error_message=error_message, + suggestions=suggestions, + retry_count=retry_count, + max_retries=max_retries + ) + + +def format_success_response( + content: str, + title: str = None, + include_footer: bool = True +) -> str: + """ + 格式化成功响应 + + Args: + content: 内容 + title: 可选标题 + include_footer: 是否包含页脚 + + Returns: + 格式化后的 Markdown 文本 + """ + formatter = get_formatter() + md = formatter.md + + lines = [] + if title: + lines.append(md.heading(title, 2)) + lines.append("") + + lines.append(content) + + if include_footer: + lines.append("") + lines.append("---") + lines.append("*以上内容由 AI Agent 生成*") + + return "\n".join(lines) diff --git a/backend/app/subgraphs/contact/nodes.py b/backend/app/subgraphs/contact/nodes.py index fcd0c29..71b0366 100644 --- a/backend/app/subgraphs/contact/nodes.py +++ b/backend/app/subgraphs/contact/nodes.py @@ -7,10 +7,9 @@ Contact Subgraph Nodes - Using Common Tools from typing import Dict, Any from datetime import datetime -# 公共工具 -from backend.app.core import MarkdownFormatter +from backend.app.core import get_formatter -from .state import ContactState +from .state import ContactState, ContactAction, Contact from .api_client import ContactAPIClient @@ -121,11 +120,12 @@ def create_contact_nodes(contact_api: ContactAPIClient): async def format_result(state: ContactState) -> ContactState: """ - 格式化结果节点(使用公共工具) + 格式化结果节点(使用全局 Formatter) """ state.current_phase = "formatting" - - md = MarkdownFormatter() + + fmt = get_formatter() + md = fmt.md output_lines = [] output_lines.append("┌───────────────────────────────────┐") diff --git a/backend/app/subgraphs/dictionary/nodes.py b/backend/app/subgraphs/dictionary/nodes.py index b28756c..71a51ec 100644 --- a/backend/app/subgraphs/dictionary/nodes.py +++ b/backend/app/subgraphs/dictionary/nodes.py @@ -7,10 +7,7 @@ from typing import Dict, Any, List from datetime import datetime import random -# 公共工具 -from backend.app.core import ( - MarkdownFormatter -) +from backend.app.core import get_formatter from .state import ( DictionaryState, @@ -172,12 +169,12 @@ def add_to_word_book(state: DictionaryState) -> DictionaryState: def format_result(state: DictionaryState) -> DictionaryState: """ - 格式化结果节点(使用公共工具) - 生成友好的 Markdown 输出 + 格式化结果节点(使用全局 Formatter) """ state.current_phase = "formatting" - - md = MarkdownFormatter() + + fmt = get_formatter() + md = fmt.md output_lines = [] # 标题 diff --git a/backend/app/subgraphs/news_analysis/nodes.py b/backend/app/subgraphs/news_analysis/nodes.py index 44a9b48..7765543 100644 --- a/backend/app/subgraphs/news_analysis/nodes.py +++ b/backend/app/subgraphs/news_analysis/nodes.py @@ -6,8 +6,7 @@ News Analysis Subgraph Nodes - Using Common Tools from typing import Dict, Any from datetime import datetime -# 公共工具 -from backend.app.core import MarkdownFormatter +from backend.app.core import get_formatter from .state import ( NewsAnalysisState, @@ -104,11 +103,12 @@ def generate_report(state: NewsAnalysisState) -> NewsAnalysisState: def format_result(state: NewsAnalysisState) -> NewsAnalysisState: """ - 格式化结果节点(使用公共工具) + 格式化结果节点(使用全局 Formatter) """ state.current_phase = "formatting" - - md = MarkdownFormatter() + + fmt = get_formatter() + md = fmt.md output_lines = [] output_lines.append("┌───────────────────────────────────┐") diff --git a/backend/app/templates/__init__.py b/backend/app/templates/__init__.py new file mode 100644 index 0000000..a1d11c1 --- /dev/null +++ b/backend/app/templates/__init__.py @@ -0,0 +1,9 @@ +""" +模板目录 - 存放可编辑的输出模板 +""" + +from pathlib import Path + +TEMPLATES_DIR = Path(__file__).parent + +__all__ = ["TEMPLATES_DIR"] diff --git a/backend/app/templates/conversation_summary.md b/backend/app/templates/conversation_summary.md new file mode 100644 index 0000000..19c0f82 --- /dev/null +++ b/backend/app/templates/conversation_summary.md @@ -0,0 +1,26 @@ +# 对话摘要 + +**时间**: {{ timestamp }} +{% if participants %} +**参与者**: {{ participants }} +{% endif %} + +--- + +## 📋 对话要点 + +{{ bullet_list(points) }} + +--- + +## 📝 总结 + +{{ summary }} + +--- + +{% if next_steps %} +## ➡️ 下一步 + +{{ bullet_list(next_steps) }} +{% endif %} diff --git a/backend/app/templates/error_notification.md b/backend/app/templates/error_notification.md new file mode 100644 index 0000000..2bf80d6 --- /dev/null +++ b/backend/app/templates/error_notification.md @@ -0,0 +1,21 @@ +## ⚠️ 操作失败 + +**错误类型**: {{ error_type }} + +{% if error_message %} +**错误详情**: {{ error_message }} +{% endif %} + +--- + +### 💡 建议操作 + +{{ suggestions }} + +--- + +{% if retry_count %} +> 已重试 {{ retry_count }} {% if max_retries %}/ 最多 {{ max_retries }} 次{% endif %} +{% endif %} + +*如果问题持续存在,请联系管理员或稍后重试* diff --git a/backend/app/templates/knowledge_summary.md b/backend/app/templates/knowledge_summary.md new file mode 100644 index 0000000..14bb213 --- /dev/null +++ b/backend/app/templates/knowledge_summary.md @@ -0,0 +1,48 @@ +# 📚 知识总结 + +**主题**: {{ topic }} +**生成时间**: {{ timestamp }} + +--- + +## 📋 内容概览 + +{{ summary }} + +--- + +{% if key_points %} +## 🔑 关键要点 + +{% for point in key_points %} +### {{ loop.index }}. {{ point.title }} + +{{ point.description }} +{% endfor %} +{% endif %} + +--- + +{% if table_data %} +## 📊 数据表格 + +{{ table }} +{% endif %} + +--- + +{% if sources %} +## 📖 参考来源 + +{% for source in sources %} +- [{{ source.title }}]({{ source.url }}) +{% endfor %} +{% endif %} + +--- + +{% if next_steps %} +## ➡️ 后续建议 + +{{ suggestions }} +{% endif %} diff --git a/backend/app/templates/tool_result.md b/backend/app/templates/tool_result.md new file mode 100644 index 0000000..1ebdb62 --- /dev/null +++ b/backend/app/templates/tool_result.md @@ -0,0 +1,24 @@ +# 工具执行结果 + +**工具**: {{ tool_name }} +**状态**: {{ status }} + +--- + +{{ content }} + +--- + +{% if metadata %} +### 📊 执行信息 + +| 项目 | 值 | +|------|-----| +{% for key, value in metadata.items() %} +| {{ key }} | {{ value }} | +{% endfor %} +{% endif %} + +{% if duration %} +*执行耗时: {{ duration }}ms* +{% endif %} diff --git a/backend/app/templates/web_search_result.md b/backend/app/templates/web_search_result.md new file mode 100644 index 0000000..c6c81b2 --- /dev/null +++ b/backend/app/templates/web_search_result.md @@ -0,0 +1,28 @@ +## 🔍 搜索结果 + +{% if query %} +**查询**: {{ query }} +{% endif %} + +{% if result_count %} +找到 {{ result_count }} 条相关结果 +{% endif %} + +--- + +{% for item in results %} +### [{{ loop.index }}] {{ item.title }} + +- **来源**: [{{ item.url }}]({{ item.url }}) +- **摘要**: {{ item.snippet }} +{% if item.source %} +- **来源网站**: {{ item.source }} +{% endif %} + +{% endfor %} + +--- + +{% if citation_note %} +{{ citation_note }} +{% endif %}