Skip to content

refactor: (scheduler) modularize handlers and search pipelines#1004

Merged
tangg555 merged 15 commits intoMemTensor:dev-20260202-v2.0.5from
fancyboi999:refactor/scheduler-architecture
Feb 6, 2026
Merged

refactor: (scheduler) modularize handlers and search pipelines#1004
tangg555 merged 15 commits intoMemTensor:dev-20260202-v2.0.5from
fancyboi999:refactor/scheduler-architecture

Conversation

@fancyboi999
Copy link
Contributor

@fancyboi999 fancyboi999 commented Feb 3, 2026

Description

Summary:

  • Extract scheduler handlers into dedicated modules and register them via a registry.
  • Split retriever into search/enhance/rerank/filter pipelines for clearer responsibilities.
  • Centralize text search logic so API and scheduler share the same implementation.

Problem:

  • Scheduler logic was monolithic and tightly coupled, hard to test/extend.
  • Search logic duplicated between scheduler and API.

Approach:

  • Introduce mem_scheduler/handlers with DI-style context.
  • Introduce retriever pipelines and keep SchedulerRetriever as a facade.
  • Add memos/search/search_service.py for shared text search.

Dependencies:

  • None.

Related Issue (Required): Fixes #1003

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Refactor (does not change functionality, e.g. code style improvements, linting)
  • Documentation update

How Has This Been Tested?

  • Unit Test
  • Test Script Or Test Steps (please provide)
  • Pipeline Automated API Test (please provide)

Test steps:

  1. Start server: cd src && python -m uvicorn memos.api.server_api:app --host 0.0.0.0 --port 8001
  2. Test endpoints:
    • GET /product/scheduler/allstatus
    • POST /product/add
    • POST /product/search (fast / fine / mixture)
    • POST /product/scheduler/wait
    • GET /product/scheduler/task_queue_status

Test Results:

Endpoint Status
GET /product/scheduler/allstatus ✅ 200 OK
POST /product/add ✅ 200 OK, data persisted to Neo4j
GET /product/scheduler/task_queue_status ✅ 200 OK
POST /product/search (fast) ✅ 200 OK, returns correct memories
POST /product/search (fine) ✅ 200 OK
POST /product/search (mixture) ✅ 200 OK
POST /product/scheduler/wait ✅ Works as expected
Unit tests (pytest) ✅ 61 passed
Ruff lint checks ✅ All passed

Checklist

  • I have performed a self-review of my own code | 我已自行检查了自己的代码
  • I have commented my code in hard-to-understand areas | 我已在难以理解的地方对代码进行了注释
  • I have added tests that prove my fix is effective or that my feature works | 我已添加测试以证明我的修复有效或功能正常
  • I have created related documentation issue/PR in MemOS-Docs (if applicable) | 我已在 MemOS-Docs 中创建了相关的文档 issue/PR(如果适用)
  • I have linked the issue to this PR (if applicable) | 我已将 issue 链接到此 PR(如果适用)
  • I have mentioned the person who will review this PR | 我已提及将审查此 PR 的人

Reviewer Checklist

- extract scheduler handlers into dedicated modules

- split retriever into pipelines (search/enhance/rerank/filter)

- centralize text search logic for API and scheduler

Refs MemTensor#1003
Copilot AI review requested due to automatic review settings February 3, 2026 11:35
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the scheduler and retrieval stack to make handler logic modular, split retrieval into clearer pipelines, and centralize text-search behavior so that the API and scheduler share a single implementation.

Changes:

  • Introduces a shared search_service module and updates API/scheduler callers to use centralized text-memory search.
  • Refactors SchedulerRetriever into composable search/enhancement/rerank/filter pipeline classes and extracts corresponding mixins from BaseScheduler.
  • Modularizes scheduler handlers into dedicated classes wired via a SchedulerHandlerRegistry and DI-style SchedulerHandlerContext.

Reviewed changes

