-
-
Notifications
You must be signed in to change notification settings - Fork 4
Description
Arroyo's DLQ design requires all in-flight messages to be held in memory in the "DLQ buffer" just in case they might be DLQed. This means the consumer's memory usage coincides with the number of uncommitted messages.
That number is hard to reason about. While theoretically constrained as we don't want to accept infinite consumer lag, in practice any strategy can delay committing by an arbitrary amount, although it "shouldn't" be doing that.
So because there is no easy-to-understand upper bound on the number of uncommitted messages, this makes memory usage of the DLQ hard to predict.
Right now we apply DLQ buffer size limits, trimming the buffer whenever it gets too full. This degrades the functionality and availability of the DLQ mechanism in arroyo.
We have two other options to constrain the DLQ memory growth:
- continue to use DLQ buffer size limits, but applying backpressure whenever the buffer is too full.
- redesign DLQ management so that a buffer is not necessary -- instead, original message payloads are fetched from kafka whenever needed
DLQ buffer applies backpressure
The problem I see with this is that the DLQ buffer size limit is now performance critical, and in order for it not to be the bottleneck it has to be tuned appropriately. It is another thing to think about when trying to burn backlogs.
We already find it hard to reason about how, why and when there are a lot of uncommitted offsets (hence, why we can't predict memory usage) -- therefore tuning the DLQ buffer limit for maximum throughput is likely to be difficult as well.
Fetching on-demand
The basic idea is to remove the DLQ buffer since all the original data already exists in Kafka. For this, each consumer connects to Kafka with another consumer group, which, on start:
- assigns itself the same partitions as the main consumer group (no automatic rebalancing, we piggyback on the partition assignment of the main consumer group and call
subscribewith a list of TopicPartition instead of topic) - pauses the consumer, until a DLQ request comes in, then seeks to the partition/offset contained in that DLQ request, reads out the message and produces it to the DLQ
This way, there is no DLQ buffer, but a lot of seek()s. Those might be slow.
- Classic DLQ usage assumes low throughput requirements -- seeking is not supposed to happen a lot
- In the case of the BLQ/stale messages usecase, stale messages are usually consecutive in offsets, meaning that no seeking is necessary in that case
- In the extreme case of every second message being DLQed (high throughput requirement, but no consecutive offsets) -- we could define a "seek threshold", basically a minimum gap between consecutive DLQ requests' offsets that is necessary before we call seek()