Splitr is a lightweight, high-performance Spring Boot library designed to implement the Request-Response pattern over Kafka. It allows microservices to execute synchronous, typed queries across distributed boundaries while maintaining idempotency and resilience.
Splitr supports three distinct communication patterns: Sync Request-Response, Async Broadcast (Kafka), and In-Memory Events.
This model bridges the gap between asynchronous messaging and synchronous execution requirements by creating a temporary bridge between two services.
sequenceDiagram
participant User
participant ServiceA as Service-A (Publisher)
participant Kafka as Kafka Topic
participant ServiceB as Service-B (Consumer)
User->>ServiceA: GET /orders/1 (Query)
ServiceA->>ServiceA: Register ID in SyncRegistry
ServiceA->>Kafka: Send Request (ID, callbackUrl, payload)
Note over ServiceA: Thread Blocked (Waiting...)
Kafka->>ServiceB: Consume Message
ServiceB->>ServiceB: Idempotency Check & Handling
ServiceB->>ServiceA: HTTP POST (callbackUrl/query) with Result
ServiceA->>ServiceA: Match ID & Complete Promise
Note over ServiceA: Thread Released
ServiceA->>User: 200 OK (Data)
Used to broadcast state changes across the system. It follows a "fire-and-forget" approach where no response is expected from the consumers. Multiple services can listen to the same event independently.
graph LR
Publisher[Service-A Publisher] -- "publish(Event)" --> Topic((Kafka Event Topic))
Topic -- "Broadcast" --> Consumer1[Service-B: Handler 1]
Topic -- "Broadcast" --> Consumer2[Service-C: Handler 1]
Topic -- "Broadcast" --> Consumer3[Service-C: Handler 2]
subgraph Service-C Logic
Consumer2
Consumer3
end
Designed for internal application logic triggers. It executes entirely In-Memory within the same JVM, providing the benefits of loose coupling without network overhead.
sequenceDiagram
participant App as Business Logic
participant DEBus as DomainEventBus
participant H1 as Local Handler A
participant H2 as Local Handler B
App->>DEBus: arise(DomainEvent)
Note right of DEBus: "ARISE!" - Local Dispatch
par Execute Handlers
DEBus->>H1: handle(event)
DEBus->>H2: handle(event)
end
DEBus-->>App: Execution Finished
// Publisher: Blocking call
String result = queryBus.publishSync(new OrderQuery("123"), String.class);
// Consumer: Implementation
@Component
public class OrderQueryHandler extends BaseQueryHandler<OrderQuery> {
@Override
public String handle(OrderQuery q) { return "Data: " + q.orderId(); }
}// Publisher: Blocking call
String result = commandBus.publishSync(new OrderCommand("123"), String.class);
// Consumer: Implementation
@Component
public class OrderCommandHandler extends BaseCommandHandler<OrderCommand> {
@Override
public String handle(OrderCommand q) { return "Data: " + q.orderId(); }
}// Publisher: Fire-and-forget to Kafka
eventBus.publish(new OrderProcessedEvent("123"));
@Component
public class OrderEventHandler extends BaseEventHandler<OrderProcessedEvent> {
@Override
public void onEvent(OrderProcessedEvent payload) {
log.info("OrderProcessedEvent in OrderEventHandler processed with id: " + payload.orderId());
}
}// Publisher: Trigger local handlers
domainEventBus.arise(new OrderDomainEvent("123"));
// Consumer: Local Listener
@Component
public class InventoryHandler extends BaseDomainEventHandler<OrderDomainEvent> {
@Override
public void onEvent(OrderDomainEvent event) {
log.info("Adjusting local inventory for: {}", event.getId());
}
}a45db290-b9d4-4cb4-a5cd-ae19bd352b1b{
"id": "a45db290-b9d4-4cb4-a5cd-ae19bd352b1b",
"type": "tr.kontas.splitr.test.CreateOrderCommand",
"payload": "{\"productName\":\"Laptop\",\"quantity\":1}",
"callbackUrl": "http://localhost:8083/internal/query/callback",
"sentAtEpochMs": 1766863517525,
"timeoutMs": 3000,
"sync": true
}{
"__TypeId__": "tr.kontas.splitr.dto.CommandRequest"
}- Synchronous over Kafka: Blocking local threads for remote responses, mimicking REST over message brokers.
- Automatic Dispatching: Just implement
BaseQueryHandler<T>, and Splitr handles the routing. - Idempotency Engine: Built-in LRU cache to prevent "at-least-once" delivery side effects.
- Type Safety: Full support for polymorphic queries via Jackson Type Headers.
- Configurable Timeouts: Global or per-request timeout management.
Enable the bus and specify where to receive results.
splitr:
publisher:
enabled: true
callback-url: http://service-a:8080/internal/query/callback
bus:
kafka:
topic: tr.kontas.splitr.query.topic
default-timeout: 5000 # ms
Enable the processor and set idempotency limits.
splitr:
consumer:
enabled: true
idempotency:
max-size: 1000 # record size
| Property | Default | Description |
|---|---|---|
splitr.kafka.publisher.enabled |
false |
Enables QueryBus and Callback endpoint. |
splitr.kafka.consumer.enabled |
false |
Enables Kafka listeners and Dispatcher. |
splitr.rabbit.publisher.enabled |
false |
Enables QueryBus and Callback endpoint. |
splitr.rabbit.consumer.enabled |
false |
Enables RabbitMQ listeners and Dispatcher. |
splitr.inmemory.enabled |
false |
Enables inmemory listeners and Dispatcher and Bus. |
splitr.callback-url |
- (Required) | The HTTP endpoint for the publisher's callback. (ex: http://localhost:${server.port}/internal/%s/callback) (%s replaced by either command or query) |
splitr.bus.default-timeout |
10000 |
Default wait time in ms for sync query and commands. |
splitr.bus.kafka.command.topic |
tr.kontas.splitr.command.topic |
Kafka Command topic. |
splitr.bus.kafka.query.topic |
tr.kontas.splitr.query.topic |
Kafka query topic. |
splitr.bus.kafka.event.topic |
tr.kontas.splitr.event.topic |
Kafka event topic. |
splitr.rabbit.command.queue |
tr.kontas.splitr.command.queue |
RabbitMQ command queue. |
splitr.rabbit.event.queue |
tr.kontas.splitr.event.queue |
RabbitMQ event queue. |
splitr.rabbit.query.queue |
tr.kontas.splitr.query.queue |
RabbitMQ query queue. |
splitr.idempotency.max-size |
100 |
Default LruStore Idempotency Key Size Limit. |
splitr.idempotency.ttl-ms |
100 |
Default LruStore Idempotency Key Store TTL as Milliseconds. |
splitr.registry.max-size |
10000 |
SyncRegistry max thread count. |
splitr.registry.cleanup-interval-ms |
10000 |
SyncRegistry clear expired threads interval timeout as Milliseconds. |
- Query Bus: Distributed request-response pattern.
- Command Bus: Asynchronous command dispatching.
- Event Bus: Pub/Sub broadcast support for events over queue/bus.
- Note: Fan-out pattern implementation. Multiple listeners for a single event with independent consumer groups.
- Domain Events InMemory Bus: Pub/Sub broadcast support for domain events.
- Dead Letter Queue (DLQ): Automatic failure routing to
.DLTtopics for commands and events.- Note: Catch-all error handling in listeners to prevent infinite retry loops and partition blocking.
- DLQ Retry Jobs: Scheduled background jobs to consume from DLQ and re-publish to main topics.
- Customization: User-defined Cron Expressions for retry intervals.
- Logic: Smart back-off strategy; use
x-retry-countheaders to prevent "poison pill" messages from circulating forever.
- Saga Support: Orchestration-based distributed transaction management.
- Note: State machine implementation to manage compensations (undo operations) when a step in the flow fails.
- Idempotency Guard: Distributed store to prevent duplicate processing.
- Circuit Breaker: Resilience4j integration to protect the bus.
- Note: Automatically trip the circuit if the
callback-urlor target microservice is down, preventing resource exhaustion.
- Note: Automatically trip the circuit if the
- Payload Encryption: Optional AES encryption for sensitive Kafka record data.
- Note: Field-level or full-body encryption to ensure PII (Personally Identifiable Information) security at rest in Kafka brokers.
- Schema Validation: JSR-303 (Hibernate Validator) support for incoming DTOs.
- Note: Fail-fast mechanism; validate the command/query payload before it ever touches the Kafka topic.
- Distributed Tracing: Micrometer/Brave/Zipkin integration.
- Note: Propagation of
Span-IDandTrace-IDacross different services to visualize the entire request flow.
- Note: Propagation of
- Metrics & Dashboards: Micrometer-based Prometheus metrics.
- Note: Real-time tracking of "Bus Throughput", "Average Response Latency", and "DLQ Error Rates".
- Audit Log: Persistent storage for all dispatched messages.
- Note: A separate database or search index (Elasticsearch) to search historical commands and see who triggered what, when.
- Kafka Transport: Primary high-throughput transport layer.
- RabbitMQ Transport: AMQP-based alternative.
- Note: Support for environments where lightweight broker logic and complex routing (Exchange types) are preferred.
Developed and maintained by BurakKontas. Feel free to submit issues or pull requests to improve the bus performance or add new features.