Skip to content

i2y/calanque

Repository files navigation

Calanque

CloudEvents-native Pull Eventing for Kubernetes

Calanque is a pull-based event consumption platform for Kubernetes, designed as a complement to push-based systems like Knative Eventing.

Features

  • Pull-based consumption - Consumers fetch events when ready
  • CloudEvents native - Full CloudEvents 1.0 compliance
  • Dual protocol - gRPC/ConnectRPC and HTTP (batch) APIs
  • Retry with backoff - Configurable exponential backoff
  • Dead Letter Queue - Failed messages go to DLQ after max retries
  • Kubernetes native - CRD-based declarative configuration
  • franz-go powered - High-performance Kafka client

Architecture

flowchart LR
    subgraph K8s[Kubernetes]
        subgraph CRDs[CRDs]
            SubCRD[Subscription]
            IngressCRD[Ingress]
        end

        Operator[Calanque Operator]

        subgraph Ingress[Ingress Deployment]
            IngressAPI[HTTP API]
            Producer[franz-go producer]
        end

        subgraph Consumer[Consumer Deployment]
            ConsumerAPI[gRPC/ConnectRPC<br/>+ HTTP API]
            ConsumerFranz[franz-go consumer]
        end

        subgraph Kafka[Kafka]
            Topic[topic]
            DLQ[DLQ]
        end

        ExtSystem[External System]
        App[Your Application]
    end

    CRDs --> Operator
    Operator --> Ingress
    Operator --> Consumer

    ExtSystem -->|POST CloudEvents| IngressAPI
    IngressAPI --> Producer
    Producer -->|publish| Topic

    Topic -->|consume| ConsumerFranz
    ConsumerFranz <-->|failed| DLQ
    ConsumerAPI -->|pull when ready| App
Loading

Quick Start

1. Install CRDs

kubectl apply -f config/crd/calanque.io_subscriptions.yaml
kubectl apply -f config/crd/calanque.io_brokers.yaml
kubectl apply -f config/crd/calanque.io_triggers.yaml

2. Choose Your Pattern

Calanque supports two approaches:

Option A: Subscription (Simple)

  • One resource = one consumer
  • Manual consumer group management

Option B: Broker + Trigger (Recommended for multi-consumer)

  • Broker defines Kafka connection once
  • Triggers subscribe to broker
  • Automatic consumer group management
  • Easy fan-out and competing consumers

Simple Subscription

For simple use cases where you just need one consumer, use Subscription directly:

apiVersion: calanque.io/v1alpha1
kind: Subscription
metadata:
  name: order-processor
spec:
  backend:
    type: kafka
    kafka:
      bootstrapServers:
        - kafka:9092
      topic: orders

  consumerGroup: order-processor-group

  pull:
    batchSize: 10
    batchTimeout: "5s"
    maxInflight: 100

  consumer:
    http:
      port: 8080
    grpc:
      port: 50051

  retry:
    maxAttempts: 3
    backoff:
      initial: "1s"
      max: "30s"
      multiplier: 2.0

  deadLetterQueue:
    kafka:
      topic: orders-dlq

Pattern Examples

Fan-out Pattern (Everyone gets every message)

