Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions src/memos/api/handlers/component_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,22 @@ def init_server() -> dict[str, Any]:
logger.info("Initializing MemOS server components...")

# Initialize Redis client first as it is a core dependency for features like scheduler status tracking
try:
from memos.mem_scheduler.orm_modules.api_redis_model import APIRedisDBManager

redis_client = APIRedisDBManager.load_redis_engine_from_env()
if redis_client:
logger.info("Redis client initialized successfully.")
else:
logger.error(
"Failed to initialize Redis client. Check REDIS_HOST etc. in environment variables."
)
except Exception as e:
logger.error(f"Failed to initialize Redis client: {e}", exc_info=True)
redis_client = None # Ensure redis_client exists even on failure
if os.getenv("MEMSCHEDULER_USE_REDIS_QUEUE", "False").lower() == "true":
try:
from memos.mem_scheduler.orm_modules.api_redis_model import APIRedisDBManager

redis_client = APIRedisDBManager.load_redis_engine_from_env()
if redis_client:
logger.info("Redis client initialized successfully.")
else:
logger.error(
"Failed to initialize Redis client. Check REDIS_HOST etc. in environment variables."
)
except Exception as e:
logger.error(f"Failed to initialize Redis client: {e}", exc_info=True)
redis_client = None # Ensure redis_client exists even on failure
else:
redis_client = None

# Get default cube configuration
default_cube_config = APIConfig.get_default_cube_config()
Expand Down
4 changes: 2 additions & 2 deletions src/memos/api/handlers/formatters_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def separate_knowledge_and_conversation_mem(memories: list[dict[str, Any]]):
knowledge_mem = []
conversation_mem = []
for item in memories:
sources = item["metadata"]["sources"]
sources = item.get("metadata", {}).get("sources", [])
if (
len(sources) > 0
and "type" in sources[0]
Expand Down Expand Up @@ -199,7 +199,7 @@ def rerank_knowledge_mem(
item["metadata"]["sources"] = []

for item in conversation_mem:
item["metadata"]["sources"] = []
item.setdefault("metadata", {})["sources"] = []

# deduplicate: remove items with duplicate memory content
original_count = len(reranked_knowledge_mem)
Expand Down
8 changes: 8 additions & 0 deletions src/memos/mem_scheduler/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,14 @@ def _message_consumer(self) -> None:
# Original local queue logic
while self._running: # Use a running flag for graceful shutdown
try:
# Check dispatcher thread pool status to avoid overloading
if self.enable_parallel_dispatch and self.dispatcher:
running_tasks = self.dispatcher.get_running_task_count()
if running_tasks >= self.dispatcher.max_workers:
# Thread pool is full, wait and retry
time.sleep(self._consume_interval)
continue

# Get messages in batches based on consume_batch setting

messages = self.memos_message_queue.get_messages(batch_size=self.consume_batch)
Expand Down
32 changes: 27 additions & 5 deletions src/memos/mem_scheduler/optimized_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ def __init__(self, config: GeneralSchedulerConfig):
self.session_counter = OrderedDict()
self.max_session_history = 5

self.api_module = SchedulerAPIModule(
window_size=self.window_size,
history_memory_turns=self.history_memory_turns,
)
if self.config.use_redis_queue:
self.api_module = SchedulerAPIModule(
window_size=self.window_size,
history_memory_turns=self.history_memory_turns,
)
else:
self.api_module = None

self.register_handlers(
{
API_MIX_SEARCH_TASK_LABEL: self._api_mix_search_message_consumer,
Expand Down Expand Up @@ -104,7 +108,8 @@ def search_memories(
target_session_id = search_req.session_id
if not target_session_id:
target_session_id = "default_session"
search_filter = {"session_id": search_req.session_id} if search_req.session_id else None
search_priority = {"session_id": search_req.session_id} if search_req.session_id else None
search_filter = search_req.filter

# Create MemCube and perform search
search_results = mem_cube.text_mem.search(
Expand All @@ -114,6 +119,7 @@ def search_memories(
mode=mode,
manual_close_internet=not search_req.internet_search,
search_filter=search_filter,
search_priority=search_priority,
info={
"user_id": search_req.user_id,
"session_id": target_session_id,
Expand All @@ -134,6 +140,22 @@ def mix_search_memories(
f"Mix searching memories for user {search_req.user_id} with query: {search_req.query}"
)

if not self.config.use_redis_queue:
logger.warning(
"Redis queue is not enabled. Running in degraded mode: "
"FAST search only, no history memory reranking, no async updates."
)
memories = self.search_memories(
search_req=search_req,
user_context=user_context,
mem_cube=self.mem_cube,
mode=SearchMode.FAST,
)
return [
format_textual_memory_item(item, include_embedding=search_req.dedup == "sim")
for item in memories
]

# Get mem_cube for fast search
target_session_id = search_req.session_id
if not target_session_id:
Expand Down
5 changes: 3 additions & 2 deletions src/memos/mem_scheduler/orm_modules/api_redis_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,8 @@ def load_redis_engine_from_env(env_file_path: str | None = None) -> Any:
logger.info(f"Loaded environment variables from {env_file_path}")
else:
logger.warning(
f"Environment file not found: {env_file_path}, using current environment variables"
f"Environment file not found: {env_file_path}, using current environment variables",
stack_info=True,
)
else:
logger.info("Using current environment variables (no env_file_path provided)")
Expand Down Expand Up @@ -513,5 +514,5 @@ def load_redis_engine_from_env(env_file_path: str | None = None) -> Any:

except Exception as e:
error_msg = f"Failed to create Redis connection from environment variables: {e}"
logger.error(error_msg)
logger.error(error_msg, stack_info=True)
raise DatabaseError(error_msg) from e