diff --git a/.env b/.env index 67989cb..c525021 100644 --- a/.env +++ b/.env @@ -1,33 +1,55 @@ # ============================================================================= -# 本地开发环境配置 -# 用于 python app/backend.py 和 streamlit run frontend/frontend.py +# Agent1 环境配置文件 +# 用法: cp .env.example .env 然后修改配置值 # ============================================================================= # ----------------------------------------------------------------------------- -# AI 模型 API 密钥 +# AI 模型 API 密钥(必需 - 请填入真实值) # ----------------------------------------------------------------------------- ZHIPUAI_API_KEY=4d568a4367f1442bbc226cc0daf84566.44SsKVWkVIM2Mkeg DEEPSEEK_API_KEY=sk-e74b13ac778f4b7eb29afa418a14421e -VLLM_LOCAL_KEY=token-abc123 +LLAMACPP_API_KEY=token-abc123 # ----------------------------------------------------------------------------- -# vLLM 服务配置 +# llama.cpp 服务配置 # ----------------------------------------------------------------------------- -# 本地开发时,vLLM 通常在 localhost 运行 -VLLM_BASE_URL=http://localhost:8000/v1 +# 主 LLM 服务 (Gemma-4-E2B GGUF) - 端口 8081 +VLLM_BASE_URL=http://localhost:8081/v1 + +# Embedding 服务 (embeddinggemma-300M GGUF) - 端口 8082 +VLLM_EMBEDDING_URL=http://localhost:8082/v1 + +# ----------------------------------------------------------------------------- +# Mem0 记忆层配置 +# ----------------------------------------------------------------------------- +# ⭐ 注意:Mem0 现在直接复用主 LLM 实例,无需单独配置 +# Qdrant 向量数据库地址(远程服务器) +QDRANT_URL=http://115.190.121.151:6333 +QDRANT_COLLECTION_NAME=mem0_user_memories # ----------------------------------------------------------------------------- # 数据库配置 # ----------------------------------------------------------------------------- -# 本地开发时,数据库在 localhost 运行 -DB_URI=postgresql://postgres:mysecretpassword@localhost:5432/langgraph_db?sslmode=disable +# PostgreSQL 连接字符串(远程服务器) +DB_URI=postgresql://postgres:mysecretpassword@115.190.121.151:5432/langgraph_db?sslmode=disable # ----------------------------------------------------------------------------- # 前端配置 # ----------------------------------------------------------------------------- -# 本地开发时,后端也在 localhost 运行 -API_URL=http://localhost:8001/chat +# 后端 API 地址(本地开发使用 8003 端口,避免与 vLLM 冲突) +API_URL=http://localhost:8003/chat -# 本地开发 - 显示所有调试信息 +# ----------------------------------------------------------------------------- +# 应用行为配置 +# ----------------------------------------------------------------------------- +# 记忆提取间隔:每 N 轮对话执行一次记忆提取 +MEMORY_SUMMARIZE_INTERVAL=10 + +# 是否启用 Graph 执行追踪(调试用) +ENABLE_GRAPH_TRACE=true + +# ----------------------------------------------------------------------------- +# 日志配置 +# ----------------------------------------------------------------------------- LOG_LEVEL=DEBUG -DEBUG=true \ No newline at end of file +DEBUG=true diff --git a/.env.docker b/.env.docker index 33e062c..cd5e5b7 100644 --- a/.env.docker +++ b/.env.docker @@ -4,32 +4,49 @@ # ============================================================================= # ----------------------------------------------------------------------------- -# AI 模型 API 密钥(必需 - 请修改为真实值) +# AI 模型 API 密钥(必需 - 请填入真实值) # ----------------------------------------------------------------------------- -ZHIPUAI_API_KEY=your_zhipuai_api_key_here +ZHIPUAI_API_KEY=your_api_key_here DEEPSEEK_API_KEY=your_deepseek_api_key_here -VLLM_LOCAL_KEY=token-abc123 +LLAMACPP_API_KEY=token-abc123 # ----------------------------------------------------------------------------- -# vLLM 服务配置 +# llama.cpp 服务配置 # ----------------------------------------------------------------------------- -# Docker 部署时,如果 vLLM 在宿主机运行,使用 FRP 穿透地址或宿主机 IP -# 如果 vLLM 也在 Docker 中,使用 Docker 服务名或容器 IP -VLLM_BASE_URL=http://115.190.121.151:18000/v1 +# 主 LLM 服务 (Gemma-4-E2B GGUF) - 端口 8081 +VLLM_BASE_URL=http://localhost:8081/v1 + +# Embedding 服务 (embeddinggemma-300M GGUF) - 端口 8082 +VLLM_EMBEDDING_URL=http://localhost:8082/v1 + +# ----------------------------------------------------------------------------- +# Mem0 记忆层配置 +# ----------------------------------------------------------------------------- +# ⭐ 注意:Mem0 现在直接复用主 LLM 实例,无需单独配置 +# Qdrant 向量数据库(远程服务器上的独立容器) +QDRANT_URL=http://115.190.121.151:6333 +QDRANT_COLLECTION_NAME=mem0_user_memories # ----------------------------------------------------------------------------- # 数据库配置 # ----------------------------------------------------------------------------- -# Docker Compose 内部网络,使用服务名 'postgres' -DB_URI=postgresql://postgres:mysecretpassword@postgres:5432/langgraph_db?sslmode=disable +# PostgreSQL 连接字符串(远程服务器上的独立容器) +DB_URI=postgresql://postgres:mysecretpassword@115.190.121.151:5432/langgraph_db?sslmode=disable # ----------------------------------------------------------------------------- -# 前端配置(通过 docker-compose.yml 注入,此处仅作文档说明) +# 前端配置 # ----------------------------------------------------------------------------- -# 注意:API_URL 在 docker-compose.yml 中已配置为 http://backend:8001/chat -# 本地无需设置,Docker 容器启动时会自动注入 -# API_URL=http://backend:8001/chat +# Docker Compose 内部网络,使用服务名 'backend' +API_URL=http://backend:8001/chat -# 生产环境 - 仅显示关键信息 +# ----------------------------------------------------------------------------- +# 应用行为配置 +# ----------------------------------------------------------------------------- +MEMORY_SUMMARIZE_INTERVAL=10 +ENABLE_GRAPH_TRACE=false + +# ----------------------------------------------------------------------------- +# 日志配置 +# ----------------------------------------------------------------------------- LOG_LEVEL=WARNING DEBUG=false \ No newline at end of file diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..8c9b53e --- /dev/null +++ b/.env.example @@ -0,0 +1,69 @@ +# ============================================================================= +# Agent1 环境配置模板 +# 用法: cp .env.example .env 然后修改配置值 +# ============================================================================= + +# ----------------------------------------------------------------------------- +# AI 模型 API 密钥(必需 - 请填入真实值) +# ----------------------------------------------------------------------------- +ZHIPUAI_API_KEY=your_api_key_here +DEEPSEEK_API_KEY=your_deepseek_api_key_here +LLAMACPP_API_KEY=token-abc123 + +# ----------------------------------------------------------------------------- +# llama.cpp 服务配置 +# ----------------------------------------------------------------------------- +# 主 LLM 服务 (Gemma-4-E2B GGUF) +# 本地开发: http://localhost:8081/v1 +# Docker 部署: 根据实际部署调整 +VLLM_BASE_URL=http://localhost:8081/v1 + +# Embedding 服务 (embeddinggemma-300M GGUF) +# 本地开发: http://localhost:8082/v1 +VLLM_EMBEDDING_URL=http://localhost:8082/v1 + +# ----------------------------------------------------------------------------- +# Mem0 记忆层配置 +# ----------------------------------------------------------------------------- +# ⭐ 注意:Mem0 现在直接复用主 LLM 实例,无需单独配置 +# Qdrant 向量数据库地址 +# 本地开发: http://localhost:6333 +# 远程服务器: http://115.190.121.151:6333 +# Docker Compose: http://qdrant:6333 +QDRANT_URL=http://115.190.121.151:6333 +QDRANT_COLLECTION_NAME=mem0_user_memories + +# ----------------------------------------------------------------------------- +# 数据库配置 +# ----------------------------------------------------------------------------- +# PostgreSQL 连接字符串 +# 本地开发: postgresql://postgres:mysecretpassword@localhost:5432/langgraph_db?sslmode=disable +# 远程服务器: postgresql://postgres:mysecretpassword@115.190.121.151:5432/langgraph_db?sslmode=disable +# Docker Compose: postgresql://postgres:mysecretpassword@postgres:5432/langgraph_db?sslmode=disable +DB_URI=postgresql://postgres:mysecretpassword@115.190.121.151:5432/langgraph_db?sslmode=disable + +# ----------------------------------------------------------------------------- +# 前端配置 +# ----------------------------------------------------------------------------- +# 后端 API 地址 +# 本地开发: http://localhost:8001/chat +# Docker Compose: http://backend:8001/chat +API_URL=http://localhost:8001/chat + +# ----------------------------------------------------------------------------- +# 应用行为配置 +# ----------------------------------------------------------------------------- +# 记忆提取间隔:每 N 轮对话执行一次记忆提取(默认 10) +MEMORY_SUMMARIZE_INTERVAL=10 + +# 是否启用 Graph 执行追踪(调试用,默认 true) +ENABLE_GRAPH_TRACE=true + +# ----------------------------------------------------------------------------- +# 日志配置 +# ----------------------------------------------------------------------------- +# 日志级别: DEBUG, INFO, WARNING, ERROR, CRITICAL +LOG_LEVEL=INFO + +# 是否启用调试模式(默认 false) +DEBUG=false diff --git a/QUICKSTART.md b/QUICKSTART.md index 9c80f8c..f378ac8 100644 --- a/QUICKSTART.md +++ b/QUICKSTART.md @@ -22,11 +22,13 @@ vim .env # 或使用你喜欢的编辑器 **必需配置项**: - `ZHIPUAI_API_KEY` - 智谱 AI API 密钥(从 [智谱开放平台](https://open.bigmodel.cn/) 获取) -- `VLLM_LOCAL_KEY` - 本地 vLLM 服务认证 Token(与 vLLM 容器的 `--api-key` 参数一致) +- `LLAMACPP_API_KEY` - llama.cpp 服务认证 Token(与容器启动参数一致,默认 `token-abc123`) **可选配置项**: -- `VLLM_BASE_URL` - vLLM 服务地址(默认已配置为 FRP 穿透地址) -- `DB_URI` - PostgreSQL 连接字符串(默认已配置,使用 Docker 服务名 `postgres`) +- `VLLM_BASE_URL` - LLM 服务地址(默认已配置为 `http://localhost:8081/v1`) +- `VLLM_EMBEDDING_URL` - Embedding 服务地址(默认已配置为 `http://localhost:8082/v1`) +- `DB_URI` - PostgreSQL 连接字符串(默认已配置,使用远程服务器地址) +- `QDRANT_URL` - Qdrant 向量数据库地址(默认已配置,使用远程服务器地址) **注意**:Docker Compose 部署时,`API_URL` 由 `docker-compose.yml` 自动注入,无需在 `.env` 中配置。 @@ -70,60 +72,51 @@ docker compose down #### 前置要求 - Python 3.10+ -- Docker(用于 PostgreSQL) -#### 1. 启动 PostgreSQL - -```bash -docker run -d \ - --name postgres-langgraph \ - -e POSTGRES_PASSWORD=mysecretpassword \ - -e POSTGRES_DB=langgraph_db \ - -p 5432:5432 \ - -v ~/docker_volumes/postgres_data:/var/lib/postgresql/data \ - postgres:16 -``` - -#### 2. 安装依赖 +#### 1. 安装依赖 ```bash pip install -r requirement.txt ``` -#### 3. 配置环境变量 +#### 2. 配置环境变量 复制并编辑 `.env` 文件: ``` -# 基于 Docker 模板创建,然后修改为本地配置 +# 基于 Docker 模板创建 cp .env.docker .env vim .env ``` **本地开发需要修改以下配置**: -```env +``env ZHIPUAI_API_KEY=your_api_key_here -VLLM_LOCAL_KEY=token-abc123 +LLAMACPP_API_KEY=token-abc123 -# 本地开发时,vLLM 和数据库都在 localhost -VLLM_BASE_URL=http://localhost:8000/v1 -DB_URI=postgresql://postgres:mysecretpassword@localhost:5432/langgraph_db?sslmode=disable +# 本地开发时,llama.cpp 服务在 localhost +VLLM_BASE_URL=http://localhost:8081/v1 +VLLM_EMBEDDING_URL=http://localhost:8082/v1 + +# 数据库和向量存储使用远程服务器 +DB_URI=postgresql://postgres:mysecretpassword@115.190.121.151:5432/langgraph_db?sslmode=disable +QDRANT_URL=http://115.190.121.151:6333 # 本地开发时,后端也在 localhost -API_URL=http://localhost:8001/chat +API_URL=http://localhost:8003/chat ``` -#### 4. 启动服务 +#### 3. 启动服务 **终端 1 - 后端:** ```bash -python backend.py +python app/backend.py ``` **终端 2 - 前端:** ```bash -streamlit run frontend.py +cd frontend && streamlit run app.py ``` 浏览器自动打开前端页面(如果配置了 Nginx,访问 `http://your-domain.com`;否则访问 http://localhost:8501) @@ -136,7 +129,7 @@ streamlit run frontend.py | 文件 | 用途 | |------|------| -| `docker-compose.yml` | 服务编排配置 | +| `docker-compose.yml` | 服务编排配置(仅包含 backend 和 frontend) | | `Dockerfile.backend` | 后端镜像构建 | | `Dockerfile.frontend` | 前端镜像构建 | | `.gitea/workflows/deploy.yml` | CI/CD 自动化部署 | @@ -145,49 +138,32 @@ streamlit run frontend.py ```yaml services: - postgres: # PostgreSQL 数据库 - backend: # FastAPI 后端服务 + backend: # FastAPI 后端服务(连接远程 PostgreSQL 和 Qdrant) frontend: # Streamlit 前端界面 ``` **特性:** -- ✅ PostgreSQL 健康检查,确保数据库就绪后才启动后端 -- ✅ 数据持久化到 Docker volume +- ✅ 通过环境变量连接远程 PostgreSQL 和 Qdrant - ✅ 自动重启策略(`restart: unless-stopped`) -- ✅ 内部网络隔离,外部无法直接访问数据库 +- ✅ 内部网络隔离 ### 只更新特定服务 ```bash -# 只重新构建后端(不影响数据库) +# 只重新构建后端 docker compose up -d --build backend # 只重新启动前端 docker compose up -d frontend ``` -### 数据持久化 - -PostgreSQL 数据存储在命名 volume `pg_data` 中: - -```bash -# 查看 volume -docker volume ls | grep pg_data - -# 备份数据 -docker run --rm -v pg_data:/data -v $(pwd):/backup alpine tar czf /backup/pg_backup.tar.gz /data - -# 恢复数据 -docker run --rm -v pg_data:/data -v $(pwd):/backup alpine tar xzf /backup/pg_backup.tar.gz -C / -``` - --- ## 🔧 开发指南 ### 添加新工具 -在 `tools.py` 中添加: +在 `app/tools.py` 中添加: ```python @tool @@ -209,7 +185,7 @@ def my_new_tool(param: str) -> str: ### 添加新模型 -在 `agent.py` 中: +在 `app/agent.py` 中: ```python def _create_new_model_llm(self): @@ -227,7 +203,7 @@ model_configs = { } ``` -在前端 `frontend.py` 中添加选项: +在前端 `frontend/app.py` 中添加选项: ```python MODEL_OPTIONS = { @@ -246,11 +222,8 @@ docker compose exec backend bash # 查看实时日志 docker compose logs -f backend -# 检查数据库连接 -docker compose exec postgres psql -U postgres -d langgraph_db -c "\dt" - # 测试后端 API -curl http://localhost:8001/ +curl http://localhost:8001/health ``` --- @@ -267,11 +240,10 @@ curl http://localhost:8001/ **部署流程:** 1. 检出代码 -2. 安装 Python 依赖(验证用) -3. 准备环境变量 -4. 重新构建并启动前后端(不影响数据库) -5. 健康检查(等待后端就绪) -6. 清理无用 Docker 资源 +2. 准备环境变量 +3. 重新构建并启动前后端(不影响远程数据库) +4. 健康检查(等待后端就绪) +5. 清理无用 Docker 资源 **配置 Secrets:** @@ -285,23 +257,20 @@ curl http://localhost:8001/ ### 常见问题 -#### 1. PostgreSQL 连接失败 +#### 1. 无法连接远程数据库 ```bash -# 检查容器状态 -docker compose ps postgres +# 测试 PostgreSQL 连接 +psql -h 115.190.121.151 -U postgres -d langgraph_db -c "SELECT version();" -# 查看日志 -docker compose logs postgres - -# 测试连接 -docker compose exec postgres pg_isready -U postgres +# 测试 Qdrant 连接 +curl http://115.190.121.151:6333/collections ``` **解决方案:** -- 确认容器正在运行 -- 检查密码是否正确 -- 等待健康检查通过(约 10-30 秒) +- 确认远程服务器防火墙开放了 5432 和 6333 端口 +- 检查网络连接是否正常 +- 验证用户名和密码是否正确 #### 2. 后端启动失败 @@ -318,71 +287,32 @@ lsof -i :8001 - 端口 8001 被占用 - 依赖包缺失 -#### 3. 前端无法连接后端(NameResolutionError) +#### 3. 前端无法连接后端 **错误信息:** ``` -HTTPConnectionPool(host='backend', port=8001): Max retries exceeded with url: /chat -(Caused by NameResolutionError("HTTPConnection(host='backend', port=8001): Failed to resolve 'backend'")) +HTTPConnectionPool(host='backend', port=8001): Max retries exceeded ``` -**原因分析:** -- 前端容器和后端容器不在同一个 Docker 网络中 -- docker-compose.yml 中的服务名配置错误 -- 环境变量 `API_URL` 配置不正确 - **解决方案:** 1. **检查容器是否在同一网络中:** ```bash -# 查看所有 Docker 网络 -docker network ls - -# 检查 ai-network 网络中的容器 docker network inspect docker_ai-network ``` -2. **确认服务名正确:** +2. **验证环境变量配置:** ```bash -# 查看运行中的容器 -docker compose ps - -# 应该看到:ai-backend, ai-frontend, ai-postgres -``` - -3. **验证环境变量配置:** -```bash -# 进入前端容器检查环境变量 docker compose exec frontend env | grep API_URL - -# 应该输出:API_URL=http://backend:8001/chat ``` -4. **重启服务:** +3. **重启服务:** ```bash -# 完全停止并重新启动所有服务 docker compose down docker compose up -d --build - -# 查看启动日志 docker compose logs -f ``` -5. **测试网络连通性:** -```bash -# 从前端容器 ping 后端服务 -docker compose exec frontend ping backend - -# 从前端容器访问后端 API -docker compose exec frontend curl http://backend:8001/health -``` - -**重要提示:** -- Docker Compose 会自动创建名为 `<项目目录>_ai-network` 的网络 -- 容器间通过**服务名**(而非容器名)进行通信 -- 在 `docker-compose.yml` 中,服务名是 `backend`、`frontend`、`postgres` -- 确保所有服务都连接到同一个自定义网络(`ai-network`) - #### 4. 模型初始化失败 ```bash @@ -405,36 +335,21 @@ docker compose logs backend | grep -i "model\|error" 1. **检查 .env 文件格式:** ```bash -# 确保文件末尾没有多余字符(如 EOF) cat -A .env - -# 正确格式应该是每行一个变量,无多余空格或特殊字符 ``` 2. **验证环境变量已加载:** ```bash -# 检查后端容器的环境变量 docker compose exec backend env | grep ZHIPUAI_API_KEY - -# 检查前端容器的环境变量 docker compose exec frontend env | grep API_URL ``` 3. **重新构建容器:** ```bash -# 修改 .env 后需要重新创建容器 docker compose down docker compose up -d --build ``` -4. **确认 .env 文件位置:** -```bash -# .env 文件应该在项目根目录(与 docker-compose.yml 的父目录同级) -ls -la .env - -# docker-compose.yml 中使用了 context: .. ,所以 .env 应该在上一级目录 -``` - --- ## 📊 监控和维护 @@ -468,11 +383,11 @@ docker compose logs -f backend frontend ### 备份和恢复 ```bash -# 备份数据库 -docker compose exec postgres pg_dump -U postgres langgraph_db > backup.sql +# 备份远程数据库 +pg_dump -h 115.190.121.151 -U postgres langgraph_db > backup.sql # 恢复数据库 -cat backup.sql | docker compose exec -T postgres psql -U postgres langgraph_db +cat backup.sql | psql -h 115.190.121.151 -U postgres langgraph_db ``` --- @@ -491,14 +406,13 @@ cat backup.sql | docker compose exec -T postgres psql -U postgres langgraph_db - 启用 HTTPS - 配置日志轮转 - 设置资源限制(CPU、内存) -- 定期备份数据库 +- 定期备份远程数据库 --- ## 📞 获取帮助 - **完整文档**: [README.md](README.md) -- **RAG 示例**: `rag_example.py` - **报告问题**: 提交 Issue 并附上日志 --- diff --git a/README.md b/README.md index 07cb91c..c987327 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ - ✅ **高可用架构**:模型自动降级,确保服务稳定 - ✅ **前后端分离**:FastAPI 后端 + Streamlit 前端 - ✅ **Docker 部署**:一键启动所有服务 +- ✅ **远程服务架构**:PostgreSQL 和 Qdrant 部署在远程服务器 --- @@ -30,13 +31,13 @@ | 层级 | 技术选型 | 说明 | |------|---------|------| -| **LLM 服务** | 智谱 AI API / vLLM (Gemma-4) | 云端 API 或本地推理 | -| **Embedding** | 智谱 Embedding API | 向量嵌入(无需 PyTorch) | +| **LLM 服务** | 智谱 AI API / llama.cpp (Gemma-4 GGUF) | 云端 API 或本地推理 | +| **Embedding** | llama.cpp (embeddinggemma-300M GGUF) | 本地向量嵌入服务 | | **Agent 框架** | LangGraph + LangChain | 工作流编排 | -| **向量数据库** | ChromaDB / pgvector | RAG 知识检索 | +| **向量数据库** | Qdrant | RAG 知识检索(远程服务器) | | **后端框架** | FastAPI + Uvicorn | RESTful API + WebSocket | | **前端框架** | Streamlit | 交互式 Web 界面 | -| **数据库** | PostgreSQL 16 | 对话记忆持久化 | +| **数据库** | PostgreSQL 16 | 对话记忆持久化(远程服务器) | | **容器化** | Docker + Docker Compose | 服务编排 | ### 架构图 @@ -63,34 +64,52 @@ │ │ - Weather │ │ │ │ - File IO │ │ │ │ - Web Scrap│ │ -│ │ - RAG │ │ +│ │ - Memory │ │ │ └────────────┘ │ └────────┬─────────┘ │ - ┌────┴────┐ - ↓ ↓ -┌────────┐ ┌──────────┐ -│PostgreSQL│ │ChromaDB │ -│(记忆存储)│ │(向量检索)│ -└────────┘ └──────────┘ + ┌────┴────────────────────┐ + ↓ ↓ +┌──────────────┐ ┌──────────────┐ +│ PostgreSQL │ │ Qdrant │ +│ (远程服务器) │ │ (远程服务器) │ +│ 115.190... │ │ 115.190... │ +└──────────────┘ └──────────────┘ ``` ### 项目结构 ``` Agent1/ -├── agent.py # Agent 服务核心(多模型管理) -├── graph_builder.py # LangGraph 状态图构建器 -├── tools.py # 工具函数定义(@tool 装饰器) -├── backend.py # FastAPI 后端应用 -├── frontend.py # Streamlit 前端界面 -├── rag_example.py # RAG 实现示例(无 PyTorch) -├── docker-compose.yml # Docker 服务编排 -├── Dockerfile.backend # 后端镜像构建 -├── Dockerfile.frontend # 前端镜像构建 -├── requirement.txt # Python 依赖 -├── .env # 环境变量配置 -└── user_docs/ # 用户文档目录 +├── app/ +│ ├── __init__.py +│ ├── config.py # 配置管理 +│ ├── state.py # 状态定义 +│ ├── prompts.py # 提示模板 +│ ├── logger.py # 日志工具 +│ ├── tools.py # 工具函数定义 +│ ├── memory/ +│ │ ├── __init__.py +│ │ └── mem0_client.py # Mem0 客户端封装 +│ ├── nodes/ +│ │ ├── __init__.py +│ │ ├── router.py # 路由决策 +│ │ ├── llm_call.py # LLM 调用节点 +│ │ ├── tool_call.py # 工具执行节点 +│ │ ├── retrieve_memory.py # 记忆检索节点 +│ │ └── summarize.py # 记忆存储节点 +│ ├── graph_builder.py # LangGraph 图构建器 +│ ├── agent.py # Agent 服务核心 +│ └── backend.py # FastAPI 后端应用 +├── frontend/ +│ └── app.py # Streamlit 前端界面 +├── docker/ +│ ├── docker-compose.yml # Docker 服务编排 +│ ├── Dockerfile.backend # 后端镜像构建 +│ └── Dockerfile.frontend # 前端镜像构建 +├── requirement.txt # Python 依赖 +├── .env # 环境变量配置 +└── user_docs/ # 用户文档目录 ├── a.txt ├── b.pdf └── c.xlsx @@ -104,9 +123,9 @@ Agent1/ ### 方式一:Docker Compose(推荐) -``` +```bash # 1. 配置环境变量 -cp .env.example .env +cp .env.docker .env # 编辑 .env 文件,填入真实的 API Key # 2. 启动所有服务 @@ -121,21 +140,19 @@ docker compose -f docker/docker-compose.yml up -d --build ### 方式二:本地开发模式 -``` -# 1. 启动 PostgreSQL -docker run -d --name postgres-langgraph \ - -e POSTGRES_PASSWORD=mysecretpassword \ - -e POSTGRES_DB=langgraph_db \ - -p 5432:5432 postgres:16 - -# 2. 安装依赖 +```bash +# 1. 安装依赖 pip install -r requirement.txt +# 2. 配置环境变量 +cp .env.docker .env +# 编辑 .env,根据本地/远程环境调整配置 + # 3. 启动后端 -python backend.py +python app/backend.py # 4. 启动前端(新终端) -streamlit run frontend.py +cd frontend && streamlit run app.py ``` --- @@ -161,7 +178,7 @@ streamlit run frontend.py | 📑 解析 PDF | "总结 b.pdf 的主要内容" | | 📊 Excel 数据 | "显示 c.xlsx 的数据" | | 🌐 网页抓取 | "抓取 https://example.com 的内容" | -| 🔍 知识库检索 | "根据知识库回答:XXX" | +| 🔍 长期记忆 | "记住我喜欢吃川菜" → "我有什么饮食偏好?" | ### 多模型切换 @@ -179,9 +196,9 @@ streamlit run frontend.py ### 添加新工具 -在 `tools.py` 中添加新的 `@tool` 装饰函数: +在 `app/tools.py` 中添加新的 `@tool` 装饰函数: -``` +```python @tool def my_new_tool(param: str) -> str: """ @@ -201,9 +218,9 @@ def my_new_tool(param: str) -> str: ### 添加新模型 -在 `agent.py` 的 `initialize()` 方法中添加模型配置: +在 `app/agent.py` 的 `initialize()` 方法中添加模型配置: -``` +```python model_configs = { "zhipu": self._create_zhipu_llm, "local": self._create_local_llm, @@ -215,7 +232,7 @@ model_configs = { 项目包含完整的 Docker 配置: -- **docker-compose.yml**:服务编排(PostgreSQL + Backend + Frontend) +- **docker-compose.yml**:服务编排(Backend + Frontend,连接远程数据库) - **Dockerfile.backend**:后端镜像构建 - **Dockerfile.frontend**:前端镜像构建 - **.gitea/workflows/deploy.yml**:CI/CD 自动化部署 @@ -228,54 +245,62 @@ model_configs = { ### 配置文件说明 -项目使用两个环境配置文件: +项目采用三层环境配置文件体系: | 文件 | 用途 | 是否提交 Git | |------|------|------------| +| `.env.example` | 配置模板 | ✅ 是 | | `.env` | 实际使用的配置 | ❌ 否(已忽略) | | `.env.docker` | Docker 部署模板 | ✅ 是 | **使用方法:** -- **本地开发**:手动创建 `.env`,配置 `localhost` 相关地址 -- **Docker 部署**:`cp .env.docker .env`,然后修改 API Key +- **本地开发**:`cp .env.example .env`,修改为 localhost 相关地址 +- **Docker 部署**:`cp .env.docker .env`,使用远程服务器地址 ### 必需的环境变量 -代码中所有使用 `os.getenv()` 的地方都必须在 `.env` 文件中定义: - | 变量名 | 说明 | 本地开发示例 | Docker 部署示例 | |--------|------|------------|----------------| | `ZHIPUAI_API_KEY` | 智谱 AI API 密钥 | `your_key_here` | `your_key_here` | -| `VLLM_LOCAL_KEY` | vLLM 认证 Token | `token-abc123` | `token-abc123` | -| `VLLM_BASE_URL` | vLLM 服务地址 | `http://localhost:8000/v1` | `http://115.190.121.151:18000/v1` | -| `DB_URI` | PostgreSQL 连接字符串 | `postgresql://...@localhost:5432/...` | `postgresql://...@postgres:5432/...` | -| `API_URL` | 后端 API 地址 | `http://localhost:8001/chat` | (由 docker-compose.yml 注入) | +| `DEEPSEEK_API_KEY` | DeepSeek API 密钥 | `your_key_here` | `your_key_here` | +| `LLAMACPP_API_KEY` | llama.cpp 认证 Token | `token-abc123` | `token-abc123` | +| `VLLM_BASE_URL` | LLM 服务地址 | `http://localhost:8081/v1` | `http://localhost:8081/v1` | +| `VLLM_EMBEDDING_URL` | Embedding 服务地址 | `http://localhost:8082/v1` | `http://localhost:8082/v1` | +| `QDRANT_URL` | Qdrant 地址 | `http://115.190.121.151:6333` | `http://115.190.121.151:6333` | +| `DB_URI` | PostgreSQL 连接字符串 | `postgresql://...@115.190.121.151:5432/...` | `postgresql://...@115.190.121.151:5432/...` | +| `API_URL` | 后端 API 地址 | `http://localhost:8003/chat` | (由 docker-compose.yml 注入) | ### 配置示例 #### 本地开发 (.env) -```bash +``` ZHIPUAI_API_KEY=your_api_key_here -VLLM_LOCAL_KEY=token-abc123 -VLLM_BASE_URL=http://localhost:8000/v1 -DB_URI=postgresql://postgres:mysecretpassword@localhost:5432/langgraph_db?sslmode=disable -API_URL=http://localhost:8001/chat +DEEPSEEK_API_KEY=your_deepseek_api_key_here +LLAMACPP_API_KEY=token-abc123 +VLLM_BASE_URL=http://localhost:8081/v1 +VLLM_EMBEDDING_URL=http://localhost:8082/v1 +QDRANT_URL=http://115.190.121.151:6333 +DB_URI=postgresql://postgres:mysecretpassword@115.190.121.151:5432/langgraph_db?sslmode=disable +API_URL=http://localhost:8003/chat ``` #### Docker 部署 (.env.docker) -```bash +``` ZHIPUAI_API_KEY=your_api_key_here -VLLM_LOCAL_KEY=token-abc123 -VLLM_BASE_URL=http://115.190.121.151:18000/v1 -DB_URI=postgresql://postgres:mysecretpassword@postgres:5432/langgraph_db?sslmode=disable -# API_URL 在 docker-compose.yml 中配置为 http://backend:8001/chat +DEEPSEEK_API_KEY=your_deepseek_api_key_here +LLAMACPP_API_KEY=token-abc123 +VLLM_BASE_URL=http://localhost:8081/v1 +VLLM_EMBEDDING_URL=http://localhost:8082/v1 +QDRANT_URL=http://115.190.121.151:6333 +DB_URI=postgresql://postgres:mysecretpassword@115.190.121.151:5432/langgraph_db?sslmode=disable +# API_URL 在 docker-compose.yml 中配置为 http://backend:8003/chat ``` ### 注意事项 - ⚠️ **不要硬编码敏感信息**:所有 API Key 必须通过环境变量配置 -- ⚠️ **Docker 网络差异**:容器内使用服务名(如 `postgres`、`backend`),本地使用 `localhost` +- ⚠️ **远程服务依赖**:确保可以访问远程 PostgreSQL (115.190.121.151:5432) 和 Qdrant (115.190.121.151:6333) - ⚠️ **修改后重启**:修改 `.env` 后,Docker 部署需要执行 `docker compose down && docker compose up -d --build` --- @@ -284,13 +309,13 @@ DB_URI=postgresql://postgres:mysecretpassword@postgres:5432/langgraph_db?sslmode ### 常见问题 -**Q: 无法连接 PostgreSQL?** +**Q: 无法连接远程数据库?** ```bash -# 检查容器状态 -docker ps | grep postgres +# 测试 PostgreSQL +psql -h 115.190.121.151 -U postgres -d langgraph_db -c "SELECT version();" -# 查看日志 -docker logs postgres-langgraph +# 测试 Qdrant +curl http://115.190.121.151:6333/collections ``` **Q: 后端启动失败?** diff --git a/app/agent.py b/app/agent.py index 7a0e99d..9aad623 100644 --- a/app/agent.py +++ b/app/agent.py @@ -6,16 +6,15 @@ AI Agent 服务类 - 支持多模型动态切换 import os from dotenv import load_dotenv from langchain_community.chat_models import ChatZhipuAI -from langchain_core.messages import HumanMessage from langchain_openai import ChatOpenAI from pydantic import SecretStr +from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver # 本地模块 from app.graph_builder import GraphBuilder, GraphContext from app.tools import AVAILABLE_TOOLS, TOOLS_BY_NAME from app.logger import debug, info, warning, error -from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver -from langgraph.store.postgres.aio import AsyncPostgresStore + load_dotenv() @@ -23,15 +22,13 @@ load_dotenv() class AIAgentService: """异步 AI Agent 服务,支持多模型动态切换,使用外部传入的 checkpointer""" - def __init__(self, checkpointer: AsyncPostgresSaver, store: AsyncPostgresStore): + def __init__(self, checkpointer: AsyncPostgresSaver): """ 初始化服务 Args: checkpointer: 已经初始化的 AsyncPostgresSaver 实例 - store: 已经初始化的 AsyncPostgresStore 实例 """ self.checkpointer = checkpointer - self.store = store self.graphs = {} # 存储不同模型对应的 graph 实例 def _create_zhipu_llm(self): @@ -68,19 +65,19 @@ class AIAgentService: # vLLM 服务地址:优先从环境变量读取,适配 Docker、FRP 穿透和本地开发 vllm_base_url = os.getenv( "VLLM_BASE_URL", - "http://115.190.121.151:18000/v1" + "http://localhost:8081/v1" ) return ChatOpenAI( base_url=vllm_base_url, - api_key=SecretStr(os.getenv("VLLM_LOCAL_KEY", "")), + api_key=SecretStr(os.getenv("LLAMACPP_API_KEY", "token-abc123")), model="gemma-4-E2B-it", timeout=60.0, # 请求超时时间(秒) max_retries=2, # 失败后自动重试次数 ) async def initialize(self): - """预编译所有模型的 graph(使用传入的 checkpointer 和 store)""" + """预编译所有模型的 graph(使用传入的 checkpointer)""" model_configs = { "zhipu": self._create_zhipu_llm, "deepseek": self._create_deepseek_llm, @@ -92,7 +89,7 @@ class AIAgentService: info(f"🔄 正在初始化模型 '{model_name}'...") llm = llm_creator() builder = GraphBuilder(llm, AVAILABLE_TOOLS, TOOLS_BY_NAME).build() - graph = builder.compile(checkpointer=self.checkpointer, store=self.store) + graph = builder.compile(checkpointer=self.checkpointer) self.graphs[model_name] = graph info(f"✅ 模型 '{model_name}' 初始化成功") except Exception as e: @@ -121,14 +118,27 @@ class AIAgentService: "elapsed_time": float # 调用耗时(秒) } """ + # 尝试使用指定模型,如果不可用则循环尝试其他模型 if model not in self.graphs: - fallback_model = next(iter(self.graphs.keys())) - warning(f"警告: 模型 '{model}' 不可用,已切换到 '{fallback_model}'") - model = fallback_model + warning(f"警告: 模型 '{model}' 不可用,尝试切换到其他可用模型") + found = False + for available_model in self.graphs.keys(): + try: + # 这里可以添加额外的模型可用性检查逻辑 + model = available_model + found = True + info(f"已切换到可用模型: '{model}'") + break + except Exception as e: + warning(f"模型 '{available_model}' 也不可用: {str(e)}") + continue + + if not found: + raise RuntimeError(f"错误: 没有任何可用的模型。当前注册的模型: {list(self.graphs.keys())}") graph = self.graphs[model] config = {"configurable": {"thread_id": thread_id}} - input_state = {"messages": [HumanMessage(content=message)]} + input_state = {"messages": [{"role": "user", "content": message}]} context = GraphContext(user_id=user_id) result = await graph.ainvoke(input_state, config=config, context=context) diff --git a/app/backend.py b/app/backend.py index 8ddf33c..fa7fdd5 100644 --- a/app/backend.py +++ b/app/backend.py @@ -12,7 +12,6 @@ from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Depe from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver -from langgraph.store.postgres.aio import AsyncPostgresStore from app.agent import AIAgentService from app.logger import debug, info, warning, error @@ -23,23 +22,19 @@ load_dotenv() # 优先级:环境变量 DB_URI > Docker 内部服务名 > 本地开发地址 DB_URI = os.getenv( "DB_URI", - "postgresql://postgres:mysecretpassword@localhost:5432/langgraph_db?sslmode=disable" + "postgresql://postgres:mysecretpassword@ai-postgres:5432/langgraph_db?sslmode=disable" ) @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理:创建并注入全局服务""" - # 1. 创建数据库连接池并初始化表 - async with ( - AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer, - AsyncPostgresStore.from_conn_string(DB_URI) as store - ): + # 1. 创建数据库连接池并初始化表(仅 checkpointer) + async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer: await checkpointer.setup() - await store.setup() # 2. 构建 AI Agent 服务 - agent_service = AIAgentService(checkpointer,store) + agent_service = AIAgentService(checkpointer) await agent_service.initialize() # 3. 将服务实例存入 app.state @@ -155,4 +150,6 @@ async def websocket_endpoint( if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8001) \ No newline at end of file + # 使用环境变量或默认端口 8003(避免与 vLLM 的 8001 端口冲突) + port = int(os.getenv("BACKEND_PORT", "8003")) + uvicorn.run(app, host="0.0.0.0", port=port) diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..1c72e8e --- /dev/null +++ b/app/config.py @@ -0,0 +1,23 @@ +""" +环境变量集中管理模块 +所有配置项统一定义,避免散落在各个文件中 +""" + +import os + + +# ========== Graph 执行追踪配置 ========== +# 是否启用 Graph 流转追踪(通过环境变量控制) +ENABLE_GRAPH_TRACE = os.getenv("ENABLE_GRAPH_TRACE", "true").lower() == "true" + +# ========== 记忆提取配置 ========== +# 记忆提取间隔:每 N 轮对话生成一次摘要 +MEMORY_SUMMARIZE_INTERVAL = int(os.getenv("MEMORY_SUMMARIZE_INTERVAL", "10")) + +# ========== Mem0 记忆层配置 ========== +# Qdrant 向量数据库地址 +QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333") +QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME", "mem0_user_memories") + +# vLLM Embedding 服务地址 (用于 Mem0 的向量化) +VLLM_EMBEDDING_URL = os.getenv("VLLM_EMBEDDING_URL", "http://localhost:8082/v1") diff --git a/app/graph_builder.py b/app/graph_builder.py index 4708156..cc72d84 100644 --- a/app/graph_builder.py +++ b/app/graph_builder.py @@ -1,95 +1,27 @@ """ -LangGraph 状态图构建模块 - 完全面向对象风格,无嵌套函数 +LangGraph 状态图构建模块 - 精简版,仅负责组装图 +所有节点逻辑已拆分到独立模块 """ -import operator -import asyncio -import time -import os -from typing import Literal, Annotated, Any from langchain_core.language_models import BaseLLM -from langchain_core.messages import AnyMessage, AIMessage, ToolMessage, SystemMessage -from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder -from langchain_core.runnables import RunnableLambda from langgraph.graph import StateGraph, START, END -from typing_extensions import TypedDict -from langgraph.store.postgres.aio import AsyncPostgresStore -from langgraph.runtime import Runtime -from dataclasses import dataclass -import uuid -from langchain_core.prompt_values import ChatPromptValue + # 本地模块 -from app.logger import debug, info, warning, error +from app.state import MessagesState, GraphContext +from app.nodes import ( + create_llm_call_node, + create_tool_call_node, + create_retrieve_memory_node, + create_summarize_node, + should_continue +) +from app.memory import Mem0Client -# 是否启用 Graph 流转追踪(通过环境变量控制) -ENABLE_GRAPH_TRACE = os.getenv("ENABLE_GRAPH_TRACE", "true").lower() == "true" - - -class MessagesState(TypedDict): - """对话状态类型定义""" - messages: Annotated[list[AnyMessage], operator.add] - llm_calls: int - memory_context:str - last_token_usage: dict # 本次调用的 token 使用详情 - last_elapsed_time: float # 本次调用耗时(秒) - -@dataclass -class GraphContext: - user_id: str - # 可扩展更多上下文信息 - -def _log_state_change(node_name: str, state: MessagesState, prefix: str = "进入"): - """ - 通用辅助函数:打印节点状态变化 - - Args: - node_name: 节点名称 - state: 当前状态 - prefix: 前缀("进入" 或 "离开") - """ - if not ENABLE_GRAPH_TRACE: - return - - messages = state.get("messages", []) - msg_count = len(messages) - last_msg = messages[-1] if messages else None - last_info = "" - if last_msg: - content_preview = str(last_msg.content)[:100].replace("\n", " ") - last_info = f"{last_msg.type.upper()}: {content_preview}" - info(f"🔄 [{node_name}] {prefix} | 消息数:{msg_count} | 最后一条:{last_info}") - -def _print_llm_input(prompt_value: ChatPromptValue) -> ChatPromptValue: - """ - RunnableLambda 回调函数:打印格式化后发送给 LLM 的完整消息 - - Args: - prompt_value: ChatPromptValue 对象,包含格式化后的消息列表 - - Returns: - 原样返回 prompt_value,不影响链式调用 - """ - if not ENABLE_GRAPH_TRACE: - return prompt_value - - messages = prompt_value.messages # ChatPromptValue 提供 .messages 属性 - - debug("\n" + "=" * 80) - debug("📤 [LLM输入] 格式化后发送给大模型的完整消息:") - debug(f" 总消息数: {len(messages)}") - debug("-" * 80) - for i, msg in enumerate(messages): - content_preview = str(msg.content) # 完整输出 - debug(f" [{i}] {msg.type.upper():10s}: {content_preview}") - debug( "\n"+"=" * 80 + "\n") - - return prompt_value - class GraphBuilder: - """LangGraph 状态图构建器 - 所有节点均为类方法""" + """LangGraph 状态图构建器 - 仅负责组装图""" - def __init__(self, llm: BaseLLM, tools: list, tools_by_name: dict[str, Any]): + def __init__(self, llm: BaseLLM, tools: list, tools_by_name: dict): """ 初始化构建器 @@ -101,304 +33,44 @@ class GraphBuilder: self.llm = llm self.tools = tools self.tools_by_name = tools_by_name - self._llm_with_tools = llm.bind_tools(tools) - self._prompt = self._create_prompt() - self._chain = ( - self._prompt - | RunnableLambda(_print_llm_input) - | self._llm_with_tools - ) - - @staticmethod - def _create_prompt() -> ChatPromptTemplate: - """创建系统提示模板(静态方法,无需访问实例)""" - system_template = ( - "你是一个个人生活助手和数据分析助手,请使用中文交流。\n\n" - "【用户背景信息】\n" - "以下是对当前用户的已知信息和长期记忆,你必须优先采纳并在回答中体现:\n" - "{memory_context}\n" - "若包含姓名、偏好等个人信息,请自然融入回应(例如称呼名字、提及偏好)。\n\n" - "【可用工具与使用规则】\n" - "- 获取温度/天气:`get_current_temperature`\n" - "- 读取文本文件:`read_local_file`(限定目录 `./user_docs`)\n" - "- 读取PDF摘要:`read_pdf_summary`(限定目录 `./user_docs`)\n" - "- 读取Excel表格:`read_excel_as_markdown`(限定目录 `./user_docs`)\n" - "- 抓取网页内容:`fetch_webpage_content`\n" - "工具调用时请直接返回所需参数,无需额外说明。\n\n" - "【回答要求(必须遵守)】\n" - "1. 回答必须简洁、直接,禁止描述任何思考过程或内心活动。\n" - "2. 优先利用已知用户信息进行个性化回复。\n" - "3. 若无信息可依,礼貌询问或提供通用帮助。" - ) - return ChatPromptTemplate.from_messages([ - ("system", system_template), - MessagesPlaceholder(variable_name="messages") - ]) - - async def call_llm(self, state: MessagesState, runtime: Runtime[GraphContext]) -> dict: - """ - LLM 调用节点(异步方法) - 注意:因为 self._chain.invoke 是同步方法,使用 run_in_executor 避免阻塞事件循环 - """ - _log_state_change("llm_call", state, "进入") - memory_context = state.get("memory_context", "暂无用户信息") - loop = asyncio.get_event_loop() - start_time = time.time() - - try: - response = await loop.run_in_executor( - None, - lambda: self._chain.invoke({ - "messages": state["messages"], - "memory_context": memory_context - }) - ) - - elapsed_time = time.time() - start_time - - # 提取 token 用量(兼容不同 LLM 提供商的元数据格式) - token_usage = {} - input_tokens = 0 - output_tokens = 0 - - # 尝试从 response_metadata 中提取 - if hasattr(response, 'response_metadata') and response.response_metadata: - meta = response.response_metadata - if 'token_usage' in meta: - token_usage = meta['token_usage'] - elif 'usage' in meta: - token_usage = meta['usage'] - - # 尝试从 additional_kwargs 中提取 - if not token_usage and hasattr(response, 'additional_kwargs'): - add_kwargs = response.additional_kwargs - if 'llm_output' in add_kwargs and 'token_usage' in add_kwargs['llm_output']: - token_usage = add_kwargs['llm_output']['token_usage'] - - # 提取具体的 token 数值 - if token_usage: - input_tokens = token_usage.get('prompt_tokens', token_usage.get('input_tokens', 0)) - output_tokens = token_usage.get('completion_tokens', token_usage.get('output_tokens', 0)) - - # 打印响应统计信息 - info(f"⏱️ [LLM统计] 调用耗时: {elapsed_time:.2f}秒") - info(f"📊 [LLM统计] Token用量: 输入={input_tokens}, 输出={output_tokens}, 总计={input_tokens + output_tokens}") - if token_usage: - debug(f"📋 [LLM统计] 详细用量: {token_usage}") - - # 打印 LLM 的完整输出 - debug("\n" + "="*80) - debug("📥 [LLM输出] 大模型返回的完整响应:") - debug(f" 消息类型: {response.type.upper()}") - debug(f" 内容长度: {len(str(response.content))} 字符") - debug("-"*80) - debug(f"{response.content}") - debug("="*80 + "\n") - - result = { - "messages": [response], - "llm_calls": state.get('llm_calls', 0) + 1, - "last_token_usage": token_usage, - "last_elapsed_time": elapsed_time - } - - _log_state_change("llm_call", {**state, **result}, "离开") - return result - - except Exception as e: - elapsed_time = time.time() - start_time - error(f"\n❌ [LLM错误] 调用失败 (耗时: {elapsed_time:.2f}秒)") - error(f" 错误类型: {type(e).__name__}") - error(f" 错误信息: {str(e)}") - import traceback - traceback.print_exc() - debug("="*80 + "\n") - - # 返回一个友好的错误消息 - error_response = AIMessage( - content="抱歉,模型暂时无法响应,可能是网络超时或服务繁忙,请稍后再试。" - ) - error_result = { - "messages": [error_response], - "llm_calls": state.get('llm_calls', 0), - "last_token_usage": {}, - "last_elapsed_time": elapsed_time - } - - _log_state_change("llm_call", state, "离开(异常)") - return error_result - - async def call_tools(self, state: MessagesState, runtime: Runtime[GraphContext]) -> dict: - """ - 工具执行节点(异步方法) - 对于每个工具调用,在线程池中执行同步工具函数 - """ - _log_state_change("tool_node", state, "进入") - - last_message = state['messages'][-1] - if not isinstance(last_message, AIMessage) or not last_message.tool_calls: - _log_state_change("tool_node", state, "离开(无工具调用)") - return {"messages": []} - - results = [] - loop = asyncio.get_event_loop() - - info(f"🛠️ [工具调用] 准备执行 {len(last_message.tool_calls)} 个工具") - - for tool_call in last_message.tool_calls: - tool_name = tool_call["name"] - tool_args = tool_call["args"] - tool_id = tool_call["id"] - tool_func = self.tools_by_name.get(tool_name) - - debug(f" ├─ 调用工具: {tool_name} 参数: {tool_args}") - - if tool_func is None: - err_msg = f"Tool {tool_name} not found" - debug(f" └─ ❌ {err_msg}") - results.append(ToolMessage(content=err_msg, tool_call_id=tool_id)) - continue - - try: - # 修复闭包问题:将变量作为默认参数传入 lambda - # 如果工具支持异步 (ainvoke),优先使用异步调用 - if hasattr(tool_func, 'ainvoke'): - observation = await tool_func.ainvoke(tool_args) - else: - observation = await loop.run_in_executor( - None, - lambda args=tool_args: tool_func.invoke(args) # 默认参数捕获当前值 - ) - - # 字符打印 - result_preview = str(observation).replace("\n", " ") - debug(f" └─ ✅ 结果: {result_preview}") - results.append(ToolMessage(content=str(observation), tool_call_id=tool_id)) - except Exception as e: - debug(f" └─ ❌ 异常: {e}") - results.append(ToolMessage(content=f"Error: {e}", tool_call_id=tool_id)) - - info(f"🛠️ [工具调用] 执行完成,返回 {len(results)} 条 ToolMessage") - - result = {"messages": results} - _log_state_change("tool_node", {**state, **result}, "离开") - return result - - @staticmethod - def should_continue(state: MessagesState) -> Literal['tool_node', 'save_memory', 'END']: - """决定下一步:工具调用、保存记忆还是结束""" - last_message = state["messages"][-1] - - # 1. 如果需要调用工具,优先进入工具节点 - if isinstance(last_message, AIMessage) and last_message.tool_calls: - if ENABLE_GRAPH_TRACE: - info(f"🔀 [路由决策] 检测到 {len(last_message.tool_calls)} 个工具调用 → 转向 'tool_node'") - return 'tool_node' - - # 2. 如果是 AI 的最终回复,可以考虑进入记忆保存节点(可增加判断逻辑) - # 这里简单处理:只要没有工具调用,且是 AI 消息,就尝试保存记忆。 - if isinstance(last_message, AIMessage): - if ENABLE_GRAPH_TRACE: - info(f"🔀 [路由决策] 收到 AI 最终回复(无工具调用) → 转向 'save_memory'") - return 'save_memory' - - # 3. 其他情况(如只有用户消息)直接结束 - if ENABLE_GRAPH_TRACE: - info(f"🔀 [路由决策] 非 AI 消息(如纯用户消息) → 结束流程") - return 'END' - - async def retrieve_memory(self, state: MessagesState, runtime: Runtime[GraphContext]) -> dict: - """搜索并返回长期记忆""" - _log_state_change("retrieve_memory", state, "进入") - - user_id = runtime.context.user_id - namespace = ("memories", user_id) - query = str(state["messages"][-1].content) - - debug(f"\n{'='*60}") - debug(f"🔎 [记忆检索] 开始检索") - debug(f" ├─ 用户ID: {user_id}") - debug(f" ├─ 命名空间: {namespace}") - debug(f" ├─ 查询内容: '{query}'") - debug(f" └─ 消息总数: {len(state['messages'])}") - - try: - memories = await runtime.store.asearch(namespace, query=query) - debug(f"✅ [记忆检索] 检索完成,找到 {len(memories)} 条相关记忆") - - if memories: - memory_text = "\n".join([m.value["data"] for m in memories]) - debug(f"📚 [记忆内容]") - for i, memory in enumerate(memories, 1): - debug(f" [{i}] {memory.value['data']}") - debug(f"{'='*60}\n") - result = {"memory_context": memory_text} - _log_state_change("retrieve_memory", {**state, **result}, "离开") - return result - else: - debug(f"⚠️ [记忆检索] 未找到相关记忆") - debug(f"{'='*60}\n") - result = {"memory_context": ""} - _log_state_change("retrieve_memory", {**state, **result}, "离开") - return result - - except Exception as e: - error(f"❌ [记忆检索] 检索失败: {e}") - import traceback - traceback.print_exc() - debug(f"{'='*60}\n") - result = {"memory_context": ""} - _log_state_change("retrieve_memory", {**state, **result}, "离开(异常)") - return result - - async def save_memory(self, state: MessagesState, runtime: Runtime[GraphContext]) -> dict: - """尝试从对话中提取并保存长期记忆""" - _log_state_change("save_memory", state, "进入") - - # 获取最后一条用户消息(通常是要记住的内容的来源) - user_messages = [msg for msg in state["messages"] if msg.type == "human"] - if not user_messages: - _log_state_change("save_memory", state, "离开(无用户消息)") - return {} - - last_user_msg = user_messages[-1].content.lower() - - # 简单触发逻辑:包含"记住"或"保存"等关键词 - if any(keyword in last_user_msg for keyword in ["记住", "保存", "别忘了"]): - # 提取记忆内容(这里仅作示例,实际可用 LLM 提取) - memory_content = f"用户说过:{last_user_msg}" - user_id = runtime.context.user_id - namespace = ("memories", user_id) - await runtime.store.aput(namespace, str(uuid.uuid4()), {"data": memory_content}) - info(f"✅ 长期记忆已保存:{memory_content}") - - _log_state_change("save_memory", state, "离开") - return {} + # ⭐ 创建 Mem0 客户端(懒加载,首次使用时初始化) + self.mem0_client = Mem0Client(llm) def build(self) -> StateGraph: """ - 构建未编译的状态图(返回 StateGraph 实例) - 图中节点直接使用实例方法 call_llm, call_tools + 构建未编译的状态图 + + Returns: + StateGraph 实例 """ - builder = StateGraph(MessagesState,context_schema=GraphContext) - builder.add_node("retrieve_memory", self.retrieve_memory) - builder.add_node("llm_call", self.call_llm) - builder.add_node("tool_node", self.call_tools) - builder.add_node("save_memory", self.save_memory) + builder = StateGraph(MessagesState, context_schema=GraphContext) + + # ⭐ 通过工厂函数创建节点(依赖注入) + retrieve_memory_node = create_retrieve_memory_node(self.mem0_client) + llm_call_node = create_llm_call_node(self.llm, self.tools) + tool_call_node = create_tool_call_node(self.tools_by_name) + summarize_node = create_summarize_node(self.mem0_client) + + # 添加节点 + builder.add_node("retrieve_memory", retrieve_memory_node) + builder.add_node("llm_call", llm_call_node) + builder.add_node("tool_node", tool_call_node) + builder.add_node("summarize", summarize_node) + # 添加边 builder.add_edge(START, "retrieve_memory") builder.add_edge("retrieve_memory", "llm_call") builder.add_conditional_edges( "llm_call", - self.should_continue, + should_continue, { "tool_node": "tool_node", - "save_memory": "save_memory", + "summarize": "summarize", 'END': END } ) builder.add_edge("tool_node", "llm_call") - builder.add_edge("save_memory", END) + builder.add_edge("summarize", END) - return builder \ No newline at end of file + return builder diff --git a/app/memory/__init__.py b/app/memory/__init__.py new file mode 100644 index 0000000..29117a6 --- /dev/null +++ b/app/memory/__init__.py @@ -0,0 +1,7 @@ +""" +Mem0 记忆层模块 +""" + +from app.memory.mem0_client import Mem0Client + +__all__ = ["Mem0Client"] diff --git a/app/memory/mem0_client.py b/app/memory/mem0_client.py new file mode 100644 index 0000000..eec47d6 --- /dev/null +++ b/app/memory/mem0_client.py @@ -0,0 +1,144 @@ +""" +Mem0 记忆层客户端封装模块 +负责 Mem0 的初始化、检索和存储 +""" + +import os +from typing import Optional, List, Dict, Any +from mem0 import AsyncMemory + +# 本地模块 +from app.config import QDRANT_URL, QDRANT_COLLECTION_NAME, VLLM_EMBEDDING_URL +from app.logger import info, warning, error + + +class Mem0Client: + """Mem0 异步客户端封装类""" + + def __init__(self, llm_instance): + """ + 初始化 Mem0 客户端 + + Args: + llm_instance: LangChain LLM 实例(用于事实提取) + """ + self.llm = llm_instance + self.mem0: Optional[AsyncMemory] = None + self._initialized = False + + async def initialize(self): + """异步初始化 Mem0 客户端""" + if self._initialized: + return + + try: + # 检查 Qdrant 是否可达 (可选) + import requests + try: + resp = requests.get(f"{QDRANT_URL}/collections", timeout=2) + if resp.status_code == 200: + info(f"✅ Qdrant 服务正常: {QDRANT_URL}") + except Exception: + warning(f"⚠️ 无法连接到 Qdrant: {QDRANT_URL},Mem0 将尝试自动连接") + + config = { + # 向量存储:复用 Qdrant 实例 + "vector_store": { + "provider": "qdrant", + "config": { + "collection_name": QDRANT_COLLECTION_NAME, + "host": QDRANT_URL.split("://")[1].split(":")[0] if "://" in QDRANT_URL else "localhost", + "port": int(QDRANT_URL.split(":")[-1]) if ":" in QDRANT_URL.split("://")[-1] else 6333, + "embedding_model_dims": 768, # embeddinggemma-300m 输出 768 维 + } + }, + # 事实提取 LLM:直接复用传入的 LangChain 实例 + "llm": { + "provider": "langchain", + "config": { + "model": self.llm # 直接传入 LangChain 模型实例 + } + }, + # Embedding:指向 vLLM 服务 + "embedder": { + "provider": "openai", + "embedding_dims": 768, # 关键:将维度参数提升到顶层 + "config": { + "model": "google/embeddinggemma-300m", + "api_key": "EMPTY", + "api_base": VLLM_EMBEDDING_URL, + # 注意:不要在此处传递 dimensions 参数,避免与 vLLM v0.7.2 不兼容 + } + }, + "version": "v1.1" + } + + self.mem0 = AsyncMemory.from_config(config) + self._initialized = True + info(f"✅ Mem0 初始化成功 (Embedding: vLLM@8002, Vector: Qdrant, LLM: 复用现有实例)") + + except Exception as e: + error(f"❌ Mem0 初始化失败: {e}") + import traceback + traceback.print_exc() + self.mem0 = None + + async def search_memories(self, query: str, user_id: str, limit: int = 5) -> List[str]: + """ + 检索相关记忆 + + Args: + query: 查询文本 + user_id: 用户 ID + limit: 返回结果数量限制 + + Returns: + List[str]: 记忆事实列表 + """ + if not self.mem0: + warning("⚠️ Mem0 未初始化,跳过记忆检索") + return [] + + try: + memories = await self.mem0.search(query, user_id=user_id, limit=limit) + + if memories and "results" in memories: + facts = [m["memory"] for m in memories["results"] if m.get("memory")] + if facts: + info(f"🔍 [记忆检索] Mem0 返回 {len(facts)} 条记忆") + return facts + + info("🔍 [记忆检索] 未找到相关记忆") + return [] + + except Exception as e: + warning(f"⚠️ Mem0 检索失败: {e}") + return [] + + async def add_memories(self, messages: List[Dict[str, str]], user_id: str) -> bool: + """ + 添加记忆(自动提取事实并存储) + + Args: + messages: 消息列表,格式为 [{"role": "user/assistant/system", "content": "..."}] + user_id: 用户 ID + + Returns: + bool: 是否成功 + """ + if not self.mem0: + warning("⚠️ Mem0 未初始化,跳过记忆添加") + return False + + try: + result = await self.mem0.add( + messages, + user_id=user_id, + metadata={"type": "conversation"} + ) + info(f"📝 [记忆添加] 已提交给 Mem0 进行事实提取") + return True + + except Exception as e: + error(f"❌ Mem0 记忆添加失败: {e}") + return False diff --git a/app/nodes/__init__.py b/app/nodes/__init__.py new file mode 100644 index 0000000..371a973 --- /dev/null +++ b/app/nodes/__init__.py @@ -0,0 +1,17 @@ +""" +节点模块 - 导出所有 LangGraph 节点函数 +""" + +from app.nodes.router import should_continue +from app.nodes.llm_call import create_llm_call_node +from app.nodes.tool_call import create_tool_call_node +from app.nodes.retrieve_memory import create_retrieve_memory_node +from app.nodes.summarize import create_summarize_node + +__all__ = [ + "should_continue", + "create_llm_call_node", + "create_tool_call_node", + "create_retrieve_memory_node", + "create_summarize_node", +] diff --git a/app/nodes/llm_call.py b/app/nodes/llm_call.py new file mode 100644 index 0000000..78a3e7f --- /dev/null +++ b/app/nodes/llm_call.py @@ -0,0 +1,139 @@ +""" +LLM 调用节点模块 +负责调用大语言模型并处理响应 +""" + +import asyncio +import time +from typing import Any, Dict +from langchain_core.language_models import BaseLLM +from langchain_core.messages import AIMessage +from langchain_core.runnables import RunnableLambda +from langgraph.runtime import Runtime + +# 本地模块 +from app.state import MessagesState, GraphContext +from app.prompts import create_system_prompt +from app.utils.logging import log_state_change, print_llm_input +from app.logger import debug, info, error + + +def create_llm_call_node(llm: BaseLLM, tools: list): + """ + 工厂函数:创建 LLM 调用节点 + + Args: + llm: LangChain LLM 实例 + tools: 工具列表 + + Returns: + 异步节点函数 + """ + # 构建调用链 + prompt = create_system_prompt() + llm_with_tools = llm.bind_tools(tools) + chain = prompt | RunnableLambda(print_llm_input) | llm_with_tools + + async def call_llm(state: MessagesState, runtime: Runtime[GraphContext]) -> Dict[str, Any]: + """ + LLM 调用节点(异步方法) + + Args: + state: 当前对话状态 + runtime: LangGraph 运行时上下文 + + Returns: + 更新后的状态字典 + """ + log_state_change("llm_call", state, "进入") + + memory_context = state.get("memory_context", "暂无用户信息") + loop = asyncio.get_event_loop() + start_time = time.time() + + try: + response = await loop.run_in_executor( + None, + lambda: chain.invoke({ + "messages": state["messages"], + "memory_context": memory_context + }) + ) + + elapsed_time = time.time() - start_time + + # 提取 token 用量(兼容不同 LLM 提供商的元数据格式) + token_usage = {} + input_tokens = 0 + output_tokens = 0 + + # 尝试从 response_metadata 中提取 + if hasattr(response, 'response_metadata') and response.response_metadata: + meta = response.response_metadata + if 'token_usage' in meta: + token_usage = meta['token_usage'] + elif 'usage' in meta: + token_usage = meta['usage'] + + # 尝试从 additional_kwargs 中提取 + if not token_usage and hasattr(response, 'additional_kwargs'): + add_kwargs = response.additional_kwargs + if 'llm_output' in add_kwargs and 'token_usage' in add_kwargs['llm_output']: + token_usage = add_kwargs['llm_output']['token_usage'] + + # 提取具体的 token 数值 + if token_usage: + input_tokens = token_usage.get('prompt_tokens', token_usage.get('input_tokens', 0)) + output_tokens = token_usage.get('completion_tokens', token_usage.get('output_tokens', 0)) + + # 打印响应统计信息 + info(f"⏱️ [LLM统计] 调用耗时: {elapsed_time:.2f}秒") + info(f"📊 [LLM统计] Token用量: 输入={input_tokens}, 输出={output_tokens}, 总计={input_tokens + output_tokens}") + if token_usage: + debug(f"📋 [LLM统计] 详细用量: {token_usage}") + + # 打印 LLM 的完整输出 + debug("\n" + "="*80) + debug("📥 [LLM输出] 大模型返回的完整响应:") + debug(f" 消息类型: {response.type.upper()}") + debug(f" 内容长度: {len(str(response.content))} 字符") + debug("-"*80) + debug(f"{response.content}") + debug("="*80 + "\n") + + result = { + "messages": [response], + "llm_calls": state.get('llm_calls', 0) + 1, + "last_token_usage": token_usage, + "last_elapsed_time": elapsed_time, + "turns_since_last_summary": state.get('turns_since_last_summary', 0) + 1 # 递增计数器 + } + + log_state_change("llm_call", {**state, **result}, "离开") + return result + + except Exception as e: + elapsed_time = time.time() - start_time + error(f"\n❌ [LLM错误] 调用失败 (耗时: {elapsed_time:.2f}秒)") + error(f" 错误类型: {type(e).__name__}") + error(f" 错误信息: {str(e)}") + import traceback + traceback.print_exc() + debug("="*80 + "\n") + + # 返回一个友好的错误消息 + error_response = AIMessage( + content="抱歉,模型暂时无法响应,可能是网络超时或服务繁忙,请稍后再试。" + ) + error_result = { + "messages": [error_response], + "llm_calls": state.get('llm_calls', 0), + "last_token_usage": {}, + "last_elapsed_time": elapsed_time, + "turns_since_last_summary": state.get('turns_since_last_summary', 0) + 1 # 即使出错也递增计数器 + } + + log_state_change("llm_call", state, "离开(异常)") + return error_result + + return call_llm diff --git a/app/nodes/retrieve_memory.py b/app/nodes/retrieve_memory.py new file mode 100644 index 0000000..4f63b65 --- /dev/null +++ b/app/nodes/retrieve_memory.py @@ -0,0 +1,75 @@ +""" +记忆检索节点模块 +负责从 Mem0 检索相关长期记忆 +""" + +from typing import Any, Dict +from langgraph.runtime import Runtime + +# 本地模块 +from app.state import MessagesState, GraphContext +from app.memory.mem0_client import Mem0Client +from app.utils.logging import log_state_change +from app.logger import debug + + +def create_retrieve_memory_node(mem0_client: Mem0Client): + """ + 工厂函数:创建记忆检索节点 + + Args: + mem0_client: Mem0 客户端实例 + + Returns: + 异步节点函数 + """ + + async def retrieve_memory(state: MessagesState, runtime: Runtime[GraphContext]) -> Dict[str, Any]: + """ + 记忆检索节点 - 使用 Mem0 + + Args: + state: 当前对话状态 + runtime: LangGraph 运行时上下文 + + Returns: + 包含 memory_context 的状态更新 + """ + log_state_change("retrieve_memory", state, "进入") + + user_id = runtime.context.user_id + + # 兼容 dict 和对象两种消息格式 + last_msg = state["messages"][-1] + if isinstance(last_msg, dict): + query = str(last_msg.get("content", "")) + else: + query = str(last_msg.content) + memory_text_parts = [] + + # 确保 Mem0 已初始化(懒加载) + if not mem0_client._initialized: + await mem0_client.initialize() + + if mem0_client.mem0: + try: + # 异步调用 Mem0 语义检索 + facts = await mem0_client.search_memories(query, user_id=user_id, limit=5) + + if facts: + memory_text_parts.append(f"【相关长期记忆】\n" + "\n".join(f"- {f}" for f in facts)) + else: + debug("🔍 [记忆检索] 未找到相关记忆") + except Exception as e: + from app.logger import warning + warning(f"⚠️ Mem0 检索失败: {e}") + else: + from app.logger import warning + warning("⚠️ Mem0 未初始化,跳过记忆检索") + + memory_context = "\n\n".join(memory_text_parts) if memory_text_parts else "暂无用户信息" + result = {"memory_context": memory_context} + log_state_change("retrieve_memory", {**state, **result}, "离开") + return result + + return retrieve_memory diff --git a/app/nodes/router.py b/app/nodes/router.py new file mode 100644 index 0000000..3dad4d0 --- /dev/null +++ b/app/nodes/router.py @@ -0,0 +1,48 @@ +""" +路由决策节点 +根据当前状态决定下一步走向 +""" + +from typing import Literal +from langchain_core.messages import AIMessage + +# 本地模块 +from app.config import ENABLE_GRAPH_TRACE, MEMORY_SUMMARIZE_INTERVAL +from app.state import MessagesState +from app.logger import info + + +def should_continue(state: MessagesState) -> Literal['tool_node', 'summarize', 'END']: + """ + 决定下一步:工具调用、生成摘要还是结束 + + Args: + state: 当前对话状态 + + Returns: + 下一个节点名称或 END + """ + last_message = state["messages"][-1] + + # 1. 如果需要调用工具,优先进入工具节点 + if isinstance(last_message, AIMessage) and last_message.tool_calls: + if ENABLE_GRAPH_TRACE: + info(f"🔀 [路由决策] 检测到 {len(last_message.tool_calls)} 个工具调用 → 转向 'tool_node'") + return 'tool_node' + + # 2. 如果是 AI 的最终回复,判断是否达到摘要生成阈值 + if isinstance(last_message, AIMessage): + turns = state.get("turns_since_last_summary", 0) + if turns >= MEMORY_SUMMARIZE_INTERVAL: + if ENABLE_GRAPH_TRACE: + info(f"🔀 [路由决策] 收到 AI 最终回复,已达摘要阈值({turns}/{MEMORY_SUMMARIZE_INTERVAL}) → 转向 'summarize'") + return 'summarize' + else: + if ENABLE_GRAPH_TRACE: + info(f"🔀 [路由决策] 收到 AI 最终回复,未达摘要阈值({turns}/{MEMORY_SUMMARIZE_INTERVAL}) → 结束流程") + return 'END' + + # 3. 其他情况(如只有用户消息)直接结束 + if ENABLE_GRAPH_TRACE: + info(f"🔀 [路由决策] 非 AI 消息(如纯用户消息) → 结束流程") + return 'END' diff --git a/app/nodes/summarize.py b/app/nodes/summarize.py new file mode 100644 index 0000000..bbf68c8 --- /dev/null +++ b/app/nodes/summarize.py @@ -0,0 +1,86 @@ +""" +记忆存储节点模块 +负责将对话历史提交给 Mem0 进行事实提取和存储 +""" + +from typing import Any, Dict +from langgraph.runtime import Runtime + +# 本地模块 +from app.state import MessagesState, GraphContext +from app.memory.mem0_client import Mem0Client +from app.utils.logging import log_state_change +from app.logger import debug, info, error, warning + + +def create_summarize_node(mem0_client: Mem0Client): + """ + 工厂函数:创建记忆存储节点 + + Args: + mem0_client: Mem0 客户端实例 + + Returns: + 异步节点函数 + """ + + async def summarize_conversation(state: MessagesState, runtime: Runtime[GraphContext]) -> Dict[str, Any]: + """ + 记忆存储节点 - 使用 Mem0 + + Args: + state: 当前对话状态 + runtime: LangGraph 运行时上下文 + + Returns: + 重置计数器的状态更新 + """ + log_state_change("summarize", state, "进入") + + messages = state["messages"] + if len(messages) < 4: + debug("📝 [记忆添加] 对话过短,跳过") + return {"turns_since_last_summary": 0} + + user_id = runtime.context.user_id + + # 确保 Mem0 已初始化(懒加载) + if not mem0_client._initialized: + await mem0_client.initialize() + + # 将整个对话历史转换为 Mem0 需要的消息格式 + mem0_messages = [] + for msg in messages: + # 兼容 dict 和对象两种格式 + if isinstance(msg, dict): + msg_type = msg.get("type", "") + msg_content = msg.get("content", "") + else: + msg_type = getattr(msg, 'type', '') + msg_content = getattr(msg, 'content', '') + + if msg_type == "human": + mem0_messages.append({"role": "user", "content": msg_content}) + elif msg_type == "ai": + mem0_messages.append({"role": "assistant", "content": msg_content}) + elif msg_type == "tool": + mem0_messages.append({"role": "system", "content": f"[工具返回] {msg_content}"}) + + if mem0_client.mem0: + try: + # 异步调用 Mem0 自动提取并存储事实 + success = await mem0_client.add_memories( + mem0_messages, + user_id=user_id + ) + if success: + info(f"📝 [记忆添加] 已提交给 Mem0 进行事实提取") + except Exception as e: + error(f"❌ Mem0 记忆添加失败: {e}") + else: + warning("⚠️ Mem0 未初始化,跳过记忆添加") + + log_state_change("summarize", state, "离开") + return {"turns_since_last_summary": 0} + + return summarize_conversation diff --git a/app/nodes/tool_call.py b/app/nodes/tool_call.py new file mode 100644 index 0000000..348abc2 --- /dev/null +++ b/app/nodes/tool_call.py @@ -0,0 +1,90 @@ +""" +工具执行节点模块 +负责执行 AI 调用的工具函数 +""" + +import asyncio +from typing import Any, Dict +from langchain_core.messages import AIMessage, ToolMessage +from langgraph.runtime import Runtime + +# 本地模块 +from app.state import MessagesState, GraphContext +from app.utils.logging import log_state_change +from app.logger import debug, info + + +def create_tool_call_node(tools_by_name: Dict[str, Any]): + """ + 工厂函数:创建工具执行节点 + + Args: + tools_by_name: 名称到工具函数的映射字典 + + Returns: + 异步节点函数 + """ + + async def call_tools(state: MessagesState, runtime: Runtime[GraphContext]) -> Dict[str, Any]: + """ + 工具执行节点(异步方法) + + Args: + state: 当前对话状态 + runtime: LangGraph 运行时上下文 + + Returns: + 包含 ToolMessage 的状态更新 + """ + log_state_change("tool_node", state, "进入") + + last_message = state['messages'][-1] + if not isinstance(last_message, AIMessage) or not last_message.tool_calls: + log_state_change("tool_node", state, "离开(无工具调用)") + return {"messages": []} + + results = [] + loop = asyncio.get_event_loop() + + info(f"🛠️ [工具调用] 准备执行 {len(last_message.tool_calls)} 个工具") + + for tool_call in last_message.tool_calls: + tool_name = tool_call["name"] + tool_args = tool_call["args"] + tool_id = tool_call["id"] + tool_func = tools_by_name.get(tool_name) + + debug(f" ├─ 调用工具: {tool_name} 参数: {tool_args}") + + if tool_func is None: + err_msg = f"Tool {tool_name} not found" + debug(f" └─ ❌ {err_msg}") + results.append(ToolMessage(content=err_msg, tool_call_id=tool_id)) + continue + + try: + # 修复闭包问题:将变量作为默认参数传入 lambda + # 如果工具支持异步 (ainvoke),优先使用异步调用 + if hasattr(tool_func, 'ainvoke'): + observation = await tool_func.ainvoke(tool_args) + else: + observation = await loop.run_in_executor( + None, + lambda args=tool_args: tool_func.invoke(args) # 默认参数捕获当前值 + ) + + # 字符打印 + result_preview = str(observation).replace("\n", " ") + debug(f" └─ ✅ 结果: {result_preview}") + results.append(ToolMessage(content=str(observation), tool_call_id=tool_id)) + except Exception as e: + debug(f" └─ ❌ 异常: {e}") + results.append(ToolMessage(content=f"Error: {e}", tool_call_id=tool_id)) + + info(f"🛠️ [工具调用] 执行完成,返回 {len(results)} 条 ToolMessage") + + result = {"messages": results} + log_state_change("tool_node", {**state, **result}, "离开") + return result + + return call_tools diff --git a/app/prompts.py b/app/prompts.py new file mode 100644 index 0000000..f585539 --- /dev/null +++ b/app/prompts.py @@ -0,0 +1,38 @@ +""" +提示模板管理模块 +所有系统提示和对话模板统一定义 +""" + +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder + + +def create_system_prompt() -> ChatPromptTemplate: + """ + 创建系统提示模板 + + Returns: + ChatPromptTemplate: 包含系统指令和消息占位符的提示模板 + """ + system_template = ( + "你是一个个人生活助手和数据分析助手,请使用中文交流。\n\n" + "【用户背景信息】\n" + "以下是对当前用户的已知信息和长期记忆,你必须优先采纳并在回答中体现:\n" + "{memory_context}\n" + "若包含姓名、偏好等个人信息,请自然融入回应(例如称呼名字、提及偏好)。\n\n" + "【可用工具与使用规则】\n" + "- 获取温度/天气:`get_current_temperature`\n" + "- 读取文本文件:`read_local_file`(限定目录 `./user_docs`)\n" + "- 读取PDF摘要:`read_pdf_summary`(限定目录 `./user_docs`)\n" + "- 读取Excel表格:`read_excel_as_markdown`(限定目录 `./user_docs`)\n" + "- 抓取网页内容:`fetch_webpage_content`\n" + "工具调用时请直接返回所需参数,无需额外说明。\n\n" + "【回答要求(必须遵守)】\n" + "1. 回答必须简洁、直接,禁止描述任何思考过程或内心活动。\n" + "2. 优先利用已知用户信息进行个性化回复。\n" + "3. 若无信息可依,礼貌询问或提供通用帮助。" + ) + + return ChatPromptTemplate.from_messages([ + ("system", system_template), + MessagesPlaceholder(variable_name="messages") + ]) diff --git a/app/state.py b/app/state.py new file mode 100644 index 0000000..61463fe --- /dev/null +++ b/app/state.py @@ -0,0 +1,27 @@ +""" +LangGraph 状态定义模块 +包含 MessagesState 和 GraphContext +""" + +import operator +from typing import Annotated, Any +from typing_extensions import TypedDict +from dataclasses import dataclass +from langchain_core.messages import AnyMessage + + +class MessagesState(TypedDict): + """对话状态类型定义""" + messages: Annotated[list[AnyMessage], operator.add] + llm_calls: int + memory_context: str + last_token_usage: dict # 本次调用的 token 使用详情 + last_elapsed_time: float # 本次调用耗时(秒) + turns_since_last_summary: int # 距离上次生成摘要的轮数 + + +@dataclass +class GraphContext: + """图执行上下文""" + user_id: str + # 可扩展更多上下文信息 diff --git a/app/utils/__init__.py b/app/utils/__init__.py new file mode 100644 index 0000000..faa27e5 --- /dev/null +++ b/app/utils/__init__.py @@ -0,0 +1,7 @@ +""" +工具模块 +""" + +from app.utils.logging import log_state_change, print_llm_input + +__all__ = ["log_state_change", "print_llm_input"] diff --git a/app/utils/logging.py b/app/utils/logging.py new file mode 100644 index 0000000..659303c --- /dev/null +++ b/app/utils/logging.py @@ -0,0 +1,61 @@ +""" +LangGraph 节点日志工具模块 +提供状态流转追踪和 LLM 输入输出打印功能 +""" + +from app.config import ENABLE_GRAPH_TRACE +from app.logger import debug, info + + +def log_state_change(node_name: str, state: dict, prefix: str = "进入"): + """ + 记录状态变化日志 + + Args: + node_name: 节点名称 + state: 当前状态 + prefix: 日志前缀("进入" 或 "离开") + """ + from app.logger import info + + messages = state.get("messages", []) + msg_count = len(messages) + last_msg = messages[-1] if messages else None + last_info = "" + if last_msg: + # 兼容 dict 和对象两种格式 + if isinstance(last_msg, dict): + content_preview = str(last_msg.get("content", ""))[:100].replace("\n", " ") + msg_type = last_msg.get("type", "unknown") + else: + content_preview = str(last_msg.content)[:100].replace("\n", " ") + msg_type = getattr(last_msg, 'type', 'unknown') + last_info = f"{msg_type.upper()}: {content_preview}" + info(f"🔄 [{node_name}] {prefix} | 消息数:{msg_count} | 最后一条:{last_info}") + + +def print_llm_input(prompt_value): + """ + RunnableLambda 回调函数:打印格式化后发送给 LLM 的完整消息 + + Args: + prompt_value: ChatPromptValue 对象,包含格式化后的消息列表 + + Returns: + 原样返回 prompt_value,不影响链式调用 + """ + if not ENABLE_GRAPH_TRACE: + return prompt_value + + messages = prompt_value.messages # ChatPromptValue 提供 .messages 属性 + + debug("\n" + "=" * 80) + debug("📤 [LLM输入] 格式化后发送给大模型的完整消息:") + debug(f" 总消息数: {len(messages)}") + debug("-" * 80) + for i, msg in enumerate(messages): + content_preview = str(msg.content) # 完整输出 + debug(f" [{i}] {msg.type.upper():10s}: {content_preview}") + debug("\n" + "=" * 80 + "\n") + + return prompt_value diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index fd3ec46..9e0fb44 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -1,23 +1,6 @@ services: - postgres: - image: postgres:16 - container_name: ai-postgres - environment: - POSTGRES_PASSWORD: mysecretpassword # 请替换为强密码 - POSTGRES_DB: langgraph_db - volumes: - - pg_data:/var/lib/postgresql/data - networks: - - ai-network - healthcheck: - test: [ "CMD-SHELL", "pg_isready -U postgres" ] - interval: 10s - timeout: 5s - retries: 5 - restart: unless-stopped - # 如需外部访问数据库,取消下面注释 - # ports: - # - "5432:5432" + # ⭐ PostgreSQL 和 Qdrant 已迁移到远程服务器 (115.190.121.151) + # 不再需要在本地 Docker Compose 中运行这些服务 backend: build: @@ -27,15 +10,18 @@ services: environment: - ZHIPUAI_API_KEY=${ZHIPUAI_API_KEY} - VLLM_LOCAL_KEY=${VLLM_LOCAL_KEY} - - DB_URI=postgresql://postgres:mysecretpassword@postgres:5432/langgraph_db?sslmode=disable + # ⭐ 使用远程服务器地址 + - DB_URI=postgresql://postgres:mysecretpassword@115.190.121.151:5432/langgraph_db?sslmode=disable + - QDRANT_URL=http://115.190.121.151:6333 + - QDRANT_COLLECTION_NAME=user_memories + - EMBEDDING_MODEL=text-embedding-3-small + - MEMORY_SUMMARIZE_INTERVAL=${MEMORY_SUMMARIZE_INTERVAL:-10} volumes: - ../data/user_docs:/app/data/user_docs # 挂载文档目录 - ../logs:/app/logs networks: - ai-network - depends_on: - postgres: - condition: service_healthy + # ⭐ 移除对 postgres 和 qdrant 的依赖 restart: unless-stopped ports: - "8001:8001" @@ -60,5 +46,7 @@ networks: ai-network: driver: bridge -volumes: - pg_data: +# ⭐ PostgreSQL 和 Qdrant 已迁移到远程服务器,不再需要本地卷 +# volumes: +# pg_data: +# qdrant_storage: diff --git a/requirement.txt b/requirement.txt index 0e62cf3..5a479e2 100644 --- a/requirement.txt +++ b/requirement.txt @@ -1,5 +1,4 @@ # Core -transformers>=4.35.0 # 仅用于分词器,不需要模型推理 pypdf>=3.0.0 pandas>=2.0.0 requests>=2.31.0 @@ -12,9 +11,13 @@ langchain-community>=0.0.10 langchain-core>=0.1.0 langchain-openai>=0.0.5 langchain-text-splitters>=0.1.0 +langchain-qdrant>=0.1.0 # Qdrant 向量存储集成 # Vector Database -chromadb>=0.4.0 # 轻量级向量数据库,可选 torch 但不强制 +qdrant-client>=1.7.0 # Qdrant 客户端 + +# Mem0 (Memory Layer) +mem0ai>=0.1.0 # LangGraph langgraph>=0.0.30 diff --git a/scripts/start.sh b/scripts/start.sh index 80934b2..c826917 100755 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -88,11 +88,22 @@ check_config() { check_warn "VLLM_LOCAL_KEY 未配置(如不使用本地模型可忽略)" fi - # 检查 DB_URI - if grep -q "^DB_URI=" "$PROJECT_DIR/.env" 2>/dev/null; then - check_pass "DB_URI 已配置" + # 检查 DB_URI (远程服务器) + if grep -q "^DB_URI=.*115.190.121.151" "$PROJECT_DIR/.env" 2>/dev/null; then + check_pass "DB_URI 已配置(远程服务器)" + elif grep -q "^DB_URI=" "$PROJECT_DIR/.env" 2>/dev/null; then + check_warn "DB_URI 已配置(非远程服务器地址)" else - check_warn "DB_URI 未配置(将使用默认值)" + check_fail "DB_URI 未配置" + fi + + # 检查 QDRANT_URL (远程服务器) + if grep -q "^QDRANT_URL=.*115.190.121.151" "$PROJECT_DIR/.env" 2>/dev/null; then + check_pass "QDRANT_URL 已配置(远程服务器)" + elif grep -q "^QDRANT_URL=" "$PROJECT_DIR/.env" 2>/dev/null; then + check_warn "QDRANT_URL 已配置(非远程服务器地址)" + else + check_fail "QDRANT_URL 未配置" fi # 3. 检查 Docker 环境 @@ -130,6 +141,31 @@ check_config() { fi done + # 5. 检查远程服务连接 + echo "" + echo "🌐 检查远程服务连接..." + + # 测试 PostgreSQL 连接 + if command -v psql &> /dev/null; then + # 注意:这里假设密码为 mysecretpassword,如果不同需调整或从 env 读取 + if PGPASSWORD=mysecretpassword psql -h 115.190.121.151 -U postgres -d langgraph_db -c "SELECT 1;" &> /dev/null; then + check_pass "PostgreSQL 远程连接正常 (115.190.121.151:5432)" + else + check_fail "PostgreSQL 远程连接失败" + echo " 提示: 检查网络连接和防火墙设置" + fi + else + check_warn "psql 客户端未安装,跳过 PostgreSQL 连接测试" + fi + + # 测试 Qdrant 连接 + if curl -s http://115.190.121.151:6333/collections &> /dev/null; then + check_pass "Qdrant 远程连接正常 (115.190.121.151:6333)" + else + check_fail "Qdrant 远程连接失败" + echo " 提示: 检查网络连接和防火墙设置" + fi + # 总结 echo "" echo "==========================================" @@ -150,90 +186,101 @@ check_config() { } # ============================================================================= -# Docker 容器检查函数 +# Docker 容器检查函数(仅检查 llama.cpp 服务) # ============================================================================= -check_vllm() { - echo -e "${BLUE}🔍 检查 vLLM 容器...${NC}" - if ! docker ps --format '{{.Names}}' | grep -q "^gemma4-server$"; then - echo -e "${YELLOW}⚠️ vLLM 容器未运行${NC}" +check_llamacpp() { + echo -e "${BLUE}🔍 检查 llama.cpp LLM 容器...${NC}" + if ! docker ps --format '{{.Names}}' | grep -q "^gemma4-llamacpp-server$"; then + echo -e "${YELLOW}⚠️ llama.cpp LLM 容器未运行${NC}" return 1 else - echo -e "${GREEN}✓ vLLM 容器正在运行 (gemma4-server)${NC}" + echo -e "${GREEN}✓ llama.cpp LLM 容器正在运行 (gemma4-llamacpp-server)${NC}" return 0 fi } -check_postgres() { - echo -e "${BLUE}🔍 检查 PostgreSQL 容器...${NC}" - if ! docker ps --format '{{.Names}}' | grep -q "^postgres-langgraph$"; then - echo -e "${YELLOW}⚠️ PostgreSQL 容器未运行${NC}" +check_embedding() { + echo -e "${BLUE}🔍 检查 llama.cpp Embedding 容器...${NC}" + if ! docker ps --format '{{.Names}}' | grep -q "^embedding-server$"; then + echo -e "${YELLOW}⚠️ llama.cpp Embedding 容器未运行${NC}" return 1 else - echo -e "${GREEN}✓ PostgreSQL 容器正在运行 (postgres-langgraph)${NC}" + echo -e "${GREEN}✓ llama.cpp Embedding 容器正在运行 (embedding-server)${NC}" return 0 fi } # ============================================================================= -# 启动 Docker 依赖服务 +# 启动 Docker 依赖服务(llama.cpp) # ============================================================================= -start_vllm() { - echo -e "${BLUE}🚀 启动 vLLM 容器...${NC}" +start_llamacpp() { + echo -e "${BLUE}🚀 启动 llama.cpp LLM 容器...${NC}" # 检查模型文件 - if [ ! -d "/home/huang/Study/AIModel/gemma-4-E2B-it" ]; then - echo -e "${RED}✗ 错误:模型目录不存在: /home/huang/Study/AIModel/gemma-4-E2B-it${NC}" + if [ ! -f "/home/huang/Study/AIModel/GGUF/Gemma-4-E2B-Uncensored-HauhauCS-Aggressive-Q6_K_P.gguf" ]; then + echo -e "${RED}✗ 错误:LLM 模型文件不存在${NC}" + exit 1 + fi + + if [ ! -f "/home/huang/Study/AIModel/GGUF/mmproj-Gemma-4-E2B-Uncensored-HauhauCS-Aggressive-f16.gguf" ]; then + echo -e "${RED}✗ 错误:多模态投影文件不存在${NC}" exit 1 fi docker run -d \ - --name gemma4-server \ + --name gemma4-llamacpp-server \ + --restart=unless-stopped \ --group-add=video \ - --cap-add=SYS_PTRACE \ - --security-opt seccomp=unconfined \ --device=/dev/kfd \ --device=/dev/dri \ - -v /home/huang/Study/AIModel/gemma-4-E2B-it:/models/gemma-4-E2B-it \ - -e VLLM_ROCM_USE_AITER=0 \ - -e HF_TOKEN="${HF_TOKEN:-}" \ - -p 8000:8000 \ - --ipc=host \ - --entrypoint vllm \ - my-vllm-gemma4:working \ - serve /models/gemma-4-E2B-it \ - --served-model-name gemma-4-E2B-it \ - --dtype auto \ - --api-key token-abc123 \ - --trust-remote-code \ - --port 8000 \ - --gpu-memory-utilization 0.85 \ - --max-model-len 8192 + -v /home/huang/Study/AIModel/GGUF:/models \ + -p 8081:8080 \ + ghcr.io/ggml-org/llama.cpp:server-rocm \ + -m /models/Gemma-4-E2B-Uncensored-HauhauCS-Aggressive-Q6_K_P.gguf \ + --mmproj /models/mmproj-Gemma-4-E2B-Uncensored-HauhauCS-Aggressive-f16.gguf \ + --host 0.0.0.0 \ + --port 8080 \ + -ngl 99 - echo -e "${GREEN}✓ vLLM 容器已启动${NC}" + echo -e "${GREEN}✓ llama.cpp LLM 容器已启动 (端口 8081)${NC}" echo -e "${YELLOW}⏳ 等待模型加载(可能需要几分钟)...${NC}" - sleep 10 + sleep 15 } -start_postgres() { - echo -e "${BLUE}🚀 启动 PostgreSQL 容器...${NC}" +start_embedding() { + echo -e "${BLUE}🚀 启动 llama.cpp Embedding 容器...${NC}" + + # 检查模型文件 + if [ ! -f "/home/huang/Study/AIModel/GGUF/embeddinggemma-300M-Q8_0.gguf" ]; then + echo -e "${RED}✗ 错误:Embedding 模型文件不存在${NC}" + exit 1 + fi docker run -d \ - --name postgres-langgraph \ - -e POSTGRES_PASSWORD=mysecretpassword \ - -e POSTGRES_DB=langgraph_db \ - -p 5432:5432 \ - -v ~/docker_volumes/postgres_data:/var/lib/postgresql/data \ - postgres:16 + --name embedding-server \ + --restart=unless-stopped \ + --group-add=video \ + --device=/dev/kfd \ + --device=/dev/dri \ + -v /home/huang/Study/AIModel/GGUF:/models \ + -p 8082:8080 \ + ghcr.io/ggml-org/llama.cpp:server-rocm \ + -m /models/embeddinggemma-300M-Q8_0.gguf \ + --host 0.0.0.0 \ + --port 8080 \ + -ngl 99 \ + --embeddings \ + -c 512 - echo -e "${GREEN}✓ PostgreSQL 容器已启动${NC}" - sleep 3 + echo -e "${GREEN}✓ llama.cpp Embedding 容器已启动 (端口 8082)${NC}" + sleep 5 } # ============================================================================= # 启动 Python 服务 # ============================================================================= start_backend() { - echo -e "\n${BLUE}🚀 启动后端服务 (端口 8001)...${NC}" + echo -e "\n${BLUE}🚀 启动后端服务 (端口 8003)...${NC}" cd "$PROJECT_DIR" # 加载 .env 文件中的环境变量 @@ -242,6 +289,7 @@ start_backend() { set +a export PYTHONPATH="$PROJECT_DIR" + export BACKEND_PORT=8003 python app/backend.py & BACKEND_PID=$! echo -e "${GREEN}✓ 后端服务已启动 (PID: $BACKEND_PID)${NC}" @@ -263,7 +311,6 @@ start_frontend() { echo -e "${GREEN}✓ 前端服务已启动 (PID: $FRONTEND_PID)${NC}" echo -e "${GREEN}✓ 访问地址:${NC}" echo -e " 本地开发: http://localhost:8501" - echo -e " Nginx代理: http://your-domain.com" } # ============================================================================= @@ -271,30 +318,28 @@ start_frontend() { # ============================================================================= docker_up() { echo -e "${BLUE}🐳 使用 Docker Compose 启动所有服务...${NC}" - cd "$PROJECT_DIR" + cd "$PROJECT_DIR/docker" # 检查 .env 文件 - if [ ! -f ".env" ]; then + if [ ! -f "../.env" ]; then echo -e "${RED}✗ 错误:.env 文件不存在${NC}" echo " 请先复制配置文件:" echo " cp .env.docker .env # 服务器部署" - echo " 或" - echo " cp .env.local .env # 本地开发" exit 1 fi - docker compose -f docker/docker-compose.yml up -d --build + docker compose up -d --build echo -e "\n${GREEN}✓ Docker Compose 服务已启动${NC}" - echo -e "${BLUE}📊 查看服务状态:${NC} docker compose -f docker/docker-compose.yml ps" - echo -e "${BLUE}📝 查看日志:${NC} docker compose -f docker/docker-compose.yml logs -f" + echo -e "${BLUE}📊 查看服务状态:${NC} docker compose ps" + echo -e "${BLUE}📝 查看日志:${NC} docker compose logs -f" echo -e "${BLUE}🌐 访问应用:${NC} http://localhost:8501" } docker_down() { echo -e "${BLUE}🛑 停止 Docker Compose 服务...${NC}" - cd "$PROJECT_DIR" - docker compose -f docker/docker-compose.yml down + cd "$PROJECT_DIR/docker" + docker compose down echo -e "${GREEN}✓ 服务已停止${NC}" } @@ -312,8 +357,8 @@ cleanup() { echo -e "${GREEN}✓ 前端服务已停止${NC}" fi echo -e "${YELLOW}💡 提示:Docker 容器需要手动停止${NC}" - echo -e " 停止 vLLM: docker stop gemma4-server" - echo -e " 停止 PostgreSQL: docker stop postgres-langgraph" + echo -e " 停止 llama.cpp LLM: docker stop gemma4-llamacpp-server" + echo -e " 停止 llama.cpp Embedding: docker stop embedding-server" echo -e " 或使用: $0 docker-down" exit 0 } @@ -331,8 +376,8 @@ case "${1:-help}" in backend) check_config || exit 1 - check_vllm || start_vllm - check_postgres || start_postgres + check_llamacpp || start_llamacpp + check_embedding || start_embedding start_backend echo -e "\n${GREEN}后端服务正在运行,按 Ctrl+C 停止${NC}" wait $BACKEND_PID @@ -347,8 +392,8 @@ case "${1:-help}" in both) check_config || exit 1 - check_vllm || start_vllm - check_postgres || start_postgres + check_llamacpp || start_llamacpp + check_embedding || start_embedding start_backend start_frontend echo -e "\n${GREEN}所有服务正在运行,按 Ctrl+C 停止 Python 服务${NC}"