From 9601ae9aa6341655d71ca55f5dc038edfc5e3b02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Wed, 14 Jan 2026 15:52:27 +0800 Subject: [PATCH 01/14] feat: add in mem-reader --- src/memos/api/handlers/component_init.py | 3 +- src/memos/mem_feedback/feedback.py | 3 +- src/memos/mem_reader/base.py | 23 ++++++++++++- src/memos/mem_reader/factory.py | 32 +++++++++++++++++-- src/memos/mem_reader/simple_struct.py | 3 ++ .../init_components_for_scheduler.py | 3 +- 6 files changed, 60 insertions(+), 7 deletions(-) diff --git a/src/memos/api/handlers/component_init.py b/src/memos/api/handlers/component_init.py index 56f8ac195..76af6decf 100644 --- a/src/memos/api/handlers/component_init.py +++ b/src/memos/api/handlers/component_init.py @@ -183,7 +183,8 @@ def init_server() -> dict[str, Any]: else None ) embedder = EmbedderFactory.from_config(embedder_config) - mem_reader = MemReaderFactory.from_config(mem_reader_config) + # Pass graph_db to mem_reader for recall operations (deduplication, conflict detection) + mem_reader = MemReaderFactory.from_config(mem_reader_config, graph_db=graph_db) reranker = RerankerFactory.from_config(reranker_config) feedback_reranker = RerankerFactory.from_config(feedback_reranker_config) internet_retriever = InternetRetrieverFactory.from_config( diff --git a/src/memos/mem_feedback/feedback.py b/src/memos/mem_feedback/feedback.py index 15d7c336a..1d199c6cb 100644 --- a/src/memos/mem_feedback/feedback.py +++ b/src/memos/mem_feedback/feedback.py @@ -76,7 +76,8 @@ def __init__(self, config: MemFeedbackConfig): self.llm: OpenAILLM | OllamaLLM | AzureLLM = LLMFactory.from_config(config.extractor_llm) self.embedder: OllamaEmbedder = EmbedderFactory.from_config(config.embedder) self.graph_store: PolarDBGraphDB = GraphStoreFactory.from_config(config.graph_db) - self.mem_reader = MemReaderFactory.from_config(config.mem_reader) + # Pass graph_store to mem_reader for recall operations (deduplication, conflict detection) + self.mem_reader = MemReaderFactory.from_config(config.mem_reader, graph_db=self.graph_store) self.is_reorganize = config.reorganize self.memory_manager: MemoryManager = MemoryManager( diff --git a/src/memos/mem_reader/base.py b/src/memos/mem_reader/base.py index 391270bcf..b34abf9a1 100644 --- a/src/memos/mem_reader/base.py +++ b/src/memos/mem_reader/base.py @@ -1,17 +1,38 @@ from abc import ABC, abstractmethod -from typing import Any +from typing import TYPE_CHECKING, Any from memos.configs.mem_reader import BaseMemReaderConfig from memos.memories.textual.item import TextualMemoryItem +if TYPE_CHECKING: + from memos.graph_dbs.base import BaseGraphDB + + class BaseMemReader(ABC): """MemReader interface class for reading information.""" + # Optional graph database for recall operations (for deduplication, conflict + # detection .etc) + graph_db: "BaseGraphDB | None" = None + @abstractmethod def __init__(self, config: BaseMemReaderConfig): """Initialize the MemReader with the given configuration.""" + def set_graph_db(self, graph_db: "BaseGraphDB | None") -> None: + """ + Set the graph database instance for recall operations. + + This enables the mem-reader to perform: + - Semantic deduplication: avoid storing duplicate memories + - Conflict detection: detect contradictions with existing memories + + Args: + graph_db: The graph database instance, or None to disable recall operations. + """ + self.graph_db = graph_db + @abstractmethod def get_memory( self, scene_data: list, type: str, info: dict[str, Any], mode: str = "fast" diff --git a/src/memos/mem_reader/factory.py b/src/memos/mem_reader/factory.py index ff24e5c77..2749327bf 100644 --- a/src/memos/mem_reader/factory.py +++ b/src/memos/mem_reader/factory.py @@ -1,4 +1,4 @@ -from typing import Any, ClassVar +from typing import TYPE_CHECKING, Any, ClassVar, Optional from memos.configs.mem_reader import MemReaderConfigFactory from memos.mem_reader.base import BaseMemReader @@ -8,6 +8,10 @@ from memos.memos_tools.singleton import singleton_factory +if TYPE_CHECKING: + from memos.graph_dbs.base import BaseGraphDB + + class MemReaderFactory(BaseMemReader): """Factory class for creating MemReader instances.""" @@ -19,9 +23,31 @@ class MemReaderFactory(BaseMemReader): @classmethod @singleton_factory() - def from_config(cls, config_factory: MemReaderConfigFactory) -> BaseMemReader: + def from_config( + cls, + config_factory: MemReaderConfigFactory, + graph_db: Optional["BaseGraphDB | None"] = None, + ) -> BaseMemReader: + """ + Create a MemReader instance from configuration. + + Args: + config_factory: Configuration factory for the MemReader. + graph_db: Optional graph database instance for recall operations + (deduplication, conflict detection). Can also be set later + via reader.set_graph_db(). + + Returns: + Configured MemReader instance. + """ backend = config_factory.backend if backend not in cls.backend_to_class: raise ValueError(f"Invalid backend: {backend}") reader_class = cls.backend_to_class[backend] - return reader_class(config_factory.config) + reader = reader_class(config_factory.config) + + # Set graph_db if provided (for recall operations) + if graph_db is not None: + reader.set_graph_db(graph_db) + + return reader diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index fa72bd063..a776d12f6 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -176,6 +176,9 @@ def __init__(self, config: SimpleStructMemReaderConfig): self.chat_window_max_tokens = getattr(self.config, "chat_window_max_tokens", 1024) self._count_tokens = count_tokens_text self.searcher = None + # Initialize graph_db as None, can be set later via set_graph_db for + # recall operations + self.graph_db = None def _make_memory_item( self, diff --git a/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py b/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py index 8fd60153d..3a12a9c79 100644 --- a/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py +++ b/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py @@ -305,7 +305,8 @@ def init_components() -> dict[str, Any]: ) llm = LLMFactory.from_config(llm_config) embedder = EmbedderFactory.from_config(embedder_config) - mem_reader = MemReaderFactory.from_config(mem_reader_config) + # Pass graph_db to mem_reader for recall operations (deduplication, conflict detection) + mem_reader = MemReaderFactory.from_config(mem_reader_config, graph_db=graph_db) reranker = RerankerFactory.from_config(reranker_config) feedback_reranker = RerankerFactory.from_config(feedback_reranker_config) internet_retriever = InternetRetrieverFactory.from_config( From 2c0d39520915270ad2daab7e6061e626c53716c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Wed, 14 Jan 2026 20:45:02 +0800 Subject: [PATCH 02/14] feat: add merge from in mem-reader --- src/memos/graph_dbs/base.py | 2 +- src/memos/mem_reader/multi_modal_struct.py | 66 +++++++++++++++++++--- src/memos/mem_reader/simple_struct.py | 20 +++++-- src/memos/multi_mem_cube/single_cube.py | 1 + src/memos/templates/mem_reader_prompts.py | 59 +++++++++++++++++-- 5 files changed, 128 insertions(+), 20 deletions(-) diff --git a/src/memos/graph_dbs/base.py b/src/memos/graph_dbs/base.py index b76ed9d08..a8f8ff414 100644 --- a/src/memos/graph_dbs/base.py +++ b/src/memos/graph_dbs/base.py @@ -82,7 +82,7 @@ def get_node(self, id: str, include_embedding: bool = False) -> dict[str, Any] | @abstractmethod def get_nodes( - self, id: str, include_embedding: bool = False, **kwargs + self, ids: list, include_embedding: bool = False, **kwargs ) -> dict[str, Any] | None: """ Retrieve the metadata and memory of a list of nodes. diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 3bf6d4927..0e8dd6d10 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -316,6 +316,7 @@ def _get_llm_response( custom_tags: list[str] | None = None, sources: list | None = None, prompt_type: str = "chat", + related_memories: str | None = None, ) -> dict: """ Override parent method to improve language detection by using actual text content @@ -326,6 +327,7 @@ def _get_llm_response( custom_tags: Optional custom tags sources: Optional list of SourceMessage objects to extract text content from prompt_type: Type of prompt to use ("chat" or "doc") + related_memories: related_memories in the graph Returns: LLM response dictionary @@ -360,7 +362,9 @@ def _get_llm_response( else: template = PROMPT_DICT["chat"][lang] examples = PROMPT_DICT["chat"][f"{lang}_example"] - prompt = template.replace("${conversation}", mem_str) + prompt = template.replace("${conversation}", mem_str).replace( + "${reference}", related_memories + ) custom_tags_prompt = ( PROMPT_DICT["custom_tags"][lang].replace("{custom_tags}", str(custom_tags)) @@ -418,6 +422,7 @@ def _process_string_fine( fast_memory_items: list[TextualMemoryItem], info: dict[str, Any], custom_tags: list[str] | None = None, + **kwargs, ) -> list[TextualMemoryItem]: """ Process fast mode memory items through LLM to generate fine mode memories. @@ -454,8 +459,36 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: # Determine prompt type based on sources prompt_type = self._determine_prompt_type(sources) + # recall related memories + related_memories = None + memory_ids = [] + if self.graph_db: + if "user_name" in kwargs: + memory_ids = self.graph_db.search_by_embedding( + vector=self.embedder.embed(mem_str)[0], + top_k=10, + status="activated", + user_name=kwargs.get("user_name"), + filter={ + "or": [{"memory_type": "LongTermMemory"}, {"memory_type": "UserMemory"}] + }, + ) + memory_ids = set({r["id"] for r in memory_ids if r.get("id")}) + related_memories_list = self.graph_db.get_nodes( + list(memory_ids), + include_embedding=False, + user_name=kwargs.get("user_name"), + ) + related_memories = "\n".join( + ["{}: {}".format(mem["id"], mem["memory"]) for mem in related_memories_list] + ) + else: + logger.warning("user_name is null when graph_db exists") + try: - resp = self._get_llm_response(mem_str, custom_tags, sources, prompt_type) + resp = self._get_llm_response( + mem_str, custom_tags, sources, prompt_type, related_memories + ) except Exception as e: logger.error(f"[MultiModalFine] Error calling LLM: {e}") return fine_items @@ -469,6 +502,11 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: .replace("长期记忆", "LongTermMemory") .replace("用户记忆", "UserMemory") ) + if "merged_from" in m: + for merged_id in m["merged_from"]: + if merged_id not in memory_ids: + logger.warning("merged id not valid!!!!!") + extra_kwargs["merged_from"] = m["merged_from"] # Create fine mode memory item (same as simple_struct) node = self._make_memory_item( value=m.get("value", ""), @@ -485,6 +523,11 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: logger.error(f"[MultiModalFine] parse error: {e}") elif resp.get("value") and resp.get("key"): try: + if "merged_from" in resp: + for merged_id in resp["merged_from"]: + if merged_id not in memory_ids: + logger.warning("merged id not valid!!!!!") + extra_kwargs["merged_from"] = resp["merged_from"] # Create fine mode memory item (same as simple_struct) node = self._make_memory_item( value=resp.get("value", "").strip(), @@ -533,9 +576,7 @@ def _get_llm_tool_trajectory_response(self, mem_str: str) -> dict: return [] def _process_tool_trajectory_fine( - self, - fast_memory_items: list[TextualMemoryItem], - info: dict[str, Any], + self, fast_memory_items: list[TextualMemoryItem], info: dict[str, Any], **kwargs ) -> list[TextualMemoryItem]: """ Process tool trajectory memory items through LLM to generate fine mode memories. @@ -618,10 +659,10 @@ def _process_multi_modal_data( with ContextThreadPoolExecutor(max_workers=2) as executor: future_string = executor.submit( - self._process_string_fine, fast_memory_items, info, custom_tags + self._process_string_fine, fast_memory_items, info, custom_tags, **kwargs ) future_tool = executor.submit( - self._process_tool_trajectory_fine, fast_memory_items, info + self._process_tool_trajectory_fine, fast_memory_items, info, **kwargs ) # Collect results @@ -710,7 +751,12 @@ def get_scene_data_info(self, scene_data: list, type: str) -> list[list[Any]]: return scene_data def _read_memory( - self, messages: list[MessagesType], type: str, info: dict[str, Any], mode: str = "fine" + self, + messages: list[MessagesType], + type: str, + info: dict[str, Any], + mode: str = "fine", + **kwargs, ) -> list[list[TextualMemoryItem]]: list_scene_data_info = self.get_scene_data_info(messages, type) @@ -718,7 +764,9 @@ def _read_memory( # Process Q&A pairs concurrently with context propagation with ContextThreadPoolExecutor() as executor: futures = [ - executor.submit(self._process_multi_modal_data, scene_data_info, info, mode=mode) + executor.submit( + self._process_multi_modal_data, scene_data_info, info, mode=mode, **kwargs + ) for scene_data_info in list_scene_data_info ] for future in concurrent.futures.as_completed(futures): diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index a776d12f6..d97eb1b54 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -221,7 +221,7 @@ def _get_llm_response(self, mem_str: str, custom_tags: list[str] | None) -> dict lang = detect_lang(mem_str) template = PROMPT_DICT["chat"][lang] examples = PROMPT_DICT["chat"][f"{lang}_example"] - prompt = template.replace("${conversation}", mem_str) + prompt = template.replace("${conversation}", mem_str).replace("${reference}", "") custom_tags_prompt = ( PROMPT_DICT["custom_tags"][lang].replace("{custom_tags}", str(custom_tags)) @@ -393,7 +393,12 @@ def _process_transfer_chat_data( return chat_read_nodes def get_memory( - self, scene_data: SceneDataInput, type: str, info: dict[str, Any], mode: str = "fine" + self, + scene_data: SceneDataInput, + type: str, + info: dict[str, Any], + mode: str = "fine", + user_name: str | None = None, ) -> list[list[TextualMemoryItem]]: """ Extract and classify memory content from scene_data. @@ -412,6 +417,8 @@ def get_memory( - chunk_overlap: Overlap for small chunks (default: 50) mode: mem-reader mode, fast for quick process while fine for better understanding via calling llm + user_name: tha user_name would be inserted later into the + database, may be used in recall. Returns: list[list[TextualMemoryItem]] containing memory content with summaries as keys and original text as values Raises: @@ -435,7 +442,7 @@ def get_memory( # Backward compatibility, after coercing scene_data, we only tackle # with standard scene_data type: MessagesType standard_scene_data = coerce_scene_data(scene_data, type) - return self._read_memory(standard_scene_data, type, info, mode) + return self._read_memory(standard_scene_data, type, info, mode, user_name=user_name) def rewrite_memories( self, messages: list[dict], memory_list: list[TextualMemoryItem], user_only: bool = True @@ -561,7 +568,12 @@ def filter_hallucination_in_memories( return memory_list def _read_memory( - self, messages: list[MessagesType], type: str, info: dict[str, Any], mode: str = "fine" + self, + messages: list[MessagesType], + type: str, + info: dict[str, Any], + mode: str = "fine", + **kwargs, ) -> list[list[TextualMemoryItem]]: """ 1. raw file: diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 6c3cc0cc7..0c5b4c87d 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -802,6 +802,7 @@ def _process_text_mem( "session_id": target_session_id, }, mode=extract_mode, + user_name=user_context.mem_cube_id, ) flattened_local = [mm for m in memories_local for mm in m] diff --git a/src/memos/templates/mem_reader_prompts.py b/src/memos/templates/mem_reader_prompts.py index 20f8150b7..134fe3956 100644 --- a/src/memos/templates/mem_reader_prompts.py +++ b/src/memos/templates/mem_reader_prompts.py @@ -143,7 +143,8 @@ "key": <字符串,唯一且简洁的记忆标题>, "memory_type": <字符串,"LongTermMemory" 或 "UserMemory">, "value": <详细、独立且无歧义的记忆陈述——若输入对话为英文,则用英文;若为中文,则用中文>, - "tags": <相关主题关键词列表(例如,["截止日期", "团队", "计划"])> + "tags": <相关主题关键词列表(例如,["截止日期", "团队", "计划"])>, + "merged_from": <需要被合并的参考记忆列表,当没有提供参考记忆时,不需要输出这个字段 > }, ... ], @@ -156,7 +157,7 @@ ${custom_tags_prompt} -示例: +示例1-无参考记忆: 对话: user: [2025年6月26日下午3:00]:嗨Jerry!昨天下午3点我和团队开了个会,讨论新项目。 assistant: 哦Tom!你觉得团队能在12月15日前完成吗? @@ -183,7 +184,7 @@ "summary": "Tom目前正专注于管理一个进度紧张的新项目。在2025年6月25日的团队会议后,他意识到原定2025年12月15日的截止日期可能无法实现,因为后端会延迟。由于担心测试时间不足,他接受了Jerry提出的延期建议。Tom计划在次日早上的会议上提出将截止日期推迟至2026年1月5日。他的行为反映出对时间线的担忧,以及积极、以团队为导向的问题解决方式。" } -对话: +示例2-无参考记忆: assistant: [2025年8月15日上午10:30]: 你提到的那本《深度工作》确实很适合你现在的情况。这本书讲了......(略),作者建议每天留出2-3 小时的专注时间块,期间关闭所有通知。考虑到你下周要交的报告,可以试试早上9点到11点这个时段。 @@ -202,25 +203,71 @@ } 注意:当对话仅有助手消息时,应使用"助手推荐"、"助手建议"等表述,而非将其错误归因为用户的陈述或计划。 -另一个中文示例(注意:当用户语言为中文时,您也需输出中文): +示例3-无参考记忆(注意:当用户语言为中文时,您也需输出中文): { "memory list": [ { "key": "项目会议", "memory_type": "LongTermMemory", "value": "在2025年6月25日下午3点,Tom与团队开会讨论了新项目,涉及时间表,并提出了对12月15日截止日期可行性的担忧。", - "tags": ["项目", "时间表", "会议", "截止日期"] + "tags": ["项目", "时间表", "会议", "截止日期"], + "merged_from": [ + "xxxx-xxxx-xxxx-xxxx-xxx", + "xxxx-xxxx-xxxx-xxxx-xx", + ], }, ... ], "summary": "Tom 目前专注于管理一个进度紧张的新项目..." } -请始终使用与对话相同的语言进行回复。 +注意,我们可能给出部分参考记忆,这部分记忆如果和新添加的记忆大量重复,合并记忆,并在输入中多一个`merged_from`字段指明合并的记忆; +新添加的记忆如果和参考记忆有强关联,可以在提取时适当参考(但一定不要捏造记忆,十分有把握再进行参考); +如果没有给出参考记忆、或参考记忆和新添加的记忆无关,直接忽略就好。 +示例4-带参考记忆: 对话: +user: [2026年1月13日] 冬天滑雪真的太快乐了!我打算这周末和朋友再滑一次! +assistant:[2026年1月13日] 听起来就很棒! +user: [2026年1月14日] 你还记得我的滑雪搭子吧?他叫Tom,我们每年都一起滑雪!这周也是! + +参考记忆: +[xxxx-xxxx-xxxx-xxxx-01]: 用户在2025年12月29日表达了对滑雪的狂热喜爱 +[xxxx-xxxx-xxxx-xxxx-06]: 用户的滑雪搭子叫Tom +[xxxx-xxxx-xxxx-xxxx-11]: 二世谷是用户多次去过的滑雪胜地,用户在比罗夫滑雪场认识了Tom并成为好朋友 +[xxxx-xxxx-xxxx-xxxx-12]: 用户2025年1月1日和助手讨论了滑雪装备,打算新买一个滑雪背包。 + +输出: +{ + "memory list": [ + { + "key": "用户冬季滑雪计划", + "memory_type": "UserMemory", + "value": "用户在2026年1月13日计划在周末与朋友Tom再次进行滑雪活动。", + "tags": ["滑雪", "运动偏好", "计划", "冬季活动"], + }, + { + "key": "用户的滑雪伙伴叫Tom", + "memory_type": "UserMemory", + "value": "用户在2026年1月14日再次提到其滑雪搭子Tom,并进一步说明他们每年都会一起滑雪。这一描述强化了双方长期稳定的滑雪伙伴关系,在原有记忆基础上补充了新的时间规律性信息。", + "tags": ["人际关系", "滑雪搭子", "长期习惯"], + "merged_from": [ + "xxxx-xxxx-xxxx-xxxx-06", + "xxxx-xxxx-xxxx-xxxx-11", + ], + } + ], + "summary": "用户近期再次强化了自己对滑雪的热爱,并在2026年1月13日明确表示冬季滑雪带来极大的快乐,同时计划于当周周末与朋友再度滑雪。这表明滑雪对用户而言仍然是一项高度重要的活动。此外,用户在2026年1月14日补充了关于其滑雪伙伴Tom的长期关系细节,强调两人每年都会结伴滑雪,进一步巩固了此人际关系在用户生活中的重要性。" +} + +您的任务: +待提取的对话: ${conversation} +参考记忆: +${reference} + +请始终使用与对话相同的语言进行回复。 您的输出:""" SIMPLE_STRUCT_DOC_READER_PROMPT = """You are an expert text analyst for a search and retrieval system. From 75c65906333d174d0fe903dfb05df4731081c46f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Wed, 14 Jan 2026 21:32:36 +0800 Subject: [PATCH 03/14] feat: set merge ids archived --- src/memos/mem_reader/multi_modal_struct.py | 12 +++++++---- src/memos/multi_mem_cube/single_cube.py | 25 ++++++++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 0e8dd6d10..5ae2f2489 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -466,11 +466,15 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: if "user_name" in kwargs: memory_ids = self.graph_db.search_by_embedding( vector=self.embedder.embed(mem_str)[0], - top_k=10, + top_k=20, status="activated", user_name=kwargs.get("user_name"), filter={ - "or": [{"memory_type": "LongTermMemory"}, {"memory_type": "UserMemory"}] + "or": [ + {"memory_type": "LongTermMemory"}, + {"memory_type": "UserMemory"}, + {"memory_type": "WorkingMemory"}, + ] }, ) memory_ids = set({r["id"] for r in memory_ids if r.get("id")}) @@ -506,7 +510,7 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: for merged_id in m["merged_from"]: if merged_id not in memory_ids: logger.warning("merged id not valid!!!!!") - extra_kwargs["merged_from"] = m["merged_from"] + info_per_item["merged_from"] = m["merged_from"] # Create fine mode memory item (same as simple_struct) node = self._make_memory_item( value=m.get("value", ""), @@ -527,7 +531,7 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: for merged_id in resp["merged_from"]: if merged_id not in memory_ids: logger.warning("merged id not valid!!!!!") - extra_kwargs["merged_from"] = resp["merged_from"] + info_per_item["merged_from"] = resp["merged_from"] # Create fine mode memory item (same as simple_struct) node = self._make_memory_item( value=resp.get("value", "").strip(), diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 0c5b4c87d..6aea6997f 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -832,6 +832,31 @@ def _process_text_mem( sync_mode=sync_mode, ) + # Mark merged_from memories as archived when provided in add_req.info + for memory in flattened_local: + merged_from = (memory.metadata.info or {}).get("merged_from") + if merged_from: + old_ids = ( + merged_from if isinstance(merged_from, (list | tuple | set)) else [merged_from] + ) + if self.mem_reader and self.mem_reader.graph_db: + for old_id in old_ids: + try: + self.mem_reader.graph_db.update_node( + str(old_id), {"status": "archived"} + ) + self.logger.info( + f"[SingleCubeView] Archived merged_from memory: {old_id}" + ) + except Exception as e: + self.logger.warning( + f"[SingleCubeView] Failed to archive merged_from memory {old_id}: {e}" + ) + else: + self.logger.warning( + "[SingleCubeView] merged_from provided but graph_db is unavailable; skip archiving." + ) + text_memories = [ { "memory": memory.memory, From 6e80154780c471148ec4157f208fad2fb15fe331 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Wed, 14 Jan 2026 21:43:25 +0800 Subject: [PATCH 04/14] feat: update en mem-reader prompt --- src/memos/templates/mem_reader_prompts.py | 56 ++++++++++++++++++++--- 1 file changed, 50 insertions(+), 6 deletions(-) diff --git a/src/memos/templates/mem_reader_prompts.py b/src/memos/templates/mem_reader_prompts.py index 134fe3956..f2d15cfb8 100644 --- a/src/memos/templates/mem_reader_prompts.py +++ b/src/memos/templates/mem_reader_prompts.py @@ -28,7 +28,8 @@ "key": , "memory_type": , "value": , - "tags": + "tags": , + "merged_from": }, ... ], @@ -41,7 +42,7 @@ ${custom_tags_prompt} -Example: +Example 1 — No reference memories: Conversation: user: [June 26, 2025 at 3:00 PM]: Hi Jerry! Yesterday at 3 PM I had a meeting with my team about the new project. assistant: Oh Tom! Do you think the team can finish by December 15? @@ -69,7 +70,7 @@ "summary": "Tom is currently focused on managing a new project with a tight schedule. After a team meeting on June 25, 2025, he realized the original deadline of December 15 might not be feasible due to backend delays. Concerned about insufficient testing time, he welcomed Jerry’s suggestion of proposing an extension. Tom plans to raise the idea of shifting the deadline to January 5, 2026 in the next morning’s meeting. His actions reflect both stress about timelines and a proactive, team-oriented problem-solving approach." } -Dialogue: +Example 2 — No reference memories: assistant: [10:30 AM, August 15, 2025]: The book Deep Work you mentioned is indeed very suitable for your current situation. The book explains … (omitted). The author suggests setting aside 2–3 hours of focused work blocks each day and turning off all notifications during that time. Considering that you need to submit a report next week, you could try using the 9:00–11:00 AM time slot for focused work. @@ -89,7 +90,7 @@ Note: When the dialogue contains only assistant messages, phrasing such as “assistant recommended” or “assistant suggested” should be used, rather than incorrectly attributing the content to the user’s statements or plans. -Another Example in Chinese (注意: 当user的语言为中文时,你就需要也输出中文): +Example 3 — No reference memories (note: if the user’s language is Chinese, output must also be Chinese): { "memory list": [ { @@ -103,11 +104,54 @@ "summary": "Tom 目前专注于管理一个进度紧张的新项目..." } -Always respond in the same language as the conversation. +Note: We may provide partial reference memories. If newly extracted memories substantially overlap with reference memories, merge them and include a `merged_from` field indicating the merged memory IDs. +If newly extracted memories are strongly related to reference memories, you may appropriately reference them during extraction (but never fabricate memories — only reference them when you are very confident). +If no reference memories are provided, or if they are unrelated to the new memories, simply ignore them. -Conversation: +Example 4 — With reference memories: +Dialogue: +user: [January 13, 2026] Winter skiing is so much fun! I’m planning to go skiing again with friends this weekend! +assistant: [January 13, 2026] That sounds great! +user: [January 14, 2026] You remember my ski buddy, right? His name is Tom. We ski together every year — including this week! + +Reference memories: +[xxxx-xxxx-xxxx-xxxx-01]: The user expressed a strong passion for skiing on December 29, 2025 +[xxxx-xxxx-xxxx-xxxx-06]: The user’s ski buddy is named Tom +[xxxx-xxxx-xxxx-xxxx-11]: Niseko is a ski destination the user has visited multiple times; the user met Tom at Hirafu Ski Resort and became close friends +[xxxx-xxxx-xxxx-xxxx-12]: On January 1, 2025, the user discussed skiing equipment with the assistant and planned to buy a new ski backpack + +Output: +{ + "memory list": [ + { + "key": "User's winter skiing plan", + "memory_type": "UserMemory", + "value": "On January 13, 2026, the user planned to go skiing again over the weekend with their friend Tom.", + "tags": ["skiing", "sports preference", "plan", "winter activity"] + }, + { + "key": "User's ski partner is named Tom", + "memory_type": "UserMemory", + "value": "On January 14, 2026, the user again mentioned their ski partner Tom and further explained that they ski together every year. This statement reinforces their long-term and stable skiing partnership and adds new information about its regular annual pattern.", + "tags": ["interpersonal relationship", "ski partner", "long-term habit"], + "merged_from": [ + "xxxx-xxxx-xxxx-xxxx-06", + "xxxx-xxxx-xxxx-xxxx-11" + ] + } + ], + "summary": "The user recently reinforced their strong passion for skiing and, on January 13, 2026, explicitly stated that winter skiing brings them great joy and that they planned to ski again with a friend over the weekend. This indicates that skiing remains a highly significant activity in the user’s life. Additionally, on January 14, 2026, the user elaborated on their long-term relationship with their ski partner Tom, emphasizing that they ski together every year. This further solidifies the importance of this interpersonal relationship in the user’s personal experiences." +} + +Your task: +Dialogue to be extracted: ${conversation} +Reference memories: +${reference} + +Always respond in the same language as the conversation. + Your Output:""" SIMPLE_STRUCT_MEM_READER_PROMPT_ZH = """您是记忆提取专家。 From c3991dc237e1a1445d0a25d50c169add31b6c2e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Wed, 14 Jan 2026 21:59:11 +0800 Subject: [PATCH 05/14] fix: abstract --- src/memos/mem_reader/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/memos/mem_reader/base.py b/src/memos/mem_reader/base.py index b34abf9a1..c150051df 100644 --- a/src/memos/mem_reader/base.py +++ b/src/memos/mem_reader/base.py @@ -20,6 +20,7 @@ class BaseMemReader(ABC): def __init__(self, config: BaseMemReaderConfig): """Initialize the MemReader with the given configuration.""" + @abstractmethod def set_graph_db(self, graph_db: "BaseGraphDB | None") -> None: """ Set the graph database instance for recall operations. From 4f30e4458fdb38ed57e0355fea2669bbdad2de43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Wed, 14 Jan 2026 22:08:34 +0800 Subject: [PATCH 06/14] fix: set graph db bug --- src/memos/mem_reader/base.py | 1 - src/memos/mem_reader/simple_struct.py | 9 ++++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/memos/mem_reader/base.py b/src/memos/mem_reader/base.py index c150051df..87bf43b0f 100644 --- a/src/memos/mem_reader/base.py +++ b/src/memos/mem_reader/base.py @@ -32,7 +32,6 @@ def set_graph_db(self, graph_db: "BaseGraphDB | None") -> None: Args: graph_db: The graph database instance, or None to disable recall operations. """ - self.graph_db = graph_db @abstractmethod def get_memory( diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index d97eb1b54..6f4542c7a 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -5,7 +5,7 @@ import traceback from abc import ABC -from typing import Any, TypeAlias +from typing import TYPE_CHECKING, Any, TypeAlias from tqdm import tqdm @@ -16,6 +16,10 @@ from memos.embedders.factory import EmbedderFactory from memos.llms.factory import LLMFactory from memos.mem_reader.base import BaseMemReader + + +if TYPE_CHECKING: + from memos.graph_dbs.base import BaseGraphDB from memos.mem_reader.read_multi_modal import coerce_scene_data, detect_lang from memos.mem_reader.utils import ( count_tokens_text, @@ -180,6 +184,9 @@ def __init__(self, config: SimpleStructMemReaderConfig): # recall operations self.graph_db = None + def set_graph_db(self, graph_db: "BaseGraphDB | None") -> None: + self.graph_db = graph_db + def _make_memory_item( self, value: str, From c4ecfc804b41cdd6b1dd5868c7fb565be1049f55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 15 Jan 2026 11:37:20 +0800 Subject: [PATCH 07/14] fix: prompt build bug: replace None --- src/memos/mem_reader/multi_modal_struct.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 5ae2f2489..a9b108f15 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -362,8 +362,9 @@ def _get_llm_response( else: template = PROMPT_DICT["chat"][lang] examples = PROMPT_DICT["chat"][f"{lang}_example"] + related_memories_str = related_memories if related_memories is not None else "" prompt = template.replace("${conversation}", mem_str).replace( - "${reference}", related_memories + "${reference}", related_memories_str ) custom_tags_prompt = ( From 3bc301c5e53b85c13269eaff96e7bb09fa567477 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 15 Jan 2026 12:04:28 +0800 Subject: [PATCH 08/14] fix: did not pass through username into mem-reader transfer function --- src/memos/mem_reader/multi_modal_struct.py | 13 +++++++------ src/memos/mem_reader/simple_struct.py | 1 + src/memos/mem_scheduler/general_scheduler.py | 1 + 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index a9b108f15..94a85e845 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -694,9 +694,7 @@ def _process_multi_modal_data( @timed def _process_transfer_multi_modal_data( - self, - raw_node: TextualMemoryItem, - custom_tags: list[str] | None = None, + self, raw_node: TextualMemoryItem, custom_tags: list[str] | None = None, **kwargs ) -> list[TextualMemoryItem]: """ Process transfer for multimodal data. @@ -720,9 +718,11 @@ def _process_transfer_multi_modal_data( # Part A: call llm in parallel using thread pool with ContextThreadPoolExecutor(max_workers=2) as executor: future_string = executor.submit( - self._process_string_fine, [raw_node], info, custom_tags + self._process_string_fine, [raw_node], info, custom_tags, **kwargs + ) + future_tool = executor.submit( + self._process_tool_trajectory_fine, [raw_node], info, **kwargs ) - future_tool = executor.submit(self._process_tool_trajectory_fine, [raw_node], info) # Collect results fine_memory_items_string_parser = future_string.result() @@ -789,6 +789,7 @@ def fine_transfer_simple_mem( input_memories: list[TextualMemoryItem], type: str, custom_tags: list[str] | None = None, + **kwargs, ) -> list[list[TextualMemoryItem]]: if not input_memories: return [] @@ -799,7 +800,7 @@ def fine_transfer_simple_mem( with ContextThreadPoolExecutor() as executor: futures = [ executor.submit( - self._process_transfer_multi_modal_data, scene_data_info, custom_tags + self._process_transfer_multi_modal_data, scene_data_info, custom_tags, **kwargs ) for scene_data_info in input_memories ] diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 6f4542c7a..3a13325c9 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -669,6 +669,7 @@ def fine_transfer_simple_mem( input_memories: list[TextualMemoryItem], type: str, custom_tags: list[str] | None = None, + **kwargs, ) -> list[list[TextualMemoryItem]]: if not input_memories: return [] diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 9b19e9ecb..a0dd6e395 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -877,6 +877,7 @@ def _process_memories_with_reader( memory_items, type="chat", custom_tags=custom_tags, + user_name=user_name, ) except Exception as e: logger.warning(f"{e}: Fail to transfer mem: {memory_items}") From 734bc8d36e6882f21a1be68022de863b4ae06a22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 15 Jan 2026 12:15:05 +0800 Subject: [PATCH 09/14] feat: fix user_name not pass bug --- src/memos/mem_reader/multi_modal_struct.py | 1 + src/memos/mem_reader/simple_struct.py | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 94a85e845..086bea75d 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -398,6 +398,7 @@ def _get_llm_response( ], "summary": mem_str, } + logger.info(f"[MultiModalFine] Task {messages}, Result {response_json}") return response_json def _determine_prompt_type(self, sources: list) -> str: diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 3a13325c9..349632b6a 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -361,7 +361,7 @@ def _build_fast_node(w): return chat_read_nodes def _process_transfer_chat_data( - self, raw_node: TextualMemoryItem, custom_tags: list[str] | None = None + self, raw_node: TextualMemoryItem, custom_tags: list[str] | None = None, **kwargs ): raw_memory = raw_node.memory response_json = self._get_llm_response(raw_memory, custom_tags) @@ -686,7 +686,7 @@ def fine_transfer_simple_mem( # Process Q&A pairs concurrently with context propagation with ContextThreadPoolExecutor() as executor: futures = [ - executor.submit(processing_func, scene_data_info, custom_tags) + executor.submit(processing_func, scene_data_info, custom_tags, **kwargs) for scene_data_info in input_memories ] for future in concurrent.futures.as_completed(futures): @@ -890,6 +890,6 @@ def _process_doc_data(self, scene_data_info, info, **kwargs): return doc_nodes def _process_transfer_doc_data( - self, raw_node: TextualMemoryItem, custom_tags: list[str] | None = None + self, raw_node: TextualMemoryItem, custom_tags: list[str] | None = None, **kwargs ): raise NotImplementedError From 7d2528be30b50900f8d99c36ee0b0fc22712438f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 15 Jan 2026 14:29:21 +0800 Subject: [PATCH 10/14] feat: tackle with merged nodes when async mode in scheduler --- src/memos/mem_reader/multi_modal_struct.py | 43 +++++++++++++----- src/memos/mem_scheduler/general_scheduler.py | 32 +++++++++++++ src/memos/multi_mem_cube/single_cube.py | 47 +++++++++++--------- 3 files changed, 90 insertions(+), 32 deletions(-) diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 086bea75d..9bcd3f017 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -466,10 +466,12 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: memory_ids = [] if self.graph_db: if "user_name" in kwargs: - memory_ids = self.graph_db.search_by_embedding( + similarity_threshold = kwargs.get("similarity_threshold", 0.5) + search_results = self.graph_db.search_by_embedding( vector=self.embedder.embed(mem_str)[0], - top_k=20, + top_k=100, status="activated", + threshold=similarity_threshold, user_name=kwargs.get("user_name"), filter={ "or": [ @@ -479,17 +481,38 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: ] }, ) - memory_ids = set({r["id"] for r in memory_ids if r.get("id")}) - related_memories_list = self.graph_db.get_nodes( - list(memory_ids), - include_embedding=False, - user_name=kwargs.get("user_name"), - ) + memory_ids = set({r["id"] for r in search_results if r.get("id")}) + related_memories_list = [ + self.graph_db.get_node( + memory_id, + include_embedding=False, + ) + for memory_id in memory_ids + ] + + # Filter out nodes with tags containing "mode: fast" + filtered_memories_list = [] + for mem in related_memories_list: + if mem: + metadata = mem.get("metadata", {}) + tags = metadata.get("tags", []) + if isinstance(tags, list): + # Filter out if tags contain "mode: fast" + if "mode:fast" not in tags: + filtered_memories_list.append(mem) + else: + filtered_memories_list.append(mem) + related_memories = "\n".join( - ["{}: {}".format(mem["id"], mem["memory"]) for mem in related_memories_list] + [ + "{}: {}".format(mem["id"], mem["memory"]) + for mem in filtered_memories_list + ] ) + memory_ids = set({mem["id"] for mem in filtered_memories_list if mem.get("id")}) else: logger.warning("user_name is null when graph_db exists") + memory_ids = set() try: resp = self._get_llm_response( @@ -511,7 +534,7 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: if "merged_from" in m: for merged_id in m["merged_from"]: if merged_id not in memory_ids: - logger.warning("merged id not valid!!!!!") + logger.warning(f"merged id not valid!!!!!: {merged_id}") info_per_item["merged_from"] = m["merged_from"] # Create fine mode memory item (same as simple_struct) node = self._make_memory_item( diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index a0dd6e395..8755de281 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -898,6 +898,38 @@ def _process_memories_with_reader( f"Added {len(enhanced_mem_ids)} enhanced memories: {enhanced_mem_ids}" ) + # Mark merged_from memories as archived when provided in memory metadata + if self.mem_reader and self.mem_reader.graph_db: + for memory in flattened_memories: + merged_from = (memory.metadata.info or {}).get("merged_from") + if merged_from: + old_ids = ( + merged_from + if isinstance(merged_from, (list | tuple | set)) + else [merged_from] + ) + for old_id in old_ids: + try: + self.mem_reader.graph_db.update_node( + str(old_id), {"status": "archived"}, user_name=user_name + ) + logger.info( + f"[Scheduler] Archived merged_from memory: {old_id}" + ) + except Exception as e: + logger.warning( + f"[Scheduler] Failed to archive merged_from memory {old_id}: {e}" + ) + else: + # Check if any memory has merged_from but graph_db is unavailable + has_merged_from = any( + (m.metadata.info or {}).get("merged_from") for m in flattened_memories + ) + if has_merged_from: + logger.warning( + "[Scheduler] merged_from provided but graph_db is unavailable; skip archiving." + ) + # LOGGING BLOCK START # This block is replicated from _add_message_consumer to ensure consistent logging cloud_env = is_cloud_env() diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 6aea6997f..ffe8fe989 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -833,29 +833,32 @@ def _process_text_mem( ) # Mark merged_from memories as archived when provided in add_req.info - for memory in flattened_local: - merged_from = (memory.metadata.info or {}).get("merged_from") - if merged_from: - old_ids = ( - merged_from if isinstance(merged_from, (list | tuple | set)) else [merged_from] - ) - if self.mem_reader and self.mem_reader.graph_db: - for old_id in old_ids: - try: - self.mem_reader.graph_db.update_node( - str(old_id), {"status": "archived"} - ) - self.logger.info( - f"[SingleCubeView] Archived merged_from memory: {old_id}" - ) - except Exception as e: - self.logger.warning( - f"[SingleCubeView] Failed to archive merged_from memory {old_id}: {e}" - ) - else: - self.logger.warning( - "[SingleCubeView] merged_from provided but graph_db is unavailable; skip archiving." + if sync_mode == "sync" and extract_mode == "fine": + for memory in flattened_local: + merged_from = (memory.metadata.info or {}).get("merged_from") + if merged_from: + old_ids = ( + merged_from + if isinstance(merged_from, (list | tuple | set)) + else [merged_from] ) + if self.mem_reader and self.mem_reader.graph_db: + for old_id in old_ids: + try: + self.mem_reader.graph_db.update_node( + str(old_id), {"status": "archived"} + ) + self.logger.info( + f"[SingleCubeView] Archived merged_from memory: {old_id}" + ) + except Exception as e: + self.logger.warning( + f"[SingleCubeView] Failed to archive merged_from memory {old_id}: {e}" + ) + else: + self.logger.warning( + "[SingleCubeView] merged_from provided but graph_db is unavailable; skip archiving." + ) text_memories = [ { From 8b55d638ddf6209f0febec9661133995d3c2b338 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 15 Jan 2026 15:39:06 +0800 Subject: [PATCH 11/14] feat: modify repeat problem --- src/memos/mem_reader/multi_modal_struct.py | 286 +++++++++++++++------ src/memos/mem_reader/simple_struct.py | 2 +- src/memos/templates/mem_reader_prompts.py | 182 ++++++------- 3 files changed, 285 insertions(+), 185 deletions(-) diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 9bcd3f017..92f3236ca 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -13,6 +13,7 @@ from memos.mem_reader.simple_struct import PROMPT_DICT, SimpleStructMemReader from memos.mem_reader.utils import parse_json_result from memos.memories.textual.item import TextualMemoryItem, TreeNodeTextualMemoryMetadata +from memos.templates.mem_reader_prompts import MEMORY_MERGE_PROMPT_EN, MEMORY_MERGE_PROMPT_ZH from memos.templates.tool_mem_prompts import TOOL_TRAJECTORY_PROMPT_EN, TOOL_TRAJECTORY_PROMPT_ZH from memos.types import MessagesType from memos.utils import timed @@ -316,7 +317,6 @@ def _get_llm_response( custom_tags: list[str] | None = None, sources: list | None = None, prompt_type: str = "chat", - related_memories: str | None = None, ) -> dict: """ Override parent method to improve language detection by using actual text content @@ -327,7 +327,6 @@ def _get_llm_response( custom_tags: Optional custom tags sources: Optional list of SourceMessage objects to extract text content from prompt_type: Type of prompt to use ("chat" or "doc") - related_memories: related_memories in the graph Returns: LLM response dictionary @@ -362,10 +361,7 @@ def _get_llm_response( else: template = PROMPT_DICT["chat"][lang] examples = PROMPT_DICT["chat"][f"{lang}_example"] - related_memories_str = related_memories if related_memories is not None else "" - prompt = template.replace("${conversation}", mem_str).replace( - "${reference}", related_memories_str - ) + prompt = template.replace("${conversation}", mem_str) custom_tags_prompt = ( PROMPT_DICT["custom_tags"][lang].replace("{custom_tags}", str(custom_tags)) @@ -419,6 +415,178 @@ def _determine_prompt_type(self, sources: list) -> str: return prompt_type + def _get_maybe_merged_memory( + self, + extracted_memory_dict: dict, + mem_text: str, + sources: list, + **kwargs, + ) -> dict: + """ + Check if extracted memory should be merged with similar existing memories. + If merge is needed, return merged memory dict with merged_from field. + Otherwise, return original memory dict. + + Args: + extracted_memory_dict: The extracted memory dict from LLM response + mem_text: The memory text content + sources: Source messages for language detection + **kwargs: Additional parameters (merge_similarity_threshold, etc.) + + Returns: + Memory dict (possibly merged) with merged_from field if merged + """ + # If no graph_db or user_name, return original + if not self.graph_db or "user_name" not in kwargs: + return extracted_memory_dict + user_name = kwargs.get("user_name") + + # Detect language + lang = "en" + if sources: + for source in sources: + if hasattr(source, "lang") and source.lang: + lang = source.lang + break + elif isinstance(source, dict) and source.get("lang"): + lang = source.get("lang") + break + if lang is None: + lang = detect_lang(mem_text) + + # Search for similar memories + merge_threshold = kwargs.get("merge_similarity_threshold", 0.5) + + try: + search_results = self.graph_db.search_by_embedding( + vector=self.embedder.embed(mem_text)[0], + top_k=20, + status="activated", + threshold=merge_threshold, + user_name=user_name, + filter={ + "or": [ + {"memory_type": "LongTermMemory"}, + {"memory_type": "UserMemory"}, + {"memory_type": "WorkingMemory"}, + ] + }, + ) + + if not search_results: + # No similar memories found, return original + return extracted_memory_dict + + # Get full memory details + similar_memory_ids = [r["id"] for r in search_results if r.get("id")] + similar_memories_list = [ + self.graph_db.get_node(mem_id, include_embedding=False) + for mem_id in similar_memory_ids + ] + + # Filter out None and mode:fast memories + filtered_similar = [] + for mem in similar_memories_list: + if not mem: + continue + mem_metadata = mem.get("metadata", {}) + tags = mem_metadata.get("tags", []) + if isinstance(tags, list) and "mode:fast" in tags: + continue + filtered_similar.append( + { + "id": mem.get("id"), + "memory": mem.get("memory", ""), + } + ) + + if not filtered_similar: + # No valid similar memories, return original + return extracted_memory_dict + + # Create a temporary TextualMemoryItem for merge check + temp_memory_item = TextualMemoryItem( + memory=mem_text, + metadata=TreeNodeTextualMemoryMetadata( + user_id="", + session_id="", + memory_type=extracted_memory_dict.get("memory_type", "LongTermMemory"), + status="activated", + tags=extracted_memory_dict.get("tags", []), + key=extracted_memory_dict.get("key", ""), + ), + ) + + # Try to merge with LLM + merge_result = self._merge_memories_with_llm( + temp_memory_item, filtered_similar, lang=lang + ) + + if merge_result: + # Return merged memory dict + merged_dict = extracted_memory_dict.copy() + merged_dict["value"] = merge_result.get("value", mem_text) + merged_dict["merged_from"] = merge_result.get("merged_from", []) + logger.info( + f"[MultiModalFine] Merged memory with {len(merged_dict['merged_from'])} existing memories" + ) + return merged_dict + else: + # No merge needed, return original + return extracted_memory_dict + + except Exception as e: + logger.error(f"[MultiModalFine] Error in get_maybe_merged_memory: {e}") + # On error, return original + return extracted_memory_dict + + def _merge_memories_with_llm( + self, + new_memory: TextualMemoryItem, + similar_memories: list[dict], + lang: str = "en", + ) -> dict | None: + """ + Use LLM to merge new memory with similar existing memories. + + Args: + new_memory: The newly extracted memory item + similar_memories: List of similar memories from graph_db (with id and memory fields) + lang: Language code ("en" or "zh") + + Returns: + Merged memory dict with merged_from field, or None if no merge needed + """ + if not similar_memories: + return None + + # Build merge prompt using template + similar_memories_text = "\n".join( + [f"[{mem['id']}]: {mem['memory']}" for mem in similar_memories] + ) + + merge_prompt_template = MEMORY_MERGE_PROMPT_ZH if lang == "zh" else MEMORY_MERGE_PROMPT_EN + merge_prompt = merge_prompt_template.format( + new_memory=new_memory.memory, + similar_memories=similar_memories_text, + ) + + try: + response_text = self.llm.generate([{"role": "user", "content": merge_prompt}]) + merge_result = parse_json_result(response_text) + + if merge_result.get("should_merge", False): + return { + "value": merge_result.get("value", new_memory.memory), + "merged_from": merge_result.get( + "merged_from", [mem["id"] for mem in similar_memories] + ), + } + except Exception as e: + logger.error(f"[MultiModalFine] Error in merge LLM call: {e}") + + return None + def _process_string_fine( self, fast_memory_items: list[TextualMemoryItem], @@ -461,63 +629,9 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: # Determine prompt type based on sources prompt_type = self._determine_prompt_type(sources) - # recall related memories - related_memories = None - memory_ids = [] - if self.graph_db: - if "user_name" in kwargs: - similarity_threshold = kwargs.get("similarity_threshold", 0.5) - search_results = self.graph_db.search_by_embedding( - vector=self.embedder.embed(mem_str)[0], - top_k=100, - status="activated", - threshold=similarity_threshold, - user_name=kwargs.get("user_name"), - filter={ - "or": [ - {"memory_type": "LongTermMemory"}, - {"memory_type": "UserMemory"}, - {"memory_type": "WorkingMemory"}, - ] - }, - ) - memory_ids = set({r["id"] for r in search_results if r.get("id")}) - related_memories_list = [ - self.graph_db.get_node( - memory_id, - include_embedding=False, - ) - for memory_id in memory_ids - ] - - # Filter out nodes with tags containing "mode: fast" - filtered_memories_list = [] - for mem in related_memories_list: - if mem: - metadata = mem.get("metadata", {}) - tags = metadata.get("tags", []) - if isinstance(tags, list): - # Filter out if tags contain "mode: fast" - if "mode:fast" not in tags: - filtered_memories_list.append(mem) - else: - filtered_memories_list.append(mem) - - related_memories = "\n".join( - [ - "{}: {}".format(mem["id"], mem["memory"]) - for mem in filtered_memories_list - ] - ) - memory_ids = set({mem["id"] for mem in filtered_memories_list if mem.get("id")}) - else: - logger.warning("user_name is null when graph_db exists") - memory_ids = set() - + # ========== Stage 1: Normal extraction (without reference) ========== try: - resp = self._get_llm_response( - mem_str, custom_tags, sources, prompt_type, related_memories - ) + resp = self._get_llm_response(mem_str, custom_tags, sources, prompt_type) except Exception as e: logger.error(f"[MultiModalFine] Error calling LLM: {e}") return fine_items @@ -525,49 +639,59 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: if resp.get("memory list", []): for m in resp.get("memory list", []): try: + # Check and merge with similar memories if needed + m_maybe_merged = self._get_maybe_merged_memory( + extracted_memory_dict=m, + mem_text=m.get("value", ""), + sources=sources, + **kwargs, + ) # Normalize memory_type (same as simple_struct) memory_type = ( - m.get("memory_type", "LongTermMemory") + m_maybe_merged.get("memory_type", "LongTermMemory") .replace("长期记忆", "LongTermMemory") .replace("用户记忆", "UserMemory") ) - if "merged_from" in m: - for merged_id in m["merged_from"]: - if merged_id not in memory_ids: - logger.warning(f"merged id not valid!!!!!: {merged_id}") - info_per_item["merged_from"] = m["merged_from"] - # Create fine mode memory item (same as simple_struct) node = self._make_memory_item( - value=m.get("value", ""), + value=m_maybe_merged.get("value", ""), info=info_per_item, memory_type=memory_type, - tags=m.get("tags", []), - key=m.get("key", ""), + tags=m_maybe_merged.get("tags", []), + key=m_maybe_merged.get("key", ""), sources=sources, # Preserve sources from fast item background=resp.get("summary", ""), **extra_kwargs, ) + # Add merged_from to info if present + if "merged_from" in m_maybe_merged: + node.metadata.info = node.metadata.info or {} + node.metadata.info["merged_from"] = m_maybe_merged["merged_from"] fine_items.append(node) except Exception as e: logger.error(f"[MultiModalFine] parse error: {e}") elif resp.get("value") and resp.get("key"): try: - if "merged_from" in resp: - for merged_id in resp["merged_from"]: - if merged_id not in memory_ids: - logger.warning("merged id not valid!!!!!") - info_per_item["merged_from"] = resp["merged_from"] - # Create fine mode memory item (same as simple_struct) + # Check and merge with similar memories if needed + resp_maybe_merged = self._get_maybe_merged_memory( + extracted_memory_dict=resp, + mem_text=resp.get("value", "").strip(), + sources=sources, + **kwargs, + ) node = self._make_memory_item( - value=resp.get("value", "").strip(), + value=resp_maybe_merged.get("value", "").strip(), info=info_per_item, memory_type="LongTermMemory", - tags=resp.get("tags", []), - key=resp.get("key", None), + tags=resp_maybe_merged.get("tags", []), + key=resp_maybe_merged.get("key", None), sources=sources, # Preserve sources from fast item background=resp.get("summary", ""), **extra_kwargs, ) + # Add merged_from to info if present + if "merged_from" in resp_maybe_merged: + node.metadata.info = node.metadata.info or {} + node.metadata.info["merged_from"] = resp_maybe_merged["merged_from"] fine_items.append(node) except Exception as e: logger.error(f"[MultiModalFine] parse error: {e}") diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 349632b6a..3e33538e0 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -228,7 +228,7 @@ def _get_llm_response(self, mem_str: str, custom_tags: list[str] | None) -> dict lang = detect_lang(mem_str) template = PROMPT_DICT["chat"][lang] examples = PROMPT_DICT["chat"][f"{lang}_example"] - prompt = template.replace("${conversation}", mem_str).replace("${reference}", "") + prompt = template.replace("${conversation}", mem_str) custom_tags_prompt = ( PROMPT_DICT["custom_tags"][lang].replace("{custom_tags}", str(custom_tags)) diff --git a/src/memos/templates/mem_reader_prompts.py b/src/memos/templates/mem_reader_prompts.py index f2d15cfb8..f85c8632b 100644 --- a/src/memos/templates/mem_reader_prompts.py +++ b/src/memos/templates/mem_reader_prompts.py @@ -28,8 +28,7 @@ "key": , "memory_type": , "value": , - "tags": , - "merged_from": + "tags": }, ... ], @@ -42,7 +41,7 @@ ${custom_tags_prompt} -Example 1 — No reference memories: +Example: Conversation: user: [June 26, 2025 at 3:00 PM]: Hi Jerry! Yesterday at 3 PM I had a meeting with my team about the new project. assistant: Oh Tom! Do you think the team can finish by December 15? @@ -70,7 +69,7 @@ "summary": "Tom is currently focused on managing a new project with a tight schedule. After a team meeting on June 25, 2025, he realized the original deadline of December 15 might not be feasible due to backend delays. Concerned about insufficient testing time, he welcomed Jerry’s suggestion of proposing an extension. Tom plans to raise the idea of shifting the deadline to January 5, 2026 in the next morning’s meeting. His actions reflect both stress about timelines and a proactive, team-oriented problem-solving approach." } -Example 2 — No reference memories: +Dialogue: assistant: [10:30 AM, August 15, 2025]: The book Deep Work you mentioned is indeed very suitable for your current situation. The book explains … (omitted). The author suggests setting aside 2–3 hours of focused work blocks each day and turning off all notifications during that time. Considering that you need to submit a report next week, you could try using the 9:00–11:00 AM time slot for focused work. @@ -90,7 +89,7 @@ Note: When the dialogue contains only assistant messages, phrasing such as “assistant recommended” or “assistant suggested” should be used, rather than incorrectly attributing the content to the user’s statements or plans. -Example 3 — No reference memories (note: if the user’s language is Chinese, output must also be Chinese): +Another Example in Chinese (注意: 当user的语言为中文时,你就需要也输出中文): { "memory list": [ { @@ -104,54 +103,11 @@ "summary": "Tom 目前专注于管理一个进度紧张的新项目..." } -Note: We may provide partial reference memories. If newly extracted memories substantially overlap with reference memories, merge them and include a `merged_from` field indicating the merged memory IDs. -If newly extracted memories are strongly related to reference memories, you may appropriately reference them during extraction (but never fabricate memories — only reference them when you are very confident). -If no reference memories are provided, or if they are unrelated to the new memories, simply ignore them. - -Example 4 — With reference memories: -Dialogue: -user: [January 13, 2026] Winter skiing is so much fun! I’m planning to go skiing again with friends this weekend! -assistant: [January 13, 2026] That sounds great! -user: [January 14, 2026] You remember my ski buddy, right? His name is Tom. We ski together every year — including this week! - -Reference memories: -[xxxx-xxxx-xxxx-xxxx-01]: The user expressed a strong passion for skiing on December 29, 2025 -[xxxx-xxxx-xxxx-xxxx-06]: The user’s ski buddy is named Tom -[xxxx-xxxx-xxxx-xxxx-11]: Niseko is a ski destination the user has visited multiple times; the user met Tom at Hirafu Ski Resort and became close friends -[xxxx-xxxx-xxxx-xxxx-12]: On January 1, 2025, the user discussed skiing equipment with the assistant and planned to buy a new ski backpack - -Output: -{ - "memory list": [ - { - "key": "User's winter skiing plan", - "memory_type": "UserMemory", - "value": "On January 13, 2026, the user planned to go skiing again over the weekend with their friend Tom.", - "tags": ["skiing", "sports preference", "plan", "winter activity"] - }, - { - "key": "User's ski partner is named Tom", - "memory_type": "UserMemory", - "value": "On January 14, 2026, the user again mentioned their ski partner Tom and further explained that they ski together every year. This statement reinforces their long-term and stable skiing partnership and adds new information about its regular annual pattern.", - "tags": ["interpersonal relationship", "ski partner", "long-term habit"], - "merged_from": [ - "xxxx-xxxx-xxxx-xxxx-06", - "xxxx-xxxx-xxxx-xxxx-11" - ] - } - ], - "summary": "The user recently reinforced their strong passion for skiing and, on January 13, 2026, explicitly stated that winter skiing brings them great joy and that they planned to ski again with a friend over the weekend. This indicates that skiing remains a highly significant activity in the user’s life. Additionally, on January 14, 2026, the user elaborated on their long-term relationship with their ski partner Tom, emphasizing that they ski together every year. This further solidifies the importance of this interpersonal relationship in the user’s personal experiences." -} +Always respond in the same language as the conversation. -Your task: -Dialogue to be extracted: +Conversation: ${conversation} -Reference memories: -${reference} - -Always respond in the same language as the conversation. - Your Output:""" SIMPLE_STRUCT_MEM_READER_PROMPT_ZH = """您是记忆提取专家。 @@ -187,8 +143,7 @@ "key": <字符串,唯一且简洁的记忆标题>, "memory_type": <字符串,"LongTermMemory" 或 "UserMemory">, "value": <详细、独立且无歧义的记忆陈述——若输入对话为英文,则用英文;若为中文,则用中文>, - "tags": <相关主题关键词列表(例如,["截止日期", "团队", "计划"])>, - "merged_from": <需要被合并的参考记忆列表,当没有提供参考记忆时,不需要输出这个字段 > + "tags": <相关主题关键词列表(例如,["截止日期", "团队", "计划"])> }, ... ], @@ -201,7 +156,7 @@ ${custom_tags_prompt} -示例1-无参考记忆: +示例: 对话: user: [2025年6月26日下午3:00]:嗨Jerry!昨天下午3点我和团队开了个会,讨论新项目。 assistant: 哦Tom!你觉得团队能在12月15日前完成吗? @@ -228,7 +183,7 @@ "summary": "Tom目前正专注于管理一个进度紧张的新项目。在2025年6月25日的团队会议后,他意识到原定2025年12月15日的截止日期可能无法实现,因为后端会延迟。由于担心测试时间不足,他接受了Jerry提出的延期建议。Tom计划在次日早上的会议上提出将截止日期推迟至2026年1月5日。他的行为反映出对时间线的担忧,以及积极、以团队为导向的问题解决方式。" } -示例2-无参考记忆: +对话: assistant: [2025年8月15日上午10:30]: 你提到的那本《深度工作》确实很适合你现在的情况。这本书讲了......(略),作者建议每天留出2-3 小时的专注时间块,期间关闭所有通知。考虑到你下周要交的报告,可以试试早上9点到11点这个时段。 @@ -247,73 +202,28 @@ } 注意:当对话仅有助手消息时,应使用"助手推荐"、"助手建议"等表述,而非将其错误归因为用户的陈述或计划。 -示例3-无参考记忆(注意:当用户语言为中文时,您也需输出中文): +另一个中文示例(注意:当用户语言为中文时,您也需输出中文): { "memory list": [ { "key": "项目会议", "memory_type": "LongTermMemory", "value": "在2025年6月25日下午3点,Tom与团队开会讨论了新项目,涉及时间表,并提出了对12月15日截止日期可行性的担忧。", - "tags": ["项目", "时间表", "会议", "截止日期"], - "merged_from": [ - "xxxx-xxxx-xxxx-xxxx-xxx", - "xxxx-xxxx-xxxx-xxxx-xx", - ], + "tags": ["项目", "时间表", "会议", "截止日期"] }, ... ], "summary": "Tom 目前专注于管理一个进度紧张的新项目..." } -注意,我们可能给出部分参考记忆,这部分记忆如果和新添加的记忆大量重复,合并记忆,并在输入中多一个`merged_from`字段指明合并的记忆; -新添加的记忆如果和参考记忆有强关联,可以在提取时适当参考(但一定不要捏造记忆,十分有把握再进行参考); -如果没有给出参考记忆、或参考记忆和新添加的记忆无关,直接忽略就好。 +请始终使用与对话相同的语言进行回复。 -示例4-带参考记忆: 对话: -user: [2026年1月13日] 冬天滑雪真的太快乐了!我打算这周末和朋友再滑一次! -assistant:[2026年1月13日] 听起来就很棒! -user: [2026年1月14日] 你还记得我的滑雪搭子吧?他叫Tom,我们每年都一起滑雪!这周也是! - -参考记忆: -[xxxx-xxxx-xxxx-xxxx-01]: 用户在2025年12月29日表达了对滑雪的狂热喜爱 -[xxxx-xxxx-xxxx-xxxx-06]: 用户的滑雪搭子叫Tom -[xxxx-xxxx-xxxx-xxxx-11]: 二世谷是用户多次去过的滑雪胜地,用户在比罗夫滑雪场认识了Tom并成为好朋友 -[xxxx-xxxx-xxxx-xxxx-12]: 用户2025年1月1日和助手讨论了滑雪装备,打算新买一个滑雪背包。 - -输出: -{ - "memory list": [ - { - "key": "用户冬季滑雪计划", - "memory_type": "UserMemory", - "value": "用户在2026年1月13日计划在周末与朋友Tom再次进行滑雪活动。", - "tags": ["滑雪", "运动偏好", "计划", "冬季活动"], - }, - { - "key": "用户的滑雪伙伴叫Tom", - "memory_type": "UserMemory", - "value": "用户在2026年1月14日再次提到其滑雪搭子Tom,并进一步说明他们每年都会一起滑雪。这一描述强化了双方长期稳定的滑雪伙伴关系,在原有记忆基础上补充了新的时间规律性信息。", - "tags": ["人际关系", "滑雪搭子", "长期习惯"], - "merged_from": [ - "xxxx-xxxx-xxxx-xxxx-06", - "xxxx-xxxx-xxxx-xxxx-11", - ], - } - ], - "summary": "用户近期再次强化了自己对滑雪的热爱,并在2026年1月13日明确表示冬季滑雪带来极大的快乐,同时计划于当周周末与朋友再度滑雪。这表明滑雪对用户而言仍然是一项高度重要的活动。此外,用户在2026年1月14日补充了关于其滑雪伙伴Tom的长期关系细节,强调两人每年都会结伴滑雪,进一步巩固了此人际关系在用户生活中的重要性。" -} - -您的任务: -待提取的对话: ${conversation} -参考记忆: -${reference} - -请始终使用与对话相同的语言进行回复。 您的输出:""" + SIMPLE_STRUCT_DOC_READER_PROMPT = """You are an expert text analyst for a search and retrieval system. Your task is to process a document chunk and generate a single, structured JSON object. @@ -957,10 +867,76 @@ Important: Output **only** the JSON. No extra text. """ +MEMORY_MERGE_PROMPT_EN = """You are a memory consolidation expert. Given a new memory and similar existing memories, decide if they should be merged. + +New Memory: +{new_memory} + +Similar Existing Memories: +{similar_memories} + +If the new memory substantially overlaps with or complements the existing memories, merge them into a single consolidated memory and return a JSON object with: +- "value": the merged memory content (preserving all unique information) +- "merged_from": list of IDs from similar_memories that were merged +- "should_merge": true + +If the new memory is distinct and should remain separate, return: +- "should_merge": false + +Return ONLY a valid JSON object, nothing else.""" + +MEMORY_MERGE_PROMPT_ZH = """你是一个记忆整合专家。给定一个新记忆和相似的现有记忆,判断它们是否应该合并。 + +示例: +新记忆: +用户的名字是Tom,用户喜欢滑雪,并计划周末去滑雪 + +相似的现有记忆: +xxxx-xxxx-xxxx-xxxx-01: 用户的名字是Tom +xxxx-xxxx-xxxx-xxxx-10: 用户喜欢滑雪 +xxxx-xxxx-xxxx-xxxx-11: 用户住在海边 + +应该的返回值: +{{ + "value": "用户的名字是Tom,用户喜欢滑雪", + "merged_from": ["xxxx-xxxx-xxxx-xxxx-01", "xxxx-xxxx-xxxx-xxxx-10"], + "should_merge": true +}} + +新记忆: +用户周天要参加一个聚会 + +相似的现有记忆: +xxxx-xxxx-xxxx-xxxx-01: 用户昨天读了一本书 + +应该的返回值: +{{ + "should_merge": false +}} + + +如果新记忆与现有记忆大量重叠或互补,将它们合并为一个整合的记忆,并返回一个JSON列表: +- "value": 合并后的记忆内容(保留所有独特信息) +- "merged_from": 被合并的相似记忆ID列表 +- "should_merge": true + +如果新记忆是独特的,应该保持独立,返回: +- "should_merge": false + +新记忆: +{new_memory} + +相似的现有记忆: +{similar_memories} + +只返回有效的JSON对象,不要其他内容。""" + # Prompt mapping for specialized tasks (e.g., hallucination filtering) PROMPT_MAPPING = { "hallucination_filter": SIMPLE_STRUCT_HALLUCINATION_FILTER_PROMPT, "rewrite": SIMPLE_STRUCT_REWRITE_MEMORY_PROMPT, "rewrite_user_only": SIMPLE_STRUCT_REWRITE_MEMORY_USER_ONLY_PROMPT, "add_before_search": SIMPLE_STRUCT_ADD_BEFORE_SEARCH_PROMPT, + "memory_merge_en": MEMORY_MERGE_PROMPT_EN, + "memory_merge_zh": MEMORY_MERGE_PROMPT_ZH, } From 4279603c8523cd4f77b2c0e3261f1be4f130ebaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 15 Jan 2026 15:48:41 +0800 Subject: [PATCH 12/14] feat: modify multi modal struct --- src/memos/mem_reader/multi_modal_struct.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 92f3236ca..be9f02b22 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -455,7 +455,7 @@ def _get_maybe_merged_memory( lang = detect_lang(mem_text) # Search for similar memories - merge_threshold = kwargs.get("merge_similarity_threshold", 0.5) + merge_threshold = kwargs.get("merge_similarity_threshold", 0.3) try: search_results = self.graph_db.search_by_embedding( @@ -499,6 +499,10 @@ def _get_maybe_merged_memory( "memory": mem.get("memory", ""), } ) + logger.info( + f"Valid similar memories for {mem_text} is " + f"{len(filtered_similar)}: {filtered_similar}" + ) if not filtered_similar: # No valid similar memories, return original From 92c952ac41eb71077c869c05a10c099ce6aa1743 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 15 Jan 2026 15:53:30 +0800 Subject: [PATCH 13/14] feat: update en-version MEMORY_MERGE_PROMPT_EN --- src/memos/templates/mem_reader_prompts.py | 35 ++++++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/src/memos/templates/mem_reader_prompts.py b/src/memos/templates/mem_reader_prompts.py index f85c8632b..4a451c842 100644 --- a/src/memos/templates/mem_reader_prompts.py +++ b/src/memos/templates/mem_reader_prompts.py @@ -869,11 +869,32 @@ MEMORY_MERGE_PROMPT_EN = """You are a memory consolidation expert. Given a new memory and similar existing memories, decide if they should be merged. -New Memory: -{new_memory} +Example: +New memory: +The user’s name is Tom, the user likes skiing, and plans to go skiing this weekend -Similar Existing Memories: -{similar_memories} +Similar existing memories: +xxxx-xxxx-xxxx-xxxx-01: The user’s name is Tom +xxxx-xxxx-xxxx-xxxx-10: The user likes skiing +xxxx-xxxx-xxxx-xxxx-11: The user lives by the sea + +Expected output: +{{ +“value”: “The user’s name is Tom, the user likes skiing”, +“merged_from”: [“xxxx-xxxx-xxxx-xxxx-01”, “xxxx-xxxx-xxxx-xxxx-10”], +“should_merge”: true +}} + +New memory: +The user is going to attend a party on Sunday + +Similar existing memories: +xxxx-xxxx-xxxx-xxxx-01: The user read a book yesterday + +Expected output: +{{ +“should_merge”: false +}} If the new memory substantially overlaps with or complements the existing memories, merge them into a single consolidated memory and return a JSON object with: - "value": the merged memory content (preserving all unique information) @@ -883,6 +904,12 @@ If the new memory is distinct and should remain separate, return: - "should_merge": false +New Memory: +{new_memory} + +Similar Existing Memories: +{similar_memories} + Return ONLY a valid JSON object, nothing else.""" MEMORY_MERGE_PROMPT_ZH = """你是一个记忆整合专家。给定一个新记忆和相似的现有记忆,判断它们是否应该合并。 From c69cbaa568835d1f5f599634e2a6b0ec627a157a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 15 Jan 2026 17:23:04 +0800 Subject: [PATCH 14/14] feat: add status to some graph db func; modify prompt --- src/memos/graph_dbs/base.py | 12 ++- src/memos/graph_dbs/neo4j.py | 28 ++++- src/memos/graph_dbs/polardb.py | 11 +- src/memos/templates/mem_reader_prompts.py | 119 +++++++++++++++++----- 4 files changed, 138 insertions(+), 32 deletions(-) diff --git a/src/memos/graph_dbs/base.py b/src/memos/graph_dbs/base.py index a8f8ff414..87a50d443 100644 --- a/src/memos/graph_dbs/base.py +++ b/src/memos/graph_dbs/base.py @@ -160,13 +160,17 @@ def search_by_embedding(self, vector: list[float], top_k: int = 5, **kwargs) -> """ @abstractmethod - def get_by_metadata(self, filters: list[dict[str, Any]]) -> list[str]: + def get_by_metadata( + self, filters: list[dict[str, Any]], status: str | None = None + ) -> list[str]: """ Retrieve node IDs that match given metadata filters. Args: filters (dict[str, Any]): A dictionary of attribute-value filters. Example: {"topic": "psychology", "importance": 2} + status (str, optional): Filter by status (e.g., 'activated', 'archived'). + If None, no status filter is applied. Returns: list[str]: Node IDs whose metadata match the filter conditions. @@ -239,13 +243,17 @@ def import_graph(self, data: dict[str, Any]) -> None: """ @abstractmethod - def get_all_memory_items(self, scope: str, include_embedding: bool = False) -> list[dict]: + def get_all_memory_items( + self, scope: str, include_embedding: bool = False, status: str | None = None + ) -> list[dict]: """ Retrieve all memory items of a specific memory_type. Args: scope (str): Must be one of 'WorkingMemory', 'LongTermMemory', or 'UserMemory'. include_embedding: with/without embedding + status (str, optional): Filter by status (e.g., 'activated', 'archived'). + If None, no status filter is applied. Returns: list[dict]: Full list of memory items under this scope. diff --git a/src/memos/graph_dbs/neo4j.py b/src/memos/graph_dbs/neo4j.py index 64aedc8f4..8698b6f73 100644 --- a/src/memos/graph_dbs/neo4j.py +++ b/src/memos/graph_dbs/neo4j.py @@ -916,6 +916,7 @@ def get_by_metadata( filter: dict | None = None, knowledgebase_ids: list[str] | None = None, user_name_flag: bool = True, + status: str | None = None, ) -> list[str]: """ TODO: @@ -933,6 +934,8 @@ def get_by_metadata( {"field": "tags", "op": "contains", "value": "AI"}, ... ] + status (str, optional): Filter by status (e.g., 'activated', 'archived'). + If None, no status filter is applied. Returns: list[str]: Node IDs whose metadata match the filter conditions. (AND logic). @@ -942,15 +945,20 @@ def get_by_metadata( - Can be used for faceted recall or prefiltering before embedding rerank. """ logger.info( - f"[get_by_metadata] filters: {filters},user_name: {user_name},filter: {filter},knowledgebase_ids: {knowledgebase_ids}" + f"[get_by_metadata] filters: {filters},user_name: {user_name},filter: {filter},knowledgebase_ids: {knowledgebase_ids},status: {status}" ) print( - f"[get_by_metadata] filters: {filters},user_name: {user_name},filter: {filter},knowledgebase_ids: {knowledgebase_ids}" + f"[get_by_metadata] filters: {filters},user_name: {user_name},filter: {filter},knowledgebase_ids: {knowledgebase_ids},status: {status}" ) user_name = user_name if user_name else self.config.user_name where_clauses = [] params = {} + # Add status filter if provided + if status: + where_clauses.append("n.status = $status") + params["status"] = status + for i, f in enumerate(filters): field = f["field"] op = f.get("op", "=") @@ -1272,8 +1280,10 @@ def import_graph(self, data: dict[str, Any], user_name: str | None = None) -> No def get_all_memory_items( self, scope: str, + include_embedding: bool = False, filter: dict | None = None, knowledgebase_ids: list[str] | None = None, + status: str | None = None, **kwargs, ) -> list[dict]: """ @@ -1281,18 +1291,21 @@ def get_all_memory_items( Args: scope (str): Must be one of 'WorkingMemory', 'LongTermMemory', or 'UserMemory'. + include_embedding (bool): Whether to include embedding in results. filter (dict, optional): Filter conditions with 'and' or 'or' logic for search results. Example: {"and": [{"id": "xxx"}, {"A": "yyy"}]} or {"or": [{"id": "xxx"}, {"A": "yyy"}]} - Returns: + knowledgebase_ids (list[str], optional): List of knowledgebase IDs to filter by. + status (str, optional): Filter by status (e.g., 'activated', 'archived'). + If None, no status filter is applied. Returns: list[dict]: Full list of memory items under this scope. """ logger.info( - f"[get_all_memory_items] scope: {scope},filter: {filter},knowledgebase_ids: {knowledgebase_ids}" + f"[get_all_memory_items] scope: {scope},filter: {filter},knowledgebase_ids: {knowledgebase_ids},status: {status}" ) print( - f"[get_all_memory_items] scope: {scope},filter: {filter},knowledgebase_ids: {knowledgebase_ids}" + f"[get_all_memory_items] scope: {scope},filter: {filter},knowledgebase_ids: {knowledgebase_ids},status: {status}" ) user_name = kwargs.get("user_name") if kwargs.get("user_name") else self.config.user_name @@ -1302,6 +1315,11 @@ def get_all_memory_items( where_clauses = ["n.memory_type = $scope"] params = {"scope": scope} + # Add status filter if provided + if status: + where_clauses.append("n.status = $status") + params["status"] = status + # Build user_name filter with knowledgebase_ids support (OR relationship) using common method user_name_conditions, user_name_params = self._build_user_name_and_kb_ids_conditions_cypher( user_name=user_name, diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index e67f866ac..4b739bb0f 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -2823,6 +2823,7 @@ def get_all_memory_items( user_name: str | None = None, filter: dict | None = None, knowledgebase_ids: list | None = None, + status: str | None = None, ) -> list[dict]: """ Retrieve all memory items of a specific memory_type. @@ -2831,12 +2832,16 @@ def get_all_memory_items( scope (str): Must be one of 'WorkingMemory', 'LongTermMemory', or 'UserMemory'. include_embedding: with/without embedding user_name (str, optional): User name for filtering in non-multi-db mode + filter (dict, optional): Filter conditions with 'and' or 'or' logic for search results. + knowledgebase_ids (list, optional): List of knowledgebase IDs to filter by. + status (str, optional): Filter by status (e.g., 'activated', 'archived'). + If None, no status filter is applied. Returns: list[dict]: Full list of memory items under this scope. """ logger.info( - f"[get_all_memory_items] filter: {filter}, knowledgebase_ids: {knowledgebase_ids}" + f"[get_all_memory_items] filter: {filter}, knowledgebase_ids: {knowledgebase_ids}, status: {status}" ) user_name = user_name if user_name else self._get_config_value("user_name") @@ -2867,6 +2872,8 @@ def get_all_memory_items( if include_embedding: # Build WHERE clause with user_name/knowledgebase_ids and filter where_parts = [f"n.memory_type = '{scope}'"] + if status: + where_parts.append(f"n.status = '{status}'") if user_name_where: # user_name_where already contains parentheses if it's an OR condition where_parts.append(user_name_where) @@ -2927,6 +2934,8 @@ def get_all_memory_items( else: # Build WHERE clause with user_name/knowledgebase_ids and filter where_parts = [f"n.memory_type = '{scope}'"] + if status: + where_parts.append(f"n.status = '{status}'") if user_name_where: # user_name_where already contains parentheses if it's an OR condition where_parts.append(user_name_where) diff --git a/src/memos/templates/mem_reader_prompts.py b/src/memos/templates/mem_reader_prompts.py index 4a451c842..2a2df0e0b 100644 --- a/src/memos/templates/mem_reader_prompts.py +++ b/src/memos/templates/mem_reader_prompts.py @@ -867,52 +867,120 @@ Important: Output **only** the JSON. No extra text. """ -MEMORY_MERGE_PROMPT_EN = """You are a memory consolidation expert. Given a new memory and similar existing memories, decide if they should be merged. +MEMORY_MERGE_PROMPT_EN = """You are a memory consolidation expert. Given a new memory and a set of similar existing memories, determine whether they should be merged. + +Before generating the value, you must complete the following reasoning steps (done in internal reasoning, no need to output them): +1. Identify the “fact units” contained in the new memory, for example: +• Identity-type facts: name, occupation, place of residence, etc. +• Stable preference-type facts: things the user likes/dislikes long-term, frequently visited places, etc. +• Relationship-type facts: relationships with someone (friend, colleague, fixed activity partner, etc.) +• One-off event/plan-type facts: events on a specific day, temporary plans for this weekend, etc. +2. For each fact unit, determine: +• Which existing memories are expressing “the same kind of fact” +• Whether the corresponding fact in the new memory is just a “repeated confirmation” of that fact, rather than “new factual content” + +Merge rules (must be followed when generating value): +• The merged value: +• Must not repeat the same meaning (each fact should be described only once) +• Must not repeat the same fact just because it was mentioned multiple times or at different times +• Unless time itself changes the meaning (for example, “used to dislike → now likes”), do not keep specific time information +• If the new memory contains multiple different types of facts (for example: “name + hobby + plan for this weekend”): +• You may output multiple merge results; each merge result should focus on only one type of fact (for example: one about “name”, one about “hobby”) +• Do not force unrelated facts into the same value +• One-off events/plans (such as “going skiing this weekend”, “attending a party on Sunday”): +• If there is no directly related and complementary event memory in the existing memories, treat it as an independent memory and do not merge it with identity/stable preference-type memories +• Do not merge a “temporary plan” and a “long-term preference” into the same value just because they are related (e.g. a plan to ski vs. a long-term preference for skiing) + +Output format requirements: +• You must return a single JSON object. +• If a merge occurred: +• “value”: The merged memory content (only describe the final conclusion, preserving all “semantically unique” information, without repetition) +• “merged_from”: A list of IDs of the similar memories that were merged +• “should_merge”: true +• If the new memory cannot be merged with any existing memories, return: +• “should_merge”: false Example: New memory: -The user’s name is Tom, the user likes skiing, and plans to go skiing this weekend +The user’s name is Tom, the user likes skiing, and plans to go skiing this weekend. Similar existing memories: xxxx-xxxx-xxxx-xxxx-01: The user’s name is Tom xxxx-xxxx-xxxx-xxxx-10: The user likes skiing xxxx-xxxx-xxxx-xxxx-11: The user lives by the sea -Expected output: +Expected return value: {{ -“value”: “The user’s name is Tom, the user likes skiing”, -“merged_from”: [“xxxx-xxxx-xxxx-xxxx-01”, “xxxx-xxxx-xxxx-xxxx-10”], -“should_merge”: true +"value": "The user's name is Tom and the user likes skiing", +"merged_from": ["xxxx-xxxx-xxxx-xxxx-01", "xxxx-xxxx-xxxx-xxxx-10"], +"should_merge": true }} New memory: -The user is going to attend a party on Sunday +The user is going to attend a party on Sunday. Similar existing memories: -xxxx-xxxx-xxxx-xxxx-01: The user read a book yesterday +xxxx-xxxx-xxxx-xxxx-01: The user read a book yesterday. -Expected output: +Expected return value: {{ -“should_merge”: false +"should_merge": false }} -If the new memory substantially overlaps with or complements the existing memories, merge them into a single consolidated memory and return a JSON object with: -- "value": the merged memory content (preserving all unique information) -- "merged_from": list of IDs from similar_memories that were merged -- "should_merge": true +If the new memory largely overlaps with or complements the existing memories, merge them into an integrated memory and return a JSON object: +• “value”: The merged memory content +• “merged_from”: A list of IDs of the similar memories that were merged +• “should_merge”: true + +If the new memory is unique and should remain independent, return: +{{ +"should_merge": false +}} -If the new memory is distinct and should remain separate, return: -- "should_merge": false +You must only return a valid JSON object in the final output, and no additional content (no natural language explanations, no extra fields). -New Memory: +New memory: {new_memory} -Similar Existing Memories: +Similar existing memories: {similar_memories} -Return ONLY a valid JSON object, nothing else.""" +Only return a valid JSON object, and do not include any other content. +""" -MEMORY_MERGE_PROMPT_ZH = """你是一个记忆整合专家。给定一个新记忆和相似的现有记忆,判断它们是否应该合并。 +MEMORY_MERGE_PROMPT_ZH = """ +你是一个记忆整合专家。给定一个新记忆和相似的现有记忆,判断它们是否应该合并。 + +在生成 value 之前,必须先完成以下判断步骤(在内在推理中完成,不需要输出): +1. 识别新记忆中包含的「事实单元」,例如: + - 身份信息类:名字、职业、居住地等 + - 稳定偏好类:长期喜欢/不喜欢的事物、常去地点等 + - 关系类:与某人的关系(朋友、同事、固定搭子等) + - 一次性事件/计划类:某天要参加的活动、本周末的临时安排等 +2. 对每个事实单元,判断: + - 哪些 existing memories 在表达“同一类事实”, + - 新记忆中对应的事实是否只是对该事实的「重复确认」,而不是“新的事实内容” + +合并规则(生成 value 时必须遵守): +- 合并后的 value: + - 不要重复表达同一语义(同一事实只描述一次) + - 不要因为多次提及或不同时间而重复同一事实 + - 除非时间本身改变了语义(例如“从不喜欢 → 现在开始喜欢”),否则不要保留具体时间信息 +- 如果新记忆中包含多个不同类型的事实(例如“名字 + 爱好 + 本周计划”): + - 不要合并就好 + - 不要把彼此无关的事实硬塞进同一个 value 中 +- 一次性事件/计划(如“本周末去滑雪”“周天参加聚会”): + - 如果 existing memories 中没有与之直接相关、可互补的事件记忆,则视为独立记忆,不要与身份/长期偏好类记忆合并 + - 不要因为它和某个长期偏好有关(例如喜欢滑雪),就把“临时计划”和“长期偏好”合在一个 value 里 + +输出格式要求: +- 你需要返回一个 JSON 对象。 +- 若发生了合并: + - "value": 合并后的记忆内容(只描述最终结论,保留所有「语义上独特」的信息,不重复) + - "merged_from": 被合并的相似记忆 ID 列表 + - "should_merge": true +- 若新记忆无法与现有记忆合并,返回: + - "should_merge": false 示例: 新记忆: @@ -941,14 +1009,17 @@ "should_merge": false }} - -如果新记忆与现有记忆大量重叠或互补,将它们合并为一个整合的记忆,并返回一个JSON列表: -- "value": 合并后的记忆内容(保留所有独特信息) +如果新记忆与现有记忆大量重叠或互补,将它们合并为一个整合的记忆,并返回一个JSON对象: +- "value": 合并后的记忆内容 - "merged_from": 被合并的相似记忆ID列表 - "should_merge": true 如果新记忆是独特的,应该保持独立,返回: -- "should_merge": false +{{ + "should_merge": false +}} + +最终只返回有效的 JSON 对象,不要任何额外内容(不要自然语言解释、不要多余字段)。 新记忆: {new_memory}