From 675dacf91c7a3bdd904b670d2a81a6a6d4ec6f7b Mon Sep 17 00:00:00 2001 From: Manpreet Singh Date: Tue, 31 Jul 2018 19:43:26 -0700 Subject: [PATCH] Prevent Fetcher from wrongfully discarding PartitionRecords in compacted topics --- kafka/consumer/fetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 6ec1b71ed..14f0c2a8a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -365,7 +365,7 @@ def _append(self, drained, part, max_records): elif fetch_offset == position: # we are ensured to have at least one record since we already checked for emptiness part_records = part.take(max_records) - next_offset = part_records[-1].offset + 1 + next_offset = part.fetch_offset log.log(0, "Returning fetched records at offset %d for assigned" " partition %s and update position to %s", position,