""" Mem0 记忆层客户端封装模块 负责 Mem0 的初始化、检索和存储 """ import os from typing import Optional, List, Dict, Any from mem0 import AsyncMemory # 本地模块 from app.config import QDRANT_URL, QDRANT_COLLECTION_NAME, VLLM_EMBEDDING_URL from app.logger import info, warning, error class Mem0Client: """Mem0 异步客户端封装类""" def __init__(self, llm_instance): """ 初始化 Mem0 客户端 Args: llm_instance: LangChain LLM 实例(用于事实提取) """ self.llm = llm_instance self.mem0: Optional[AsyncMemory] = None self._initialized = False async def initialize(self): """异步初始化 Mem0 客户端""" if self._initialized: return try: # 检查 Qdrant 是否可达 (可选) import requests try: resp = requests.get(f"{QDRANT_URL}/collections", timeout=2) if resp.status_code == 200: info(f"✅ Qdrant 服务正常: {QDRANT_URL}") except Exception: warning(f"⚠️ 无法连接到 Qdrant: {QDRANT_URL},Mem0 将尝试自动连接") config = { # 向量存储:复用 Qdrant 实例 "vector_store": { "provider": "qdrant", "config": { "collection_name": QDRANT_COLLECTION_NAME, "host": QDRANT_URL.split("://")[1].split(":")[0] if "://" in QDRANT_URL else "localhost", "port": int(QDRANT_URL.split(":")[-1]) if ":" in QDRANT_URL.split("://")[-1] else 6333, "embedding_model_dims": 768, # embeddinggemma-300m 输出 768 维 } }, # 事实提取 LLM:直接复用传入的 LangChain 实例 "llm": { "provider": "langchain", "config": { "model": self.llm # 直接传入 LangChain 模型实例 } }, # Embedding:指向 vLLM 服务 "embedder": { "provider": "openai", "embedding_dims": 768, # 关键:将维度参数提升到顶层 "config": { "model": "google/embeddinggemma-300m", "api_key": "EMPTY", "api_base": VLLM_EMBEDDING_URL, # 注意:不要在此处传递 dimensions 参数,避免与 vLLM v0.7.2 不兼容 } }, "version": "v1.1" } self.mem0 = AsyncMemory.from_config(config) self._initialized = True info(f"✅ Mem0 初始化成功 (Embedding: vLLM@8002, Vector: Qdrant, LLM: 复用现有实例)") except Exception as e: error(f"❌ Mem0 初始化失败: {e}") import traceback traceback.print_exc() self.mem0 = None async def search_memories(self, query: str, user_id: str, limit: int = 5) -> List[str]: """ 检索相关记忆 Args: query: 查询文本 user_id: 用户 ID limit: 返回结果数量限制 Returns: List[str]: 记忆事实列表 """ if not self.mem0: warning("⚠️ Mem0 未初始化,跳过记忆检索") return [] try: memories = await self.mem0.search(query, user_id=user_id, limit=limit) if memories and "results" in memories: facts = [m["memory"] for m in memories["results"] if m.get("memory")] if facts: info(f"🔍 [记忆检索] Mem0 返回 {len(facts)} 条记忆") return facts info("🔍 [记忆检索] 未找到相关记忆") return [] except Exception as e: warning(f"⚠️ Mem0 检索失败: {e}") return [] async def add_memories(self, messages: List[Dict[str, str]], user_id: str) -> bool: """ 添加记忆(自动提取事实并存储) Args: messages: 消息列表,格式为 [{"role": "user/assistant/system", "content": "..."}] user_id: 用户 ID Returns: bool: 是否成功 """ if not self.mem0: warning("⚠️ Mem0 未初始化,跳过记忆添加") return False try: result = await self.mem0.add( messages, user_id=user_id, metadata={"type": "conversation"} ) info(f"📝 [记忆添加] 已提交给 Mem0 进行事实提取") return True except Exception as e: error(f"❌ Mem0 记忆添加失败: {e}") return False