A lightweight, high-performance data streaming bridge that connects sources to sinks with powerful transformation capabilities.
mstream simplifies building data pipelines by providing a configurable bridge between systems like MongoDB, Kafka, and PubSub. It handles format conversion, schema validation, and data transformation out of the box, allowing you to focus on your data logic rather than glue code.
- Universal Connectivity: Stream data between MongoDB, Kafka, and Google Cloud PubSub.
- Zero-Code Transformations: Convert between BSON, JSON, and Avro formats automatically.
- Powerful Middleware: Transform data in-flight using HTTP services or embedded Rhai scripts.
- Schema Validation: Enforce data quality with Avro schema validation and filtering.
- High Performance: Optimized batch processing with zero-copy handling.
- Simple Configuration: Define your entire pipeline in a single TOML file.
make
-
Create a configuration file:
cp mstream-config.toml.example mstream-config.toml
-
Start the full stack using the Makefile helper:
make docker-up
cargo build --releaseCreate a mstream-config.toml file to define your pipeline. Here is a simple example that streams changes from a MongoDB collection to a Kafka topic:
# 1. Define Services
[[services]]
provider = "mongodb"
name = "mongo-local"
connection_string = "mongodb://localhost:27017"
db_name = "app_db"
[[services]]
provider = "kafka"
name = "kafka-local"
"bootstrap.servers" = "localhost:9092"
# 2. Define Connector
[[connectors]]
enabled = true
name = "users-sync"
# Source: Watch 'users' collection
source = { service_name = "mongo-local", resource = "users", output_encoding = "json" }
# Sink: Publish to 'users-topic'
sinks = [
{ service_name = "kafka-local", resource = "users-topic", output_encoding = "json" }
]Ready-made configs:
- mongo_to_kafka.toml (MongoDB β Kafka)
- kafka_to_mongo.toml (Kafka β MongoDB)
Run mstream with your configuration:
# All together
cargo build --release
cp target/release/mstream .
cp mstream-config.toml.example ./mstream-config.toml
# update the config as needed
RUST_LOG=info ./mstream
# Or run with Docker Compose (recommended)
cp mstream-config.toml.example ./mstream-config.toml
# update the config as needed
make docker-up
make db-init-rpl-setmstream keeps job definitions and runtime state in an internal job-lifecycle store so that restarts are predictable. By default the store is in-memory, but you can opt into a persistent backend by pointing the system section at a managed service:
[system.job_lifecycle]
service_name = "mongodb-source"
resource = "job-lifecycle"
startup_state = "seed_from_file" # other options: "force_from_file", "keep"- service_name references an existing service entry (e.g. MongoDB) that will host the lifecycle collection/table.
- resource is the database collection/table used to store job metadata.
- startup_state controls how jobs are reconciled when mstream boots:
force_from_file: stop everything and start only the connectors listed in the config file.seed_from_file: initialize the store from the file if it is empty, otherwise resume the previously persisted state.keep: ignore the file and resume whatever is already stored (handy for API-driven workflows).
With a persistent store in place you can safely restart the server without losing job intent, resume jobs automatically, or keep them paused until you explicitly restart them via the API. If you omit [system.job_lifecycle], mstream falls back to its in-memory job store.
Services created through the config file or the /services API can survive restarts by enabling a persistent service registry. Point the registry at a MongoDB service and tell mstream where to read the encryption key that protects the stored definitions:
[system]
# Optional: override where the AES-256 key is stored; defaults to ./mstream.key
encryption_key_path = "./mstream-services.key"
[system.service_lifecycle]
service_name = "mongodb-source"
resource = "mstream-services"- service_name must reference an existing MongoDB service; mstream reuses the same client and database.
- resource is the collection that will hold the encrypted service documents.
- encryption_key_path is the 32-byte AES-256 key file. If it is missing, mstream will generate it with strict file permissions; back it up because it is required to decrypt persisted services.
- Instead of storing the key on disk you can set the
MSTREAM_ENC_KEYenvironment variable to the hex-encoded key (handy for container secrets managers).
If you skip [system.service_lifecycle], service definitions remain in-memory and will be rebuilt from the config file on each restart.
With service_lifecycle configured, service definitions that are registered via config or API calls are written to MongoDB, restored on startup, and stay available to jobs even if the process restarts.
graph LR
Source[Source] --> |Event| Middleware[Middleware]
Middleware --> |Transformed Event| Sink[Sink]
subgraph Pipeline
Source
Middleware
Sink
end
- Source: The origin of data (e.g., a MongoDB Change Stream, Kafka topic).
- Sink: The destination for data (e.g., a Kafka topic, HTTP endpoint).
- Middleware: Optional processing steps that sit between source and sink to transform, enrich, or filter data.
mstream uses a clear input/output encoding model:
- Input Encoding: The format data is received in (e.g.,
avrofrom Kafka). - Output Encoding: The format data is transformed to before the next step (e.g.,
jsonfor a sink).
[[connectors]]
# Read Avro from Kafka -> Convert to JSON -> Write to MongoDB as BSON
source = { service_name = "kafka", resource = "events", input_encoding = "avro", output_encoding = "json" }
sinks = [
{ service_name = "mongo", resource = "events_archive", output_encoding = "bson" }
]For high-throughput scenarios, enable batch processing to group events:
[[connectors]]
name = "high-volume-sync"
batch = { kind = "count", size = 100 }
source = { service_name = "mongo", resource = "logs", output_encoding = "bson" }
sinks = [
{ service_name = "kafka", resource = "logs-batch", output_encoding = "json" }
]Important Considerations:
- MongoDB Requirements: When using batching with a MongoDB sink, the output encoding must be set to
bsonto enable optimized bulk writes. - General Batching: Batching is supported for all sinks. For non-MongoDB sinks (Kafka, PubSub), the batch is typically emitted as an array of events in the configured output encoding (e.g., JSON array).
- Middleware: Middleware services must be capable of handling batched data (arrays) in the format provided.
- Schema Validation: Schema validation is applied to each event in the batch individually before passing over.
- Resource Usage: The batch size directly affects memory usage and should be tuned based on available resources.
- Latency: For time-sensitive events, lower batch sizes may be preferable to reduce latency.
Transform data on the fly using HTTP services or embedded scripts.
Send events to an external API for processing:
middlewares = [
{ service_name = "my-api", resource = "enrich-user", output_encoding = "json" }
]Write custom transformation logic in Rhai scripts for high-performance, safe execution:
[[services]]
provider = "udf"
name = "script-engine"
engine = { kind = "rhai" }
script_path = "./scripts"
[[connectors]]
middlewares = [
{ service_name = "script-engine", resource = "anonymize.rhai", output_encoding = "json" }
]Example Script (anonymize.rhai):
fn transform(data, attributes) {
// Mask email address
if "email" in data {
data.email = mask_email(data.email);
}
// Add timestamp
data.processed_at = timestamp_ms();
result(data, attributes)
}
| Type | Service | Notes |
|---|---|---|
| Source | MongoDB | Change Streams (v6.0+) |
| Source | Kafka | Consumer groups, offset management |
| Source | Google PubSub | Subscriptions |
| Sink | MongoDB | Insert/Update/Delete operations |
| Sink | Kafka | Producer |
| Sink | Google PubSub | Publisher |
| Sink | HTTP | POST requests |
mstream exposes a REST API for monitoring and management (default port: 8787).
GET /jobs: List all jobs along with their desired and current state.POST /jobs: Start a new job by providing a connector configuration.POST /jobs/{name}/restart: Restart an existing job using its stored pipeline definition.POST /jobs/{name}/stop: Stop a running job (the configuration remains stored for future use).
GET /services: List all configured services and their usagePOST /services: Add a new serviceDELETE /services/{name}: Remove a service (if not in use)
Service List Response Example:
[
{
"provider": "mongodb",
"name": "etl-test-data",
"connection_string": "*****",
"db_name": "etl-tests",
"used_by_jobs": [
"integration-test"
]
},
{
"provider": "kafka",
"name": "kafka-prod",
"bootstrap.servers": "kafka:9092",
"sasl.password": "*****",
"used_by_jobs": []
}
]When using Google Cloud PubSub as a sink, mstream adds the following attributes to the message:
| Attribute | Description |
|---|---|
operation_type |
Event type: insert, update, delete |
database |
MongoDB database name (for Mongo sources) |
collection |
MongoDB collection name (for Mongo sources) |
When batch processing is enabled for a MongoDB sink, events are stored as a single document with an items array:
{
"items": [
{ "id": 1, "name": "Event 1" },
{ "id": 2, "name": "Event 2" }
]
}The schema_id field is optional in most cases and follows these rules:
- Avro encoding: A schema reference is required whenever Avro encoding is used.
- Schema inheritance: If no
schema_idis specified, the component will use the schema defined at the most recent previous step. - Source schema: If a schema is defined at the source, it will be applied to all steps unless overridden.
| Source Format | Target Format | Notes |
|---|---|---|
| BSON | BSON | Direct passthrough |
| BSON | JSON | Serializes BSON to JSON |
| BSON | Avro | Requires schema_id with Avro schema |
| JSON | JSON | Passthrough |
| JSON | BSON | Parses JSON to BSON |
| JSON | Avro | Requires schema_id with Avro schema |
| Avro | Avro | Validates against schema if provided |
| Avro | JSON | Deserializes Avro to JSON |
| Avro | BSON | Converts Avro records to BSON documents |
# Spawn mongo cluster in docker
make docker-db-up
make db-check
# Run the app with debug logging
make run-debug
# Run unit tests
make unit-tests
# Run integration tests (requires local mongo and GCP setup)
make integration-testsThis project is licensed under the MIT License - see the LICENSE file for details.