CloudEvents-native Pull Eventing for Kubernetes
Calanque is a pull-based event consumption platform for Kubernetes, designed as a complement to push-based systems like Knative Eventing.
- Pull-based consumption - Consumers fetch events when ready
- CloudEvents native - Full CloudEvents 1.0 compliance
- Dual protocol - gRPC/ConnectRPC and HTTP (batch) APIs
- Retry with backoff - Configurable exponential backoff
- Dead Letter Queue - Failed messages go to DLQ after max retries
- Kubernetes native - CRD-based declarative configuration
- franz-go powered - High-performance Kafka client
flowchart LR
subgraph K8s[Kubernetes]
subgraph CRDs[CRDs]
SubCRD[Subscription]
IngressCRD[Ingress]
end
Operator[Calanque Operator]
subgraph Ingress[Ingress Deployment]
IngressAPI[HTTP API]
Producer[franz-go producer]
end
subgraph Consumer[Consumer Deployment]
ConsumerAPI[gRPC/ConnectRPC<br/>+ HTTP API]
ConsumerFranz[franz-go consumer]
end
subgraph Kafka[Kafka]
Topic[topic]
DLQ[DLQ]
end
ExtSystem[External System]
App[Your Application]
end
CRDs --> Operator
Operator --> Ingress
Operator --> Consumer
ExtSystem -->|POST CloudEvents| IngressAPI
IngressAPI --> Producer
Producer -->|publish| Topic
Topic -->|consume| ConsumerFranz
ConsumerFranz <-->|failed| DLQ
ConsumerAPI -->|pull when ready| App
kubectl apply -f config/crd/calanque.io_subscriptions.yaml
kubectl apply -f config/crd/calanque.io_brokers.yaml
kubectl apply -f config/crd/calanque.io_triggers.yamlCalanque supports two approaches:
Option A: Subscription (Simple)
- One resource = one consumer
- Manual consumer group management
Option B: Broker + Trigger (Recommended for multi-consumer)
- Broker defines Kafka connection once
- Triggers subscribe to broker
- Automatic consumer group management
- Easy fan-out and competing consumers
For simple use cases where you just need one consumer, use Subscription directly:
apiVersion: calanque.io/v1alpha1
kind: Subscription
metadata:
name: order-processor
spec:
backend:
type: kafka
kafka:
bootstrapServers:
- kafka:9092
topic: orders
consumerGroup: order-processor-group
pull:
batchSize: 10
batchTimeout: "5s"
maxInflight: 100
consumer:
http:
port: 8080
grpc:
port: 50051
retry:
maxAttempts: 3
backoff:
initial: "1s"
max: "30s"
multiplier: 2.0
deadLetterQueue:
kafka:
topic: orders-dlqflowchart LR
Topic[orders topic]
Analytics[analytics<br/>gets order #1]
Shipping[shipping<br/>gets order #1]
Inventory[inventory<br/>gets order #1]
Topic --> Analytics
Topic --> Shipping
Topic --> Inventory
apiVersion: calanque.io/v1alpha1
kind: Broker
metadata:
name: orders
spec:
backend:
type: kafka
kafka:
bootstrapServers: ["kafka:9092"]
topic: orders
---
# Each trigger gets ALL messages (different consumer groups)
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
name: analytics
spec:
broker: orders
# consumerGroup omitted → auto-generated → fan-out
subscriber:
http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
name: shipping
spec:
broker: orders
subscriber:
http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
name: inventory
spec:
broker: orders
subscriber:
http: {port: 8080}flowchart LR
Topic[tasks topic]
W1[worker-1<br/>gets task A]
W2[worker-2<br/>gets task B]
W3[worker-3<br/>gets task C]
Topic -.->|task A| W1
Topic -.->|task B| W2
Topic -.->|task C| W3
apiVersion: calanque.io/v1alpha1
kind: Broker
metadata:
name: tasks
spec:
backend:
type: kafka
kafka:
bootstrapServers: ["kafka:9092"]
topic: heavy-tasks
---
# All workers share the SAME consumerGroup → competing
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
name: worker-1
spec:
broker: tasks
consumerGroup: task-workers # ← Same group!
subscriber:
http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
name: worker-2
spec:
broker: tasks
consumerGroup: task-workers # ← Same group!
subscriber:
http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
name: worker-3
spec:
broker: tasks
consumerGroup: task-workers # ← Same group!
subscriber:
http: {port: 8080}# Analytics gets ALL orders (fan-out)
# Processors share orders (competing)
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
name: analytics
spec:
broker: orders
# No consumerGroup → unique → gets ALL
subscriber:
http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
name: processor-1
spec:
broker: orders
consumerGroup: order-processors # Shared
subscriber:
http: {port: 8080}
---
apiVersion: calanque.io/v1alpha1
kind: Trigger
metadata:
name: processor-2
spec:
broker: orders
consumerGroup: order-processors # Shared
subscriber:
http: {port: 8080}Ingress is the publishing side of Calanque. While Subscription/Trigger consume events FROM Kafka, Ingress publishes events TO Kafka.
flowchart LR
Ext[External System]
Ingress[Ingress<br/>HTTP POST]
Kafka[Kafka Topic]
Sub[Subscription<br/>/ Trigger]
Ext -->|CloudEvents| Ingress
Ingress -->|publish| Kafka
Kafka -->|consume| Sub
apiVersion: calanque.io/v1alpha1
kind: Ingress
metadata:
name: order-ingress
spec:
backend:
type: kafka
kafka:
bootstrapServers:
- kafka:9092
topic: orders
sasl:
mechanism: SCRAM-SHA-256
secretRef:
name: kafka-credentials
tls:
enabled: true
http:
port: 8080
path: /events# Single event (structured format)
curl -X POST "http://order-ingress-ingress:8080/events" \
-H "Content-Type: application/cloudevents+json" \
-d '{
"specversion": "1.0",
"type": "order.created",
"source": "/orders",
"id": "order-123",
"data": {"orderId": "123", "amount": 1000}
}'
# Single event (binary format)
curl -X POST "http://order-ingress-ingress:8080/events" \
-H "Content-Type: application/json" \
-H "ce-specversion: 1.0" \
-H "ce-type: order.created" \
-H "ce-source: /orders" \
-H "ce-id: order-123" \
-d '{"orderId": "123", "amount": 1000}'
# Batch events
curl -X POST "http://order-ingress-ingress:8080/events" \
-H "Content-Type: application/cloudevents-batch+json" \
-d '[
{"specversion":"1.0","type":"order.created","source":"/orders","id":"1","data":{}},
{"specversion":"1.0","type":"order.created","source":"/orders","id":"2","data":{}}
]'While Ingress publishes events TO Kafka, Subscription/Trigger consume events FROM Kafka. Your application pulls events when ready.
# Pull events
curl "http://order-processor-consumer:8080/events?subscription=order-processor&maxEvents=10"
# Response: application/cloudevents-batch+json
# [
# {"specversion":"1.0","type":"order.created","id":"123",...},
# ...
# ]
# Acknowledge
curl -X POST "http://order-processor-consumer:8080/ack?subscription=order-processor" \
-H "Content-Type: application/json" \
-d '{"messageIds": ["orders-0-100", "orders-0-101"]}'
# Negative acknowledge (retry)
curl -X POST "http://order-processor-consumer:8080/nack?subscription=order-processor" \
-H "Content-Type: application/json" \
-d '{"messageIds": ["orders-0-102"], "requeue": true}'
# Negative acknowledge (send to DLQ)
curl -X POST "http://order-processor-consumer:8080/nack?subscription=order-processor" \
-H "Content-Type: application/json" \
-d '{"messageIds": ["orders-0-103"], "requeue": false}'See examples/python/client.py for a full example client implementation.
| Endpoint | Method | Description |
|---|---|---|
/events |
GET | Pull batch of CloudEvents |
/ack |
POST | Acknowledge messages |
/nack |
POST | Negative acknowledge (retry/DLQ) |
/health |
GET | Health check |
Query Parameters for GET /events:
subscription(required): Subscription namemaxEvents(default: 10): Maximum events to returntimeout(default: 5s): Long-poll timeout
| Endpoint | Method | Description |
|---|---|---|
/events |
POST | Publish CloudEvent(s) |
/health |
GET | Health check |
/healthz |
GET | Kubernetes liveness probe |
/readyz |
GET | Kubernetes readiness probe |
Content-Types for POST /events:
application/cloudevents+json: Single CloudEvent (structured format)application/jsonwithce-*headers: Single CloudEvent (binary format)application/cloudevents-batch+json: Batch of CloudEvents
// Consumer services
service PullService {
rpc Pull(PullRequest) returns (PullResponse);
rpc StreamPull(stream StreamPullRequest) returns (stream StreamPullResponse);
rpc Ack(AckRequest) returns (AckResponse);
rpc Nack(NackRequest) returns (NackResponse);
}
service HealthService {
rpc Check(CheckRequest) returns (CheckResponse);
}
// Ingress service
service PublishService {
rpc Publish(PublishRequest) returns (PublishResponse);
rpc PublishBatch(PublishBatchRequest) returns (PublishBatchResponse);
}| Field | Type | Description |
|---|---|---|
backend.type |
string | Backend type: kafka, sqs, pubsub |
backend.kafka.bootstrapServers |
[]string | Kafka broker addresses |
backend.kafka.topic |
string | Source topic |
backend.kafka.sasl |
object | SASL authentication |
backend.kafka.tls |
object | TLS configuration |
consumerGroup |
string | Consumer group ID |
pull.batchSize |
int32 | Max events per pull (default: 10) |
pull.batchTimeout |
string | Pull timeout (default: 5s) |
pull.maxInflight |
int32 | Max unacked messages (default: 100) |
pull.initialOffset |
string | "earliest" or "latest" |
consumer.grpc.port |
int32 | gRPC/ConnectRPC port (default: 50051) |
consumer.http.port |
int32 | HTTP port (default: 8080) |
consumer.http.path |
string | Pull endpoint path (default: /events) |
retry.maxAttempts |
int32 | Max retry attempts (default: 3) |
retry.backoff.initial |
string | Initial backoff (default: 1s) |
retry.backoff.max |
string | Max backoff (default: 30s) |
retry.backoff.multiplier |
float64 | Backoff multiplier (default: 2.0) |
deadLetterQueue.kafka.topic |
string | DLQ topic name |
| Field | Type | Description |
|---|---|---|
backend.type |
string | Backend type: kafka, sqs, pubsub |
backend.kafka.bootstrapServers |
[]string | Kafka broker addresses |
backend.kafka.topic |
string | Target topic |
backend.kafka.sasl |
object | SASL authentication |
backend.kafka.tls |
object | TLS configuration |
http.port |
int32 | HTTP port (default: 8080) |
http.path |
string | CloudEvents endpoint path (default: /events) |
| Field | Type | Description |
|---|---|---|
backend.type |
string | Backend type: kafka, sqs, pubsub |
backend.kafka.bootstrapServers |
[]string | Kafka broker addresses |
backend.kafka.topic |
string | Topic name |
backend.kafka.sasl |
object | SASL authentication |
backend.kafka.tls |
object | TLS configuration |
| Field | Type | Description |
|---|---|---|
broker |
string | Name of the Broker to subscribe to |
consumerGroup |
string | Consumer group (optional, for competing consumers) |
filter.types |
[]string | Filter by event types |
filter.source |
string | Filter by event source (regex) |
filter.expression |
string | CEL expression for advanced filtering |
pull.batchSize |
int32 | Max events per pull (default: 10) |
pull.batchTimeout |
string | Pull timeout (default: 5s) |
subscriber.grpc.port |
int32 | gRPC/ConnectRPC port (default: 50051) |
subscriber.http.port |
int32 | HTTP port (default: 8080) |
retry.maxAttempts |
int32 | Max retry attempts (default: 3) |
deadLetterQueue.kafka.topic |
string | DLQ topic name |
| Feature | Knative Eventing (Push) | Calanque (Pull) |
|---|---|---|
| Delivery Model | Push to HTTP endpoint | Consumer pulls |
| Backpressure | Limited (webhook timeout) | Native (pull when ready) |
| Batch Processing | Not supported | Native batch pull |
| Retry | Built-in | Built-in with backoff |
| DLQ | Supported | Supported |
| Scale to Zero | Supported | Not applicable |
| Protocol | HTTP only | gRPC/ConnectRPC + HTTP |
Calanque provides a complete local development environment using a local Kubernetes cluster and Tilt.
- Local Kubernetes cluster (Docker Desktop, kind, minikube, Rancher Desktop, etc.)
- Tilt:
brew install tilt-dev/tap/tilt - uv:
brew install uv
Note: The default configuration uses Docker Desktop Kubernetes. For other environments, you may need to modify
Tiltfileanddocker-compose.yml.
# Start Kafka (KRaft mode, no Zookeeper)
docker-compose up -d
# Start Tilt
tilt upOpen http://localhost:10350 in your browser to see the Tilt UI.
flowchart TB
subgraph K8s[Local Kubernetes]
Operator[Operator]
Consumer[Consumer Pod<br/>:50051 gRPC/ConnectRPC, :8080 HTTP]
Ingress[Ingress Pod<br/>:8080 HTTP]
Operator --> Consumer
Operator --> Ingress
end
subgraph Docker[Docker Compose - Host]
Kafka[Kafka :9092]
KafkaUI[Kafka UI :8090]
end
Consumer <--> Kafka
Ingress <--> Kafka
- Wait for all resources to be ready (green in Tilt UI)
- Click
pf-http-gatewayto enable Ingress port-forward - Click
test-send-httpto send CloudEvents via Ingress - View events in Kafka UI: http://localhost:8090
- Click
pf-consumerthentest-consumeto fetch events
| Service | URL |
|---|---|
| Kafka (host) | localhost:29092 |
| Kafka UI | http://localhost:8090 |
| Consumer HTTP | http://localhost:8080 (via port-forward) |
| Ingress HTTP | http://localhost:8081 (via port-forward) |
Calanque uses two types of code generation:
Protocol Buffers (Connect-Go)
When you modify proto/calanque.proto:
buf generateThis generates files in the gen/ directory:
gen/calanque.pb.go- Protocol Buffers type definitionsgen/calanquev1connect/- Connect-Go service handlers
Kubernetes CRD (DeepCopy)
When you modify CRD type definitions in api/v1alpha1/:
make generateThis generates api/v1alpha1/zz_generated.deepcopy.go.
- Modify proto file →
buf generate - Modify CRD types →
make generate - Build →
make build
# Build operator
make build
# Build container images
make docker-build
# Deploy to cluster
make deploy- Kafka backend (franz-go)
- gRPC/ConnectRPC Pull API
- HTTP Pull API (CloudEvents batch)
- Retry with exponential backoff
- Dead Letter Queue
- Broker / Trigger (fan-out & competing consumers)
- CEL filter expressions
- OpenTelemetry integration
- SDK/client libraries (Python, Go, TypeScript, etc.)
- AWS SQS backend
- Google Pub/Sub backend
- NATS JetStream backend
Different backends have different native capabilities:
| Backend | Fan-out | Competing Consumers | Notes |
|---|---|---|---|
| Kafka | ✓ Native | ✓ Native | Use consumerGroup to control |
| SQS | ✗ Not supported | ✓ Native (default) | For fan-out, use SNS → multiple SQS |
| Pub/Sub | ✓ Native | ✓ Native | Subscription = consumer group |
| NATS JS | ✓ Native | ✓ Native | Durable consumer name = group |
SQS is designed as a point-to-point queue. Each message is delivered to one consumer only (competing consumers pattern).
For fan-out with SQS, AWS expects you to use SNS:
flowchart LR
SNS[SNS Topic]
SQS1[SQS: orders-analytics]
SQS2[SQS: orders-shipping]
SQS3[SQS: orders-inventory]
PS1[Subscription A]
PS2[Subscription B]
PS3[Subscription C]
SNS --> SQS1 --> PS1
SNS --> SQS2 --> PS2
SNS --> SQS3 --> PS3
This is the AWS-native way, so Calanque doesn't try to reinvent it. Configure fan-out in AWS, then create separate Subscriptions for each SQS queue.
Kafka (supports both patterns):
spec:
backend:
type: kafka
kafka:
bootstrapServers: ["kafka:9092"]
topic: orders
consumerGroup: my-group # Controls fan-out vs competingSQS (competing consumers only):
spec:
sqs:
queueUrl: https://sqs.ap-northeast-1.amazonaws.com/123456789/orders
region: ap-northeast-1
# No consumerGroup - SQS is always competing consumersApache 2.0