From 93def1bded2b6cf919848af45883758e396a42a7 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 7 Jan 2026 19:21:22 +0800 Subject: [PATCH 1/3] Use env exchange name/type overrides for scheduler --- .../webservice_modules/rabbitmq_service.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index 46b2ad3d1..96dec731a 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -14,7 +14,6 @@ from memos.mem_scheduler.general_modules.base import BaseSchedulerModule from memos.mem_scheduler.general_modules.misc import AutoDroppingQueue from memos.mem_scheduler.schemas.general_schemas import DIRECT_EXCHANGE_TYPE, FANOUT_EXCHANGE_TYPE -from memos.mem_scheduler.utils.misc_utils import is_cloud_env logger = get_logger(__name__) @@ -132,6 +131,19 @@ def initialize_rabbitmq( self.rabbitmq_exchange_type = self.rabbitmq_config.exchange_type logger.info(f"Using configured exchange type: {self.rabbitmq_exchange_type}") + env_exchange_name = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") + env_exchange_type = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_TYPE") + if env_exchange_name: + self.rabbitmq_exchange_name = env_exchange_name + logger.info( + f"Using env exchange name override: {self.rabbitmq_exchange_name}" + ) + if env_exchange_type: + self.rabbitmq_exchange_type = env_exchange_type + logger.info( + f"Using env exchange type override: {self.rabbitmq_exchange_type}" + ) + # Start connection process parameters = self.get_rabbitmq_connection_param() self.rabbitmq_connection = SelectConnection( @@ -313,15 +325,12 @@ def rabbitmq_publish_message(self, message: dict): if label == "knowledgeBaseUpdate": routing_key = "" - # Cloud environment override: applies to specific message types if MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME is set + # Env override: apply to all message types when MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME is set env_exchange_name = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") - if is_cloud_env() and env_exchange_name and label in ["taskStatus", "knowledgeBaseUpdate"]: + if env_exchange_name: exchange_name = env_exchange_name - routing_key = "" # Routing key is always empty in cloud environment for these types - - # Specific diagnostic logging for messages affected by cloud environment settings logger.info( - f"[DIAGNOSTIC] Publishing {label} message in Cloud Env. " + f"[DIAGNOSTIC] Publishing {label} message with env exchange override. " f"Exchange: {exchange_name}, Routing Key: '{routing_key}'." ) logger.info(f" - Message Content: {json.dumps(message, indent=2, ensure_ascii=False)}") From 91fcc43feac317025211d0c339f74fad4ba3cd8f Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 7 Jan 2026 19:24:13 +0800 Subject: [PATCH 2/3] Default routing key to empty with env exchange override --- .../mem_scheduler/webservice_modules/rabbitmq_service.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index 96dec731a..b648a21f2 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -327,8 +327,12 @@ def rabbitmq_publish_message(self, message: dict): # Env override: apply to all message types when MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME is set env_exchange_name = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") + env_routing_key = os.getenv("MEMSCHEDULER_RABBITMQ_ROUTING_KEY") if env_exchange_name: exchange_name = env_exchange_name + routing_key = ( + env_routing_key if env_routing_key is not None and env_routing_key != "" else "" + ) logger.info( f"[DIAGNOSTIC] Publishing {label} message with env exchange override. " f"Exchange: {exchange_name}, Routing Key: '{routing_key}'." From 3aeb0b3abe2dbb23a535fd292fef22f696e57ed2 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 7 Jan 2026 19:38:24 +0800 Subject: [PATCH 3/3] Format rabbitmq_service after env override changes --- .../mem_scheduler/webservice_modules/rabbitmq_service.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index b648a21f2..5a94d2af2 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -135,14 +135,10 @@ def initialize_rabbitmq( env_exchange_type = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_TYPE") if env_exchange_name: self.rabbitmq_exchange_name = env_exchange_name - logger.info( - f"Using env exchange name override: {self.rabbitmq_exchange_name}" - ) + logger.info(f"Using env exchange name override: {self.rabbitmq_exchange_name}") if env_exchange_type: self.rabbitmq_exchange_type = env_exchange_type - logger.info( - f"Using env exchange type override: {self.rabbitmq_exchange_type}" - ) + logger.info(f"Using env exchange type override: {self.rabbitmq_exchange_type}") # Start connection process parameters = self.get_rabbitmq_connection_param()