diff --git a/src/memos/api/handlers/component_init.py b/src/memos/api/handlers/component_init.py index 76af6decf..bfbd6271d 100644 --- a/src/memos/api/handlers/component_init.py +++ b/src/memos/api/handlers/component_init.py @@ -133,19 +133,22 @@ def init_server() -> dict[str, Any]: logger.info("Initializing MemOS server components...") # Initialize Redis client first as it is a core dependency for features like scheduler status tracking - try: - from memos.mem_scheduler.orm_modules.api_redis_model import APIRedisDBManager - - redis_client = APIRedisDBManager.load_redis_engine_from_env() - if redis_client: - logger.info("Redis client initialized successfully.") - else: - logger.error( - "Failed to initialize Redis client. Check REDIS_HOST etc. in environment variables." - ) - except Exception as e: - logger.error(f"Failed to initialize Redis client: {e}", exc_info=True) - redis_client = None # Ensure redis_client exists even on failure + if os.getenv("MEMSCHEDULER_USE_REDIS_QUEUE", "False").lower() == "true": + try: + from memos.mem_scheduler.orm_modules.api_redis_model import APIRedisDBManager + + redis_client = APIRedisDBManager.load_redis_engine_from_env() + if redis_client: + logger.info("Redis client initialized successfully.") + else: + logger.error( + "Failed to initialize Redis client. Check REDIS_HOST etc. in environment variables." + ) + except Exception as e: + logger.error(f"Failed to initialize Redis client: {e}", exc_info=True) + redis_client = None # Ensure redis_client exists even on failure + else: + redis_client = None # Get default cube configuration default_cube_config = APIConfig.get_default_cube_config() diff --git a/src/memos/api/handlers/formatters_handler.py b/src/memos/api/handlers/formatters_handler.py index 29e376d33..6e1d9d1b6 100644 --- a/src/memos/api/handlers/formatters_handler.py +++ b/src/memos/api/handlers/formatters_handler.py @@ -144,7 +144,7 @@ def separate_knowledge_and_conversation_mem(memories: list[dict[str, Any]]): knowledge_mem = [] conversation_mem = [] for item in memories: - sources = item["metadata"]["sources"] + sources = item.get("metadata", {}).get("sources", []) if ( len(sources) > 0 and "type" in sources[0] @@ -199,7 +199,7 @@ def rerank_knowledge_mem( item["metadata"]["sources"] = [] for item in conversation_mem: - item["metadata"]["sources"] = [] + item.setdefault("metadata", {})["sources"] = [] # deduplicate: remove items with duplicate memory content original_count = len(reranked_knowledge_mem) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 4c9310cbb..5ab524128 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -964,6 +964,14 @@ def _message_consumer(self) -> None: # Original local queue logic while self._running: # Use a running flag for graceful shutdown try: + # Check dispatcher thread pool status to avoid overloading + if self.enable_parallel_dispatch and self.dispatcher: + running_tasks = self.dispatcher.get_running_task_count() + if running_tasks >= self.dispatcher.max_workers: + # Thread pool is full, wait and retry + time.sleep(self._consume_interval) + continue + # Get messages in batches based on consume_batch setting messages = self.memos_message_queue.get_messages(batch_size=self.consume_batch) diff --git a/src/memos/mem_scheduler/optimized_scheduler.py b/src/memos/mem_scheduler/optimized_scheduler.py index 7007f8418..497d19ac6 100644 --- a/src/memos/mem_scheduler/optimized_scheduler.py +++ b/src/memos/mem_scheduler/optimized_scheduler.py @@ -43,10 +43,14 @@ def __init__(self, config: GeneralSchedulerConfig): self.session_counter = OrderedDict() self.max_session_history = 5 - self.api_module = SchedulerAPIModule( - window_size=self.window_size, - history_memory_turns=self.history_memory_turns, - ) + if self.config.use_redis_queue: + self.api_module = SchedulerAPIModule( + window_size=self.window_size, + history_memory_turns=self.history_memory_turns, + ) + else: + self.api_module = None + self.register_handlers( { API_MIX_SEARCH_TASK_LABEL: self._api_mix_search_message_consumer, @@ -104,7 +108,8 @@ def search_memories( target_session_id = search_req.session_id if not target_session_id: target_session_id = "default_session" - search_filter = {"session_id": search_req.session_id} if search_req.session_id else None + search_priority = {"session_id": search_req.session_id} if search_req.session_id else None + search_filter = search_req.filter # Create MemCube and perform search search_results = mem_cube.text_mem.search( @@ -114,6 +119,7 @@ def search_memories( mode=mode, manual_close_internet=not search_req.internet_search, search_filter=search_filter, + search_priority=search_priority, info={ "user_id": search_req.user_id, "session_id": target_session_id, @@ -134,6 +140,22 @@ def mix_search_memories( f"Mix searching memories for user {search_req.user_id} with query: {search_req.query}" ) + if not self.config.use_redis_queue: + logger.warning( + "Redis queue is not enabled. Running in degraded mode: " + "FAST search only, no history memory reranking, no async updates." + ) + memories = self.search_memories( + search_req=search_req, + user_context=user_context, + mem_cube=self.mem_cube, + mode=SearchMode.FAST, + ) + return [ + format_textual_memory_item(item, include_embedding=search_req.dedup == "sim") + for item in memories + ] + # Get mem_cube for fast search target_session_id = search_req.session_id if not target_session_id: diff --git a/src/memos/mem_scheduler/orm_modules/api_redis_model.py b/src/memos/mem_scheduler/orm_modules/api_redis_model.py index 04cd7e833..546dcc956 100644 --- a/src/memos/mem_scheduler/orm_modules/api_redis_model.py +++ b/src/memos/mem_scheduler/orm_modules/api_redis_model.py @@ -428,7 +428,8 @@ def load_redis_engine_from_env(env_file_path: str | None = None) -> Any: logger.info(f"Loaded environment variables from {env_file_path}") else: logger.warning( - f"Environment file not found: {env_file_path}, using current environment variables" + f"Environment file not found: {env_file_path}, using current environment variables", + stack_info=True, ) else: logger.info("Using current environment variables (no env_file_path provided)") @@ -513,5 +514,5 @@ def load_redis_engine_from_env(env_file_path: str | None = None) -> Any: except Exception as e: error_msg = f"Failed to create Redis connection from environment variables: {e}" - logger.error(error_msg) + logger.error(error_msg, stack_info=True) raise DatabaseError(error_msg) from e