From d5fc3e30d2e76b7332ddb5c8aba568cd1b749c98 Mon Sep 17 00:00:00 2001 From: chentang Date: Tue, 20 Jan 2026 15:43:55 +0800 Subject: [PATCH 1/5] fix:component_init will initialize redis module when setting use_redis to false --- src/memos/api/handlers/component_init.py | 29 +++-- .../mem_scheduler/optimized_scheduler.py | 122 +++++++++++++++--- .../orm_modules/api_redis_model.py | 5 +- 3 files changed, 121 insertions(+), 35 deletions(-) diff --git a/src/memos/api/handlers/component_init.py b/src/memos/api/handlers/component_init.py index 76af6decf..bfbd6271d 100644 --- a/src/memos/api/handlers/component_init.py +++ b/src/memos/api/handlers/component_init.py @@ -133,19 +133,22 @@ def init_server() -> dict[str, Any]: logger.info("Initializing MemOS server components...") # Initialize Redis client first as it is a core dependency for features like scheduler status tracking - try: - from memos.mem_scheduler.orm_modules.api_redis_model import APIRedisDBManager - - redis_client = APIRedisDBManager.load_redis_engine_from_env() - if redis_client: - logger.info("Redis client initialized successfully.") - else: - logger.error( - "Failed to initialize Redis client. Check REDIS_HOST etc. in environment variables." - ) - except Exception as e: - logger.error(f"Failed to initialize Redis client: {e}", exc_info=True) - redis_client = None # Ensure redis_client exists even on failure + if os.getenv("MEMSCHEDULER_USE_REDIS_QUEUE", "False").lower() == "true": + try: + from memos.mem_scheduler.orm_modules.api_redis_model import APIRedisDBManager + + redis_client = APIRedisDBManager.load_redis_engine_from_env() + if redis_client: + logger.info("Redis client initialized successfully.") + else: + logger.error( + "Failed to initialize Redis client. Check REDIS_HOST etc. in environment variables." + ) + except Exception as e: + logger.error(f"Failed to initialize Redis client: {e}", exc_info=True) + redis_client = None # Ensure redis_client exists even on failure + else: + redis_client = None # Get default cube configuration default_cube_config = APIConfig.get_default_cube_config() diff --git a/src/memos/mem_scheduler/optimized_scheduler.py b/src/memos/mem_scheduler/optimized_scheduler.py index 7007f8418..fa8c36ac6 100644 --- a/src/memos/mem_scheduler/optimized_scheduler.py +++ b/src/memos/mem_scheduler/optimized_scheduler.py @@ -11,6 +11,11 @@ from memos.mem_cube.navie import NaiveMemCube from memos.mem_scheduler.general_modules.api_misc import SchedulerAPIModule from memos.mem_scheduler.general_scheduler import GeneralScheduler +from memos.mem_scheduler.schemas.api_schemas import ( + APIMemoryHistoryEntryItem, + APISearchHistoryManager, + TaskRunningStatus, +) from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.schemas.task_schemas import ( API_MIX_SEARCH_TASK_LABEL, @@ -42,11 +47,16 @@ def __init__(self, config: GeneralSchedulerConfig): self.history_memory_turns = int(os.getenv("API_SEARCH_HISTORY_TURNS", 5)) self.session_counter = OrderedDict() self.max_session_history = 5 + self.local_history_manager: dict[str, APISearchHistoryManager] = {} + + if self.config.use_redis_queue: + self.api_module = SchedulerAPIModule( + window_size=self.window_size, + history_memory_turns=self.history_memory_turns, + ) + else: + self.api_module = None - self.api_module = SchedulerAPIModule( - window_size=self.window_size, - history_memory_turns=self.history_memory_turns, - ) self.register_handlers( { API_MIX_SEARCH_TASK_LABEL: self._api_mix_search_message_consumer, @@ -134,6 +144,19 @@ def mix_search_memories( f"Mix searching memories for user {search_req.user_id} with query: {search_req.query}" ) + if not self.config.use_redis_queue: + logger.warning("Redis queue is not enabled, falling back to fast search.") + memories = self.search_memories( + search_req=search_req, + user_context=user_context, + mem_cube=self.mem_cube, + mode=SearchMode.FAST, + ) + return [ + format_textual_memory_item(item, include_embedding=search_req.dedup == "sim") + for item in memories + ] + # Get mem_cube for fast search target_session_id = search_req.session_id if not target_session_id: @@ -164,11 +187,22 @@ def mix_search_memories( ) # Try to get pre-computed memories if available - history_memories = self.api_module.get_history_memories( - user_id=search_req.user_id, - mem_cube_id=user_context.mem_cube_id, - turns=self.history_memory_turns, - ) + if self.api_module: + history_memories = self.api_module.get_history_memories( + user_id=search_req.user_id, + mem_cube_id=user_context.mem_cube_id, + turns=self.history_memory_turns, + ) + else: + # Use local list + key = f"search_history:{search_req.user_id}:{user_context.mem_cube_id}" + if key in self.local_history_manager: + history_memories = self.local_history_manager[key].get_history_memories( + turns=self.history_memory_turns + ) + else: + history_memories = [] + logger.info(f"Found {len(history_memories)} history memories.") # if history memories can directly answer @@ -247,17 +281,65 @@ def update_search_memories_to_redis( ] formatted_memories = memories_to_store["formatted_memories"] - # Sync search data to Redis - self.api_module.sync_search_data( - item_id=msg.item_id, - user_id=search_req["user_id"], - mem_cube_id=user_context["mem_cube_id"], - query=search_req["query"], - memories=memories, - formatted_memories=formatted_memories, - session_id=session_id, - conversation_turn=session_turn, - ) + # Sync search data + if self.api_module: + self.api_module.sync_search_data( + item_id=msg.item_id, + user_id=search_req["user_id"], + mem_cube_id=user_context["mem_cube_id"], + query=search_req["query"], + memories=memories, + formatted_memories=formatted_memories, + session_id=session_id, + conversation_turn=session_turn, + ) + else: + # Local sync + user_id = search_req["user_id"] + mem_cube_id = user_context["mem_cube_id"] + key = f"search_history:{user_id}:{mem_cube_id}" + + if key not in self.local_history_manager: + self.local_history_manager[key] = APISearchHistoryManager( + window_size=self.window_size + ) + + search_history = self.local_history_manager[key] + + # Update existing entry or add new + success = search_history.update_entry_by_item_id( + item_id=msg.item_id, + query=search_req["query"], + formatted_memories=formatted_memories, + task_status=TaskRunningStatus.COMPLETED, + session_id=session_id, + memories=memories, + ) + + if not success: + # Add new + entry_item = APIMemoryHistoryEntryItem( + item_id=msg.item_id, + query=search_req["query"], + formatted_memories=formatted_memories, + memories=memories, + task_status=TaskRunningStatus.COMPLETED, + session_id=session_id, + conversation_turn=session_turn, + ) + search_history.completed_entries.append(entry_item) + + # Sort by created_time to ensure chronological order + search_history.completed_entries.sort(key=lambda x: x.created_time) + + # Maintain window size + if len(search_history.completed_entries) > search_history.window_size: + search_history.completed_entries = search_history.completed_entries[ + -search_history.window_size : + ] + + if msg.item_id in search_history.running_item_ids: + search_history.running_item_ids.remove(msg.item_id) def _api_mix_search_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: """ diff --git a/src/memos/mem_scheduler/orm_modules/api_redis_model.py b/src/memos/mem_scheduler/orm_modules/api_redis_model.py index 04cd7e833..546dcc956 100644 --- a/src/memos/mem_scheduler/orm_modules/api_redis_model.py +++ b/src/memos/mem_scheduler/orm_modules/api_redis_model.py @@ -428,7 +428,8 @@ def load_redis_engine_from_env(env_file_path: str | None = None) -> Any: logger.info(f"Loaded environment variables from {env_file_path}") else: logger.warning( - f"Environment file not found: {env_file_path}, using current environment variables" + f"Environment file not found: {env_file_path}, using current environment variables", + stack_info=True, ) else: logger.info("Using current environment variables (no env_file_path provided)") @@ -513,5 +514,5 @@ def load_redis_engine_from_env(env_file_path: str | None = None) -> Any: except Exception as e: error_msg = f"Failed to create Redis connection from environment variables: {e}" - logger.error(error_msg) + logger.error(error_msg, stack_info=True) raise DatabaseError(error_msg) from e From 79fdc38788d1fbef559175438dbe2ea7cc632553 Mon Sep 17 00:00:00 2001 From: chentang Date: Tue, 20 Jan 2026 15:52:50 +0800 Subject: [PATCH 2/5] fix: memories without sources will throw an error --- src/memos/api/handlers/formatters_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/memos/api/handlers/formatters_handler.py b/src/memos/api/handlers/formatters_handler.py index 29e376d33..6e1d9d1b6 100644 --- a/src/memos/api/handlers/formatters_handler.py +++ b/src/memos/api/handlers/formatters_handler.py @@ -144,7 +144,7 @@ def separate_knowledge_and_conversation_mem(memories: list[dict[str, Any]]): knowledge_mem = [] conversation_mem = [] for item in memories: - sources = item["metadata"]["sources"] + sources = item.get("metadata", {}).get("sources", []) if ( len(sources) > 0 and "type" in sources[0] @@ -199,7 +199,7 @@ def rerank_knowledge_mem( item["metadata"]["sources"] = [] for item in conversation_mem: - item["metadata"]["sources"] = [] + item.setdefault("metadata", {})["sources"] = [] # deduplicate: remove items with duplicate memory content original_count = len(reranked_knowledge_mem) From d84a61460bfb5f200c5f951b16dbafcfb5a390a7 Mon Sep 17 00:00:00 2001 From: chentang Date: Tue, 20 Jan 2026 16:39:31 +0800 Subject: [PATCH 3/5] refactor: limit consumer side of task queueing --- src/memos/mem_scheduler/base_scheduler.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 4c9310cbb..5ab524128 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -964,6 +964,14 @@ def _message_consumer(self) -> None: # Original local queue logic while self._running: # Use a running flag for graceful shutdown try: + # Check dispatcher thread pool status to avoid overloading + if self.enable_parallel_dispatch and self.dispatcher: + running_tasks = self.dispatcher.get_running_task_count() + if running_tasks >= self.dispatcher.max_workers: + # Thread pool is full, wait and retry + time.sleep(self._consume_interval) + continue + # Get messages in batches based on consume_batch setting messages = self.memos_message_queue.get_messages(batch_size=self.consume_batch) From d960c754083f49c49a867e647d2f43a2f9bd6b5c Mon Sep 17 00:00:00 2001 From: chentang Date: Tue, 20 Jan 2026 17:30:22 +0800 Subject: [PATCH 4/5] fix: revise search priority and serach filter for mix search --- src/memos/mem_scheduler/optimized_scheduler.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/memos/mem_scheduler/optimized_scheduler.py b/src/memos/mem_scheduler/optimized_scheduler.py index fa8c36ac6..0861d8079 100644 --- a/src/memos/mem_scheduler/optimized_scheduler.py +++ b/src/memos/mem_scheduler/optimized_scheduler.py @@ -114,7 +114,9 @@ def search_memories( target_session_id = search_req.session_id if not target_session_id: target_session_id = "default_session" - search_filter = {"session_id": search_req.session_id} if search_req.session_id else None + + search_priority = {"session_id": search_req.session_id} if search_req.session_id else None + search_filter = search_req.filter # Create MemCube and perform search search_results = mem_cube.text_mem.search( @@ -124,6 +126,7 @@ def search_memories( mode=mode, manual_close_internet=not search_req.internet_search, search_filter=search_filter, + search_priority=search_priority, info={ "user_id": search_req.user_id, "session_id": target_session_id, @@ -145,7 +148,10 @@ def mix_search_memories( ) if not self.config.use_redis_queue: - logger.warning("Redis queue is not enabled, falling back to fast search.") + logger.warning( + "Redis queue is not enabled. Running in degraded mode: " + "FAST search only, no history memory reranking, no async updates." + ) memories = self.search_memories( search_req=search_req, user_context=user_context, From 240ee3a6d8d4e355916e9fc1de38cb3e652bd318 Mon Sep 17 00:00:00 2001 From: chentang Date: Tue, 20 Jan 2026 19:55:32 +0800 Subject: [PATCH 5/5] fix: remove local history manager, and make mixture search degrade to fast search when api_module is None --- .../mem_scheduler/optimized_scheduler.py | 98 +++---------------- 1 file changed, 16 insertions(+), 82 deletions(-) diff --git a/src/memos/mem_scheduler/optimized_scheduler.py b/src/memos/mem_scheduler/optimized_scheduler.py index 0861d8079..497d19ac6 100644 --- a/src/memos/mem_scheduler/optimized_scheduler.py +++ b/src/memos/mem_scheduler/optimized_scheduler.py @@ -11,11 +11,6 @@ from memos.mem_cube.navie import NaiveMemCube from memos.mem_scheduler.general_modules.api_misc import SchedulerAPIModule from memos.mem_scheduler.general_scheduler import GeneralScheduler -from memos.mem_scheduler.schemas.api_schemas import ( - APIMemoryHistoryEntryItem, - APISearchHistoryManager, - TaskRunningStatus, -) from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.schemas.task_schemas import ( API_MIX_SEARCH_TASK_LABEL, @@ -47,7 +42,6 @@ def __init__(self, config: GeneralSchedulerConfig): self.history_memory_turns = int(os.getenv("API_SEARCH_HISTORY_TURNS", 5)) self.session_counter = OrderedDict() self.max_session_history = 5 - self.local_history_manager: dict[str, APISearchHistoryManager] = {} if self.config.use_redis_queue: self.api_module = SchedulerAPIModule( @@ -114,7 +108,6 @@ def search_memories( target_session_id = search_req.session_id if not target_session_id: target_session_id = "default_session" - search_priority = {"session_id": search_req.session_id} if search_req.session_id else None search_filter = search_req.filter @@ -193,22 +186,11 @@ def mix_search_memories( ) # Try to get pre-computed memories if available - if self.api_module: - history_memories = self.api_module.get_history_memories( - user_id=search_req.user_id, - mem_cube_id=user_context.mem_cube_id, - turns=self.history_memory_turns, - ) - else: - # Use local list - key = f"search_history:{search_req.user_id}:{user_context.mem_cube_id}" - if key in self.local_history_manager: - history_memories = self.local_history_manager[key].get_history_memories( - turns=self.history_memory_turns - ) - else: - history_memories = [] - + history_memories = self.api_module.get_history_memories( + user_id=search_req.user_id, + mem_cube_id=user_context.mem_cube_id, + turns=self.history_memory_turns, + ) logger.info(f"Found {len(history_memories)} history memories.") # if history memories can directly answer @@ -287,65 +269,17 @@ def update_search_memories_to_redis( ] formatted_memories = memories_to_store["formatted_memories"] - # Sync search data - if self.api_module: - self.api_module.sync_search_data( - item_id=msg.item_id, - user_id=search_req["user_id"], - mem_cube_id=user_context["mem_cube_id"], - query=search_req["query"], - memories=memories, - formatted_memories=formatted_memories, - session_id=session_id, - conversation_turn=session_turn, - ) - else: - # Local sync - user_id = search_req["user_id"] - mem_cube_id = user_context["mem_cube_id"] - key = f"search_history:{user_id}:{mem_cube_id}" - - if key not in self.local_history_manager: - self.local_history_manager[key] = APISearchHistoryManager( - window_size=self.window_size - ) - - search_history = self.local_history_manager[key] - - # Update existing entry or add new - success = search_history.update_entry_by_item_id( - item_id=msg.item_id, - query=search_req["query"], - formatted_memories=formatted_memories, - task_status=TaskRunningStatus.COMPLETED, - session_id=session_id, - memories=memories, - ) - - if not success: - # Add new - entry_item = APIMemoryHistoryEntryItem( - item_id=msg.item_id, - query=search_req["query"], - formatted_memories=formatted_memories, - memories=memories, - task_status=TaskRunningStatus.COMPLETED, - session_id=session_id, - conversation_turn=session_turn, - ) - search_history.completed_entries.append(entry_item) - - # Sort by created_time to ensure chronological order - search_history.completed_entries.sort(key=lambda x: x.created_time) - - # Maintain window size - if len(search_history.completed_entries) > search_history.window_size: - search_history.completed_entries = search_history.completed_entries[ - -search_history.window_size : - ] - - if msg.item_id in search_history.running_item_ids: - search_history.running_item_ids.remove(msg.item_id) + # Sync search data to Redis + self.api_module.sync_search_data( + item_id=msg.item_id, + user_id=search_req["user_id"], + mem_cube_id=user_context["mem_cube_id"], + query=search_req["query"], + memories=memories, + formatted_memories=formatted_memories, + session_id=session_id, + conversation_turn=session_turn, + ) def _api_mix_search_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: """