Skip to content

Vendor-agnostic messaging and event streaming abstraction for Go, covering queues, pub/sub and streams

License

Notifications You must be signed in to change notification settings

teamlify-devx/unimq

Repository files navigation

UniMQ

Vendor-agnostic messaging and event streaming abstraction for Go, covering queues, pub/sub and streams.

🌟 Features

  • Driver/Registry Pattern: Similar to GORM, import only the drivers you need
  • Zero Dependency Bloat: Each driver is a separate package with its own dependencies
  • Unified API: Same interface for all messaging systems
  • Auto-Registration: Drivers register themselves via init() functions
  • Type-Safe: Full Go type safety with generics-ready design
  • Context Support: Full context.Context support for timeouts and cancellation
  • Retry Mechanism: Built-in exponential backoff retry with jitter support
  • Circuit Breaker: Protect against cascading failures with automatic recovery
  • Automatic Reconnection: Auto-reconnect with subscription restoration
  • Dead Letter Queue: Automatic DLQ routing for failed messages
  • Health Monitoring: Comprehensive health checks with Kubernetes probes
  • Middleware System: Pluggable middleware for logging, metrics, tracing, rate limiting

📦 Supported Drivers

Driver Package Status
Apache Kafka drivers/kafka ✅ Ready
RabbitMQ drivers/rabbitmq ✅ Ready
NATS drivers/nats ✅ Ready
Redis Pub/Sub & Streams drivers/redis ✅ Ready
Google Cloud Pub/Sub drivers/gcppubsub ✅ Ready
AWS SQS drivers/awssqs ✅ Ready
Azure Service Bus drivers/azureservicebus ✅ Ready
Apache Pulsar drivers/pulsar ✅ Ready

🚀 Quick Start

Installation

go get github.com/teamlify-devx/unimq

Basic Usage

package main

import (
    "context"
    "log"

    "github.com/teamlify-devx/unimq"
    
    // Import ONLY the drivers you need!
    // This is the key to avoiding dependency bloat
    _ "github.com/teamlify-devx/unimq/drivers/kafka"
    _ "github.com/teamlify-devx/unimq/drivers/rabbitmq"
)

