Files
ailine/backend/app/memory/mem0_client.py
root d6805d1db8
Some checks failed
构建并部署 AI Agent 服务 / deploy (push) Failing after 6m26s
修复重构后的导入错误和缺失模块
2026-04-29 17:23:20 +08:00

199 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from app.config import (
LLM_API_KEY, ZHIPUAI_API_KEY,
VLLM_BASE_URL, QDRANT_URL, QDRANT_COLLECTION_NAME, QDRANT_API_KEY,
LLAMACPP_EMBEDDING_URL, LLAMACPP_API_KEY,
ZHIPU_EMBEDDING_MODEL, ZHIPU_API_BASE
)
from ..model_services import get_embedding_service
from app.logger import info, warning, error
import time
"""
Mem0 记忆层客户端封装模块
负责 Mem0 的初始化、检索和存储
"""
import asyncio
from typing import Optional, List, Dict
from mem0 import AsyncMemory
class Mem0Client:
"""Mem0 异步客户端封装类"""
def __init__(self, llm_instance):
"""
初始化 Mem0 客户端
Args:
llm_instance: LangChain LLM 实例(用于事实提取)
"""
self.llm = llm_instance
self.mem0: Optional[AsyncMemory] = None
self._initialized = False
async def initialize(self):
"""异步初始化 Mem0 客户端,并进行实际连接测试"""
if self._initialized:
return
try:
# 获取可用的 embedding 服务并确定维度
info("🔄 正在获取嵌入服务...")
embeddings = get_embedding_service()
test_embedding = embeddings.embed_query("test")
embedding_dim = len(test_embedding)
info(f"✅ 嵌入服务可用,向量维度: {embedding_dim}")
# 构建 embedder 配置 - 改进的方法
# 检查本地 provider
from ..model_services.embedding_services import LocalLlamaCppEmbeddingProvider, ZhipuEmbeddingProvider
embedder_config = None
local_provider = LocalLlamaCppEmbeddingProvider()
if local_provider.is_available():
info("✅ 使用本地 llama.cpp 作为 mem0 embedder")
embedder_config = {
"provider": "openai",
"config": {
"model": "Qwen3-Embedding-0.6B-Q8_0",
"api_key": LLAMACPP_API_KEY or "dummy-key",
"openai_base_url": LLAMACPP_EMBEDDING_URL,
}
}
else:
# 检查智谱
zhipu_provider = ZhipuEmbeddingProvider()
if zhipu_provider.is_available():
info("✅ 使用智谱 API 作为 mem0 embedder")
# 使用自定义 embedder 或者 openai 兼容方式
# 注意:这里我们使用一个特殊的配置方法
embedder_config = {
"provider": "openai",
"config": {
"model": ZHIPU_EMBEDDING_MODEL,
"api_key": ZHIPUAI_API_KEY,
"openai_base_url": ZHIPU_API_BASE,
}
}
else:
# 都不可用,使用 dummy 配置并警告
warning("⚠️ 没有可用的 embedder使用 dummy 配置")
embedder_config = {
"provider": "openai",
"config": {
"model": "text-embedding-ada-002",
"api_key": "dummy-key",
"openai_base_url": "http://localhost:8080/v1",
}
}
# Mem0 配置 - 简化配置,先确保能启动
info("🔄 正在构建 Mem0 配置...")
config = {
"vector_store": {
"provider": "qdrant",
"config": {
"url": QDRANT_URL,
"api_key": QDRANT_API_KEY,
"collection_name": QDRANT_COLLECTION_NAME,
"embedding_model_dims": embedding_dim,
}
},
"llm": {
"provider": "openai",
"config": {
"model": "gpt-3.5-turbo", # 使用一个通用的模型名
"api_key": LLM_API_KEY or ZHIPUAI_API_KEY or "dummy-key",
"openai_base_url": VLLM_BASE_URL or ZHIPU_API_BASE,
"temperature": 0.1,
"max_tokens": 2000,
}
},
"embedder": embedder_config,
"version": "v1.1"
}
info("🔄 正在初始化 Mem0 实例...")
self.mem0 = AsyncMemory.from_config(config)
info("✅ Mem0 配置加载成功")
# 尝试进行连接测试,但失败不会阻止初始化
try:
info("🔄 正在测试 Mem0 连接...")
# 使用短超时的测试
await asyncio.wait_for(
self.mem0.search("ping", user_id="test", limit=1),
timeout=10.0
)
info("✅ Mem0 连接测试成功")
except Exception as e:
warning(f"⚠️ Mem0 连接测试遇到问题(但继续使用): {e}")
self._initialized = True
info("🎉 Mem0 初始化完成")
except asyncio.TimeoutError:
error("❌ Mem0 初始化超时")
self.mem0 = None
self._initialized = False
except Exception as e:
error(f"❌ Mem0 初始化失败: {e}")
import traceback
error(f"详细错误信息:\n{traceback.format_exc()}")
self.mem0 = None
self._initialized = False
async def search_memories(self, query: str, user_id: str, limit: int = 5) -> List[str]:
"""
检索相关记忆
Args:
query: 查询文本
user_id: 用户 ID
limit: 返回结果数量限制
Returns:
List[str]: 记忆事实列表
"""
if not self.mem0:
warning("⚠️ Mem0 未初始化,跳过记忆检索")
return []
try:
memories = await asyncio.wait_for(
self.mem0.search(query, user_id=user_id, limit=limit),
timeout=30.0
)
if memories and "results" in memories:
facts = [m["memory"] for m in memories["results"] if m.get("memory")]
if facts:
info(f"🔍 [记忆检索] Mem0 返回 {len(facts)} 条记忆")
return facts
info("🔍 [记忆检索] 未找到相关记忆")
return []
except asyncio.TimeoutError:
warning("⚠️ Mem0 检索超时 (30s),跳过本次记忆检索")
return []
except Exception as e:
warning(f"⚠️ Mem0 检索失败: {e}")
return []
async def add_memories(self, messages, user_id):
if not self.mem0:
return False
try:
start = time.time()
info(f"📝 开始 Mem0 add消息数: {len(messages)}")
await asyncio.wait_for(
self.mem0.add(messages, user_id=user_id, metadata={"type": "conversation"}),
timeout=60.0
)
info(f"✅ Mem0 add 完成,耗时: {time.time() - start:.2f}s")
return True
except asyncio.TimeoutError:
error(f"❌ Mem0 记忆添加超时 (60s),已等待 {time.time() - start:.2f}s")
return False