diff --git a/src/memos/api/handlers/formatters_handler.py b/src/memos/api/handlers/formatters_handler.py index 29e376d33..e74f9d46b 100644 --- a/src/memos/api/handlers/formatters_handler.py +++ b/src/memos/api/handlers/formatters_handler.py @@ -146,7 +146,8 @@ def separate_knowledge_and_conversation_mem(memories: list[dict[str, Any]]): for item in memories: sources = item["metadata"]["sources"] if ( - len(sources) > 0 + item["metadata"]["memory_type"] != "RawFileMemory" + and len(sources) > 0 and "type" in sources[0] and sources[0]["type"] == "file" and "content" in sources[0] @@ -192,8 +193,7 @@ def rerank_knowledge_mem( key=lambda item: item.get("metadata", {}).get("relativity", 0.0), reverse=True, ) - - # TODO revoke sources replace memory value + # replace memory value with source.content for LongTermMemory, WorkingMemory or UserMemory for item in reranked_knowledge_mem: item["memory"] = item["metadata"]["sources"][0]["content"] item["metadata"]["sources"] = [] diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index b2f8a9fa3..76a54103f 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -392,8 +392,8 @@ class APISearchRequest(BaseRequest): ) # Internal field for search memory type search_memory_type: str = Field( - "All", - description="Type of memory to search: All, WorkingMemory, LongTermMemory, UserMemory, OuterMemory, ToolSchemaMemory, ToolTrajectoryMemory", + "AllSummaryMemory", + description="Type of memory to search: All, WorkingMemory, LongTermMemory, UserMemory, OuterMemory, ToolSchemaMemory, ToolTrajectoryMemory, RawFileMemory, AllSummaryMemory", ) # ==== Context ==== diff --git a/src/memos/mem_feedback/feedback.py b/src/memos/mem_feedback/feedback.py index 1d199c6cb..5bc1b86ed 100644 --- a/src/memos/mem_feedback/feedback.py +++ b/src/memos/mem_feedback/feedback.py @@ -234,18 +234,15 @@ def _single_add_operation( to_add_memory.metadata.tags = new_memory_item.metadata.tags to_add_memory.memory = new_memory_item.memory to_add_memory.metadata.embedding = new_memory_item.metadata.embedding - to_add_memory.metadata.user_id = new_memory_item.metadata.user_id - to_add_memory.metadata.created_at = to_add_memory.metadata.updated_at = ( - datetime.now().isoformat() - ) - to_add_memory.metadata.background = new_memory_item.metadata.background else: to_add_memory = new_memory_item.model_copy(deep=True) - to_add_memory.metadata.created_at = to_add_memory.metadata.updated_at = ( - datetime.now().isoformat() - ) - to_add_memory.metadata.background = new_memory_item.metadata.background + + to_add_memory.metadata.created_at = to_add_memory.metadata.updated_at = ( + datetime.now().isoformat() + ) + to_add_memory.metadata.background = new_memory_item.metadata.background + to_add_memory.metadata.sources = [] to_add_memory.id = "" added_ids = self._retry_db_operation( diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 9edcd0a55..6e6c1ff73 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -596,6 +596,7 @@ def _process_string_fine( ) -> list[TextualMemoryItem]: """ Process fast mode memory items through LLM to generate fine mode memories. + Where fast_memory_items are raw chunk memory items, not the final memory items. """ if not fast_memory_items: return [] diff --git a/src/memos/mem_reader/read_multi_modal/file_content_parser.py b/src/memos/mem_reader/read_multi_modal/file_content_parser.py index fbc704d0b..c2dab6282 100644 --- a/src/memos/mem_reader/read_multi_modal/file_content_parser.py +++ b/src/memos/mem_reader/read_multi_modal/file_content_parser.py @@ -448,10 +448,6 @@ def parse_fast( if filename: content_parts.append(f"[Filename: {filename}]") - # If no content can be extracted, create a placeholder - if not content_parts: - content_parts.append("[File: unknown]") - # Combine content parts content = " ".join(content_parts) @@ -513,38 +509,6 @@ def parse_fast( ) memory_items.append(memory_item) - # If no chunks were created, create a placeholder - if not memory_items: - # Create source for placeholder (no chunk index since there are no chunks) - placeholder_source = self.create_source( - message, - info, - chunk_index=None, - chunk_total=0, - chunk_content=content, - file_url_flag=file_url_flag, - ) - memory_item = TextualMemoryItem( - memory=content, - metadata=TreeNodeTextualMemoryMetadata( - user_id=user_id, - session_id=session_id, - memory_type=memory_type, - status="activated", - tags=["mode:fast", "multimodal:file"], - key=_derive_key(content), - embedding=self.embedder.embed([content])[0], - usage=[], - sources=[placeholder_source], - background="", - confidence=0.99, - type="fact", - info=info_, - file_ids=file_ids, - ), - ) - memory_items.append(memory_item) - return memory_items def parse_fine( @@ -781,6 +745,31 @@ def _process_chunk(chunk_idx: int, chunk_text: str) -> TextualMemoryItem: logger.warning(f"[FileContentParser] Fallback to raw for chunk {chunk_idx}") return _make_fallback(chunk_idx, chunk_text) + def _relate_chunks(items: list[TextualMemoryItem]) -> None: + """ + Relate chunks to each other. + """ + if len(items) <= 1: + return + + def get_chunk_idx(item: TextualMemoryItem) -> int: + """Extract chunk_idx from item's source metadata.""" + if item.metadata.sources and len(item.metadata.sources) > 0: + source = item.metadata.sources[0] + if source.file_info and isinstance(source.file_info, dict): + chunk_idx = source.file_info.get("chunk_index") + if chunk_idx is not None: + return chunk_idx + return float("inf") + + sorted_items = sorted(items, key=get_chunk_idx) + + # Relate adjacent items + for i in range(len(sorted_items) - 1): + sorted_items[i].metadata.following_id = sorted_items[i + 1].id + sorted_items[i + 1].metadata.preceding_id = sorted_items[i].id + return sorted_items + # Process chunks concurrently with progress bar memory_items = [] chunk_map = dict(valid_chunks) @@ -802,8 +791,24 @@ def _process_chunk(chunk_idx: int, chunk_text: str) -> TextualMemoryItem: chunk_idx = futures[future] try: node = future.result() - if node: - memory_items.append(node) + memory_items.append(node) + # save raw file + node_id = node.id + if node.memory != node.metadata.sources[0].content: + chunk_node = _make_memory_item( + value=node.metadata.sources[0].content, + mem_type="RawFileMemory", + tags=[ + "mode:fine", + "multimodal:file", + f"chunk:{chunk_idx + 1}/{total_chunks}", + ], + chunk_idx=chunk_idx, + chunk_content="", + ) + chunk_node.metadata.summary_id = node_id + memory_items.append(chunk_node) + except Exception as e: tqdm.write(f"[ERROR] Chunk {chunk_idx} failed: {e}") logger.error(f"[FileContentParser] Future failed for chunk {chunk_idx}: {e}") @@ -816,6 +821,14 @@ def _process_chunk(chunk_idx: int, chunk_text: str) -> TextualMemoryItem: logger.info( f"[FileContentParser] Completed processing {len(memory_items)}/{total_chunks} chunks" ) + rawfile_items = [ + memory for memory in memory_items if memory.metadata.memory_type == "RawFileMemory" + ] + mem_items = [ + memory for memory in memory_items if memory.metadata.memory_type != "RawFileMemory" + ] + related_rawfile_items = _relate_chunks(rawfile_items) + memory_items = mem_items + related_rawfile_items return memory_items or [ _make_memory_item( diff --git a/src/memos/mem_reader/read_multi_modal/user_parser.py b/src/memos/mem_reader/read_multi_modal/user_parser.py index 1c9afab65..268acd2fb 100644 --- a/src/memos/mem_reader/read_multi_modal/user_parser.py +++ b/src/memos/mem_reader/read_multi_modal/user_parser.py @@ -64,6 +64,10 @@ def create_source( part_type = part.get("type", "") if part_type == "text": text_contents.append(part.get("text", "")) + if part_type == "file": + file_info = part.get("file", {}) + file_data = file_info.get("file_data", "") + text_contents.append(file_data) # Detect overall language from all text content overall_lang = "en" diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index d4ac09cc3..a9bf6e366 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -1,6 +1,7 @@ import concurrent.futures import contextlib import json +import os import traceback from memos.configs.mem_scheduler import GeneralSchedulerConfig @@ -840,7 +841,7 @@ def _process_memories_with_reader( ) return - # Get the original memory items + # Get the original fast memory (raw chunk) items memory_items = [] for mem_id in mem_ids: try: @@ -893,14 +894,38 @@ def _process_memories_with_reader( # Add the enhanced memories back to the memory system if flattened_memories: - enhanced_mem_ids = text_mem.add(flattened_memories, user_name=user_name) + mem_group = [ + memory + for memory in flattened_memories + if memory.metadata.memory_type != "RawFileMemory" + ] + enhanced_mem_ids = text_mem.add(mem_group, user_name=user_name) logger.info( f"Added {len(enhanced_mem_ids)} enhanced memories: {enhanced_mem_ids}" ) + # add raw file nodes and edges + if os.getenv("SAVE_RAWFILE_NODE", "false").lower() == "true": + raw_file_mem_group = [ + memory + for memory in flattened_memories + if memory.metadata.memory_type == "RawFileMemory" + ] + text_mem.add_rawfile_nodes_n_edges( + raw_file_mem_group, + enhanced_mem_ids, + user_id=user_id, + user_name=user_name, + ) + # Mark merged_from memories as archived when provided in memory metadata + summary_memories = [ + memory + for memory in flattened_memories + if memory.metadata.memory_type != "RawFileMemory" + ] if self.mem_reader.graph_db: - for memory in flattened_memories: + for memory in summary_memories: merged_from = (memory.metadata.info or {}).get("merged_from") if merged_from: old_ids = ( @@ -923,7 +948,7 @@ def _process_memories_with_reader( else: # Check if any memory has merged_from but graph_db is unavailable has_merged_from = any( - (m.metadata.info or {}).get("merged_from") for m in flattened_memories + (m.metadata.info or {}).get("merged_from") for m in summary_memories ) if has_merged_from: logger.warning( diff --git a/src/memos/memories/textual/item.py b/src/memos/memories/textual/item.py index a1c85033b..bd49c16a0 100644 --- a/src/memos/memories/textual/item.py +++ b/src/memos/memories/textual/item.py @@ -112,6 +112,7 @@ class TreeNodeTextualMemoryMetadata(TextualMemoryMetadata): "OuterMemory", "ToolSchemaMemory", "ToolTrajectoryMemory", + "RawFileMemory", ] = Field(default="WorkingMemory", description="Memory lifecycle type.") sources: list[SourceMessage] | None = Field( default=None, description="Multiple origins of the memory (e.g., URLs, notes)." diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index b963cfa9b..f020d92e8 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -1,7 +1,9 @@ +import concurrent.futures import json import os import shutil import tempfile +import time from datetime import datetime from pathlib import Path @@ -9,6 +11,7 @@ from memos.configs.memory import TreeTextMemoryConfig from memos.configs.reranker import RerankerConfigFactory +from memos.context.context import ContextThreadPoolExecutor from memos.embedders.factory import EmbedderFactory, OllamaEmbedder from memos.graph_dbs.factory import GraphStoreFactory, Neo4jGraphDB from memos.llms.factory import AzureLLM, LLMFactory, OllamaLLM, OpenAILLM @@ -457,3 +460,99 @@ def _cleanup_old_backups(root_dir: Path, keep_last_n: int) -> None: logger.info(f"Deleted old backup directory: {old_dir}") except Exception as e: logger.warning(f"Failed to delete backup {old_dir}: {e}") + + def add_rawfile_nodes_n_edges( + self, + raw_file_mem_group: list[TextualMemoryItem], + mem_ids: list[str], + user_id: str | None = None, + user_name: str | None = None, + ) -> None: + """ + Add raw file nodes and edges to the graph. Edges are between raw file ids and mem_ids. + Args: + raw_file_mem_group: List of raw file memory items. + mem_ids: List of memory IDs. + user_name: cube id. + """ + rawfile_ids_local: list[str] = self.add( + raw_file_mem_group, + user_name=user_name, + ) + + from_ids = [] + to_ids = [] + types = [] + + for raw_file_mem in raw_file_mem_group: + # Add SUMMARY edge: memory -> raw file; raw file -> memory + if hasattr(raw_file_mem.metadata, "summary_id") and raw_file_mem.metadata.summary_id: + summary_id = raw_file_mem.metadata.summary_id + if summary_id in mem_ids: + from_ids.append(summary_id) + to_ids.append(raw_file_mem.id) + types.append("MATERIAL") + + from_ids.append(raw_file_mem.id) + to_ids.append(summary_id) + types.append("SUMMARY") + + # Add FOLLOWING edge: current chunk -> next chunk + if ( + hasattr(raw_file_mem.metadata, "following_id") + and raw_file_mem.metadata.following_id + ): + following_id = raw_file_mem.metadata.following_id + if following_id in rawfile_ids_local: + from_ids.append(raw_file_mem.id) + to_ids.append(following_id) + types.append("FOLLOWING") + + # Add PRECEDING edge: previous chunk -> current chunk + if ( + hasattr(raw_file_mem.metadata, "preceding_id") + and raw_file_mem.metadata.preceding_id + ): + preceding_id = raw_file_mem.metadata.preceding_id + if preceding_id in rawfile_ids_local: + from_ids.append(raw_file_mem.id) + to_ids.append(preceding_id) + types.append("PRECEDING") + + start_time = time.time() + self.add_graph_edges( + from_ids, + to_ids, + types, + user_name=user_name, + ) + end_time = time.time() + logger.info(f"[RawFile]Added {len(rawfile_ids_local)} chunks for user {user_id}") + logger.info( + f"[RawFile]Time taken to add edges: {end_time - start_time} seconds for {len(from_ids)} edges" + ) + + def add_graph_edges( + self, from_ids: list[str], to_ids: list[str], types: list[str], user_name: str | None = None + ) -> None: + """ + Add edges to the graph. + Args: + from_ids: List of source node IDs. + to_ids: List of target node IDs. + types: List of edge types. + user_name: Optional user name. + """ + with ContextThreadPoolExecutor(max_workers=20) as executor: + futures = { + executor.submit( + self.graph_store.add_edge, from_id, to_id, edge_type, user_name=user_name + ) + for from_id, to_id, edge_type in zip(from_ids, to_ids, types, strict=False) + } + + for future in concurrent.futures.as_completed(futures): + try: + future.result() + except Exception as e: + logger.exception("Add edge error: ", exc_info=e) diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index c96d5a12a..1fff23c83 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -68,12 +68,14 @@ def __init__( self.current_memory_size = { "WorkingMemory": 0, "LongTermMemory": 0, + "RawFileMemory": 0, "UserMemory": 0, } if not memory_size: self.memory_size = { "WorkingMemory": 20, "LongTermMemory": 1500, + "RawFileMemory": 1500, "UserMemory": 480, } logger.info(f"MemorySize is {self.memory_size}") @@ -159,7 +161,11 @@ def _add_memories_batch( for memory in memories: working_id = str(uuid.uuid4()) - if memory.metadata.memory_type not in ("ToolSchemaMemory", "ToolTrajectoryMemory"): + if memory.metadata.memory_type not in ( + "ToolSchemaMemory", + "ToolTrajectoryMemory", + "RawFileMemory", + ): working_metadata = memory.metadata.model_copy( update={"memory_type": "WorkingMemory"} ).model_dump(exclude_none=True) @@ -176,8 +182,9 @@ def _add_memories_batch( "UserMemory", "ToolSchemaMemory", "ToolTrajectoryMemory", + "RawFileMemory", ): - graph_node_id = str(uuid.uuid4()) + graph_node_id = memory.id if hasattr(memory, "id") else str(uuid.uuid4()) metadata_dict = memory.metadata.model_dump(exclude_none=True) metadata_dict["updated_at"] = datetime.now().isoformat() @@ -310,7 +317,11 @@ def _process_memory(self, memory: TextualMemoryItem, user_name: str | None = Non working_id = str(uuid.uuid4()) with ContextThreadPoolExecutor(max_workers=2, thread_name_prefix="mem") as ex: - if memory.metadata.memory_type not in ("ToolSchemaMemory", "ToolTrajectoryMemory"): + if memory.metadata.memory_type not in ( + "ToolSchemaMemory", + "ToolTrajectoryMemory", + "RawFileMemory", + ): f_working = ex.submit( self._add_memory_to_db, memory, "WorkingMemory", user_name, working_id ) @@ -321,6 +332,7 @@ def _process_memory(self, memory: TextualMemoryItem, user_name: str | None = Non "UserMemory", "ToolSchemaMemory", "ToolTrajectoryMemory", + "RawFileMemory", ): f_graph = ex.submit( self._add_to_graph_memory, @@ -372,7 +384,7 @@ def _add_to_graph_memory( """ Generalized method to add memory to a graph-based memory type (e.g., LongTermMemory, UserMemory). """ - node_id = str(uuid.uuid4()) + node_id = memory.id if hasattr(memory, "id") else str(uuid.uuid4()) # Step 2: Add new node to graph metadata_dict = memory.metadata.model_dump(exclude_none=True) tags = metadata_dict.get("tags") or [] diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/recall.py b/src/memos/memories/textual/tree_text_memory/retrieve/recall.py index 4541b118b..f159fa779 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/recall.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/recall.py @@ -67,6 +67,7 @@ def retrieve( "UserMemory", "ToolSchemaMemory", "ToolTrajectoryMemory", + "RawFileMemory", ]: raise ValueError(f"Unsupported memory scope: {memory_scope}") diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py index 8c30d74f3..c5249e61d 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py @@ -1,5 +1,7 @@ import traceback +from concurrent.futures import as_completed + from memos.context.context import ContextThreadPoolExecutor from memos.embedders.factory import OllamaEmbedder from memos.graph_dbs.factory import Neo4jGraphDB @@ -447,8 +449,8 @@ def _retrieve_from_long_term_and_user( else: cot_embeddings = query_embedding - with ContextThreadPoolExecutor(max_workers=2) as executor: - if memory_type in ["All", "LongTermMemory"]: + with ContextThreadPoolExecutor(max_workers=3) as executor: + if memory_type in ["All", "AllSummaryMemory", "LongTermMemory"]: tasks.append( executor.submit( self.graph_retriever.retrieve, @@ -464,7 +466,7 @@ def _retrieve_from_long_term_and_user( use_fast_graph=self.use_fast_graph, ) ) - if memory_type in ["All", "UserMemory"]: + if memory_type in ["All", "AllSummaryMemory", "UserMemory"]: tasks.append( executor.submit( self.graph_retriever.retrieve, @@ -480,10 +482,27 @@ def _retrieve_from_long_term_and_user( use_fast_graph=self.use_fast_graph, ) ) + if memory_type in ["All", "RawFileMemory"]: + tasks.append( + executor.submit( + self.graph_retriever.retrieve, + query=query, + parsed_goal=parsed_goal, + query_embedding=cot_embeddings, + top_k=top_k * 2, + memory_scope="RawFileMemory", + search_filter=search_filter, + search_priority=search_priority, + user_name=user_name, + id_filter=id_filter, + use_fast_graph=self.use_fast_graph, + ) + ) # Collect results from all tasks for task in tasks: results.extend(task.result()) + results = self._deduplicate_rawfile_results(results, user_name=user_name) return self.reranker.rerank( query=query, @@ -772,6 +791,54 @@ def _sort_and_trim( ) return final_items + @timed + def _deduplicate_rawfile_results(self, results, user_name: str | None = None): + """ + Deduplicate rawfile related memories by edge + """ + if not results: + return results + + summary_ids_to_remove = set() + rawfile_items = [item for item in results if item.metadata.memory_type == "RawFileMemory"] + if not rawfile_items: + return results + + with ContextThreadPoolExecutor(max_workers=min(len(rawfile_items), 10)) as executor: + futures = [ + executor.submit( + self.graph_store.get_edges, + rawfile_item.id, + type="SUMMARY", + direction="OUTGOING", + user_name=user_name, + ) + for rawfile_item in rawfile_items + ] + for future in as_completed(futures): + try: + edges = future.result() + for edge in edges: + summary_target_id = edge.get("to") + if summary_target_id: + summary_ids_to_remove.add(summary_target_id) + logger.debug( + f"[DEDUP] Marking summary node {summary_target_id} for removal (pointed by RawFileMemory)" + ) + except Exception as e: + logger.warning(f"[DEDUP] Failed to get summary target ids: {e}") + + filtered_results = [] + for item in results: + if item.id in summary_ids_to_remove: + logger.debug( + f"[DEDUP] Removing summary node {item.id} because it is pointed by RawFileMemory" + ) + continue + filtered_results.append(item) + + return filtered_results + @timed def _update_usage_history(self, items, info, user_name: str | None = None): """Update usage history in graph DB diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 426cf32be..a9d025583 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -815,16 +815,34 @@ def _process_text_mem( self.logger.info(f"Memory extraction completed for user {add_req.user_id}") # Add memories to text_mem + mem_group = [ + memory for memory in flattened_local if memory.metadata.memory_type != "RawFileMemory" + ] mem_ids_local: list[str] = self.naive_mem_cube.text_mem.add( - flattened_local, + mem_group, user_name=user_context.mem_cube_id, ) + self.logger.info( f"Added {len(mem_ids_local)} memories for user {add_req.user_id} " f"in session {add_req.session_id}: {mem_ids_local}" ) - # Schedule async/sync tasks + # Add raw file nodes and edges + if os.getenv("SAVE_RAWFILE_NODE", "false").lower() == "true" and extract_mode == "fine": + raw_file_mem_group = [ + memory + for memory in flattened_local + if memory.metadata.memory_type == "RawFileMemory" + ] + self.naive_mem_cube.text_mem.add_rawfile_nodes_n_edges( + raw_file_mem_group, + mem_ids_local, + user_id=add_req.user_id, + user_name=user_context.mem_cube_id, + ) + + # Schedule async/sync tasks: async process raw chunk memory | sync only send messages self._schedule_memory_tasks( add_req=add_req, user_context=user_context,