flowchart LR
    Topic[orders topic]
    Analytics[analytics<br/>gets order #1]
    Shipping[shipping<br/>gets order #1]
    Inventory[inventory<br/>gets order #1]

    Topic --> Analytics
    Topic --> Shipping
    Topic --> Inventory
Loading
apiVersion: calanque.io/v1alpha1
kind: Broker
metadata:
  name: orders
spec:
  backend:
    type: kafka
    kafka:
      bootstrapServers: ["kafka:9092"]
      topic: orders
---
# Each trigger gets ALL messages (different consumer groups)
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
  name: analytics
spec:
  broker: orders
  # consumerGroup omitted → auto-generated → fan-out
  subscriber:
    http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
  name: shipping
spec:
  broker: orders
  subscriber:
    http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
  name: inventory
spec:
  broker: orders
  subscriber:
    http: {port: 8080}

Competing Consumers Pattern (Load balancing)

flowchart LR
    Topic[tasks topic]
    W1[worker-1<br/>gets task A]
    W2[worker-2<br/>gets task B]
    W3[worker-3<br/>gets task C]

    Topic -.->|task A| W1
    Topic -.->|task B| W2
    Topic -.->|task C| W3
Loading
apiVersion: calanque.io/v1alpha1
kind: Broker
metadata:
  name: tasks
spec:
  backend:
    type: kafka
    kafka:
      bootstrapServers: ["kafka:9092"]
      topic: heavy-tasks
---
# All workers share the SAME consumerGroup → competing
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
  name: worker-1
spec:
  broker: tasks
  consumerGroup: task-workers  # ← Same group!
  subscriber:
    http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
  name: worker-2
spec:
  broker: tasks
  consumerGroup: task-workers  # ← Same group!
  subscriber:
    http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
  name: worker-3
spec:
  broker: tasks
  consumerGroup: task-workers  # ← Same group!
  subscriber:
    http: {port: 8080}

Combined Pattern (Fan-out + Competing)

# Analytics gets ALL orders (fan-out)
# Processors share orders (competing)

apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
  name: analytics
spec:
  broker: orders
  # No consumerGroup → unique → gets ALL
  subscriber:
    http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
  name: processor-1
spec:
  broker: orders
  consumerGroup: order-processors  # Shared
  subscriber:
    http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
  name: processor-2
spec:
  broker: orders
  consumerGroup: order-processors  # Shared
  subscriber:
    http: {port: 8080}

Publishing Events (Ingress)

Ingress is the publishing side of Calanque. While Subscription/Trigger consume events FROM Kafka, Ingress publishes events TO Kafka.

flowchart LR
    Ext[External System]
    Ingress[Ingress<br/>HTTP POST]
    Kafka[Kafka Topic]
    Sub[Subscription<br/>/ Trigger]

    Ext -->|CloudEvents| Ingress
    Ingress -->|publish| Kafka
    Kafka -->|consume| Sub
Loading

Ingress Example

apiVersion: calanque.io/v1alpha1
kind: Ingress
metadata:
  name: order-ingress
spec:
  backend:
    type: kafka
    kafka:
      bootstrapServers:
        - kafka:9092
      topic: orders
      sasl:
        mechanism: SCRAM-SHA-256
        secretRef:
          name: kafka-credentials
      tls:
        enabled: true

  http:
    port: 8080
    path: /events

Publish CloudEvents

# Single event (structured format)
curl -X POST "http://order-ingress-ingress:8080/events" \
  -H "Content-Type: application/cloudevents+json" \
  -d '{
    "specversion": "1.0",
    "type": "order.created",
    "source": "/orders",
    "id": "order-123",
    "data": {"orderId": "123", "amount": 1000}
  }'

# Single event (binary format)
curl -X POST "http://order-ingress-ingress:8080/events" \
  -H "Content-Type: application/json" \
  -H "ce-specversion: 1.0" \
  -H "ce-type: order.created" \
  -H "ce-source: /orders" \
  -H "ce-id: order-123" \
  -d '{"orderId": "123", "amount": 1000}'

# Batch events
curl -X POST "http://order-ingress-ingress:8080/events" \
  -H "Content-Type: application/cloudevents-batch+json" \
  -d '[
    {"specversion":"1.0","type":"order.created","source":"/orders","id":"1","data":{}},
    {"specversion":"1.0","type":"order.created","source":"/orders","id":"2","data":{}}
  ]'

Consuming Events

While Ingress publishes events TO Kafka, Subscription/Trigger consume events FROM Kafka. Your application pulls events when ready.

Pull Events (HTTP)

# Pull events
curl "http://order-processor-consumer:8080/events?subscription=order-processor&maxEvents=10"

# Response: application/cloudevents-batch+json
# [
#   {"specversion":"1.0","type":"order.created","id":"123",...},
#   ...
# ]

# Acknowledge
curl -X POST "http://order-processor-consumer:8080/ack?subscription=order-processor" \
  -H "Content-Type: application/json" \
  -d '{"messageIds": ["orders-0-100", "orders-0-101"]}'

# Negative acknowledge (retry)
curl -X POST "http://order-processor-consumer:8080/nack?subscription=order-processor" \
  -H "Content-Type: application/json" \
  -d '{"messageIds": ["orders-0-102"], "requeue": true}'

# Negative acknowledge (send to DLQ)
curl -X POST "http://order-processor-consumer:8080/nack?subscription=order-processor" \
  -H "Content-Type: application/json" \
  -d '{"messageIds": ["orders-0-103"], "requeue": false}'

Python Client (Example)

See examples/python/client.py for a full example client implementation.

API Reference

Consumer HTTP API

Endpoint Method Description
/events GET Pull batch of CloudEvents
/ack POST Acknowledge messages
/nack POST Negative acknowledge (retry/DLQ)
/health GET Health check

Query Parameters for GET /events:

  • subscription (required): Subscription name
  • maxEvents (default: 10): Maximum events to return
  • timeout (default: 5s): Long-poll timeout

Ingress HTTP API

Endpoint Method Description
/events POST Publish CloudEvent(s)
/health GET Health check
/healthz GET Kubernetes liveness probe
/readyz GET Kubernetes readiness probe

Content-Types for POST /events:

  • application/cloudevents+json: Single CloudEvent (structured format)
  • application/json with ce-* headers: Single CloudEvent (binary format)
  • application/cloudevents-batch+json: Batch of CloudEvents

ConnectRPC API

// Consumer services
service PullService {
  rpc Pull(PullRequest) returns (PullResponse);
  rpc StreamPull(stream StreamPullRequest) returns (stream StreamPullResponse);
  rpc Ack(AckRequest) returns (AckResponse);
  rpc Nack(NackRequest) returns (NackResponse);
}

service HealthService {
  rpc Check(CheckRequest) returns (CheckResponse);
}

// Ingress service
service PublishService {
  rpc Publish(PublishRequest) returns (PublishResponse);
  rpc PublishBatch(PublishBatchRequest) returns (PublishBatchResponse);
}

Configuration Reference

SubscriptionSpec

Field Type Description
backend.type string Backend type: kafka, sqs, pubsub
backend.kafka.bootstrapServers []string Kafka broker addresses
backend.kafka.topic string Source topic
backend.kafka.sasl object SASL authentication
backend.kafka.tls object TLS configuration
consumerGroup string Consumer group ID
pull.batchSize int32 Max events per pull (default: 10)
pull.batchTimeout string Pull timeout (default: 5s)
pull.maxInflight int32 Max unacked messages (default: 100)
pull.initialOffset string "earliest" or "latest"
consumer.grpc.port int32 gRPC/ConnectRPC port (default: 50051)
consumer.http.port int32 HTTP port (default: 8080)
consumer.http.path string Pull endpoint path (default: /events)
retry.maxAttempts int32 Max retry attempts (default: 3)
retry.backoff.initial string Initial backoff (default: 1s)
retry.backoff.max string Max backoff (default: 30s)
retry.backoff.multiplier float64 Backoff multiplier (default: 2.0)
deadLetterQueue.kafka.topic string DLQ topic name

IngressSpec

Field Type Description
backend.type string Backend type: kafka, sqs, pubsub
backend.kafka.bootstrapServers []string Kafka broker addresses
backend.kafka.topic string Target topic
backend.kafka.sasl object SASL authentication
backend.kafka.tls object TLS configuration
http.port int32 HTTP port (default: 8080)
http.path string CloudEvents endpoint path (default: /events)

BrokerSpec

Field Type Description
backend.type string Backend type: kafka, sqs, pubsub
backend.kafka.bootstrapServers []string Kafka broker addresses
backend.kafka.topic string Topic name
backend.kafka.sasl object SASL authentication
backend.kafka.tls object TLS configuration

TriggerSpec

Field Type Description
broker string Name of the Broker to subscribe to
consumerGroup string Consumer group (optional, for competing consumers)
filter.types []string Filter by event types
filter.source string Filter by event source (regex)
filter.expression string CEL expression for advanced filtering
pull.batchSize int32 Max events per pull (default: 10)
pull.batchTimeout string Pull timeout (default: 5s)
subscriber.grpc.port int32 gRPC/ConnectRPC port (default: 50051)
subscriber.http.port int32 HTTP port (default: 8080)
retry.maxAttempts int32 Max retry attempts (default: 3)
deadLetterQueue.kafka.topic string DLQ topic name

Comparison with Knative Eventing

Feature Knative Eventing (Push) Calanque (Pull)
Delivery Model Push to HTTP endpoint Consumer pulls
Backpressure Limited (webhook timeout) Native (pull when ready)
Batch Processing Not supported Native batch pull
Retry Built-in Built-in with backoff
DLQ Supported Supported
Scale to Zero Supported Not applicable
Protocol HTTP only gRPC/ConnectRPC + HTTP

Development

Local Development Environment

Calanque provides a complete local development environment using a local Kubernetes cluster and Tilt.

Prerequisites

  • Local Kubernetes cluster (Docker Desktop, kind, minikube, Rancher Desktop, etc.)
  • Tilt: brew install tilt-dev/tap/tilt
  • uv: brew install uv

Note: The default configuration uses Docker Desktop Kubernetes. For other environments, you may need to modify Tiltfile and docker-compose.yml.

Quick Start

# Start Kafka (KRaft mode, no Zookeeper)
docker-compose up -d

# Start Tilt
tilt up

Open http://localhost:10350 in your browser to see the Tilt UI.

Architecture

flowchart TB
    subgraph K8s[Local Kubernetes]
        Operator[Operator]
        Consumer[Consumer Pod<br/>:50051 gRPC/ConnectRPC, :8080 HTTP]
        Ingress[Ingress Pod<br/>:8080 HTTP]

        Operator --> Consumer
        Operator --> Ingress
    end

    subgraph Docker[Docker Compose - Host]
        Kafka[Kafka :9092]
        KafkaUI[Kafka UI :8090]
    end

    Consumer <--> Kafka
    Ingress <--> Kafka
Loading

Testing Workflow

  1. Wait for all resources to be ready (green in Tilt UI)
  2. Click pf-http-gateway to enable Ingress port-forward
  3. Click test-send-http to send CloudEvents via Ingress
  4. View events in Kafka UI: http://localhost:8090
  5. Click pf-consumer then test-consume to fetch events

Endpoints

Service URL
Kafka (host) localhost:29092
Kafka UI http://localhost:8090
Consumer HTTP http://localhost:8080 (via port-forward)
Ingress HTTP http://localhost:8081 (via port-forward)

Code Generation

Calanque uses two types of code generation:

Protocol Buffers (Connect-Go)

When you modify proto/calanque.proto:

buf generate

This generates files in the gen/ directory:

  • gen/calanque.pb.go - Protocol Buffers type definitions
  • gen/calanquev1connect/ - Connect-Go service handlers

Kubernetes CRD (DeepCopy)

When you modify CRD type definitions in api/v1alpha1/:

make generate

This generates api/v1alpha1/zz_generated.deepcopy.go.

Development Workflow

  1. Modify proto file → buf generate
  2. Modify CRD types → make generate
  3. Build → make build

Building

# Build operator
make build

# Build container images
make docker-build

# Deploy to cluster
make deploy

Roadmap

  • Kafka backend (franz-go)
  • gRPC/ConnectRPC Pull API
  • HTTP Pull API (CloudEvents batch)
  • Retry with exponential backoff
  • Dead Letter Queue
  • Broker / Trigger (fan-out & competing consumers)
  • CEL filter expressions
  • OpenTelemetry integration
  • SDK/client libraries (Python, Go, TypeScript, etc.)
  • AWS SQS backend
  • Google Pub/Sub backend
  • NATS JetStream backend

Backend Support Matrix

Different backends have different native capabilities:

Backend Fan-out Competing Consumers Notes
Kafka ✓ Native ✓ Native Use consumerGroup to control
SQS ✗ Not supported ✓ Native (default) For fan-out, use SNS → multiple SQS
Pub/Sub ✓ Native ✓ Native Subscription = consumer group
NATS JS ✓ Native ✓ Native Durable consumer name = group

Why SQS doesn't support fan-out in Calanque

SQS is designed as a point-to-point queue. Each message is delivered to one consumer only (competing consumers pattern).

For fan-out with SQS, AWS expects you to use SNS:

flowchart LR
    SNS[SNS Topic]
    SQS1[SQS: orders-analytics]
    SQS2[SQS: orders-shipping]
    SQS3[SQS: orders-inventory]
    PS1[Subscription A]
    PS2[Subscription B]
    PS3[Subscription C]

    SNS --> SQS1 --> PS1
    SNS --> SQS2 --> PS2
    SNS --> SQS3 --> PS3
Loading

This is the AWS-native way, so Calanque doesn't try to reinvent it. Configure fan-out in AWS, then create separate Subscriptions for each SQS queue.

Backend-specific CRD examples

Kafka (supports both patterns):

spec:
  backend:
    type: kafka
    kafka:
      bootstrapServers: ["kafka:9092"]
      topic: orders
  consumerGroup: my-group  # Controls fan-out vs competing

SQS (competing consumers only):

spec:
  sqs:
    queueUrl: https://sqs.ap-northeast-1.amazonaws.com/123456789/orders
    region: ap-northeast-1
    # No consumerGroup - SQS is always competing consumers

License

Apache 2.0

About

Calanque is a pull-based event consumption for Kubernetes

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published