Files
ailine/backend/app/main_graph/nodes/memory_trigger.py
root 615b4b6eed
All checks were successful
构建并部署 AI Agent 服务 / deploy (push) Successful in 6m39s
修复状态兼容性问题:让旧节点同时支持 dict 和 dataclass
2026-05-01 22:45:42 +08:00

49 lines
1.7 KiB
Python

from typing import Any, Dict
from langchain_core.runnables.config import RunnableConfig
from app.main_graph.state import MessagesState
from app.memory.mem0_client import Mem0Client
from app.logger import info
def _get_attr(state, attr_name, default=None):
"""通用方法:兼容 dict 和 dataclass 两种状态格式"""
if isinstance(state, dict):
return state.get(attr_name, default)
else:
return getattr(state, attr_name, default)
# 全局变量,在 GraphBuilder 中注入
_mem0_client: Mem0Client = None
def set_mem0_client(client: Mem0Client):
global _mem0_client
_mem0_client = client
async def memory_trigger_node(state, config: RunnableConfig) -> Dict[str, Any]:
"""检测用户消息中的记忆指令,若命中则主动调用 Mem0 存储"""
if _mem0_client is None:
return {}
messages = _get_attr(state, "messages", [])
if not messages:
return {}
last_msg = messages[-1]
content = last_msg.content if hasattr(last_msg, 'content') else str(last_msg)
# 触发词(可自行扩展)
trigger_words = ["记住", "记下", "保存", "备忘", "记录下", "别忘了"]
if any(word in content for word in trigger_words):
user_id = config.get("metadata", {}).get("user_id", "default_user")
# 确保 Mem0 已初始化
if not _mem0_client._initialized:
await _mem0_client.initialize()
# 将用户消息作为事实来源提交给 Mem0
info(f"📌 检测到记忆指令,已主动触发 Mem0 存储")
mem0_messages = [{"role": "user", "content": content}]
await _mem0_client.add_memories(mem0_messages, user_id=user_id)
return {} # 不修改状态