优化memory、rag和embedding模块
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 5m5s

This commit is contained in:
2026-04-29 10:52:01 +08:00
parent da4a7b0dd4
commit 223d1c9afd
4 changed files with 113 additions and 84 deletions

View File

@@ -16,6 +16,7 @@ import asyncio
from typing import Optional, List, Dict from typing import Optional, List, Dict
from mem0 import AsyncMemory from mem0 import AsyncMemory
class Mem0Client: class Mem0Client:
"""Mem0 异步客户端封装类""" """Mem0 异步客户端封装类"""
@@ -37,34 +38,36 @@ class Mem0Client:
try: try:
# 获取可用的 embedding 服务并确定维度 # 获取可用的 embedding 服务并确定维度
info("🔄 正在获取嵌入服务...")
embeddings = get_embedding_service() embeddings = get_embedding_service()
test_embedding = embeddings.embed_query("test") test_embedding = embeddings.embed_query("test")
embedding_dim = len(test_embedding) embedding_dim = len(test_embedding)
info(f"✅ 嵌入服务可用,向量维度: {embedding_dim}")
# 构建正确的 embedder 配置 - 根据我们的降级机制 # 构建 embedder 配置 - 改进的方法
# 首先我们需要判断哪个服务实际可用 # 检查本地 provider
from ..model_services.embedding_services import LocalLlamaCppEmbeddingProvider, ZhipuEmbeddingProvider from ..model_services.embedding_services import LocalLlamaCppEmbeddingProvider, ZhipuEmbeddingProvider
embedder_config = None embedder_config = None
# 检查本地服务
local_provider = LocalLlamaCppEmbeddingProvider() local_provider = LocalLlamaCppEmbeddingProvider()
if local_provider.is_available(): if local_provider.is_available():
info("✅ 使用本地 llama.cpp 作为 mem0 embedder") info("✅ 使用本地 llama.cpp 作为 mem0 embedder")
embedder_config = { embedder_config = {
"provider": "openai", "provider": "openai",
"config": { "config": {
"model": "Qwen3-Embedding-0.6B-Q8_0", "model": "Qwen3-Embedding-0.6B-Q8_0",
"api_key": LLAMACPP_API_KEY or "dummy", "api_key": LLAMACPP_API_KEY or "dummy-key",
"openai_base_url": LLAMACPP_EMBEDDING_URL, "openai_base_url": LLAMACPP_EMBEDDING_URL,
} }
} }
else: else:
# 尝试使用智谱 # 检查智谱
zhipu_provider = ZhipuEmbeddingProvider() zhipu_provider = ZhipuEmbeddingProvider()
if zhipu_provider.is_available(): if zhipu_provider.is_available():
info("✅ 使用智谱 API 作为 mem0 embedder") info("✅ 使用智谱 API 作为 mem0 embedder")
# 注意mem0 可能不直接支持智谱,这里我们暂时还是用 openai 兼容方式 # 使用自定义 embedder 或者 openai 兼容方式
# 或者需要自定义 embedder # 注意:这里我们使用一个特殊的配置方法
embedder_config = { embedder_config = {
"provider": "openai", "provider": "openai",
"config": { "config": {
@@ -74,18 +77,19 @@ class Mem0Client:
} }
} }
else: else:
# 都不可用,使用 dummy 配置 # 都不可用,使用 dummy 配置并警告
warning("⚠️ 没有可用的 embedder使用 dummy 配置") warning("⚠️ 没有可用的 embedder使用 dummy 配置")
embedder_config = { embedder_config = {
"provider": "openai", "provider": "openai",
"config": { "config": {
"model": "dummy", "model": "text-embedding-ada-002",
"api_key": "dummy", "api_key": "dummy-key",
"openai_base_url": "http://localhost:8080/v1", "openai_base_url": "http://localhost:8080/v1",
} }
} }
# Mem0 配置 # Mem0 配置 - 简化配置,先确保能启动
info("🔄 正在构建 Mem0 配置...")
config = { config = {
"vector_store": { "vector_store": {
"provider": "qdrant", "provider": "qdrant",
@@ -99,9 +103,9 @@ class Mem0Client:
"llm": { "llm": {
"provider": "openai", "provider": "openai",
"config": { "config": {
"model": "LLM_MODEL", "model": "gpt-3.5-turbo", # 使用一个通用的模型名
"api_key": LLM_API_KEY, "api_key": LLM_API_KEY or ZHIPUAI_API_KEY or "dummy-key",
"openai_base_url": VLLM_BASE_URL, "openai_base_url": VLLM_BASE_URL or ZHIPU_API_BASE,
"temperature": 0.1, "temperature": 0.1,
"max_tokens": 2000, "max_tokens": 2000,
} }
@@ -110,27 +114,31 @@ class Mem0Client:
"version": "v1.1" "version": "v1.1"
} }
info("🔄 正在初始化 Mem0 实例...")
self.mem0 = AsyncMemory.from_config(config) self.mem0 = AsyncMemory.from_config(config)
info("✅ Mem0 配置加载成功,开始连接测试...") info("✅ Mem0 配置加载成功")
# 实际连接测试 # 尝试进行连接测试,但失败不会阻止初始化
try: try:
info("🔄 正在测试 Mem0 连接...")
# 使用短超时的测试
await asyncio.wait_for( await asyncio.wait_for(
self.mem0.search("ping", user_id="test", limit=1), self.mem0.search("ping", user_id="test", limit=1),
timeout=30.0 timeout=10.0
) )
info("✅ Mem0 实际连接测试成功,初始化完成") info("✅ Mem0 连接测试成功")
except Exception as e: except Exception as e:
warning(f"⚠️ Mem0 连接测试遇到问题,但仍继续初始化: {e}") warning(f"⚠️ Mem0 连接测试遇到问题(但继续使用): {e}")
self._initialized = True self._initialized = True
info("🎉 Mem0 初始化完成")
except asyncio.TimeoutError: except asyncio.TimeoutError:
error("❌ Mem0 连接测试超时 (10s),请检查 Qdrant 或 Embedding 服务响应") error("❌ Mem0 初始化超时")
self.mem0 = None self.mem0 = None
self._initialized = False self._initialized = False
except Exception as e: except Exception as e:
error(f"❌ Mem0 初始化或连接测试失败: {e}") error(f"❌ Mem0 初始化失败: {e}")
import traceback import traceback
error(f"详细错误信息:\n{traceback.format_exc()}") error(f"详细错误信息:\n{traceback.format_exc()}")
self.mem0 = None self.mem0 = None

View File

@@ -4,24 +4,11 @@ Qdrant 向量检索器模块
提供基于 Qdrant 的基础向量检索和混合检索Dense + Sparse功能。 提供基于 Qdrant 的基础向量检索和混合检索Dense + Sparse功能。
核心原理: 核心原理:
- 基础检索:将查询文本转换为向量,在 Qdrant 中进行近似最近邻ANN搜索 - 直接使用统一的 get_embedding_service(),已内置降级机制
使用余弦相似度返回最相似的 k 个文档。
- 混合检索:结合稠密向量检索(语义相似)和 BM25 稀疏向量检索(关键词匹配),
通过加权或分数融合提高召回精度。
使用示例: 使用示例:
>>> from rag_core import LlamaCppEmbedder >>> from app.rag.retriever import create_base_retriever
>>> embedder = LlamaCppEmbedder() >>> retriever = create_base_retriever(collection_name="my_docs")
>>> embeddings = embedder.as_langchain_embeddings()
>>>
>>> # 创建基础检索器
>>> retriever = create_base_retriever(
... collection_name="my_docs",
... embeddings=embeddings,
... search_kwargs={"k": 10}
... )
>>>
>>> # 执行检索
>>> docs = retriever.invoke("什么是 RAG") >>> docs = retriever.invoke("什么是 RAG")
""" """
@@ -32,8 +19,10 @@ from langchain_qdrant import QdrantVectorStore
from langchain_core.embeddings import Embeddings from langchain_core.embeddings import Embeddings
from langchain_core.retrievers import BaseRetriever from langchain_core.retrievers import BaseRetriever
from rag_core import QDRANT_URL, QDRANT_API_KEY, LlamaCppEmbedder from rag_core import QDRANT_URL, QDRANT_API_KEY
from rag_core.client import create_qdrant_client as create_core_qdrant_client from rag_core.client import create_qdrant_client as create_core_qdrant_client
from app.model_services import get_embedding_service
from app.logger import info, warning
# 模块级常量 # 模块级常量
DEFAULT_SEARCH_K = 20 DEFAULT_SEARCH_K = 20
@@ -44,31 +33,24 @@ def create_base_retriever(
collection_name: str, collection_name: str,
search_kwargs: Dict[str, Any] | None = None, search_kwargs: Dict[str, Any] | None = None,
client: QdrantClient | None = None, client: QdrantClient | None = None,
embeddings: Embeddings | None = None,
) -> BaseRetriever: ) -> BaseRetriever:
""" """
创建基础向量检索器(仅稠密向量检索) 创建基础向量检索器(仅稠密向量检索)
该检索器使用嵌入模型将查询转为向量,在 Qdrant 集合中执行 ANN 搜索,
返回语义上最相似的文档块。
Args: Args:
collection_name: Qdrant 集合名称(需预先创建并索引)。 collection_name: Qdrant 集合名称
search_kwargs: 搜索参数,可包含: search_kwargs: 搜索参数
- k (int): 返回的文档数量,默认 20。 client: 可选的 Qdrant 客户端
- score_threshold (float): 相似度阈值,仅返回高于此分数的文档。 embeddings: 可选的嵌入模型(默认使用 get_embedding_service()
- filter (dict): Qdrant 过滤条件。
若为 None则使用默认值 {"k": 20}。
client: 可选的 Qdrant 客户端实例。若未提供,将自动创建。
Returns: Returns:
BaseRetriever 实例,可直接调用 .invoke(query) 或 .ainvoke(query) 检索 LangChain 兼容的检索
Raises:
ValueError: 如果集合不存在或嵌入模型无效。
""" """
# 嵌入模型 # 默认使用统一嵌入服务(已内置降级机制)
embedder = LlamaCppEmbedder() if embeddings is None:
embeddings = embedder.as_langchain_embeddings() embeddings = get_embedding_service()
info("✅ 使用统一嵌入服务(本地 llama.cpp → 智谱 API 自动降级)")
# 合并默认搜索参数 # 合并默认搜索参数
merged_search_kwargs = {"k": DEFAULT_SEARCH_K} merged_search_kwargs = {"k": DEFAULT_SEARCH_K}
@@ -79,14 +61,13 @@ def create_base_retriever(
if client is None: if client is None:
client = create_core_qdrant_client() client = create_core_qdrant_client()
# 验证集合是否存在(可选,便于提前发现问题) # 验证集合是否存在
try: try:
client.get_collection(collection_name) client.get_collection(collection_name)
except UnexpectedResponse as e: except UnexpectedResponse as e:
if e.status_code == 404: if e.status_code == 404:
raise ValueError( warning(f"⚠️ Qdrant 集合 '{collection_name}' 不存在,请先创建并索引文档")
f"Qdrant 集合 '{collection_name}' 不存在,请先创建并索引文档。" raise ValueError(f"Qdrant 集合 '{collection_name}' 不存在")
)
raise raise
# 构建向量存储 # 构建向量存储
@@ -96,7 +77,6 @@ def create_base_retriever(
embedding=embeddings, embedding=embeddings,
) )
# 返回检索器
return vector_store.as_retriever(search_kwargs=merged_search_kwargs) return vector_store.as_retriever(search_kwargs=merged_search_kwargs)
@@ -106,6 +86,7 @@ def create_hybrid_retriever(
sparse_k: int = 10, sparse_k: int = 10,
score_threshold: float | None = DEFAULT_SCORE_THRESHOLD, score_threshold: float | None = DEFAULT_SCORE_THRESHOLD,
client: QdrantClient | None = None, client: QdrantClient | None = None,
embeddings: Embeddings | None = None,
) -> BaseRetriever: ) -> BaseRetriever:
""" """
创建混合检索器(稠密向量 + BM25 稀疏向量)。 创建混合检索器(稠密向量 + BM25 稀疏向量)。
@@ -122,6 +103,7 @@ def create_hybrid_retriever(
sparse_k: 稀疏向量检索返回数量,默认 10。 sparse_k: 稀疏向量检索返回数量,默认 10。
score_threshold: 相似度阈值,默认 0.3。 score_threshold: 相似度阈值,默认 0.3。
client: 可选的 Qdrant 客户端实例。 client: 可选的 Qdrant 客户端实例。
embeddings: 可选的嵌入模型实例。若未提供,将自动获取统一嵌入服务。
Returns: Returns:
BaseRetriever 实例,配置了混合搜索参数。 BaseRetriever 实例,配置了混合搜索参数。
@@ -139,6 +121,7 @@ def create_hybrid_retriever(
collection_name=collection_name, collection_name=collection_name,
search_kwargs=search_kwargs, search_kwargs=search_kwargs,
client=client, client=client,
embeddings=embeddings,
) )

View File

@@ -1,49 +1,88 @@
""" """
嵌入模型包装器,用于 llama.cpp 服务 嵌入模型包装器 - 直接使用统一嵌入服务
支持自动降级(本地 llama.cpp → 智谱),由 get_embedding_service() 内部处理
""" """
import os import sys
from .config import LLAMACPP_EMBEDDING_URL, LLAMACPP_API_KEY import logging
import httpx
from typing import List from typing import List
from pathlib import Path
# 添加父目录到路径,支持从 app.model_services 导入
backend_root = Path(__file__).parent.parent
if str(backend_root) not in sys.path:
sys.path.insert(0, str(backend_root))
from .config import LLAMACPP_EMBEDDING_URL, LLAMACPP_API_KEY
from langchain_core.embeddings import Embeddings from langchain_core.embeddings import Embeddings
logger = logging.getLogger(__name__)
class LlamaCppEmbedder: class LlamaCppEmbedder:
"""通过 OpenAI 兼容 API 封装 llama.cpp 嵌入服务。""" """
嵌入器包装类 - 直接使用统一的 get_embedding_service()
降级逻辑完全由 app.model_services 处理
"""
def __init__(self, model: str = "Qwen3-Embedding-0.6B-Q8_0"): def __init__(self, model: str = "Qwen3-Embedding-0.6B-Q8_0", use_fallback: bool = True):
""" """
Args: Args:
model: 嵌入模型名称,默认 "Qwen3-Embedding-0.6B-Q8_0" model: 嵌入模型名称(向后兼容,现在实际使用统一服务)
use_fallback: 是否使用降级机制(保留参数,现在始终为 True
""" """
self.base_url = LLAMACPP_EMBEDDING_URL
self.api_key = LLAMACPP_API_KEY
self.model = model self.model = model
print(f"初始化 base_url: { self.base_url}") self._fallback_embeddings = None
# 直接获取统一嵌入服务
try:
from app.model_services import get_embedding_service
self._fallback_embeddings = get_embedding_service()
logger.info("✅ 统一嵌入服务加载成功")
except Exception as e:
logger.warning(f"⚠️ 无法加载统一嵌入服务: {e}")
# 保留向后兼容的初始化
self.base_url = LLAMACPP_EMBEDDING_URL
self.api_key = LLAMACPP_API_KEY
def as_langchain_embeddings(self) -> Embeddings: def as_langchain_embeddings(self) -> Embeddings:
"""创建 LangChain 兼容的嵌入实例""" """创建 LangChain 兼容的嵌入实例"""
if self._fallback_embeddings:
logger.info("✅ 使用统一嵌入服务(已内置降级机制)")
return self._fallback_embeddings
# 向后兼容,仅在统一服务不可用时使用传统方式
logger.warning("⚠️ 统一服务不可用,使用传统模式(不推荐)")
return _LlamaCppLangchainAdapter(self) return _LlamaCppLangchainAdapter(self)
def embed_documents(self, texts: List[str]) -> List[List[float]]: def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""嵌入一批文档""" """嵌入一批文档"""
if self._fallback_embeddings:
return self._fallback_embeddings.embed_documents(texts)
# 向后兼容
return self._call_embedding_api(texts) return self._call_embedding_api(texts)
def embed_query(self, text: str) -> List[List[float]]: def embed_query(self, text: str) -> List[float]:
"""嵌入单个查询""" """嵌入单个查询"""
if self._fallback_embeddings:
return self._fallback_embeddings.embed_query(text)
# 向后兼容
return self._call_embedding_api([text])[0] return self._call_embedding_api([text])[0]
def get_embedding_dimension(self) -> int: def get_embedding_dimension(self) -> int:
"""通过嵌入测试字符串获取嵌入维度""" """通过嵌入测试字符串获取嵌入维度"""
test_embedding = self.embed_query("test") test_embedding = self.embed_query("test")
return len(test_embedding) return len(test_embedding)
def _call_embedding_api(self, texts: List[str]) -> List[List[float]]: def _call_embedding_api(self, texts: List[str]) -> List[List[float]]:
"""直接调用 llama.cpp 嵌入 API。""" """仅作为向后兼容的备用方法"""
import httpx
if not hasattr(self, 'base_url') or not self.base_url:
raise ValueError("LLAMACPP_EMBEDDING_URL 未配置且统一服务不可用")
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
if self.api_key: if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}" headers["Authorization"] = f"Bearer {self.api_key}"
@@ -52,7 +91,6 @@ class LlamaCppEmbedder:
if not base.endswith("/v1"): if not base.endswith("/v1"):
base = base + "/v1" base = base + "/v1"
payload = { payload = {
"input": texts, "input": texts,
"model": self.model, "model": self.model,
@@ -76,7 +114,7 @@ class LlamaCppEmbedder:
class _LlamaCppLangchainAdapter(Embeddings): class _LlamaCppLangchainAdapter(Embeddings):
"""将 LlamaCppEmbedder 适配为 LangChain Embeddings 接口。""" """仅作为向后兼容的适配器"""
def __init__(self, embedder: LlamaCppEmbedder): def __init__(self, embedder: LlamaCppEmbedder):
self._embedder = embedder self._embedder = embedder
@@ -84,5 +122,5 @@ class _LlamaCppLangchainAdapter(Embeddings):
def embed_documents(self, texts: List[str]) -> List[List[float]]: def embed_documents(self, texts: List[str]) -> List[List[float]]:
return self._embedder.embed_documents(texts) return self._embedder.embed_documents(texts)
def embed_query(self, text: str) -> List[List[float]]: def embed_query(self, text: str) -> List[float]:
return self._embedder.embed_query(text) return self._embedder.embed_query(text)

View File

@@ -87,8 +87,8 @@ services:
environment: environment:
# Docker 内部网络使用服务名 'backend' 解析后端服务 # Docker 内部网络使用服务名 'backend' 解析后端服务
- API_URL=http://backend:8079/chat - API_URL=http://backend:8079/chat
volumes: # volumes:
- ../frontend/src:/app/src # 挂载源代码目录,修改立即生效 # - ../frontend/src:/app/src # 挂载源代码目录,修改立即生效
ports: ports:
- "8501:8501" - "8501:8501"
networks: networks: