diff --git a/content/en/open_source/modules/mem_chat.md b/content/en/open_source/modules/mem_chat.md new file mode 100644 index 00000000..25ba6012 --- /dev/null +++ b/content/en/open_source/modules/mem_chat.md @@ -0,0 +1,180 @@ +--- +title: MemChat +desc: MemChat is your "memory diplomat". It coordinates user input, memory retrieval, and LLM generation to create coherent conversations with long-term memory. +--- + +## 1. Introduction + +**MemChat** is the conversation control center of MemOS. + +It is not just a chat interface, but a bridge connecting "instant conversation" and "long-term memory". During interactions with users, MemChat is responsible for real-time retrieval of relevant background information from MemCube (Memory Cube), building context, and crystallizing new conversation content into new memories. With it, your Agent is no longer "goldfish memory", but a truly intelligent companion that can understand the past and continuously grow. + +--- + +## 2. Core Capabilities + +### Memory-Augmented Chat +Before answering user questions, MemChat automatically retrieves relevant Textual Memory from MemCube and injects it into the Prompt. This enables the Agent to answer questions based on past interaction history or knowledge bases, rather than relying solely on the LLM's pre-trained knowledge. + +### Auto-Memorization +After conversation, MemChat uses Extractor LLM to automatically extract valuable information from the conversation flow (such as user preferences, factual knowledge) and store it in MemCube. The entire process is fully automated without manual user intervention. + +### Context Management +Automatically manages conversation history window (`max_turns_window`). When conversations become too long, it intelligently trims old context while relying on retrieved long-term memory to maintain conversation coherence, effectively solving the LLM Context Window limitation problem. + +### Flexible Configuration +Supports configurable toggles for different types of memory (textual memory, activation memory, etc.) to adapt to different application scenarios. + +--- + +## 3. Code Structure + +Core logic is located under `memos/src/memos/mem_chat/`. + +* **`simple.py`**: **Default implementation (SimpleMemChat)**. This is an out-of-the-box REPL (Read-Eval-Print Loop) implementation containing complete "retrieve -> generate -> store" loop logic. +* **`base.py`**: **Interface definition (BaseMemChat)**. Defines the basic behavior of MemChat, such as `run()` and `mem_cube` properties. +* **`factory.py`**: **Factory class**. Responsible for instantiating concrete MemChat objects based on configuration (`MemChatConfig`). + +--- + +## 4. Key Interface + +The main interaction entry point is the `MemChat` class (typically created by `MemChatFactory`). + +### 4.1 Initialization +You need to first create a configuration object, then create an instance through the factory method. After creation, you must mount the `MemCube` instance to `mem_chat.mem_cube`. + +### 4.2 `run()` +Starts an interactive command-line conversation loop. Suitable for development and debugging, it handles user input, calls memory retrieval, generates replies, and prints output. + +### 4.3 Properties +* **`mem_cube`**: Associated MemCube object. MemChat reads and writes memories through it. +* **`chat_llm`**: LLM instance used to generate replies. + +--- + +## 5. Workflow + +A typical conversation round in MemChat includes the following steps: + +1. **Receive Input**: Get user text input. +2. **Memory Recall**: (If `enable_textual_memory` is enabled) Use user input as Query to retrieve Top-K relevant memories from `mem_cube.text_mem`. +3. **Prompt Construction**: Concatenate system prompt, retrieved memories, and recent conversation history into a complete Prompt. +4. **Generate Response**: Call `chat_llm` to generate a reply. +5. **Memorization**: (If `enable_textual_memory` is enabled) Send this round's conversation (User + Assistant) to `mem_cube`'s extractor, extract new memories and store them in the database. + +--- + +## 6. Development Example + +Below is a complete code example showing how to configure MemChat and mount a MemCube based on Qdrant and OpenAI. + +### 6.1 Code Implementation + +```python +import os +import sys + +# Ensure src module can be imported +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../src"))) + +from memos.configs.mem_chat import MemChatConfigFactory +from memos.configs.mem_cube import GeneralMemCubeConfig +from memos.mem_chat.factory import MemChatFactory +from memos.mem_cube.general import GeneralMemCube + +def get_mem_chat_config() -> MemChatConfigFactory: + """Generate MemChat configuration""" + return MemChatConfigFactory.model_validate( + { + "backend": "simple", + "config": { + "user_id": "user_123", + "chat_llm": { + "backend": "openai", + "config": { + "model_name_or_path": os.getenv("MOS_CHAT_MODEL", "gpt-4o"), + "temperature": 0.8, + "max_tokens": 1024, + "api_key": os.getenv("OPENAI_API_KEY"), + "api_base": os.getenv("OPENAI_API_BASE"), + }, + }, + "max_turns_window": 20, + "top_k": 5, + "enable_textual_memory": True, # Enable explicit memory + }, + } + ) + +def get_mem_cube_config() -> GeneralMemCubeConfig: + """Generate MemCube configuration""" + return GeneralMemCubeConfig.model_validate( + { + "user_id": "user03alice", + "cube_id": "user03alice/mem_cube_tree", + "text_mem": { + "backend": "general_text", + "config": { + "cube_id": "user03alice/mem_cube_general", + "extractor_llm": { + "backend": "openai", + "config": { + "model_name_or_path": os.getenv("MOS_CHAT_MODEL", "gpt-4o"), + "api_key": os.getenv("OPENAI_API_KEY"), + "api_base": os.getenv("OPENAI_API_BASE"), + }, + }, + "vector_db": { + "backend": "qdrant", + "config": { + "collection_name": "user03alice_mem_cube_general", + "vector_dimension": 1024, + }, + }, + "embedder": { + "backend": os.getenv("MOS_EMBEDDER_BACKEND", "universal_api"), + "config": { + "provider": "openai", + "api_key": os.getenv("MOS_EMBEDDER_API_KEY", "EMPTY"), + "model_name_or_path": os.getenv("MOS_EMBEDDER_MODEL", "bge-m3"), + "base_url": os.getenv("MOS_EMBEDDER_API_BASE"), + }, + }, + }, + }, + } + ) + +def main(): + print("Initializing MemChat...") + mem_chat = MemChatFactory.from_config(get_mem_chat_config()) + + print("Initializing MemCube...") + mem_cube = GeneralMemCube(get_mem_cube_config()) + + # Critical step: mount the memory cube + mem_chat.mem_cube = mem_cube + + print("Starting Chat Session...") + try: + mem_chat.run() + finally: + print("Saving memory cube...") + mem_chat.mem_cube.dump("new_cube_path") + +if __name__ == "__main__": + main() +``` + +--- + +## 7. Configuration Description + +When configuring `MemChatConfigFactory`, the following parameters are crucial: + +* **`user_id`**: Required. Used to identify the current user in the conversation, ensuring memory isolation. +* **`chat_llm`**: Chat model configuration. Recommend using a capable model (such as GPT-4o) for better reply quality and instruction-following ability. +* **`enable_textual_memory`**: `True` / `False`. Whether to enable textual memory. If enabled, the system will perform retrieval before conversation and storage after conversation. +* **`max_turns_window`**: Integer. Number of conversation turns to retain in history. History beyond this limit will be truncated, relying on long-term memory to supplement context. +* **`top_k`**: Integer. How many most relevant memory fragments to retrieve from the memory library and inject into the Prompt each time. \ No newline at end of file diff --git a/content/en/open_source/modules/mem_cube.md b/content/en/open_source/modules/mem_cube.md index 12544356..beb27e4e 100644 --- a/content/en/open_source/modules/mem_cube.md +++ b/content/en/open_source/modules/mem_cube.md @@ -1,23 +1,26 @@ --- -title: MemCube Overview -desc: "`MemCube` is the core organizational unit in MemOS, designed to encapsulate and manage all types of memory for a user or agent. It provides a unified interface for loading, saving, and operating on multiple memory modules, making it easy to build, share, and deploy memory-augmented applications." +title: MemCube +desc: "`MemCube` is your memory container that manages three types of memories: textual memory, activation memory, and parametric memory. It provides a simple interface for loading, saving, and operating on multiple memory modules, making it easy to build, save, and share memory-augmented applications." --- -## What is a MemCube? -A **MemCube** is a container that bundles three major types of memory: +## What is MemCube? -- **Textual Memory** (e.g., `GeneralTextMemory`, `TreeTextMemory`): For storing and retrieving unstructured or structured text knowledge. -- **Activation Memory** (e.g., `KVCacheMemory`): For storing key-value caches to accelerate LLM inference and context reuse. -- **Parametric Memory** (e.g., `LoRAMemory`): For storing model adaptation parameters (like LoRA weights). +**MemCube** contains three major types of memory: -Each memory type is independently configurable and can be swapped or extended as needed. +- **Textual Memory**: Stores text knowledge, supporting semantic search and knowledge management. +- **Activation Memory**: Stores intermediate reasoning results, accelerating LLM responses. +- **Parametric Memory**: Stores model adaptation weights, used for personalization. + +Each memory type can be independently configured and flexibly combined based on application needs. ## Structure -A MemCube is defined by a configuration (see `GeneralMemCubeConfig`), which specifies the backend and settings for each memory type. The typical structure is: +MemCube is defined by a configuration (see `GeneralMemCubeConfig`), which specifies the backend and settings for each memory type. The typical structure is: ``` MemCube + ├── user_id + ├── cube_id ├── text_mem: TextualMemory ├── act_mem: ActivationMemory └── para_mem: ParametricMemory @@ -35,7 +38,7 @@ Starting from MemOS 2.0, runtime operations (add/search) should go through the * ### SingleCubeView -Operates on a single MemCube. Use when you have one logical memory space. +Use this to manage a single MemCube. When you only need one memory space. ```python from memos.multi_mem_cube.single_cube import SingleCubeView @@ -59,7 +62,7 @@ view.search_memories(search_request) ### CompositeCubeView -Operates on multiple MemCubes. Fan-out operations to multiple SingleCubeViews and aggregate results. +Use this to manage multiple MemCubes. When you need unified operations across multiple memory spaces. ```python from memos.multi_mem_cube.composite_cube import CompositeCubeView @@ -76,15 +79,19 @@ results = composite.search_memories(search_request) # Results contain cube_id field to identify source ``` -### API Request Fields +## API Request Fields + +When using the View architecture for add/search operations, specify these parameters: + +| Field | Type | Description | +| :--- | :--- | :--- | +| `writable_cube_ids` | `list[str]` | Target cubes for add operations. Can specify multiple; the system will write to all targets in parallel. | +| `readable_cube_ids` | `list[str]` | Target cubes for search operations. Can search across multiple cubes; results include source information. | +| `async_mode` | `str` | Execution mode: `"sync"` for synchronous processing (wait for results), `"async"` for asynchronous processing (push to background queue, return task ID immediately). | -| Field | Description | -| --------------------- | ------------------------------------------------------------------ | -| `writable_cube_ids` | Target cubes for add operations | -| `readable_cube_ids` | Target cubes for search operations | -| `async_mode` | `"async"` (scheduler enabled) or `"sync"` (scheduler disabled) | +## Core Methods (`GeneralMemCube`) -## API Summary (`GeneralMemCube`) +**GeneralMemCube** is the standard implementation of MemCube, managing all system memories through a unified interface. Here are the main methods to complete memory lifecycle management. ### Initialization @@ -95,23 +102,23 @@ mem_cube = GeneralMemCube(config) ### Static Data Operations -| Method | Description | -| ----------------------------------------- | --------------------------------------------------------- | -| `init_from_dir(dir)` | Load a MemCube from a local directory | -| `init_from_remote_repo(repo, base_url)` | Load a MemCube from remote repo (e.g., Hugging Face) | -| `load(dir)` | Load all memories from a directory into existing instance | -| `dump(dir)` | Save all memories to a directory for persistence | +| Method | Description | +| :--- | :--- | +| `init_from_dir(dir)` | Load a MemCube from a local directory | +| `init_from_remote_repo(repo, base_url)` | Load a MemCube from a remote repository (e.g., Hugging Face) | +| `load(dir)` | Load all memories from a directory into the existing instance | +| `dump(dir)` | Save all memories to a directory for persistence | -## File Storage +## File Structure -A MemCube directory contains: +A MemCube directory contains the following files, with each file corresponding to a memory type: - `config.json` (MemCube configuration) - `textual_memory.json` (textual memory) - `activation_memory.pickle` (activation memory) - `parametric_memory.adapter` (parametric memory) -## Example Usage +## Usage Examples ### Export Example (dump_cube.py) @@ -156,15 +163,15 @@ result = view.add_memories(APIADDRequest( )) print(f"✓ Added {len(result)} memories") -# 4. Export specific cube_id data +# 4. Export data for the specific cube_id output_dir = "tmp/mem_cube_dump" if os.path.exists(output_dir): shutil.rmtree(output_dir) os.makedirs(output_dir, exist_ok=True) -# Export graph data (only data for current cube_id) +# Export graph data (only data for the current cube_id) json_data = naive.text_mem.graph_store.export_graph( - include_embedding=True, # Include embeddings for semantic search + include_embedding=True, # Include embeddings to support semantic search user_name=EXAMPLE_CUBE_ID, # Filter by cube_id ) @@ -187,7 +194,7 @@ print(f"✓ Saved to: {memory_file}") ### Import and Search Example (load_cube.py) -> **Note on Embeddings**: The sample data uses **bge-m3** model with **1024 dimensions**. If your environment uses a different embedding model or dimension, semantic search after import may be inaccurate or fail. Ensure your `.env` configuration matches the embedding settings used during export. +> **Embedding Compatibility Note**: The sample data uses the **bge-m3** model with **1024 dimensions**. If your environment uses a different embedding model or dimension, semantic search after import may be inaccurate or fail. Ensure your `.env` configuration matches the embedding settings used during export. ```python import json @@ -273,6 +280,6 @@ The old approach of directly calling `mem_cube.text_mem.get_all()` is deprecated ## Developer Notes -* MemCube enforces schema consistency for safe loading/dumping -* Each memory type is pluggable and independently tested -* See `/tests/mem_cube/` for integration tests and usage patterns +* MemCube enforces schema consistency to ensure safe loading and dumping +* Each memory type can be independently configured, tested, and extended +* See `/tests/mem_cube/` for integration tests and usage examples \ No newline at end of file diff --git a/content/en/open_source/modules/mem_feedback.md b/content/en/open_source/modules/mem_feedback.md index dbed0106..ea7879a0 100644 --- a/content/en/open_source/modules/mem_feedback.md +++ b/content/en/open_source/modules/mem_feedback.md @@ -1,6 +1,6 @@ --- title: MemFeedback -desc: "MemFeedback enables your Agent to understand 'You remembered it wrong' and automatically correct the memory database. It is a key component for self-evolving memory." +desc: MemFeedback is your "memory error notebook". It enables your Agent to understand 'You remembered it wrong' and automatically correct the memory database. It is a key component for achieving self-evolving memory. --- ## 1. Introduction @@ -9,7 +9,7 @@ desc: "MemFeedback enables your Agent to understand 'You remembered it wrong' an In long-term memory systems, the biggest headache is often not "forgetting," but "remembering wrong and unable to change." When a user says, "No, my birthday is tomorrow" or "Change the project code to X," simple RAG systems are usually helpless. -MemFeedback can understand these natural language instructions, accurately locate conflicting memories in the database, and execute atomic correction operations (such as archiving old memories and writing new ones). With it, your Agent can correct errors and learn continuously during interactions, just like a human. +MemFeedback can understand these natural language instructions, automatically locate conflicting memories in the database, and execute atomic correction operations (such as archiving old memories and writing new ones). With it, your Agent can correct errors and learn continuously during interactions, just like a human. --- @@ -23,7 +23,7 @@ When the user points out a factual error. The system will not brutally delete th ### Addition If the user just supplements new information that does not conflict with old memories, it is simple—directly save it as a new node in the memory database. -### Keyword Replacement +### Keyword Replacement (Global Refactor) Similar to "Global Refactor" in an IDE. For example, if the user says, "Change 'Zhang San' to 'Li Si' in all documents," the system will combine the Reranker to automatically determine the scope of affected documents and update all relevant memories in batches. ### Preference Evolution @@ -50,7 +50,7 @@ There is only one main entry point: `process_feedback()`. It is usually called a | Parameter | Description | | :--- | :--- | -| `user_id` / `user_name` | User ID and Cube ID. | +| `user_id` / `user_name` | User identification and Cube ID. | | `chat_history` | Conversation history, letting LLM know what you talked about. | | `feedback_content` | The feedback sentence from the user (e.g., "No, it's 5 o'clock"). | | **`retrieved_memory_ids`** | **Required (Strongly Recommended)**. Pass in the memory IDs retrieved in the previous RAG round. This gives the system a "target," telling it which memory to correct. If not passed, the system has to search again in the massive memory, which is slow and prone to errors. | diff --git a/content/en/open_source/modules/mem_reader.md b/content/en/open_source/modules/mem_reader.md index 8d0d92bc..cd053a8a 100644 --- a/content/en/open_source/modules/mem_reader.md +++ b/content/en/open_source/modules/mem_reader.md @@ -1,11 +1,11 @@ --- title: "MemReader" -desc: "MemReader is MemOS's “memory translator”. It translates messy user inputs (chat, documents, images) into structured memory fragments the system can understand." +desc: MemReader is your "memory translator". It translates messy user inputs (chat, documents, images) into structured memory fragments the system can understand. --- ## 1. Overview -When building AI applications, we often run into this problem: users send all kinds of things—casual chat messages, PDF documents, and images. **MemReader** turns these raw inputs (Raw Data) into standard memory blocks (Memory Items) with embeddings and metadata. +When building AI applications, we often run into this problem: users send all kinds of things—casual chat messages, PDF documents, and images. **MemReader** turns these raw inputs (Raw Data) into standard memory blocks (Memory Items) with embeddings and metadata by "chewing" and "digesting" them. In short, it does three things: 1. **Normalization**: Whether you send a string or JSON, it first converts everything into a standard format. @@ -16,28 +16,28 @@ In short, it does three things: ## 2. Core Modes -MemReader provides two modes, corresponding to the needs for “speed” and “accuracy”: +MemReader provides two modes, corresponding to the needs for "speed" and "accuracy": ### ⚡ Fast Mode (speed first) * **Characteristics**: **Does not call an LLM**, only performs chunking and embeddings. * **Use cases**: * Users are sending messages quickly and the system needs millisecond-level responses. - * You only need to keep “snapshots” of the conversation, without deep understanding. -* **Output**: raw text chunks + vector index. + * You only need to keep "snapshots" of the conversation, without deep understanding. +* **Output**: raw text chunks + vector index + provenance tracking (Sources). ### 🧠 Fine Mode (carefully crafted) * **Characteristics**: **Calls an LLM** for deeper analysis. * **Use cases**: * Long-term memory writing (needs key facts extracted). * Document analysis (needs core ideas summarized). - * Multimodal understanding (needs to understand what’s in an image). -* **Output**: structured facts + summary (Background) + provenance tracking (Provenance). + * Multimodal understanding (needs to understand what's in an image). +* **Output**: structured facts + key information extraction (Key) + background (Background) + vector index + provenance tracking (Sources) + multimodal details. --- ## 3. Code Structure -MemReader’s code structure is straightforward and mainly includes: +MemReader's code structure is straightforward and mainly includes: * **`base.py`**: defines the interface contract that all Readers must follow. * **`simple_struct.py`**: **the most commonly used implementation**. Focuses on pure-text conversations and local documents; lightweight and efficient. @@ -52,7 +52,7 @@ MemReader’s code structure is straightforward and mainly includes: | :--- | :--- | :--- | | **Only process plain text chats** | `SimpleStructMemReader` | Simple, direct, and performant. | | **Need to handle images and file links** | `MultiModalStructMemReader` | Built-in multimodal parsing. | -| **Upgrade from Fast to Fine** | Any Reader’s `fine_transfer` method | Supports a progressive “store first, refine later” strategy. | +| **Upgrade from Fast to Fine** | Any Reader's `fine_transfer` method | Supports a progressive "store first, refine later" strategy. | --- @@ -60,7 +60,7 @@ MemReader’s code structure is straightforward and mainly includes: ### Unified Factory: `MemReaderFactory` -Don’t instantiate readers directly; using the factory pattern is best practice: +Don't instantiate readers directly; using the factory pattern is best practice: ```python from memos.configs.mem_reader import MemReaderConfigFactory @@ -84,8 +84,12 @@ memories = reader.get_memory( ) ``` -* **Return value**: `list[list[TextualMemoryItem]]`. - * Why a nested list? Because a long conversation may be split into multiple windows. The outer list represents windows, and the inner list represents memory items extracted from that window. +**Return value**: `list[list[TextualMemoryItem]]` + +:::note +Why a nested list? +Because a long conversation may be split into multiple windows (Window). The outer list represents windows, and the inner list represents memory items extracted from that window. +::: --- @@ -149,7 +153,7 @@ memories = multimodal_reader.get_memory( ### Scenario 3: Progressive optimization (Fine Transfer) -For better UX, you can first store the conversation quickly in Fast mode, then “refine” it into Fine memories when the system is idle. +For better UX, you can first store the conversation quickly in Fast mode, then "refine" it into Fine memories when the system is idle. ```python # 1. Store quickly first (millisecond-level) @@ -173,6 +177,5 @@ refined_memories = reader.fine_transfer_simple_mem( In `.env` or configuration files, you can adjust these key parameters: * **`chat_window_max_tokens`**: **sliding window size**. Default is 1024. It determines how much context is packed together for processing. Too small may lose context; too large may exceed the LLM token limit. -* **`remove_prompt_example`**: **whether to remove examples from the prompt**. Set to True if you want to save tokens; set to False if extraction quality is not good (keep few-shot examples). -* **`direct_markdown_hostnames`** (multimodal only): **hostname allowlist**. If a file URL’s hostname is in this list (e.g., `raw.githubusercontent.com`), the Reader treats it as Markdown text directly instead of trying OCR or conversion, which is more efficient. - +* **`remove_prompt_example`**: **whether to remove examples from the prompt**. True = save tokens but may reduce extraction quality; False = keep few-shot examples for better accuracy but consume more tokens. +* **`direct_markdown_hostnames`** (multimodal only): **hostname allowlist**. If a file URL's hostname is in this list (e.g., `raw.githubusercontent.com`), the Reader treats it as Markdown text directly instead of trying OCR or conversion, which is more efficient. \ No newline at end of file diff --git a/content/en/open_source/modules/mem_scheduler.md b/content/en/open_source/modules/mem_scheduler.md index 40ff31f0..e16f6f13 100644 --- a/content/en/open_source/modules/mem_scheduler.md +++ b/content/en/open_source/modules/mem_scheduler.md @@ -1,81 +1,495 @@ --- -title: "MemScheduler: The Scheduler for Memory Organization" -desc: "`MemScheduler` is a concurrent memory management system parallel running with the MemOS system, which coordinates memory operations between working memory, long-term memory, and activation memory in AI systems. It handles memory retrieval, updates, and compaction through event-driven scheduling.
This system is particularly suited for conversational agents and reasoning systems requiring dynamic memory management." +title: "MemScheduler" +desc: MemScheduler is your "memory organization scheduler". It asynchronously manages memory flow and updates in the background, coordinating interactions between working memory, long-term memory, and activation memory, enabling conversational systems to dynamically organize and utilize memories. --- + ## Key Features -- 🚀 **Concurrent operation** with MemOS system -- 🧠 **Multi-memory coordination** (Working/Long-Term/User memory) -- ⚡ **Event-driven scheduling** for memory operations -- 🔍 **Efficient retrieval** of relevant memory items -- 📊 **Comprehensive monitoring** of memory usage -- 📝 **Detailed logging** for debugging and analysis -- -## Memory Scheduler Architecture +- 🚀 **Concurrent operation with MemOS system**: Runs in independent threads/processes without blocking main business logic. +- 🧠 **Multi-memory coordination**: Intelligently manages the flow of working memory, long-term memory, and user-personalized memory. +- ⚡ **Event-driven scheduling**: Asynchronous task distribution based on message queues (Redis/Local). +- 🔍 **Efficient retrieval**: Integrated vector and graph retrieval for quick location of relevant memories. +- 📊 **Comprehensive monitoring**: Real-time monitoring of memory utilization, task queue status, and scheduling latency. +- 📝 **Detailed logging**: Full-chain tracing of memory operations for debugging and system analysis. + +## MemScheduler Architecture + +`MemScheduler` adopts a three-layer modular architecture: + +### Scheduling Layer (Core) +1. **Scheduler (Router)**: Intelligent message router that dispatches tasks to corresponding handlers based on message types (e.g., `QUERY`, `ANSWER`, `MEM_UPDATE`). +2. **Message Processing**: Event-driven business logic through messages with specific labels, defining message formats and processing rules. + +### Execution Layer (Guarantee) +3. **Task Queue**: Supports both Redis Stream (production) and Local Queue (development/testing) modes, providing asynchronous task buffering and persistence. +4. **Memory Management**: Executes read/write, compression, forgetting, and type conversion operations on three-layer memory (Working/Long-term/User). +5. **Retrieval System**: Hybrid retrieval module combining user intent, scenario management, and keyword matching for quick memory location. + +### Support Layer (Auxiliary) +6. **Monitoring**: Tracks task accumulation, processing latency, and memory health status. +7. **Logging**: Maintains full-chain memory operation logs for debugging and analysis. + +## MemScheduler Initialization + +In the MemOS architecture, `MemScheduler` is initialized as part of the server components during startup. + +### Initialization in Server Router + +In `src/memos/api/routers/server_router.py`, the scheduler is automatically loaded through the `init_server()` function: + +```python +from memos.api import handlers +from memos.api.handlers.base_handler import HandlerDependencies +from memos.mem_scheduler.base_scheduler import BaseScheduler +from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker + +# ... other imports ... + +# 1. Initialize all server components (including DB, LLM, Memory, Scheduler) +# init_server() reads environment variables and initializes global singleton components +components = handlers.init_server() + +# Create dependency container for handlers +dependencies = HandlerDependencies.from_init_server(components) + +# Initialize handlers... +# search_handler = SearchHandler(dependencies) +# ... + +# 2. Get the scheduler instance from the components dictionary +# The scheduler is already initialized and started inside init_server (if enabled) +mem_scheduler: BaseScheduler = components["mem_scheduler"] + +# 3. Users can also get other scheduling-related components from components (optional, for custom task handling) +# redis_client is used for direct Redis operations or monitoring task status +redis_client = components["redis_client"] +# ... +``` + +## Scheduling Tasks and Data Models + +The scheduler distributes and executes tasks through a message-driven approach. This section introduces supported task types, message structures, and execution logs. + +### Message Types and Handlers + +The scheduler dispatches and executes tasks by registering specific task labels (Label) with handlers (Handler). The following are the default supported scheduling tasks in the current version (based on `GeneralScheduler` and `OptimizedScheduler`): + +| Message Label | Constant | Handler Method | Description | +| :--- | :--- | :--- | :--- | +| `query` | `QUERY_TASK_LABEL` | `_query_message_consumer` | Processes user queries, triggers intent recognition, memory retrieval, and converts them to memory update tasks. | +| `answer` | `ANSWER_TASK_LABEL` | `_answer_message_consumer` | Processes AI responses and logs conversations. | +| `mem_update` | `MEM_UPDATE_TASK_LABEL` | `_memory_update_consumer` | Core task. Executes the long-term memory update process, including extracting Query Keywords, updating Monitor, retrieving relevant memories, and replacing Working Memory. | +| `add` | `ADD_TASK_LABEL` | `_add_message_consumer` | Handles logging of new memory additions (supports local and cloud logs). | +| `mem_read` | `MEM_READ_TASK_LABEL` | `_mem_read_message_consumer` | Deep processing and importing external memory content using `MemReader`. | +| `mem_organize` | `MEM_ORGANIZE_TASK_LABEL` | `_mem_reorganize_message_consumer` | Triggers memory reorganization and merge operations. | +| `pref_add` | `PREF_ADD_TASK_LABEL` | `_pref_add_message_consumer` | Handles extraction and addition of user preference memory (Preference Memory). | +| `mem_feedback` | `MEM_FEEDBACK_TASK_LABEL` | `_mem_feedback_message_consumer` | Processes user feedback for correcting or reinforcing preferences. | +| `api_mix_search` | `API_MIX_SEARCH_TASK_LABEL` | `_api_mix_search_message_consumer` | (OptimizedScheduler only) Executes asynchronous hybrid search tasks combining fast and fine retrieval. | + +### Message Data Structure (ScheduleMessageItem) + +The scheduler uses a unified `ScheduleMessageItem` structure to pass messages in the queue. + +> **Note**: The `mem_cube` object itself is not directly included in the message model; instead, it is resolved by the scheduler at runtime through `mem_cube_id`. + +| Field | Type | Description | Default/Remarks | +| :--- | :--- | :--- | :--- | +| `item_id` | `str` | Unique message identifier (UUID) | Auto-generated | +| `user_id` | `str` | Associated user ID | (Required) | +| `mem_cube_id` | `str` | Associated Memory Cube ID | (Required) | +| `label` | `str` | Task label (e.g., `query`, `mem_update`) | (Required) | +| `content` | `str` | Message payload (typically JSON string or text) | (Required) | +| `timestamp` | `datetime` | Message submission time | Auto-generated (UTC now) | +| `session_id` | `str` | Session ID for context isolation | `""` | +| `trace_id` | `str` | Trace ID for full-chain log association | Auto-generated | +| `user_name` | `str` | User display name | `""` | +| `task_id` | `str` | Business-level task ID (for associating multiple messages) | `None` | +| `info` | `dict` | Additional custom context information | `None` | +| `stream_key` | `str` | (Internal use) Redis Stream key name | `""` | + +### Execution Log Structure (ScheduleLogForWebItem) + +The scheduler generates structured log messages for frontend display or persistent storage. + +| Field | Type | Description | Remarks | +| :--- | :--- | :--- | :--- | +| `item_id` | `str` | Unique log entry identifier | Auto-generated | +| `task_id` | `str` | Associated parent task ID | Optional | +| `user_id` | `str` | User ID | (Required) | +| `mem_cube_id` | `str` | Memory Cube ID | (Required) | +| `label` | `str` | Log category (e.g., `addMessage`, `addMemory`) | (Required) | +| `log_content` | `str` | Brief log description text | (Required) | +| `from_memory_type` | `str` | Source memory area | e.g., `UserInput`, `LongTermMemory` | +| `to_memory_type` | `str` | Destination memory area | e.g., `WorkingMemory` | +| `memcube_log_content` | `list[dict]` | Structured detailed content | Contains specific memory text, reference IDs, etc. | +| `metadata` | `list[dict]` | Memory item metadata | Contains confidence, status, tags, etc. | +| `status` | `str` | Task status | e.g., `completed`, `failed` | +| `timestamp` | `datetime` | Log creation time | Auto-generated | +| `current_memory_sizes` | `MemorySizes` | Current memory quantity snapshot for each area | For monitoring dashboard display | +| `memory_capacities` | `MemoryCapacities` | Memory capacity limits for each area | For monitoring dashboard display | + +## Scheduling Function Examples + +### 1. Message Processing and Custom Handlers + +The scheduler's most powerful feature is support for registering custom message handlers. You can define specific message types (e.g., `MY_CUSTOM_TASK`) and write functions to handle them. + +```python +import uuid +from datetime import datetime + +# 1. Import necessary type definitions and scheduler instance +# Note: mem_scheduler needs to be imported from server_router as it's a global singleton +from memos.api.routers.server_router import mem_scheduler +from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem + +# Define a custom task label +MY_TASK_LABEL = "MY_CUSTOM_TASK" + + +# Define a handler function +def my_task_handler(messages: list[ScheduleMessageItem]): + """ + Function to handle custom tasks + """ + for msg in messages: + print(f"⚡️ [Handler] Received task: {msg.item_id}") + print(f"📦 Content: {msg.content}") + # Execute your business logic here, e.g., call LLM, write to database, trigger other tasks, etc. + + +# 2. Register the handler to the scheduler +# This step mounts your custom logic to the scheduling system +mem_scheduler.register_handlers({ + MY_TASK_LABEL: my_task_handler +}) + +# 3. Submit a task +task = ScheduleMessageItem( + item_id=str(uuid.uuid4()), + user_id="user_123", + mem_cube_id="cube_001", + label=MY_TASK_LABEL, + content="This is a test message", + timestamp=datetime.now() +) + +# If the scheduler is not started, the task will be queued for processing +# or in local queue mode may require calling mem_scheduler.start() first +mem_scheduler.submit_messages([task]) + +print(f"Task submitted: {task.item_id}") + +# Prevent scheduler main process from exiting prematurely +time.sleep(10) +``` + +### 2. Redis Queue vs Local Queue + +- **Local Queue**: + - **Use case**: Unit tests, simple single-machine scripts. + - **Characteristics**: Fast, but data is lost after process restart; does not support multi-process/multi-instance sharing. + - **Configuration**: `MOS_SCHEDULER_USE_REDIS_QUEUE=false` + +- **Redis Queue (Redis Stream)**: + - **Use case**: Production environment, distributed deployment. + - **Characteristics**: Data persistence, supports consumer groups allowing multiple scheduler instances to handle tasks together (load balancing). + - **Configuration**: `MOS_SCHEDULER_USE_REDIS_QUEUE=true` + - **Debugging**: Use the `show_redis_status.py` script to check queue accumulation. + +## Comprehensive Application Scenarios + +### Scenario 1: Basic Conversation Flow and Memory Update + +The following is a complete example demonstrating how to initialize the environment, register custom logic, simulate conversation flow, and trigger memory updates. + +```python +import asyncio +import json +import os +import sys +import time +from pathlib import Path + +# --- Environment Setup --- +# 1. Add project root to sys.path to ensure memos module can be imported +FILE_PATH = Path(__file__).absolute() +BASE_DIR = FILE_PATH.parent.parent.parent +sys.path.insert(0, str(BASE_DIR)) + +# 2. Set necessary environment variables (simulating .env configuration) +os.environ["ENABLE_CHAT_API"] = "true" +os.environ["MOS_ENABLE_SCHEDULER"] = "true" +# Choose between Redis or Local queue +os.environ["MOS_SCHEDULER_USE_REDIS_QUEUE"] = "false" + +# --- Import Components --- +# Note: Importing server_router triggers component initialization, +# ensure environment variables are set before this import +from memos.api.product_models import APIADDRequest, ChatPlaygroundRequest +from memos.api.routers.server_router import ( + add_handler, + chat_stream_playground, + mem_scheduler, # mem_scheduler here is already an initialized singleton +) +from memos.log import get_logger +from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem +from memos.mem_scheduler.schemas.task_schemas import ( + MEM_UPDATE_TASK_LABEL, + QUERY_TASK_LABEL, +) + +logger = get_logger(__name__) + +# Global variable for demonstrating memory retrieval results +working_memories = [] + +# --- Custom Handlers --- + +def custom_query_handler(messages: list[ScheduleMessageItem]): + """ + Handle user query messages: + 1. Print query content + 2. Convert message to MEM_UPDATE task, triggering memory retrieval/update process + """ + for msg in messages: + print(f"\n[Scheduler 🟢] Received user query: {msg.content}") + + # Copy message and change label to MEM_UPDATE, a common "task chaining" pattern + new_msg = msg.model_copy(update={"label": MEM_UPDATE_TASK_LABEL}) + + # Submit new task back to scheduler + mem_scheduler.submit_messages([new_msg]) + + +def custom_mem_update_handler(messages: list[ScheduleMessageItem]): + """ + Handle memory update tasks: + 1. Use retriever to find relevant memories + 2. Update global working memory list + """ + global working_memories + search_args = {} + top_k = 2 + + for msg in messages: + print(f"[Scheduler 🔵] Retrieving memories for query...") + # Call core retrieval functionality + results = mem_scheduler.retriever.search( + query=msg.content, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=mem_scheduler.current_mem_cube, + top_k=top_k, + method=mem_scheduler.search_method, + search_args=search_args, + ) + + # Simulate working memory update + working_memories.extend(results) + working_memories = working_memories[-5:] # Keep the latest 5 + + for mem in results: + # Print retrieved memory fragments + print(f" ↳ [Memory Found]: {mem.memory[:50]}...") + +# --- Mock Business Data --- + +def get_mock_data(): + """Generate mock conversation data""" + conversations = [ + {"role": "user", "content": "I just adopted a golden retriever puppy named Max."}, + {"role": "assistant", "content": "That's exciting! Max is a great name."}, + {"role": "user", "content": "He loves peanut butter treats but I am allergic to nuts."}, + {"role": "assistant", "content": "Noted. Peanut butter for Max, no nuts for you."}, + ] + + questions = [ + {"question": "What is my dog's name?", "category": "Pet"}, + {"question": "What am I allergic to?", "category": "Allergy"}, + ] + return conversations, questions + +# --- Main Flow --- + +async def run_demo(): + print("==== MemScheduler Demo Start ====") + conversations, questions = get_mock_data() + + user_id = "demo_user_001" + mem_cube_id = "cube_demo_001" + + print(f"1. Initialize user memory library ({user_id})...") + # Use API Handler to add initial memories (synchronous mode) + add_req = APIADDRequest( + user_id=user_id, + writable_cube_ids=[mem_cube_id], + messages=conversations, + async_mode="sync", + ) + add_handler.handle_add_memories(add_req) + print(" Memory addition completed.") + + print("\n2. Start conversation testing (triggering background scheduling tasks)...") + for item in questions: + query = item["question"] + print(f"\n>> User: {query}") + + # Initiate chat request + chat_req = ChatPlaygroundRequest( + user_id=user_id, + query=query, + readable_cube_ids=[mem_cube_id], + writable_cube_ids=[mem_cube_id], + ) + + # Get streaming response + response = chat_stream_playground(chat_req) + + # Handle streaming output (simplified) + full_answer = "" + buffer = "" + async for chunk in response.body_iterator: + if isinstance(chunk, bytes): + chunk = chunk.decode("utf-8") + buffer += chunk + while "\n\n" in buffer: + msg, buffer = buffer.split("\n\n", 1) + for line in msg.split("\n"): + if line.startswith("data: "): + try: + data = json.loads(line[6:]) + if data.get("type") == "text": + full_answer += data["data"] + except: pass + + print(f">> AI: {full_answer}") + + # Wait a moment for background scheduler to process tasks and print logs + await asyncio.sleep(1) + +if __name__ == "__main__": + # 1. Register our custom handlers + # This will override or add to the default scheduling logic + mem_scheduler.register_handlers( + { + QUERY_TASK_LABEL: custom_query_handler, + MEM_UPDATE_TASK_LABEL: custom_mem_update_handler, + } + ) + + # 2. Ensure scheduler is started + if not mem_scheduler._running: + mem_scheduler.start() + + try: + asyncio.run(run_demo()) + except KeyboardInterrupt: + pass + finally: + # Prevent scheduler main process from exiting prematurely + time.sleep(10) -The `MemScheduler` system is structured around several key components: + print("\n==== Stopping scheduler ====") + mem_scheduler.stop() +``` -1. **Message Handling**: Processes incoming messages through a dispatcher with labeled handlers -2. **Memory Management**: Manages different memory types (Working, Long-Term, User) -3. **Retrieval System**: Efficiently retrieves relevant memory items based on context -4. **Monitoring**: Tracks memory usage, frequencies, and triggers updates -5. **Dispatcher (Router)**: Trigger different memory reorganization strategies by checking messages from MemOS systems. -6. **Logging**: Maintains logs of memory operations for debugging and analysis +### Scenario 2: Concurrent Asynchronous Tasks and Checkpoint Restart (Redis) -## Message Processing +This example demonstrates how to use Redis queues to achieve concurrent asynchronous task processing and checkpoint restart functionality. Running this example requires Redis environment configuration. -The scheduler processes messages through a dispatcher with dedicated handlers: +```python +from pathlib import Path +from time import sleep -### Message Types +from memos.api.routers.server_router import mem_scheduler +from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem -| Message Type | Handler Method | Description | -|--------------|---------------------------------|--------------------------------------------| -| `QUERY_LABEL` | `_query_message_consume` | Handles user queries and triggers retrieval | -| `ANSWER_LABEL`| `_answer_message_consume` | Processes answers and updates memory usage | -### Schedule Message Structure +# Debug: Print scheduler configuration +print("=== Scheduler Configuration Debug ===") +print(f"Scheduler type: {type(mem_scheduler).__name__}") +print(f"Config: {mem_scheduler.config}") +print(f"use_redis_queue: {mem_scheduler.use_redis_queue}") +print(f"Queue type: {type(mem_scheduler.memos_message_queue).__name__}") +print(f"Queue maxsize: {getattr(mem_scheduler.memos_message_queue, 'maxsize', 'N/A')}") +print("=====================================\n") -The scheduler processes messages from its queue using the following format: +queue = mem_scheduler.memos_message_queue -ScheduleMessageItem: -| Field | Type | Description | -|---------------|----------------------|-----------------------------------------------| -| `item_id` | `str` | UUID (auto-generated) for unique identification | -| `user_id` | `str` | Identifier for the associated user | -| `mem_cube_id` | `str` | Identifier for the memory cube | -| `label` | `str` | Message label (e.g., `QUERY_LABEL`, `ANSWER_LABEL`) | -| `mem_cube` | `GeneralMemCube|str` | Memory cube object or reference | -| `content` | `str` | Message content | -| `timestamp` | `datetime` | Time when the message was submitted | +# Define handler function +def my_test_handler(messages: list[ScheduleMessageItem]): + print(f"My test handler received {len(messages)} messages: {[one.item_id for one in messages]}") + for msg in messages: + # Create file based on task_id (use item_id as numeric ID 0..99) + task_id = str(msg.item_id) + file_path = tmp_dir / f"{task_id}.txt" + try: + sleep(5) + file_path.write_text(f"Task {task_id} processed.\n") + print(f"writing {file_path} done") + except Exception as e: + print(f"Failed to write {file_path}: {e}") -Meanwhile the scheduler will send the scheduling messages by following structures. +def submit_tasks(): + mem_scheduler.memos_message_queue.clear() -ScheduleLogForWebItem: + # Create 100 messages (task_id 0..99) + users = ["user_A", "user_B"] + messages_to_send = [ + ScheduleMessageItem( + item_id=str(i), + user_id=users[i % 2], + mem_cube_id="test_mem_cube", + label=TEST_HANDLER_LABEL, + content=f"Create file for task {i}", + ) + for i in range(100) + ] + # Batch submit messages and print completion info + print(f"Submitting {len(messages_to_send)} messages to the scheduler...") + mem_scheduler.memos_message_queue.submit_messages(messages_to_send) + print(f"Task submission done! tasks in queue: {mem_scheduler.get_tasks_status()}") -| Field | Type | Description | Default Value | -|------------------------|--------------------|-----------------------------------------------------------------------------|----------------------------------------| -| `item_id` | `str` | Unique log entry identifier (UUIDv4) | Auto-generated (`uuid4()`) | -| `user_id` | `str` | Associated user identifier | (Required) | -| `mem_cube_id` | `str` | Linked memory cube ID | (Required) | -| `label` | `str` | Log category identifier | (Required) | -| `from_memory_type` | `str` | Source memory partition
Possible values:
- `"LongTermMemory"`
- `"UserMemory"`
- `"WorkingMemory"` | (Required) | -| `to_memory_type` | `str` | Destination memory partition | (Required) | -| `log_content` | `str` | Detailed log message | (Required) | -| `current_memory_sizes` | `MemorySizes` | Current memory utilization |
DEFAULT_MEMORY_SIZES = {
"long_term_memory_size": -1,
"user_memory_size": -1,
"working_memory_size": -1,
"transformed_act_memory_size": -1
}
| -| `memory_capacities` | `MemoryCapacities` | Memory partition limits |
DEFAULT_MEMORY_CAPACITIES = {
"long_term_memory_capacity": 10000,
"user_memory_capacity": 10000,
"working_memory_capacity": 20,
"transformed_act_memory_capacity": -1
}
| -| `timestamp` | `datetime` | Log creation time | Auto-set (`datetime.now`) | -## Execution Example +# Register handler function +TEST_HANDLER_LABEL = "test_handler" +mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler}) -`examples/mem_scheduler/schedule_w_memos.py` is a demonstration script showcasing how to utilize the `MemScheduler` module. It illustrates memory management and retrieval within conversational contexts. +# 5 second restart +mem_scheduler.orchestrator.tasks_min_idle_ms[TEST_HANDLER_LABEL] = 5_000 -### Code Functionality Overview +tmp_dir = Path("./tmp") +tmp_dir.mkdir(exist_ok=True) -This script demonstrates two methods for initializing and using the memory scheduler: +# Test stop and restart: if tmp has >1 files, skip submission and print info +existing_count = len(list(Path("tmp").glob("*.txt"))) if Path("tmp").exists() else 0 +if existing_count > 1: + print(f"Skip submission: found {existing_count} files in tmp (>1), continue processing") +else: + submit_tasks() -1. **Automatic Initialization**: Configures the scheduler via configuration files -2. **Manual Initialization**: Explicitly creates and configures scheduler components +# Wait until tmp has 100 files or timeout +poll_interval = 1 +expected = 100 +tmp_dir = Path("tmp") +tasks_status = mem_scheduler.get_tasks_status() +mem_scheduler.print_tasks_status(tasks_status=tasks_status) +while ( + mem_scheduler.get_tasks_status()["remaining"] != 0 + or mem_scheduler.get_tasks_status()["running"] != 0 +): + count = len(list(tmp_dir.glob("*.txt"))) if tmp_dir.exists() else 0 + tasks_status = mem_scheduler.get_tasks_status() + mem_scheduler.print_tasks_status(tasks_status=tasks_status) + print(f"[Monitor] Files in tmp: {count}/{expected}") + sleep(poll_interval) +print(f"[Result] Final files in tmp: {len(list(tmp_dir.glob('*.txt')))})") -The script simulates a pet-related conversation between a user and an assistant, demonstrating how memory scheduler manages conversation history and retrieves relevant information. +# Stop scheduler +sleep(20) +print("Stopping the scheduler...") +mem_scheduler.stop() +``` \ No newline at end of file diff --git a/content/en/open_source/modules/mos/overview.md b/content/en/open_source/modules/mos/overview.md index 2caaca5f..ef8f4e4c 100644 --- a/content/en/open_source/modules/mos/overview.md +++ b/content/en/open_source/modules/mos/overview.md @@ -3,38 +3,36 @@ title: MemOS API Development Guide (Components & Handlers Architecture) desc: MemOS v2.0 adopts a more modular and decoupled architecture. The legacy MOS class is deprecated; Components + Handlers is now recommended for development. --- -This architecture separates "system construction" (Components) from "business logic execution" (Handlers), making the system easier to extend, test, and maintain. +This architecture separates "system components" (Components) from "business logic execution" (Handlers), making the system easier to extend, test, and maintain. ## 1. Core Concepts -### 1.1 Components +### 1.1 Components (Core Components) -Components are the "brain" and "infrastructure" of MemOS. They are initialized when the server starts (via `init_server()`) and reused throughout the system lifecycle. +Components are the "organs" of MemOS. They are initialized when the server starts (via `init_server()`) and reused throughout the system lifecycle. Core components include: #### Core Memory Components -1. **MemCube**: A memory container used to isolate memories across different users / different cubes, and to manage multiple memory modules in a unified way. -2. **MemReader**: A memory processor that parses user inputs (chat, documents, images) into memory items that the system can write. -3. **MemScheduler**: A scheduler that turns time-consuming tasks such as memory writes, index building, and memory organization into asynchronous and concurrent jobs. -4. **MemChat**: A conversation controller responsible for the dialogue loop of "retrieve memory -> generate response -> write new memory". -5. **MemFeedback**: Correction and feedback that converts users' natural-language feedback into safe updates to the memory store. +1. **MemCube**: A memory container that isolates memories across different users and application scenarios, managing multiple memory modules in a unified way. +2. **MemReader**: A memory processor that parses user inputs (chat, documents, images) into standardized memory items that the system can persist. +3. **MemScheduler**: A background scheduler that handles asynchronous processing of memory operations—storage, indexing, and organization—supporting concurrent task execution. +4. **MemChat**: A conversation controller responsible for orchestrating the memory-augmented dialogue loop: "retrieve memory → generate response → store new memory". +5. **MemFeedback**: A memory correction engine that understands users' natural-language feedback and performs atomic-level updates to memories (correction, addition, replacement). -### 1.2 Handlers +### 1.2 Handlers (Business Processors) -Handlers are the "hands" of MemOS. They encapsulate concrete business logic and complete tasks by calling Components. +Handlers are the "brain" of MemOS. They encapsulate concrete business logic by coordinating and calling the capabilities of Components to complete user-facing tasks. -Main Handlers include: - -## Core Handlers Overview +#### Core Handlers Overview | Handler | Purpose | Key Methods | | :--- | :--- | :--- | | **AddHandler** | Add memories (chat / documents / text) | `handle_add_memories` | | **SearchHandler** | Search memories (semantic retrieval) | `handle_search_memories` | | **ChatHandler** | Chat (with memory augmentation) | `handle_chat_complete`, `handle_chat_stream` | -| **FeedbackHandler** | Feedback (correct memories / human-in-the-loop) | `handle_feedback_memories` | +| **FeedbackHandler** | Feedback (correct memories / human feedback) | `handle_feedback_memories` | | **MemoryHandler** | Manage (get details / delete) | `handle_get_memory`, `handle_delete_memories` | | **SchedulerHandler** | Scheduling (query async task status) | `handle_scheduler_status`, `handle_scheduler_wait` | | **SuggestionHandler** | Suggestions (generate recommended questions) | `handle_get_suggestion_queries` | @@ -44,65 +42,64 @@ Main Handlers include: ### 2.1 Initialization Initialization is the foundation of system startup. All Handlers rely on a unified component registry and dependency-injection mechanism. -- Component loading (`init_server`): The system first initializes all core components, including the LLM, storage layers (vector DB, graph DB), the scheduler, and various Memory Cubes. -- Dependency injection (`HandlerDependencies`): To keep the code decoupled and testable, all components are wrapped into a `HandlerDependencies` object. Handlers receive this dependency container during instantiation, so they can access resources such as `naive_mem_cube`, `mem_reader`, or `feedback_server` on demand, without hard-coding initialization logic inside the Handler. +- Component loading (`init_server`): When the system starts, it initializes all core components, including the LLM, storage layers (vector DB, graph DB), scheduler, and various Memory Cubes. +- Dependency injection (`HandlerDependencies`): To ensure loose coupling and testability, all components are wrapped into a `HandlerDependencies` container. When a Handler is instantiated, it receives this container and can access needed resources—such as `naive_mem_cube`, `mem_reader`, or `feedback_server`—without duplicating initialization logic. ### 2.2 Add Memories (AddHandler) -AddHandler is the primary entry point that converts external information into system memories. It supports chats, file uploads, and plain text. Besides writing, it also takes on part of the feedback routing responsibility. +AddHandler is the brain's "memory intake instruction", responsible for converting external information into system memories. It handles not only intake and conversion of various information types, but also automatically recognizes feedback and routes it to dedicated feedback processing. - Core capabilities: - - Multimodal support: Processes user message lists (Messages) and converts them into internal memory objects. - - Sync and async modes: Controlled via `async_mode`. In production, "async" is recommended: tasks are pushed to a background queue and executed by the Scheduler, and the API returns immediately with a task_id. For debugging, use "sync" to block until completion. - - Automatic feedback routing: If the request sets `is_feedback=True`, the Handler automatically takes the last user message as feedback content and routes it to the feedback logic, instead of adding it as a normal new memory. - - Multi-target writes: You can specify multiple target cubes via `writable_cube_ids`. If multiple targets are provided, the Handler builds a CompositeCubeView and fans out write tasks in parallel. If only one target is provided, it uses a lightweight SingleCubeView. + - Multimodal support: Processes user conversations, documents, images, and other input types, converting them into standardized memory objects. + - Sync and async modes: Controlled via `async_mode`. **Sync mode** ("sync"): processes immediately and blocks until completion, suitable for debugging. **Async mode** ("async"): pushes tasks to a background queue for concurrent processing by MemScheduler, returns a task ID immediately, suitable for production to improve response speed. + - Automatic feedback routing: If the request sets `is_feedback=True`, the Handler automatically extracts the last user message as feedback content and routes it to MemFeedback processing, instead of adding it as a normal memory. + - Multi-target writes: Supports writing to multiple MemCubes simultaneously. When multiple targets are specified, the system processes all write tasks in parallel; when only one target is specified, it uses a lightweight approach. ### 2.3 Search Memories (SearchHandler) -SearchHandler provides semantic memory retrieval and is a key component for RAG (Retrieval-Augmented Generation). +SearchHandler is the brain's "memory retrieval instruction", providing semantic-based intelligent memory query capabilities and serving as a key component for RAG (Retrieval-Augmented Generation). - Core capabilities: - - Semantic retrieval: Uses embedding-based similarity to recall relevant memories by meaning rather than simple keyword matching. - - Flexible search scope: With `readable_cube_ids`, callers can precisely control the search context (for example, search only within a specific user's memory, or search across public memories). - - Multi-strategy support: The underlying implementation supports multiple search strategies (such as fast, fine, or mixture) to balance latency and recall quality. - - Deep search integration: Can integrate `deepsearch_agent` to handle more complex retrieval requests that require multi-step reasoning. + - Semantic retrieval: Uses embedding technology to recall relevant memories based on semantic similarity, understanding user intent more accurately than simple keyword matching. + - Flexible search scope: Supports specifying the target data range for retrieval. For example, you can search only within a specific user's memory, or search across multiple users' shared public memories, meeting different privacy and business needs. + - Multiple retrieval modes: Flexibly choose between speed and accuracy based on application scenarios. **Fast mode** suits scenarios requiring high real-time performance, **fine mode** suits scenarios pursuing high retrieval accuracy, and **mixed mode** balances both. + - Multi-step reasoning retrieval: For complex questions, supports deep reasoning capability to progressively approach the most relevant memories through multiple rounds of understanding and retrieval. ### 2.4 Chat (ChatHandler) -ChatHandler is the orchestrator of upper-layer business logic. It does not store data directly; instead, it composes other Handlers to complete end-to-end chat tasks. +ChatHandler is the brain's "dialogue coordination instruction", responsible for converting user dialogue requirements into a complete business process. It does not directly operate on memories; instead, it coordinates other Handlers to complete end-to-end dialogue tasks. - Core capabilities: - - Orchestration: Automatically chains the full process of "retrieve -> generate -> store". It calls SearchHandler for context, calls the LLM to generate a response, then calls AddHandler to save the newly produced dialogue as memory. - - Context management: Assembles `history` (past conversation) and `query` (current question) to ensure the AI understands the complete context. - - Streaming and non-streaming: Supports standard responses (APIChatCompleteRequest) and streaming responses (Stream) to match different frontend interaction needs. - - Notification integration: Optionally integrates `online_bot` (for example, a DingTalk bot) to push notifications after responses are generated. + - Orchestration: Automatically executes the complete dialogue loop of "retrieve memory → generate response → store memory". Each user query benefits from historical memories for smarter responses, and each dialogue is crystallized as new memory, achieving "chat-as-learning". + - Context management: Handles the assembly of `history` (past conversation) and `query` (current question) to ensure the LLM understands the complete dialogue context and avoids information loss. + - Multiple interaction modes: Supports standard request-response mode and streaming response mode. Standard mode suits simple questions, streaming mode suits long-text replies, meeting different frontend interaction needs. + - Message push (optional): Supports automatically pushing results to third-party platforms (such as DingTalk) after generating responses, enabling multi-channel integration. ### 2.5 Feedback and Correction (FeedbackHandler) -FeedbackHandler is the system's "self-correction" mechanism. It allows users to intervene in the AI's behavior to improve future retrieval and generation. +FeedbackHandler is the brain's "feedback correction instruction", responsible for understanding users' natural-language feedback about AI performance and automatically locating and correcting relevant memory content. - Core capabilities: - - Memory correction: When users point out mistakes (for example, "the meeting location is Shanghai, not Beijing"), the Handler updates or marks the old memory nodes based on the feedback content. - - Positive and negative feedback: Supports upvote and downvote signals to adjust the weight or credibility of specific memories. - - Precise targeting: In addition to conversation-history-based feedback, it supports `retrieved_memory_ids` so you can correct specific retrieved items, improving feedback effectiveness. + - Memory correction: When users point out AI errors (such as "the meeting location is Shanghai, not Beijing"), the Handler automatically updates or marks old memories. The system uses version management rather than direct deletion, maintaining traceability of modification history. + - Positive and negative feedback: Supports users marking specific memory quality through upvote or downvote. The system adjusts the memory's weight and credibility accordingly, making subsequent retrieval more accurate. + - Precise targeting: Supports two feedback modes. One is automatic conflict detection based on dialogue history, the other allows users to directly specify memories to correct, improving feedback effectiveness and accuracy. ### 2.6 Memory Management (MemoryHandler) -MemoryHandler provides low-level CRUD capabilities for memory data. It is mainly used for admin backends or cleanup tools. +MemoryHandler is the brain's "memory management instruction", providing low-level CRUD capabilities for memory data, primarily for system admin backends or data cleanup scenarios. - Core capabilities: - - Fine-grained management: Unlike AddHandler's business-level writes, this Handler can fetch a single memory by `memory_id` or perform physical deletion. - - Direct dependency access: Some operations need to interact with the underlying `naive_mem_cube` component directly, bypassing business wrappers to provide the most efficient data operations. + - Fine-grained management: Unlike AddHandler's business-level writes, this Handler allows fetching detailed information of a single memory or performing physical deletion by memory ID. This direct operation bypasses business logic packaging, primarily for debugging, auditing, or system cleanup. + - Direct backend access: Some management operations need to interact directly with the underlying memory component (naive_mem_cube) to provide the most efficient and lowest-latency data operations, meeting system operations needs. ### 2.7 Scheduler Status (SchedulerHandler) -SchedulerHandler monitors the lifecycle of all async tasks in the system and is an important part of system observability. +SchedulerHandler is the brain's "task monitoring instruction", responsible for tracking the real-time execution status of all async tasks in the system, allowing users to understand background task progress and results. - Core capabilities: - - Status tracking: With a Redis backend, it tracks real-time task states (Queued, Running, Completed, Failed). - - Result fetching: For async tasks, clients can poll progress via this API and retrieve the final result or error when the task completes. - - Debugging support: Provides utilities such as `handle_scheduler_wait` to force async flows into synchronous waits in test scripts, which is useful for integration tests. + - Status tracking: Tracks real-time task status in real-time (queued, running, completed, failed). This is important for users in async mode who need to understand when tasks complete. + - Result fetching: Provides a task result query interface. When async tasks complete, users can fetch the final execution result or error information through this interface, understanding whether operations succeeded and the reasons for failure. + - Sync wait (debugging tool): During testing and integration testing, provides a tool to force async tasks into synchronous waits, allowing developers to debug async flows like debugging synchronous code, improving development efficiency. -### 2.8 Suggested Next Questions (SuggestionHandler) -SuggestionHandler improves interaction by predicting users' potential intent and generating "recommended questions" (Next Query Suggestion). +### 2.8 Suggested Questions (SuggestionHandler) +SuggestionHandler is the brain's "suggestion generation instruction", predicting users' potential needs and proactively recommending related questions to help users explore system capabilities and discover topics of interest. - Core capabilities: - Dual-mode generation: - - Conversation-based: If `message` (recent conversation records) is provided, the system analyzes the context and generates 3 follow-up questions closely related to the current topic. - - Memory-based: If there is no conversation context, the system uses `naive_mem_cube` to quickly retrieve the user's "recent memories" and generates questions related to the user's recent life/work status. - - Multi-language support: Built-in Chinese and English prompt templates switch the language style automatically based on the `language` parameter. - + - Conversation-based suggestions: When users provide recent conversation records, the system analyzes dialogue context and infers potential follow-up topics of interest, generating 3 related recommended questions. + - Memory-based suggestions: When there is no conversation context, the system infers user interests and status from recent memories, generating recommended questions related to the user's recent life or work. This suits dialogue initiation or topic transitions. + - Multi-language support: Recommended questions automatically adapt to user language settings, supporting Chinese, English, and other languages, improving experience for different users. \ No newline at end of file