Copilot reviewed 27 out of 27 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
src/memos/search/search_service.py Adds SearchContext and search_text_memories to centralize text-memory search arguments and behavior.
src/memos/search/__init__.py Exposes the new search service types/functions as public API.
src/memos/multi_mem_cube/single_cube.py Replaces inline text search logic with calls to search_text_memories for fast search.
src/memos/mem_scheduler/optimized_scheduler.py Switches scheduler search paths to use centralized search functions and shared SearchContext.
src/memos/mem_scheduler/memory_manage_modules/search_pipeline.py Extracts the search step of the scheduler retriever into a standalone SearchPipeline.
src/memos/mem_scheduler/memory_manage_modules/retriever.py Refactors SchedulerRetriever into a façade composed of search/enhancement/rerank/filter pipelines.
src/memos/mem_scheduler/memory_manage_modules/rerank_pipeline.py Encapsulates LLM-based memory reranking and combined memory post-processing in RerankPipeline.
src/memos/mem_scheduler/memory_manage_modules/filter_pipeline.py Wraps MemoryFilter operations in a dedicated FilterPipeline.
src/memos/mem_scheduler/memory_manage_modules/enhancement_pipeline.py Moves enhancement and recall logic into EnhancementPipeline, including batching and parallelization.
src/memos/mem_scheduler/handlers/registry.py Adds SchedulerHandlerRegistry that instantiates handler classes and builds the label-to-handler dispatch map.
src/memos/mem_scheduler/handlers/query_handler.py Implements QueryMessageHandler to log user queries and re-label them to memory-update tasks.
src/memos/mem_scheduler/handlers/pref_add_handler.py Implements PrefAddMessageHandler to process preference-add messages concurrently and write to PreferenceTextMemory.
src/memos/mem_scheduler/handlers/memory_update_handler.py Extracts long-memory update flow and retrieval triggering logic into MemoryUpdateHandler.
src/memos/mem_scheduler/handlers/mem_reorganize_handler.py Adds MemReorganizeMessageHandler to process memory reorganization and emit merge logs.
src/memos/mem_scheduler/handlers/mem_read_handler.py Adds MemReadMessageHandler to drive mem-reader processing and downstream knowledge-base logs.
src/memos/mem_scheduler/handlers/feedback_handler.py Implements FeedbackMessageHandler to process feedback payloads and emit KB update logs in cloud environments.
src/memos/mem_scheduler/handlers/context.py Defines SchedulerHandlerContext and SchedulerHandlerServices to inject scheduler dependencies into handlers.
src/memos/mem_scheduler/handlers/base.py Introduces BaseSchedulerHandler to store the injected handler context.
src/memos/mem_scheduler/handlers/answer_handler.py Implements AnswerMessageHandler to log assistant responses as messages.
src/memos/mem_scheduler/handlers/add_handler.py Extracts add-memory logging (local vs cloud KB) into AddMessageHandler.
src/memos/mem_scheduler/handlers/__init__.py Re-exports handler context and registry for use by the scheduler.
src/memos/mem_scheduler/general_scheduler.py Simplifies GeneralScheduler to assemble handler context/registry and register handlers instead of inlined consumers; delegates memory ops to mixins.
src/memos/mem_scheduler/base_scheduler.py Refactors core scheduler behavioral methods into separate mixins and updates BaseScheduler inheritance accordingly.
src/memos/mem_scheduler/base_mixins/web_log_ops.py Moves web-log publishing and normalization logic into BaseSchedulerWebLogMixin.
src/memos/mem_scheduler/base_mixins/queue_ops.py Moves message submission, dispatch, consumer loop, and queue monitoring into BaseSchedulerQueueMixin.
src/memos/mem_scheduler/base_mixins/memory_ops.py Moves working/activation memory management and monitoring helpers into BaseSchedulerMemoryMixin.
src/memos/mem_scheduler/base_mixins/__init__.py Re-exports the new scheduler base mixins.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

