Kafka Consumer Lag Exporter — Know when your consumers fall behind, before it becomes a problem.
Inspired by kafka-lag-exporter (archived 2024). Built with Vert.x and Micrometer.
Scales to large clusters: Monitors thousands of consumer groups in ~50MB heap. Request batching with configurable delays prevents overwhelming brokers — fetch offsets for 500+ groups without spiking cluster CPU.
Consumer lag is the gap between what Kafka has produced and what your consumers have processed. Left unmonitored, growing lag leads to:
- Stale data in downstream systems
- Memory pressure as consumers struggle to catch up
- Silent failures when consumer groups die without alerts
Klag continuously monitors all consumer groups and exposes metrics to your observability stack.
| Feature | Why It Matters |
|---|---|
| Lag velocity | Know if lag is growing or shrinking — catch problems before they escalate |
| Time-based lag estimation | See lag in seconds/minutes, not just message counts |
| Hot partition detection | Find partitions with uneven load causing bottlenecks |
| Consumer group state tracking | Alert on Rebalancing, Dead, or Empty states |
| Request batching | Safely monitor large clusters without overwhelming brokers |
| Stale group cleanup | Automatically stops reporting deleted/inactive groups |
- Prometheus endpoint
- Datadog
- OTLP (OpenTelemetry) — works with Grafana Cloud, New Relic, etc.
- (planned) Prometheus Push Gateway, StatsD, Google Stackdriver
docker run -e KAFKA_BOOTSTRAP_SERVERS=kafka:9092 \
-e METRICS_REPORTER=prometheus \
-p 8888:8888 \
themoah/klagMetrics available at http://localhost:8888/metrics
| Metric | Description |
|---|---|
klag.consumer.lag |
Current lag per partition (also .sum, .max, .min aggregations) |
klag.consumer.lag.velocity |
Rate of change — positive means falling behind |
klag.consumer.lag.time_ms |
Estimated time (ms) to process current lag at current rate |
klag.consumer.lag.time_to_close_seconds |
Estimated seconds until lag reaches zero (only when catching up) |
klag.consumer.group.state |
Group health: Stable, Rebalancing, Dead, Empty |
klag.hot_partition |
Partitions with statistically abnormal throughput |
klag.hot_partition.lag |
Lag on hot partitions specifically |
klag.topic.partitions |
Partition count per topic |
klag.partition.log_end_offset |
Latest offset per partition |
klag.consumer.committed_offset |
Last committed offset per consumer |
All metrics tagged with consumer_group, topic, partition where applicable.
helm install klag ./charts/klag \
--set kafka.bootstrapServers="kafka-broker:9092"With SASL authentication
helm install klag ./charts/klag \
--set kafka.bootstrapServers="kafka:9092" \
--set kafka.securityProtocol="SASL_SSL" \
--set kafka.saslMechanism="PLAIN" \
--set kafka.saslJaasConfig="org.apache.kafka.common.security.plain.PlainLoginModule required username='user' password='pass';"See charts/klag/README.md for full configuration options.
docker run --env-file .env themoah/klagSample .env file
# Kafka connection
KAFKA_BOOTSTRAP_SERVERS=instance.gcp.confluent.cloud:9092
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=${SASL_USERNAME} password=${SASL_PASSWORD};"
# Metrics
METRICS_REPORTER=prometheus
METRICS_INTERVAL_MS=30000
METRICS_GROUP_FILTER=*
# Optional: JVM metrics
METRICS_JVM_ENABLED=trueConfigure via src/main/resources/application.properties or environment variables:
| Variable | Default | Description |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS |
localhost:9092 |
Kafka broker addresses |
KAFKA_REQUEST_TIMEOUT_MS |
30000 |
Request timeout |
KAFKA_CHUNK_COUNT |
1 |
Split offset requests into N batches |
KAFKA_CHUNK_DELAY_MS |
0 |
Delay (ms) between batches |
METRICS_REPORTER |
none |
prometheus, datadog, or otlp |
METRICS_INTERVAL_MS |
60000 |
How often to collect metrics |
METRICS_GROUP_FILTER |
* |
Glob pattern to filter consumer groups |
See CLAUDE.md for the complete configuration reference.
Klag requires read-only access to monitor consumer lag. It uses only the Kafka Admin Client API with DESCRIBE permissions—no write or alter access needed.
| Resource | Name | Permission | Operations |
|---|---|---|---|
| CLUSTER | kafka-cluster | DESCRIBE | Health check, list consumer groups |
| TOPIC | * or prefixed |
DESCRIBE | Get partition info and offsets |
| GROUP | * or prefixed |
DESCRIBE | Get group state and committed offsets |
Monitor all groups and topics
# Cluster permissions (required)
kafka-acls --bootstrap-server <broker> \
--add --allow-principal User:<klag-user> \
--operation Describe --cluster
# All topics
kafka-acls --bootstrap-server <broker> \
--add --allow-principal User:<klag-user> \
--operation Describe --topic '*'
# All consumer groups
kafka-acls --bootstrap-server <broker> \
--add --allow-principal User:<klag-user> \
--operation Describe --group '*'Monitor specific prefix only
# Cluster permissions (required for listConsumerGroups)
kafka-acls --bootstrap-server <broker> \
--add --allow-principal User:<klag-user> \
--operation Describe --cluster
# Topics with prefix
kafka-acls --bootstrap-server <broker> \
--add --allow-principal User:<klag-user> \
--operation Describe --topic 'myapp-' \
--resource-pattern-type prefixed
# Consumer groups with prefix
kafka-acls --bootstrap-server <broker> \
--add --allow-principal User:<klag-user> \
--operation Describe --group 'myapp-' \
--resource-pattern-type prefixedCreate a service account with the following ACLs using the Confluent CLI:
Monitor all groups and topics
# Set your cluster ID
CLUSTER_ID=<your-cluster-id>
SERVICE_ACCOUNT=<service-account-id>
# Cluster permissions
confluent kafka acl create --allow --service-account $SERVICE_ACCOUNT \
--operations describe --cluster-scope
# All topics
confluent kafka acl create --allow --service-account $SERVICE_ACCOUNT \
--operations describe --topic '*'
# All consumer groups
confluent kafka acl create --allow --service-account $SERVICE_ACCOUNT \
--operations describe --consumer-group '*'Monitor specific prefix only
CLUSTER_ID=<your-cluster-id>
SERVICE_ACCOUNT=<service-account-id>
# Cluster permissions (required for listConsumerGroups)
confluent kafka acl create --allow --service-account $SERVICE_ACCOUNT \
--operations describe --cluster-scope
# Topics with prefix
confluent kafka acl create --allow --service-account $SERVICE_ACCOUNT \
--operations describe --topic 'myapp-' --prefix
# Consumer groups with prefix
confluent kafka acl create --allow --service-account $SERVICE_ACCOUNT \
--operations describe --consumer-group 'myapp-' --prefixNote: The
METRICS_GROUP_FILTERenvironment variable provides application-level filtering, but cluster DESCRIBE permission is always required sincelistConsumerGroups()queries all groups before filtering.
Requires Java 21.
./gradlew clean test # Run tests
./gradlew clean assemble # Build fat JAR
./gradlew clean run # Run with hot-reload./scripts/test-helm-chart.sh# Full test: create cluster, install chart, validate, cleanup
./scripts/local-k8s-test.sh
# Auto-install dependencies (kind, helm, kubectl) via Homebrew
./scripts/local-k8s-test.sh --auto-install
# Keep cluster running after test
./scripts/local-k8s-test.sh --skip-cleanup
# Cleanup only
./scripts/local-k8s-test.sh --cleanupPrerequisites: Docker Desktop, kind, helm, kubectl (use --auto-install to install via Homebrew).
- Fork the repository
- Create a feature branch
- Run tests before submitting:
./gradlew test # Java tests ./scripts/test-helm-chart.sh # Helm chart tests
- Submit a pull request