func main() {
    ctx := context.Background()

    // Create a Kafka client
    client, err := unimq.Open(ctx, "kafka", &unimq.Config{
        Brokers: []string{"localhost:9092"},
        GroupID: "my-consumer-group",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Publish a message
    err = client.Publish(ctx, "my-topic", []byte("Hello, World!"))
    if err != nil {
        log.Fatal(err)
    }

    // Subscribe to messages
    err = client.Subscribe(ctx, "my-topic", func(ctx context.Context, msg *unimq.Message) error {
        log.Printf("Received: %s", string(msg.Data))
        return msg.Ack()
    })
    if err != nil {
        log.Fatal(err)
    }
}

🏗️ Architecture

UniMQ follows a Driver/Registry pattern similar to GORM's database drivers:

unimq/
├── queue.go              # Core interface & registry
├── drivers/
│   ├── kafka/           # Kafka driver (imports IBM/sarama)
│   ├── rabbitmq/        # RabbitMQ driver (imports amqp091-go)
│   ├── nats/            # NATS driver (imports nats-io/nats.go)
│   ├── redis/           # Redis driver (imports go-redis)
│   ├── gcppubsub/       # GCP Pub/Sub driver
│   ├── awssqs/          # AWS SQS driver
│   ├── azureservicebus/ # Azure Service Bus driver
│   └── pulsar/          # Apache Pulsar driver
└── example/
    └── test/
        ├── publisher/   # Publisher CLI example
        └── consumer/    # Consumer CLI example

How It Works

  1. Registration: Each driver package has an init() function that registers itself:
// drivers/kafka/kafka.go
func init() {
    unimq.Register(&KafkaDriver{})
}
  1. Import: Users import only the drivers they need:
import (
    _ "github.com/teamlify-devx/unimq/drivers/kafka"    // Only Kafka dependencies
    _ "github.com/teamlify-devx/unimq/drivers/rabbitmq" // Only RabbitMQ dependencies
)
  1. Open: Use the unified Open function to create clients:
client, err := unimq.Open(ctx, "kafka", config)

📖 API Reference

Core Interface

type Queue interface {
    // Publishing
    Publish(ctx context.Context, topic string, data []byte) error
    PublishMessage(ctx context.Context, topic string, msg *Message) error
    PublishBatch(ctx context.Context, topic string, messages [][]byte) error

    // Subscribing
    Subscribe(ctx context.Context, topic string, handler MessageHandler) error
    SubscribeMultiple(ctx context.Context, topics []string, handler MessageHandler) error

    // Connection management
    Close() error
    Ping(ctx context.Context) error
    Driver() string
}

Message Structure

type Message struct {
    ID        string            // Unique message identifier
    Topic     string            // Topic/queue name
    Data      []byte            // Message payload
    Headers   map[string]string // Message headers/metadata
    Timestamp int64             // Message timestamp
    Partition int32             // Partition (for Kafka)
    Offset    int64             // Offset (for Kafka)
}

// Acknowledgment
msg.Ack()              // Acknowledge message
msg.Nack(requeue bool) // Negative acknowledge

Configuration

type Config struct {
    Brokers       []string               // Broker addresses
    URL           string                 // Connection URL
    Topic         string                 // Default topic
    GroupID       string                 // Consumer group ID
    ClientID      string                 // Client identifier
    Username      string                 // Authentication
    Password      string                 // Authentication
    TLS           bool                   // Enable TLS
    TLSSkipVerify bool                   // Skip TLS verification
    CertFile      string                 // TLS certificate
    KeyFile       string                 // TLS key
    CAFile        string                 // CA certificate
    Extra         map[string]interface{} // Driver-specific options
}

🔧 Driver-Specific Configuration

Kafka

config := &unimq.Config{
    Brokers: []string{"localhost:9092"},
    GroupID: "my-group",
    Extra: map[string]interface{}{
        "auto.offset.reset": "earliest",
        "acks":              -1, // all
        "compression.type":  "snappy",
    },
}

RabbitMQ

config := &unimq.Config{
    URL: "amqp://guest:guest@localhost:5672/",
    Extra: map[string]interface{}{
        "exchange":       "my-exchange",
        "exchange.type":  "topic",
        "queue.durable":  true,
        "prefetch_count": 10,
    },
}

NATS

config := &unimq.Config{
    URL:     "nats://localhost:4222",
    GroupID: "my-queue-group",
    Extra: map[string]interface{}{
        "jetstream":    true,
        "stream.name":  "my-stream",
    },
}

Redis

config := &unimq.Config{
    URL: "redis://localhost:6379",
    Extra: map[string]interface{}{
        "streams.enabled":        true, // Use Redis Streams
        "streams.consumer_group": "my-group",
        "db":                     0,
    },
}

AWS SQS

config := &unimq.Config{
    Extra: map[string]interface{}{
        "region":             "us-east-1",
        "visibility_timeout": 30,
        "wait_time_seconds":  20,
    },
}

GCP Pub/Sub

config := &unimq.Config{
    ClientID: "my-project-id",
    GroupID:  "my-subscription",
    Extra: map[string]interface{}{
        "credentials_file":        "/path/to/credentials.json",
        "subscription.ack_deadline": 10,
    },
}

Azure Service Bus

config := &unimq.Config{
    URL:      "my-namespace.servicebus.windows.net",
    Username: "my-shared-access-key-name",
    Password: "my-shared-access-key",
    Extra: map[string]interface{}{
        "subscription.name": "my-subscription",
    },
}

Pulsar

config := &unimq.Config{
    URL:     "pulsar://localhost:6650",
    GroupID: "my-subscription",
    Extra: map[string]interface{}{
        "subscription.type":            "shared",
        "subscription.initial_position": "earliest",
    },
}

🔄 Retry Mechanism

UniMQ includes a built-in retry mechanism with exponential backoff and jitter support.

Basic Retry

// Wrap any queue with retry logic
client, err := unimq.Open(ctx, "kafka", config)
if err != nil {
    log.Fatal(err)
}

// Wrap with default retry policy (3 retries, exponential backoff)
retryableClient := unimq.WrapWithRetry(client, unimq.DefaultRetryPolicy())

// All operations now have automatic retry
err = retryableClient.Publish(ctx, "topic", []byte("message"))

Retry Policies

// Default policy: 3 retries, 100ms initial interval, 2x multiplier
policy := unimq.DefaultRetryPolicy()

// No retry
policy := unimq.NoRetryPolicy()

// Aggressive retry for critical operations
policy := unimq.AggressiveRetryPolicy() // 10 retries, 50ms initial

// Custom policy
policy := &unimq.RetryPolicy{
    MaxRetries:          5,                    // Max retry attempts (-1 = infinite)
    InitialInterval:     100 * time.Millisecond,
    MaxInterval:         30 * time.Second,     // Cap for backoff
    Multiplier:          2.0,                  // Exponential factor
    RandomizationFactor: 0.5,                  // Jitter (0-1)
    RetryIf: func(err error) bool {           // Custom retry condition
        return !errors.Is(err, unimq.ErrNonRetryableError)
    },
    OnRetry: func(attempt int, err error, nextInterval time.Duration) {
        log.Printf("Retry attempt %d after error: %v (next in %v)", attempt, err, nextInterval)
    },
}

Retry with Connection

// Retry the initial connection
client, err := unimq.OpenWithRetry(ctx, "kafka", config, unimq.AggressiveRetryPolicy())

// Or open and wrap in one call
retryableClient, err := unimq.OpenWithRetryWrapped(ctx, "kafka", config,
    unimq.AggressiveRetryPolicy(),  // Connection retry policy
    unimq.DefaultRetryPolicy(),     // Operation retry policy
)

Retryable Message Handler

// Wrap message handlers with retry logic
handler := unimq.RetryableMessageHandler(policy, func(ctx context.Context, msg *unimq.Message) error {
    // Process message - will be retried on error
    return processMessage(msg)
})

client.Subscribe(ctx, "topic", handler)

Low-Level Retry Function

// Use Retry directly for any operation
err := unimq.Retry(ctx, policy, func() error {
    return someOperation()
})

// With result
result, err := unimq.RetryWithResult(ctx, policy, func() (MyType, error) {
    return someOperationWithResult()
})

Exponential Backoff Behavior

With default settings (InitialInterval: 100ms, Multiplier: 2.0, MaxInterval: 10s):

Attempt Interval (without jitter)
1 100ms
2 200ms
3 400ms
4 800ms
5 1.6s
6 3.2s
7 6.4s
8+ 10s (capped)

With RandomizationFactor: 0.5, actual intervals will be ±50% of these values to prevent thundering herd.

🔌 Circuit Breaker

UniMQ includes a circuit breaker to protect against cascading failures when a service is unavailable.

Circuit Breaker States

     ┌─────────────────────────────────────────┐
     │                                         │
     ▼                                         │
┌─────────┐  failure threshold  ┌────────┐    │
│ CLOSED  │ ─────────────────▶  │  OPEN  │    │
└─────────┘                     └────────┘    │
     ▲                               │        │
     │                               │ timeout
     │ success threshold             ▼        │
     │                         ┌───────────┐  │
     └──────────────────────── │ HALF-OPEN │ ─┘
                  failure      └───────────┘
  • Closed: Normal operation, requests pass through
  • Open: Circuit is tripped, requests fail fast with ErrCircuitOpen
  • Half-Open: After timeout, limited requests test if service recovered

Basic Usage

// Wrap any queue with circuit breaker
client, _ := unimq.Open(ctx, "kafka", config)
cbClient := unimq.WrapWithCircuitBreaker(client, unimq.DefaultCircuitBreakerConfig())

// Operations are now protected
err := cbClient.Publish(ctx, "topic", data)
if errors.Is(err, unimq.ErrCircuitOpen) {
    log.Println("Circuit is open, service unavailable")
}

Configuration

config := &unimq.CircuitBreakerConfig{
    Name:                "kafka-producer",     // Identifier for logging
    FailureThreshold:    5,                    // Failures before opening
    SuccessThreshold:    2,                    // Successes to close from half-open
    Timeout:             30 * time.Second,     // Time before half-open
    MaxHalfOpenRequests: 3,                    // Concurrent requests in half-open
    
    // Optional: Custom success checker
    IsSuccessful: func(err error) bool {
        // Don't count 404 as failure
        return err == nil || errors.Is(err, ErrNotFound)
    },
    
    // Optional: State change callback
    OnStateChange: func(name, from, to string) {
        log.Printf("Circuit %s: %s -> %s", name, from, to)
    },
    
    // Optional: Fallback when circuit is open
    Fallback: func(ctx context.Context, err error) error {
        return getCachedResponse()
    },
}

Separate Circuit Breakers for Publish/Consume

publishConfig := &unimq.CircuitBreakerConfig{
    FailureThreshold: 3,
    Timeout:          10 * time.Second,
}

consumeConfig := &unimq.CircuitBreakerConfig{
    FailureThreshold: 5,
    Timeout:          30 * time.Second,
}

cbClient := unimq.WrapWithCircuitBreakers(client, publishConfig, consumeConfig)

Manual Control

cb := cbClient.PublishCircuitBreaker()

// Check state
fmt.Println("State:", cb.State()) // closed, open, or half-open

// Get counts
failures, successes := cb.Counts()

// Manually trip (for testing/maintenance)
cb.Trip()

// Manually reset
cb.Reset()

🛡️ Resilient Queue (Retry + Circuit Breaker)

Combine retry and circuit breaker for maximum resilience:

// Create a resilient client with both protections
resilientClient := unimq.WrapWithResilience(
    client,
    unimq.DefaultRetryPolicy(),
    unimq.DefaultCircuitBreakerConfig(),
)

// Or open with full resilience in one call
resilientClient, err := unimq.OpenWithResilience(ctx, "kafka", config,
    unimq.AggressiveRetryPolicy(),       // For connection and operations
    unimq.DefaultCircuitBreakerConfig(), // Circuit breaker settings
)

// Operations are now:
// 1. Protected by circuit breaker (fail fast if open)
// 2. Retried with exponential backoff on transient failures
err = resilientClient.Publish(ctx, "topic", data)

Resilience Behavior

  1. Circuit Closed: Operations retry on failure with exponential backoff
  2. Circuit Open: Operations fail immediately with ErrCircuitOpen (no retry)
  3. Circuit Half-Open: Limited operations test recovery, retry on transient failures

🔄 Automatic Reconnection

UniMQ provides automatic reconnection with subscription restoration when the connection is lost.

Basic Usage

// Open with automatic reconnection
client, err := unimq.OpenWithReconnect(ctx, "kafka", config, unimq.DefaultReconnectConfig())
if err != nil {
    log.Fatal(err)
}
defer client.Close()

// Subscriptions are automatically restored after reconnection
client.Subscribe(ctx, "topic", handler)

// Publish - will wait for reconnection if disconnected
client.Publish(ctx, "topic", data)

Configuration

config := &unimq.ReconnectConfig{
    InitialInterval: 1 * time.Second,   // Initial wait before first retry
    MaxInterval:     30 * time.Second,  // Maximum backoff interval
    Multiplier:      2.0,               // Exponential backoff multiplier
    MaxReconnects:   -1,                // -1 = infinite retries
    PingInterval:    10 * time.Second,  // Health check frequency
    PingTimeout:     5 * time.Second,   // Health check timeout

    // Optional callbacks
    OnStateChange: func(from, to string) {
        log.Printf("Connection: %s -> %s", from, to)
    },
    OnReconnect: func(attempt int, err error) {
        log.Printf("Reconnecting (attempt %d): %v", attempt, err)
    },
    OnReconnected: func(attempt int) {
        log.Printf("Reconnected after %d attempts", attempt)
    },
    OnReconnectFailed: func(err error) {
        log.Printf("Reconnection failed: %v", err)
    },
}

Connection States

// Check connection status
state := client.State()
// "connected", "disconnected", "reconnecting", "closed"

isConnected := client.IsConnected()

// Get last error
err := client.LastError()

// Get reconnection attempt count
count := client.ReconnectCount()

Manual Control

// Force reconnection (for testing or maintenance)
client.ForceReconnect()

// Clear stored subscriptions
client.ClearSubscriptions()

// Get underlying queue
underlyingQueue := client.Unwrap()

Wrap Existing Queue

// Wrap an existing queue with reconnection
existingClient, _ := unimq.Open(ctx, "kafka", config)
reconnectableClient := unimq.WrapWithReconnect(
    existingClient, 
    "kafka",  // driver name needed for reconnection
    config,   // config needed for reconnection
    unimq.DefaultReconnectConfig(),
)

🏆 Fully Resilient Queue

Combine all resilience features (Retry + Circuit Breaker + Reconnection) for maximum reliability:

// Open with full resilience
client, err := unimq.OpenFullyResilient(ctx, "kafka", config,
    unimq.DefaultRetryPolicy(),         // Retry configuration
    unimq.DefaultCircuitBreakerConfig(), // Circuit breaker configuration  
    unimq.DefaultReconnectConfig(),      // Reconnection configuration
)
if err != nil {
    log.Fatal(err)
}
defer client.Close()

// All operations now have:
// 1. Automatic reconnection with subscription restoration
// 2. Circuit breaker protection
// 3. Retry with exponential backoff

err = client.Publish(ctx, "topic", data)

Resilience Layers

Request
   │
   ▼
┌──────────────────┐
│  Retry Logic     │  ◄── Exponential backoff with jitter
└────────┬─────────┘
         │
         ▼
┌──────────────────┐
│ Circuit Breaker  │  ◄── Fail fast when service is down
└────────┬─────────┘
         │
         ▼
┌──────────────────┐
│  Reconnectable   │  ◄── Auto-reconnect on connection loss
│     Queue        │
└────────┬─────────┘
         │
         ▼
┌──────────────────┐
│  Actual Driver   │  ◄── Kafka, RabbitMQ, etc.
└──────────────────┘

💀 Dead Letter Queue (DLQ)

UniMQ provides automatic Dead Letter Queue routing for messages that fail processing after multiple retries.

Basic Usage

// Wrap a queue with DLQ support
client, _ := unimq.Open(ctx, "kafka", config)
dlqClient := unimq.WrapWithDLQ(client, unimq.DefaultDLQConfig())

// Subscribe - failed messages auto-route to DLQ after retries
dlqClient.Subscribe(ctx, "orders", func(ctx context.Context, msg *unimq.Message) error {
    if err := processOrder(msg); err != nil {
        return err // Will retry, then DLQ after max retries
    }
    return nil // Success - message acknowledged
})

Configuration

config := &unimq.DLQConfig{
    // Topic naming
    TopicSuffix: ".dlq",           // orders -> orders.dlq
    TopicPrefix: "dlq.",           // orders -> dlq.orders
    FixedTopic:  "global-dlq",     // All failed messages to one topic

    // Retry behavior
    MaxRetries:    3,                    // Retries before DLQ (0 = no retries)
    RetryInterval: 1 * time.Second,      // Wait between retries

    // Message content
    IncludeOriginalMessage: true,        // Include original data in DLQ

    // Callbacks
    OnRetry: func(msg *unimq.Message, attempt int, err error) {
        log.Printf("Retry %d for message %s: %v", attempt, msg.ID, err)
    },
    OnDLQ: func(msg *unimq.DLQMessage) {
        log.Printf("Message sent to DLQ: %s (retries: %d)", 
            msg.OriginalMessageID, msg.RetryCount)
        alertOps(msg)
    },

    // Custom DLQ condition
    ShouldDLQ: func(err error) bool {
        // Don't DLQ validation errors
        return !errors.Is(err, ErrValidation)
    },
}

DLQ Message Format

When a message is sent to DLQ, it includes metadata:

{
    "original_topic": "orders",
    "original_message_id": "msg-123",
    "original_data": "eyJvcmRlcl9pZCI6IDEyM30=",
    "original_headers": {"key": "value"},
    "error": "database connection failed",
    "retry_count": 3,
    "first_failed_at": "2024-01-15T10:00:00Z",
    "last_failed_at": "2024-01-15T10:00:05Z",
    "sent_to_dlq_at": "2024-01-15T10:00:05Z",
    "consumer_id": "consumer-1"
}

Processing DLQ Messages

// Create a DLQ consumer
dlqConsumer := unimq.NewDLQConsumer(dlqQueue, mainQueue, config)

// Subscribe to DLQ with access to both DLQ metadata and original message
dlqConsumer.Subscribe(ctx, "orders.dlq", func(ctx context.Context, 
    dlqMsg *unimq.DLQMessage, originalMsg *unimq.Message) error {
    
    log.Printf("Failed message: topic=%s, error=%s, retries=%d",
        dlqMsg.OriginalTopic, dlqMsg.Error, dlqMsg.RetryCount)
    
    // Fix and reprocess, or alert, or archive
    return nil
})

Reprocessing DLQ Messages

// Reprocess all DLQ messages back to original topics
dlqConsumer.Reprocess(ctx, "orders.dlq", nil)

// Reprocess with filter
dlqConsumer.Reprocess(ctx, "orders.dlq", func(msg *unimq.DLQMessage) bool {
    // Only reprocess messages that failed due to timeout
    return strings.Contains(msg.Error, "timeout")
})

// Reprocess in batches
dlqConsumer.ReprocessBatch(ctx, "orders.dlq", 100, nil)

DLQ Utilities

// Get statistics
processed, failed, sentToDLQ := dlqClient.Stats()

// Get DLQ topic name
dlqTopic := dlqClient.DLQTopic("orders") // "orders.dlq"

// Set consumer ID for tracking
dlqClient.SetConsumerID("order-processor-1")

// Use separate queues for main and DLQ
dlqClient := unimq.WrapWithSeparateDLQ(mainQueue, dlqQueue, config)

DLQ Flow

Message Received
       │
       ▼
┌─────────────┐
│   Process   │
└──────┬──────┘
       │
   Success?
    ╱     ╲
   Yes     No
   │       │
   ▼       ▼
  Ack   Retry?
         ╱   ╲
       Yes    No (max reached)
        │         │
        ▼         ▼
     Wait &   Send to DLQ
     Retry        │
                  ▼
               Ack Original

🏥 Health Monitoring

UniMQ provides comprehensive health monitoring with Kubernetes-compatible liveness and readiness probes.

Basic Usage

// Wrap a queue with health monitoring
client, _ := unimq.Open(ctx, "kafka", config)
healthyClient := unimq.WrapWithHealth(client, unimq.DefaultHealthCheckConfig())
defer healthyClient.Close()

// Health checks run automatically in background
// Check current status
if healthyClient.IsHealthy() {
    fmt.Println("Queue is healthy")
}

// Get detailed health info
details := healthyClient.GetHealth()
fmt.Printf("Status: %s, Uptime: %v\n", details.Status, details.Uptime)

Configuration

config := &unimq.HealthCheckConfig{
    Enabled:          true,                  // Enable health monitoring
    Interval:         10 * time.Second,      // Check interval
    Timeout:          5 * time.Second,       // Check timeout
    FailureThreshold: 3,                     // Failures before unhealthy
    SuccessThreshold: 1,                     // Successes before healthy
    InitialDelay:     5 * time.Second,       // Delay before first check

    // Callbacks
    OnHealthChange: func(from, to string, details *unimq.HealthDetails) {
        log.Printf("Health changed: %s -> %s", from, to)
        if to == unimq.HealthStatusUnhealthy {
            alertOps("Queue is unhealthy!")
        }
    },
    OnHealthCheck: func(details *unimq.HealthDetails) {
        metrics.RecordHealthCheck(details)
    },

    // Custom health checks
    CustomChecks: []unimq.HealthChecker{
        unimq.NewHealthCheck("database", func(ctx context.Context) error {
            return db.Ping(ctx)
        }),
        unimq.NewHealthCheck("redis", func(ctx context.Context) error {
            return redis.Ping(ctx).Err()
        }),
    },
}

Health Status Values

Status Description
healthy All components working
degraded Some issues, but operational
unhealthy Critical failure
unknown Not yet checked

Kubernetes Integration

// Get HTTP handlers for Kubernetes probes
handler := healthyClient.HTTPHandler()

mux := http.NewServeMux()
handler.RegisterRoutes(mux, "")
// Registers:
//   /health - Full health details
//   /live   - Kubernetes liveness probe
//   /ready  - Kubernetes readiness probe

http.ListenAndServe(":8080", mux)

Kubernetes Deployment:

livenessProbe:
  httpGet:
    path: /live
    port: 8080
  initialDelaySeconds: 10
  periodSeconds: 5

readinessProbe:
  httpGet:
    path: /ready
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 5

Health Details Response

{
    "status": "healthy",
    "timestamp": "2024-01-15T10:00:00Z",
    "uptime_ns": 3600000000000,
    "components": {
        "queue": {
            "name": "queue",
            "status": "healthy",
            "last_checked": "2024-01-15T10:00:00Z",
            "duration_ns": 15000000
        },
        "database": {
            "name": "database",
            "status": "healthy",
            "last_checked": "2024-01-15T10:00:00Z"
        }
    },
    "total_checks": 360,
    "successful_checks": 359,
    "failed_checks": 1,
    "is_live": true,
    "is_ready": true,
    "driver": "kafka",
    "metadata": {
        "version": "1.0.0",
        "environment": "production"
    }
}

Manual Health Monitor

// Create standalone health monitor
monitor := unimq.NewHealthMonitor(queue, config)
monitor.Start()
defer monitor.Stop()

// Immediate check
details, err := monitor.Check(ctx)

// Add/remove checks dynamically
monitor.AddCheck(unimq.NewHealthCheck("external-api", checkFunc))
monitor.RemoveCheck("external-api")

// Set custom metadata
monitor.SetMetadata("version", "1.0.0")
monitor.SetMetadata("pod", os.Getenv("POD_NAME"))

Aggregate Health Monitoring

Monitor multiple queues together:

aggregate := unimq.NewAggregateHealthMonitor()
aggregate.Add("orders-queue", ordersMonitor)
aggregate.Add("notifications-queue", notificationsMonitor)

// Get combined health status
details := aggregate.GetHealth()

// HTTP handler for aggregate health
http.HandleFunc("/health", aggregate.HTTPHandler())

Health Flow

┌─────────────────────────────────────────────────────┐
│                 Health Monitor                       │
│                                                      │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐     │
│   │  Queue   │    │ Database │    │  Redis   │     │
│   │  Ping    │    │  Check   │    │  Check   │     │
│   └────┬─────┘    └────┬─────┘    └────┬─────┘     │
│        │               │               │            │
│        └───────────────┴───────────────┘            │
│                        │                            │
│                        ▼                            │
│            ┌───────────────────────┐               │
│            │  Evaluate Status      │               │
│            │  (all pass = healthy) │               │
│            └───────────┬───────────┘               │
│                        │                            │
│           ┌────────────┼────────────┐              │
│           ▼            ▼            ▼              │
│       OnChange    /health      /live /ready        │
│       Callback     JSON        Kubernetes          │
└─────────────────────────────────────────────────────┘

🔌 Middleware System

UniMQ provides a flexible middleware system for both publish and consume operations.

Basic Usage

// Wrap queue with middleware support
client, _ := unimq.Open(ctx, "kafka", config)
mq := unimq.WrapWithMiddleware(client)

// Add publish middleware
mq.UsePublish(unimq.LoggingMiddleware(nil))
mq.UsePublish(unimq.MetricsMiddleware(&metricsConfig))
mq.UsePublish(unimq.RateLimiterMiddleware(nil))

// Add consume middleware
mq.UseConsume(unimq.LoggingConsumeMiddleware(nil))
mq.UseConsume(unimq.RecoveryMiddleware(onPanic))
mq.UseConsume(unimq.TimeoutMiddleware(30 * time.Second))

Built-in Middleware

Middleware Type Description
LoggingMiddleware Publish Log publish operations
LoggingConsumeMiddleware Consume Log consume operations
MetricsMiddleware Publish Track publish metrics
MetricsConsumeMiddleware Consume Track consume metrics
RateLimiterMiddleware Publish Rate limit publishing
TracingMiddleware Publish Distributed tracing
TracingConsumeMiddleware Consume Distributed tracing
RecoveryMiddleware Consume Panic recovery
TimeoutMiddleware Consume Handler timeout
RetryConsumeMiddleware Consume Retry failed handlers
FilterMiddleware Consume Filter messages
DeduplicationMiddleware Consume Skip duplicate messages
ValidationMiddleware Consume Validate messages
TransformMiddleware Consume Transform messages

Logging Middleware

config := &unimq.LoggingConfig{
    Logger:        log.Printf,           // Custom logger
    LogPublish:    true,                  // Log publishes
    LogConsume:    true,                  // Log consumes
    LogErrors:     true,                  // Only log errors
    IncludeData:   false,                 // Include message data
    MaxDataLength: 100,                   // Truncate data
}

mq.UsePublish(unimq.LoggingMiddleware(config))
mq.UseConsume(unimq.LoggingConsumeMiddleware(config))

Metrics Middleware

// Prometheus example
config := &unimq.MetricsConfig{
    OnPublish: func(topic string, duration time.Duration, err error, batchSize int) {
        publishCounter.WithLabelValues(topic).Inc()
        publishDuration.WithLabelValues(topic).Observe(duration.Seconds())
        if err != nil {
            publishErrors.WithLabelValues(topic).Inc()
        }
    },
    OnConsume: func(topic string, duration time.Duration, err error) {
        consumeCounter.WithLabelValues(topic).Inc()
        consumeDuration.WithLabelValues(topic).Observe(duration.Seconds())
        if err != nil {
            consumeErrors.WithLabelValues(topic).Inc()
        }
    },
}

mq.UsePublish(unimq.MetricsMiddleware(config))
mq.UseConsume(unimq.MetricsConsumeMiddleware(config))

Rate Limiting

config := &unimq.RateLimiterConfig{
    Rate:        100,                     // 100 requests/second
    Burst:       10,                      // Burst size
    PerTopic:    true,                    // Per-topic limits
    WaitOnLimit: true,                    // Wait instead of error
    MaxWait:     time.Second,             // Max wait time
    OnLimited: func(topic string) {
        log.Printf("Rate limited on topic: %s", topic)
    },
}

mq.UsePublish(unimq.RateLimiterMiddleware(config))

Tracing (OpenTelemetry Example)

config := &unimq.TracingConfig{
    ServiceName: "order-service",
    StartSpan: func(ctx context.Context, name string) (context.Context, interface{}) {
        ctx, span := tracer.Start(ctx, name)
        return ctx, span
    },
    FinishSpan: func(span interface{}, err error) {
        s := span.(trace.Span)
        if err != nil {
            s.RecordError(err)
        }
        s.End()
    },
    InjectHeaders: func(ctx context.Context, headers map[string]string) {
        propagator.Inject(ctx, propagation.MapCarrier(headers))
    },
    ExtractHeaders: func(headers map[string]string) context.Context {
        return propagator.Extract(context.Background(), 
            propagation.MapCarrier(headers))
    },
}

mq.UsePublish(unimq.TracingMiddleware(config))
mq.UseConsume(unimq.TracingConsumeMiddleware(config))

Recovery Middleware

mq.UseConsume(unimq.RecoveryMiddleware(func(topic string, msg *unimq.Message, recovered interface{}) {
    log.Printf("PANIC in handler: topic=%s, msg=%s, panic=%v", 
        topic, msg.ID, recovered)
    alertOps(recovered)
}))

Timeout Middleware

// Handler must complete within 30 seconds
mq.UseConsume(unimq.TimeoutMiddleware(30 * time.Second))

Deduplication Middleware

config := &unimq.DeduplicationConfig{
    TTL:     time.Hour,           // Remember IDs for 1 hour
    MaxSize: 10000,               // Max IDs to remember
    GetKey: func(msg *unimq.Message) string {
        return msg.ID            // Use message ID
    },
}

mq.UseConsume(unimq.DeduplicationMiddleware(config))

Filter Middleware

// Only process high-priority messages
mq.UseConsume(unimq.FilterMiddleware(func(msg *unimq.Message) bool {
    return msg.Headers["priority"] == "high"
}))

Validation Middleware

mq.UseConsume(unimq.ValidationMiddleware(func(msg *unimq.Message) error {
    if len(msg.Data) == 0 {
        return errors.New("empty message")
    }
    if msg.Headers["version"] != "2" {
        return errors.New("unsupported version")
    }
    return nil
}))

Transform Middleware

mq.UseConsume(unimq.TransformMiddleware(func(msg *unimq.Message) *unimq.Message {
    // Decompress data
    msg.Data = decompress(msg.Data)
    // Add processing timestamp
    msg.Headers["processed_at"] = time.Now().String()
    return msg
}))

Custom Middleware

// Custom publish middleware
customPublish := func(next unimq.PublishFunc) unimq.PublishFunc {
    return func(ctx context.Context, pctx *unimq.PublishContext) error {
        // Before publish
        log.Printf("Publishing to %s", pctx.Topic)
        
        err := next(ctx, pctx)
        
        // After publish
        if err != nil {
            log.Printf("Publish failed: %v", err)
        }
        return err
    }
}

// Custom consume middleware
customConsume := func(next unimq.MessageHandler) unimq.MessageHandler {
    return func(ctx context.Context, msg *unimq.Message) error {
        // Before processing
        start := time.Now()
        
        err := next(ctx, msg)
        
        // After processing
        log.Printf("Processed in %v", time.Since(start))
        return err
    }
}

mq.UsePublish(customPublish)
mq.UseConsume(customConsume)

Chaining Middleware

// Chain multiple middleware into one
publishChain := unimq.ChainPublishMiddleware(
    unimq.LoggingMiddleware(nil),
    unimq.MetricsMiddleware(metricsConfig),
    unimq.RateLimiterMiddleware(nil),
)

consumeChain := unimq.ChainConsumeMiddleware(
    unimq.RecoveryMiddleware(nil),
    unimq.TimeoutMiddleware(30*time.Second),
    unimq.DeduplicationMiddleware(nil),
)

mq.UsePublish(publishChain)
mq.UseConsume(consumeChain)

Middleware Execution Order

Publish Request
       │
       ▼
┌─────────────────────────────────────┐
│     Middleware 1 (before)           │
│         ↓                           │
│     Middleware 2 (before)           │
│         ↓                           │
│     Middleware 3 (before)           │
│         ↓                           │
│  ┌─────────────────────────────┐    │
│  │    Actual Publish/Consume    │    │
│  └─────────────────────────────┘    │
│         ↓                           │
│     Middleware 3 (after)            │
│         ↓                           │
│     Middleware 2 (after)            │
│         ↓                           │
│     Middleware 1 (after)            │
└─────────────────────────────────────┘
       │
       ▼
    Response

🧪 Examples

Running the Examples

# Start services (Kafka, RabbitMQ, etc.)
cd example/test
docker-compose up -d

# Run publisher
cd publisher
go run main.go -p kafka -t test-topic -m "Hello World" -c 10

# Run consumer (in another terminal)
cd consumer
go run main.go -p kafka -t test-topic

Available CLI Flags

Publisher:

  • -p: Provider name (required)
  • -t: Topic name (default: test-topic)
  • -m: Message content
  • -c: Number of messages
  • --batch: Enable batch mode
  • --brokers: Broker addresses
  • --url: Connection URL

Consumer:

  • -p: Provider name (required)
  • -t: Topic name (default: test-topic)
  • --timeout: Timeout in seconds (0 = unlimited)
  • --max: Maximum messages to consume
  • --auto-ack: Auto-acknowledge messages
  • --group: Consumer group ID

🤝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Adding a New Driver

  1. Create a new package under drivers/
  2. Implement the Driver interface
  3. Register via init() function
  4. Add tests and documentation

📄 License

MIT License - see LICENSE for details.