Vendor-agnostic messaging and event streaming abstraction for Go, covering queues, pub/sub and streams.
- Driver/Registry Pattern: Similar to GORM, import only the drivers you need
- Zero Dependency Bloat: Each driver is a separate package with its own dependencies
- Unified API: Same interface for all messaging systems
- Auto-Registration: Drivers register themselves via
init()functions - Type-Safe: Full Go type safety with generics-ready design
- Context Support: Full context.Context support for timeouts and cancellation
- Retry Mechanism: Built-in exponential backoff retry with jitter support
- Circuit Breaker: Protect against cascading failures with automatic recovery
- Automatic Reconnection: Auto-reconnect with subscription restoration
- Dead Letter Queue: Automatic DLQ routing for failed messages
- Health Monitoring: Comprehensive health checks with Kubernetes probes
- Middleware System: Pluggable middleware for logging, metrics, tracing, rate limiting
| Driver | Package | Status |
|---|---|---|
| Apache Kafka | drivers/kafka |
✅ Ready |
| RabbitMQ | drivers/rabbitmq |
✅ Ready |
| NATS | drivers/nats |
✅ Ready |
| Redis Pub/Sub & Streams | drivers/redis |
✅ Ready |
| Google Cloud Pub/Sub | drivers/gcppubsub |
✅ Ready |
| AWS SQS | drivers/awssqs |
✅ Ready |
| Azure Service Bus | drivers/azureservicebus |
✅ Ready |
| Apache Pulsar | drivers/pulsar |
✅ Ready |
go get github.com/teamlify-devx/unimqpackage main
import (
"context"
"log"
"github.com/teamlify-devx/unimq"
// Import ONLY the drivers you need!
// This is the key to avoiding dependency bloat
_ "github.com/teamlify-devx/unimq/drivers/kafka"
_ "github.com/teamlify-devx/unimq/drivers/rabbitmq"
)
func main() {
ctx := context.Background()
// Create a Kafka client
client, err := unimq.Open(ctx, "kafka", &unimq.Config{
Brokers: []string{"localhost:9092"},
GroupID: "my-consumer-group",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Publish a message
err = client.Publish(ctx, "my-topic", []byte("Hello, World!"))
if err != nil {
log.Fatal(err)
}
// Subscribe to messages
err = client.Subscribe(ctx, "my-topic", func(ctx context.Context, msg *unimq.Message) error {
log.Printf("Received: %s", string(msg.Data))
return msg.Ack()
})
if err != nil {
log.Fatal(err)
}
}UniMQ follows a Driver/Registry pattern similar to GORM's database drivers:
unimq/
├── queue.go # Core interface & registry
├── drivers/
│ ├── kafka/ # Kafka driver (imports IBM/sarama)
│ ├── rabbitmq/ # RabbitMQ driver (imports amqp091-go)
│ ├── nats/ # NATS driver (imports nats-io/nats.go)
│ ├── redis/ # Redis driver (imports go-redis)
│ ├── gcppubsub/ # GCP Pub/Sub driver
│ ├── awssqs/ # AWS SQS driver
│ ├── azureservicebus/ # Azure Service Bus driver
│ └── pulsar/ # Apache Pulsar driver
└── example/
└── test/
├── publisher/ # Publisher CLI example
└── consumer/ # Consumer CLI example
- Registration: Each driver package has an
init()function that registers itself:
// drivers/kafka/kafka.go
func init() {
unimq.Register(&KafkaDriver{})
}- Import: Users import only the drivers they need:
import (
_ "github.com/teamlify-devx/unimq/drivers/kafka" // Only Kafka dependencies
_ "github.com/teamlify-devx/unimq/drivers/rabbitmq" // Only RabbitMQ dependencies
)- Open: Use the unified
Openfunction to create clients:
client, err := unimq.Open(ctx, "kafka", config)type Queue interface {
// Publishing
Publish(ctx context.Context, topic string, data []byte) error
PublishMessage(ctx context.Context, topic string, msg *Message) error
PublishBatch(ctx context.Context, topic string, messages [][]byte) error
// Subscribing
Subscribe(ctx context.Context, topic string, handler MessageHandler) error
SubscribeMultiple(ctx context.Context, topics []string, handler MessageHandler) error
// Connection management
Close() error
Ping(ctx context.Context) error
Driver() string
}type Message struct {
ID string // Unique message identifier
Topic string // Topic/queue name
Data []byte // Message payload
Headers map[string]string // Message headers/metadata
Timestamp int64 // Message timestamp
Partition int32 // Partition (for Kafka)
Offset int64 // Offset (for Kafka)
}
// Acknowledgment
msg.Ack() // Acknowledge message
msg.Nack(requeue bool) // Negative acknowledgetype Config struct {
Brokers []string // Broker addresses
URL string // Connection URL
Topic string // Default topic
GroupID string // Consumer group ID
ClientID string // Client identifier
Username string // Authentication
Password string // Authentication
TLS bool // Enable TLS
TLSSkipVerify bool // Skip TLS verification
CertFile string // TLS certificate
KeyFile string // TLS key
CAFile string // CA certificate
Extra map[string]interface{} // Driver-specific options
}config := &unimq.Config{
Brokers: []string{"localhost:9092"},
GroupID: "my-group",
Extra: map[string]interface{}{
"auto.offset.reset": "earliest",
"acks": -1, // all
"compression.type": "snappy",
},
}config := &unimq.Config{
URL: "amqp://guest:guest@localhost:5672/",
Extra: map[string]interface{}{
"exchange": "my-exchange",
"exchange.type": "topic",
"queue.durable": true,
"prefetch_count": 10,
},
}config := &unimq.Config{
URL: "nats://localhost:4222",
GroupID: "my-queue-group",
Extra: map[string]interface{}{
"jetstream": true,
"stream.name": "my-stream",
},
}config := &unimq.Config{
URL: "redis://localhost:6379",
Extra: map[string]interface{}{
"streams.enabled": true, // Use Redis Streams
"streams.consumer_group": "my-group",
"db": 0,
},
}config := &unimq.Config{
Extra: map[string]interface{}{
"region": "us-east-1",
"visibility_timeout": 30,
"wait_time_seconds": 20,
},
}config := &unimq.Config{
ClientID: "my-project-id",
GroupID: "my-subscription",
Extra: map[string]interface{}{
"credentials_file": "/path/to/credentials.json",
"subscription.ack_deadline": 10,
},
}config := &unimq.Config{
URL: "my-namespace.servicebus.windows.net",
Username: "my-shared-access-key-name",
Password: "my-shared-access-key",
Extra: map[string]interface{}{
"subscription.name": "my-subscription",
},
}config := &unimq.Config{
URL: "pulsar://localhost:6650",
GroupID: "my-subscription",
Extra: map[string]interface{}{
"subscription.type": "shared",
"subscription.initial_position": "earliest",
},
}UniMQ includes a built-in retry mechanism with exponential backoff and jitter support.
// Wrap any queue with retry logic
client, err := unimq.Open(ctx, "kafka", config)
if err != nil {
log.Fatal(err)
}
// Wrap with default retry policy (3 retries, exponential backoff)
retryableClient := unimq.WrapWithRetry(client, unimq.DefaultRetryPolicy())
// All operations now have automatic retry
err = retryableClient.Publish(ctx, "topic", []byte("message"))// Default policy: 3 retries, 100ms initial interval, 2x multiplier
policy := unimq.DefaultRetryPolicy()
// No retry
policy := unimq.NoRetryPolicy()
// Aggressive retry for critical operations
policy := unimq.AggressiveRetryPolicy() // 10 retries, 50ms initial
// Custom policy
policy := &unimq.RetryPolicy{
MaxRetries: 5, // Max retry attempts (-1 = infinite)
InitialInterval: 100 * time.Millisecond,
MaxInterval: 30 * time.Second, // Cap for backoff
Multiplier: 2.0, // Exponential factor
RandomizationFactor: 0.5, // Jitter (0-1)
RetryIf: func(err error) bool { // Custom retry condition
return !errors.Is(err, unimq.ErrNonRetryableError)
},
OnRetry: func(attempt int, err error, nextInterval time.Duration) {
log.Printf("Retry attempt %d after error: %v (next in %v)", attempt, err, nextInterval)
},
}// Retry the initial connection
client, err := unimq.OpenWithRetry(ctx, "kafka", config, unimq.AggressiveRetryPolicy())
// Or open and wrap in one call
retryableClient, err := unimq.OpenWithRetryWrapped(ctx, "kafka", config,
unimq.AggressiveRetryPolicy(), // Connection retry policy
unimq.DefaultRetryPolicy(), // Operation retry policy
)// Wrap message handlers with retry logic
handler := unimq.RetryableMessageHandler(policy, func(ctx context.Context, msg *unimq.Message) error {
// Process message - will be retried on error
return processMessage(msg)
})
client.Subscribe(ctx, "topic", handler)// Use Retry directly for any operation
err := unimq.Retry(ctx, policy, func() error {
return someOperation()
})
// With result
result, err := unimq.RetryWithResult(ctx, policy, func() (MyType, error) {
return someOperationWithResult()
})With default settings (InitialInterval: 100ms, Multiplier: 2.0, MaxInterval: 10s):
| Attempt | Interval (without jitter) |
|---|---|
| 1 | 100ms |
| 2 | 200ms |
| 3 | 400ms |
| 4 | 800ms |
| 5 | 1.6s |
| 6 | 3.2s |
| 7 | 6.4s |
| 8+ | 10s (capped) |
With RandomizationFactor: 0.5, actual intervals will be ±50% of these values to prevent thundering herd.
UniMQ includes a circuit breaker to protect against cascading failures when a service is unavailable.
┌─────────────────────────────────────────┐
│ │
▼ │
┌─────────┐ failure threshold ┌────────┐ │
│ CLOSED │ ─────────────────▶ │ OPEN │ │
└─────────┘ └────────┘ │
▲ │ │
│ │ timeout
│ success threshold ▼ │
│ ┌───────────┐ │
└──────────────────────── │ HALF-OPEN │ ─┘
failure └───────────┘
- Closed: Normal operation, requests pass through
- Open: Circuit is tripped, requests fail fast with
ErrCircuitOpen - Half-Open: After timeout, limited requests test if service recovered
// Wrap any queue with circuit breaker
client, _ := unimq.Open(ctx, "kafka", config)
cbClient := unimq.WrapWithCircuitBreaker(client, unimq.DefaultCircuitBreakerConfig())
// Operations are now protected
err := cbClient.Publish(ctx, "topic", data)
if errors.Is(err, unimq.ErrCircuitOpen) {
log.Println("Circuit is open, service unavailable")
}config := &unimq.CircuitBreakerConfig{
Name: "kafka-producer", // Identifier for logging
FailureThreshold: 5, // Failures before opening
SuccessThreshold: 2, // Successes to close from half-open
Timeout: 30 * time.Second, // Time before half-open
MaxHalfOpenRequests: 3, // Concurrent requests in half-open
// Optional: Custom success checker
IsSuccessful: func(err error) bool {
// Don't count 404 as failure
return err == nil || errors.Is(err, ErrNotFound)
},
// Optional: State change callback
OnStateChange: func(name, from, to string) {
log.Printf("Circuit %s: %s -> %s", name, from, to)
},
// Optional: Fallback when circuit is open
Fallback: func(ctx context.Context, err error) error {
return getCachedResponse()
},
}publishConfig := &unimq.CircuitBreakerConfig{
FailureThreshold: 3,
Timeout: 10 * time.Second,
}
consumeConfig := &unimq.CircuitBreakerConfig{
FailureThreshold: 5,
Timeout: 30 * time.Second,
}
cbClient := unimq.WrapWithCircuitBreakers(client, publishConfig, consumeConfig)cb := cbClient.PublishCircuitBreaker()
// Check state
fmt.Println("State:", cb.State()) // closed, open, or half-open
// Get counts
failures, successes := cb.Counts()
// Manually trip (for testing/maintenance)
cb.Trip()
// Manually reset
cb.Reset()Combine retry and circuit breaker for maximum resilience:
// Create a resilient client with both protections
resilientClient := unimq.WrapWithResilience(
client,
unimq.DefaultRetryPolicy(),
unimq.DefaultCircuitBreakerConfig(),
)
// Or open with full resilience in one call
resilientClient, err := unimq.OpenWithResilience(ctx, "kafka", config,
unimq.AggressiveRetryPolicy(), // For connection and operations
unimq.DefaultCircuitBreakerConfig(), // Circuit breaker settings
)
// Operations are now:
// 1. Protected by circuit breaker (fail fast if open)
// 2. Retried with exponential backoff on transient failures
err = resilientClient.Publish(ctx, "topic", data)- Circuit Closed: Operations retry on failure with exponential backoff
- Circuit Open: Operations fail immediately with
ErrCircuitOpen(no retry) - Circuit Half-Open: Limited operations test recovery, retry on transient failures
UniMQ provides automatic reconnection with subscription restoration when the connection is lost.
// Open with automatic reconnection
client, err := unimq.OpenWithReconnect(ctx, "kafka", config, unimq.DefaultReconnectConfig())
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Subscriptions are automatically restored after reconnection
client.Subscribe(ctx, "topic", handler)
// Publish - will wait for reconnection if disconnected
client.Publish(ctx, "topic", data)config := &unimq.ReconnectConfig{
InitialInterval: 1 * time.Second, // Initial wait before first retry
MaxInterval: 30 * time.Second, // Maximum backoff interval
Multiplier: 2.0, // Exponential backoff multiplier
MaxReconnects: -1, // -1 = infinite retries
PingInterval: 10 * time.Second, // Health check frequency
PingTimeout: 5 * time.Second, // Health check timeout
// Optional callbacks
OnStateChange: func(from, to string) {
log.Printf("Connection: %s -> %s", from, to)
},
OnReconnect: func(attempt int, err error) {
log.Printf("Reconnecting (attempt %d): %v", attempt, err)
},
OnReconnected: func(attempt int) {
log.Printf("Reconnected after %d attempts", attempt)
},
OnReconnectFailed: func(err error) {
log.Printf("Reconnection failed: %v", err)
},
}// Check connection status
state := client.State()
// "connected", "disconnected", "reconnecting", "closed"
isConnected := client.IsConnected()
// Get last error
err := client.LastError()
// Get reconnection attempt count
count := client.ReconnectCount()// Force reconnection (for testing or maintenance)
client.ForceReconnect()
// Clear stored subscriptions
client.ClearSubscriptions()
// Get underlying queue
underlyingQueue := client.Unwrap()// Wrap an existing queue with reconnection
existingClient, _ := unimq.Open(ctx, "kafka", config)
reconnectableClient := unimq.WrapWithReconnect(
existingClient,
"kafka", // driver name needed for reconnection
config, // config needed for reconnection
unimq.DefaultReconnectConfig(),
)Combine all resilience features (Retry + Circuit Breaker + Reconnection) for maximum reliability:
// Open with full resilience
client, err := unimq.OpenFullyResilient(ctx, "kafka", config,
unimq.DefaultRetryPolicy(), // Retry configuration
unimq.DefaultCircuitBreakerConfig(), // Circuit breaker configuration
unimq.DefaultReconnectConfig(), // Reconnection configuration
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// All operations now have:
// 1. Automatic reconnection with subscription restoration
// 2. Circuit breaker protection
// 3. Retry with exponential backoff
err = client.Publish(ctx, "topic", data)Request
│
▼
┌──────────────────┐
│ Retry Logic │ ◄── Exponential backoff with jitter
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Circuit Breaker │ ◄── Fail fast when service is down
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Reconnectable │ ◄── Auto-reconnect on connection loss
│ Queue │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Actual Driver │ ◄── Kafka, RabbitMQ, etc.
└──────────────────┘
UniMQ provides automatic Dead Letter Queue routing for messages that fail processing after multiple retries.
// Wrap a queue with DLQ support
client, _ := unimq.Open(ctx, "kafka", config)
dlqClient := unimq.WrapWithDLQ(client, unimq.DefaultDLQConfig())
// Subscribe - failed messages auto-route to DLQ after retries
dlqClient.Subscribe(ctx, "orders", func(ctx context.Context, msg *unimq.Message) error {
if err := processOrder(msg); err != nil {
return err // Will retry, then DLQ after max retries
}
return nil // Success - message acknowledged
})config := &unimq.DLQConfig{
// Topic naming
TopicSuffix: ".dlq", // orders -> orders.dlq
TopicPrefix: "dlq.", // orders -> dlq.orders
FixedTopic: "global-dlq", // All failed messages to one topic
// Retry behavior
MaxRetries: 3, // Retries before DLQ (0 = no retries)
RetryInterval: 1 * time.Second, // Wait between retries
// Message content
IncludeOriginalMessage: true, // Include original data in DLQ
// Callbacks
OnRetry: func(msg *unimq.Message, attempt int, err error) {
log.Printf("Retry %d for message %s: %v", attempt, msg.ID, err)
},
OnDLQ: func(msg *unimq.DLQMessage) {
log.Printf("Message sent to DLQ: %s (retries: %d)",
msg.OriginalMessageID, msg.RetryCount)
alertOps(msg)
},
// Custom DLQ condition
ShouldDLQ: func(err error) bool {
// Don't DLQ validation errors
return !errors.Is(err, ErrValidation)
},
}When a message is sent to DLQ, it includes metadata:
{
"original_topic": "orders",
"original_message_id": "msg-123",
"original_data": "eyJvcmRlcl9pZCI6IDEyM30=",
"original_headers": {"key": "value"},
"error": "database connection failed",
"retry_count": 3,
"first_failed_at": "2024-01-15T10:00:00Z",
"last_failed_at": "2024-01-15T10:00:05Z",
"sent_to_dlq_at": "2024-01-15T10:00:05Z",
"consumer_id": "consumer-1"
}// Create a DLQ consumer
dlqConsumer := unimq.NewDLQConsumer(dlqQueue, mainQueue, config)
// Subscribe to DLQ with access to both DLQ metadata and original message
dlqConsumer.Subscribe(ctx, "orders.dlq", func(ctx context.Context,
dlqMsg *unimq.DLQMessage, originalMsg *unimq.Message) error {
log.Printf("Failed message: topic=%s, error=%s, retries=%d",
dlqMsg.OriginalTopic, dlqMsg.Error, dlqMsg.RetryCount)
// Fix and reprocess, or alert, or archive
return nil
})// Reprocess all DLQ messages back to original topics
dlqConsumer.Reprocess(ctx, "orders.dlq", nil)
// Reprocess with filter
dlqConsumer.Reprocess(ctx, "orders.dlq", func(msg *unimq.DLQMessage) bool {
// Only reprocess messages that failed due to timeout
return strings.Contains(msg.Error, "timeout")
})
// Reprocess in batches
dlqConsumer.ReprocessBatch(ctx, "orders.dlq", 100, nil)// Get statistics
processed, failed, sentToDLQ := dlqClient.Stats()
// Get DLQ topic name
dlqTopic := dlqClient.DLQTopic("orders") // "orders.dlq"
// Set consumer ID for tracking
dlqClient.SetConsumerID("order-processor-1")
// Use separate queues for main and DLQ
dlqClient := unimq.WrapWithSeparateDLQ(mainQueue, dlqQueue, config)Message Received
│
▼
┌─────────────┐
│ Process │
└──────┬──────┘
│
Success?
╱ ╲
Yes No
│ │
▼ ▼
Ack Retry?
╱ ╲
Yes No (max reached)
│ │
▼ ▼
Wait & Send to DLQ
Retry │
▼
Ack Original
UniMQ provides comprehensive health monitoring with Kubernetes-compatible liveness and readiness probes.
// Wrap a queue with health monitoring
client, _ := unimq.Open(ctx, "kafka", config)
healthyClient := unimq.WrapWithHealth(client, unimq.DefaultHealthCheckConfig())
defer healthyClient.Close()
// Health checks run automatically in background
// Check current status
if healthyClient.IsHealthy() {
fmt.Println("Queue is healthy")
}
// Get detailed health info
details := healthyClient.GetHealth()
fmt.Printf("Status: %s, Uptime: %v\n", details.Status, details.Uptime)config := &unimq.HealthCheckConfig{
Enabled: true, // Enable health monitoring
Interval: 10 * time.Second, // Check interval
Timeout: 5 * time.Second, // Check timeout
FailureThreshold: 3, // Failures before unhealthy
SuccessThreshold: 1, // Successes before healthy
InitialDelay: 5 * time.Second, // Delay before first check
// Callbacks
OnHealthChange: func(from, to string, details *unimq.HealthDetails) {
log.Printf("Health changed: %s -> %s", from, to)
if to == unimq.HealthStatusUnhealthy {
alertOps("Queue is unhealthy!")
}
},
OnHealthCheck: func(details *unimq.HealthDetails) {
metrics.RecordHealthCheck(details)
},
// Custom health checks
CustomChecks: []unimq.HealthChecker{
unimq.NewHealthCheck("database", func(ctx context.Context) error {
return db.Ping(ctx)
}),
unimq.NewHealthCheck("redis", func(ctx context.Context) error {
return redis.Ping(ctx).Err()
}),
},
}| Status | Description |
|---|---|
healthy |
All components working |
degraded |
Some issues, but operational |
unhealthy |
Critical failure |
unknown |
Not yet checked |
// Get HTTP handlers for Kubernetes probes
handler := healthyClient.HTTPHandler()
mux := http.NewServeMux()
handler.RegisterRoutes(mux, "")
// Registers:
// /health - Full health details
// /live - Kubernetes liveness probe
// /ready - Kubernetes readiness probe
http.ListenAndServe(":8080", mux)Kubernetes Deployment:
livenessProbe:
httpGet:
path: /live
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5{
"status": "healthy",
"timestamp": "2024-01-15T10:00:00Z",
"uptime_ns": 3600000000000,
"components": {
"queue": {
"name": "queue",
"status": "healthy",
"last_checked": "2024-01-15T10:00:00Z",
"duration_ns": 15000000
},
"database": {
"name": "database",
"status": "healthy",
"last_checked": "2024-01-15T10:00:00Z"
}
},
"total_checks": 360,
"successful_checks": 359,
"failed_checks": 1,
"is_live": true,
"is_ready": true,
"driver": "kafka",
"metadata": {
"version": "1.0.0",
"environment": "production"
}
}// Create standalone health monitor
monitor := unimq.NewHealthMonitor(queue, config)
monitor.Start()
defer monitor.Stop()
// Immediate check
details, err := monitor.Check(ctx)
// Add/remove checks dynamically
monitor.AddCheck(unimq.NewHealthCheck("external-api", checkFunc))
monitor.RemoveCheck("external-api")
// Set custom metadata
monitor.SetMetadata("version", "1.0.0")
monitor.SetMetadata("pod", os.Getenv("POD_NAME"))Monitor multiple queues together:
aggregate := unimq.NewAggregateHealthMonitor()
aggregate.Add("orders-queue", ordersMonitor)
aggregate.Add("notifications-queue", notificationsMonitor)
// Get combined health status
details := aggregate.GetHealth()
// HTTP handler for aggregate health
http.HandleFunc("/health", aggregate.HTTPHandler())┌─────────────────────────────────────────────────────┐
│ Health Monitor │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Queue │ │ Database │ │ Redis │ │
│ │ Ping │ │ Check │ │ Check │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └───────────────┴───────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────┐ │
│ │ Evaluate Status │ │
│ │ (all pass = healthy) │ │
│ └───────────┬───────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ ▼ ▼ ▼ │
│ OnChange /health /live /ready │
│ Callback JSON Kubernetes │
└─────────────────────────────────────────────────────┘
UniMQ provides a flexible middleware system for both publish and consume operations.
// Wrap queue with middleware support
client, _ := unimq.Open(ctx, "kafka", config)
mq := unimq.WrapWithMiddleware(client)
// Add publish middleware
mq.UsePublish(unimq.LoggingMiddleware(nil))
mq.UsePublish(unimq.MetricsMiddleware(&metricsConfig))
mq.UsePublish(unimq.RateLimiterMiddleware(nil))
// Add consume middleware
mq.UseConsume(unimq.LoggingConsumeMiddleware(nil))
mq.UseConsume(unimq.RecoveryMiddleware(onPanic))
mq.UseConsume(unimq.TimeoutMiddleware(30 * time.Second))| Middleware | Type | Description |
|---|---|---|
LoggingMiddleware |
Publish | Log publish operations |
LoggingConsumeMiddleware |
Consume | Log consume operations |
MetricsMiddleware |
Publish | Track publish metrics |
MetricsConsumeMiddleware |
Consume | Track consume metrics |
RateLimiterMiddleware |
Publish | Rate limit publishing |
TracingMiddleware |
Publish | Distributed tracing |
TracingConsumeMiddleware |
Consume | Distributed tracing |
RecoveryMiddleware |
Consume | Panic recovery |
TimeoutMiddleware |
Consume | Handler timeout |
RetryConsumeMiddleware |
Consume | Retry failed handlers |
FilterMiddleware |
Consume | Filter messages |
DeduplicationMiddleware |
Consume | Skip duplicate messages |
ValidationMiddleware |
Consume | Validate messages |
TransformMiddleware |
Consume | Transform messages |
config := &unimq.LoggingConfig{
Logger: log.Printf, // Custom logger
LogPublish: true, // Log publishes
LogConsume: true, // Log consumes
LogErrors: true, // Only log errors
IncludeData: false, // Include message data
MaxDataLength: 100, // Truncate data
}
mq.UsePublish(unimq.LoggingMiddleware(config))
mq.UseConsume(unimq.LoggingConsumeMiddleware(config))// Prometheus example
config := &unimq.MetricsConfig{
OnPublish: func(topic string, duration time.Duration, err error, batchSize int) {
publishCounter.WithLabelValues(topic).Inc()
publishDuration.WithLabelValues(topic).Observe(duration.Seconds())
if err != nil {
publishErrors.WithLabelValues(topic).Inc()
}
},
OnConsume: func(topic string, duration time.Duration, err error) {
consumeCounter.WithLabelValues(topic).Inc()
consumeDuration.WithLabelValues(topic).Observe(duration.Seconds())
if err != nil {
consumeErrors.WithLabelValues(topic).Inc()
}
},
}
mq.UsePublish(unimq.MetricsMiddleware(config))
mq.UseConsume(unimq.MetricsConsumeMiddleware(config))config := &unimq.RateLimiterConfig{
Rate: 100, // 100 requests/second
Burst: 10, // Burst size
PerTopic: true, // Per-topic limits
WaitOnLimit: true, // Wait instead of error
MaxWait: time.Second, // Max wait time
OnLimited: func(topic string) {
log.Printf("Rate limited on topic: %s", topic)
},
}
mq.UsePublish(unimq.RateLimiterMiddleware(config))config := &unimq.TracingConfig{
ServiceName: "order-service",
StartSpan: func(ctx context.Context, name string) (context.Context, interface{}) {
ctx, span := tracer.Start(ctx, name)
return ctx, span
},
FinishSpan: func(span interface{}, err error) {
s := span.(trace.Span)
if err != nil {
s.RecordError(err)
}
s.End()
},
InjectHeaders: func(ctx context.Context, headers map[string]string) {
propagator.Inject(ctx, propagation.MapCarrier(headers))
},
ExtractHeaders: func(headers map[string]string) context.Context {
return propagator.Extract(context.Background(),
propagation.MapCarrier(headers))
},
}
mq.UsePublish(unimq.TracingMiddleware(config))
mq.UseConsume(unimq.TracingConsumeMiddleware(config))mq.UseConsume(unimq.RecoveryMiddleware(func(topic string, msg *unimq.Message, recovered interface{}) {
log.Printf("PANIC in handler: topic=%s, msg=%s, panic=%v",
topic, msg.ID, recovered)
alertOps(recovered)
}))// Handler must complete within 30 seconds
mq.UseConsume(unimq.TimeoutMiddleware(30 * time.Second))config := &unimq.DeduplicationConfig{
TTL: time.Hour, // Remember IDs for 1 hour
MaxSize: 10000, // Max IDs to remember
GetKey: func(msg *unimq.Message) string {
return msg.ID // Use message ID
},
}
mq.UseConsume(unimq.DeduplicationMiddleware(config))// Only process high-priority messages
mq.UseConsume(unimq.FilterMiddleware(func(msg *unimq.Message) bool {
return msg.Headers["priority"] == "high"
}))mq.UseConsume(unimq.ValidationMiddleware(func(msg *unimq.Message) error {
if len(msg.Data) == 0 {
return errors.New("empty message")
}
if msg.Headers["version"] != "2" {
return errors.New("unsupported version")
}
return nil
}))mq.UseConsume(unimq.TransformMiddleware(func(msg *unimq.Message) *unimq.Message {
// Decompress data
msg.Data = decompress(msg.Data)
// Add processing timestamp
msg.Headers["processed_at"] = time.Now().String()
return msg
}))// Custom publish middleware
customPublish := func(next unimq.PublishFunc) unimq.PublishFunc {
return func(ctx context.Context, pctx *unimq.PublishContext) error {
// Before publish
log.Printf("Publishing to %s", pctx.Topic)
err := next(ctx, pctx)
// After publish
if err != nil {
log.Printf("Publish failed: %v", err)
}
return err
}
}
// Custom consume middleware
customConsume := func(next unimq.MessageHandler) unimq.MessageHandler {
return func(ctx context.Context, msg *unimq.Message) error {
// Before processing
start := time.Now()
err := next(ctx, msg)
// After processing
log.Printf("Processed in %v", time.Since(start))
return err
}
}
mq.UsePublish(customPublish)
mq.UseConsume(customConsume)// Chain multiple middleware into one
publishChain := unimq.ChainPublishMiddleware(
unimq.LoggingMiddleware(nil),
unimq.MetricsMiddleware(metricsConfig),
unimq.RateLimiterMiddleware(nil),
)
consumeChain := unimq.ChainConsumeMiddleware(
unimq.RecoveryMiddleware(nil),
unimq.TimeoutMiddleware(30*time.Second),
unimq.DeduplicationMiddleware(nil),
)
mq.UsePublish(publishChain)
mq.UseConsume(consumeChain)Publish Request
│
▼
┌─────────────────────────────────────┐
│ Middleware 1 (before) │
│ ↓ │
│ Middleware 2 (before) │
│ ↓ │
│ Middleware 3 (before) │
│ ↓ │
│ ┌─────────────────────────────┐ │
│ │ Actual Publish/Consume │ │
│ └─────────────────────────────┘ │
│ ↓ │
│ Middleware 3 (after) │
│ ↓ │
│ Middleware 2 (after) │
│ ↓ │
│ Middleware 1 (after) │
└─────────────────────────────────────┘
│
▼
Response
# Start services (Kafka, RabbitMQ, etc.)
cd example/test
docker-compose up -d
# Run publisher
cd publisher
go run main.go -p kafka -t test-topic -m "Hello World" -c 10
# Run consumer (in another terminal)
cd consumer
go run main.go -p kafka -t test-topicPublisher:
-p: Provider name (required)-t: Topic name (default: test-topic)-m: Message content-c: Number of messages--batch: Enable batch mode--brokers: Broker addresses--url: Connection URL
Consumer:
-p: Provider name (required)-t: Topic name (default: test-topic)--timeout: Timeout in seconds (0 = unlimited)--max: Maximum messages to consume--auto-ack: Auto-acknowledge messages--group: Consumer group ID
Contributions are welcome! Please feel free to submit a Pull Request.
- Create a new package under
drivers/ - Implement the
Driverinterface - Register via
init()function - Add tests and documentation
MIT License - see LICENSE for details.