- Fix TC001/TC002/TC003: move type-only imports into TYPE_CHECKING blocks
- Fix RUF059: prefix unused variables with underscore
- Fix typos: "Memorires" -> "Memories", "exeption" -> "exception"
- Remove self-assignment: `text_mem_base = text_mem_base`
- Remove unused `user_context` param from `build_search_context`
- Restore original `QUERY_TASK_LABEL` in activation memory update
- Apply ruff format to all modified files
@CaralHsi CaralHsi changed the base branch from main to dev-20260209-v2.0.6 February 4, 2026 02:53
- json-encode list/dict fields for Redis XADD

- decode chat_history safely when reading from streams
- avoid private _memory_update_consumer call

- delegate mem update handler to built-in handler
@fancyboi999
Copy link
Contributor Author

fancyboi999 commented Feb 4, 2026

更新说明(本次追加修复):

  1. 修复 Redis Stream xadd 不接受 list 的问题
  • 在 ScheduleMessageItem.to_dict() 中对 list/dict 做 JSON 序列化,避免 redis.exceptions.DataError
  • from_dict 里对 chat_history 做安全反序列化
  • 复测:examples/mem_scheduler/api_w_scheduler.py 可正常跑通(不再报 list 错误)
  1. 修复 examples 在重构后的调用方式
  • try_schedule_modules.py 改为调用公开 handler(MEM_UPDATE_TASK_LABEL)替代私有 _memory_update_consumer
  • memos_w_scheduler.py 代理到默认内置 handler,避免直接访问 mem_scheduler.retriever

fancyboi999 and others added 8 commits February 4, 2026 16:18
- apply X | Y form to satisfy UP038
- align datetime line breaks with ruff format
…r-stage2

This commit merges key modularization benefits and bug fixes from the
refactor-scheduler-stage2 branch into fancy-scheduler:
Modularize activation memory logic into ActivationMemoryManager
Introduce SchedulerSearchService for unified memory search coordination
Extract filtering and reranking logic into MemoryPostProcessor
Maintain intentional search scope of LongTermMemory and UserMemory in SearchPipeline and SchedulerSearchService
Update BaseScheduler to initialize and manage lifecycle of new modules
Refactor BaseSchedulerMemoryMixin to delegate tasks to specialized managers
Rename abbreviation 'ctx' to 'scheduler_context' in GeneralScheduler and SchedulerHandlerRegistry to improve code readability and clarity.
- Sync orchestrator config removal when unregistering handlers in dispatcher
- Fix missing TaskPriorityLevel import in dispatcher
- Fix register_handlers signature in BaseSchedulerQueueMixin
- Fix handler registry imports and initialization map
- Fix relative imports in handlers package
- prefix unused unpacked vars with underscore

- apply ruff format changes
@CaralHsi CaralHsi changed the title refactor(scheduler): modularize handlers and search pipelines refactor: (scheduler) modularize handlers and search pipelines Feb 6, 2026
@CaralHsi CaralHsi changed the base branch from dev-20260209-v2.0.6 to dev-20260202-v2.0.5 February 6, 2026 06:22
@CaralHsi CaralHsi changed the base branch from dev-20260202-v2.0.5 to dev-20260209-v2.0.6 February 6, 2026 06:26
@CaralHsi CaralHsi changed the base branch from dev-20260209-v2.0.6 to dev-20260202-v2.0.5 February 6, 2026 06:29
glin1993@outlook.com and others added 3 commits February 6, 2026 14:40
- Update MemReadMessageHandler to extract user_context from message and pass it to _process_memories_with_reader and transfer_mem.
- Update PrefAddMessageHandler to extract user_context from message and pass it to pref_mem.add.
- This ensures user context information is available during memory reading and preference adding operations.
@tangg555 tangg555 merged commit 5e17c52 into MemTensor:dev-20260202-v2.0.5 Feb 6, 2026
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

fix: Scheduler architecture is monolithic and tightly coupled (hard to test/extend)

2 participants