From ae6ee2ec2caf2105b5ac69b32b5219af787c7781 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 14 Mar 2025 11:03:05 +0800 Subject: [PATCH 01/18] 1 --- .../mledger/impl/AckSetPositionImpl.java | 5 +++ .../bookkeeper/mledger/impl/AckSetState.java | 7 ++++ .../mledger/impl/AckSetStateUtil.java | 15 +++++++ .../mledger/impl/ManagedCursorImpl.java | 17 +++++++- .../pulsar/broker/service/Consumer.java | 19 +++++---- .../pulsar/broker/service/PendingAcksMap.java | 8 +++- .../persistent/PersistentSubscription.java | 40 ++++++++++++++++++- 7 files changed, 98 insertions(+), 13 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java index 22a99eb3607eb..6bfd3ffe64e5a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java @@ -19,6 +19,8 @@ package org.apache.bookkeeper.mledger.impl; import java.util.Optional; +import lombok.Getter; +import lombok.Setter; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; @@ -31,6 +33,9 @@ public class AckSetPositionImpl implements Position, AckSetState { protected final long ledgerId; protected final long entryId; protected volatile long[] ackSet; + @Getter + @Setter + private int messagesCountAcked; public AckSetPositionImpl(long ledgerId, long entryId, long[] ackSet) { this.ledgerId = ledgerId; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java index 03ff50c1c7fe5..73c29a18868ef 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java @@ -26,6 +26,9 @@ * ackSet state and to extract the state. */ public interface AckSetState { + + int MESSAGES_COUNT_ACKED_THAT_REQUESTED = -1; + /** * Get the ackSet bitset information encoded as a long array. * @return the ackSet @@ -38,6 +41,10 @@ public interface AckSetState { */ void setAckSet(long[] ackSet); + void setMessagesCountAcked(int messagesCountAcked); + + int getMessagesCountAcked(); + /** * Check if the ackSet is set. * @return true if the ackSet is set, false otherwise diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java index 11ab520b68e92..e71fb61ae4707 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java @@ -56,6 +56,21 @@ public static long[] getAckSetArrayOrNull(Position position) { return maybeGetAckSetState(position).map(AckSetState::getAckSet).orElse(null); } + public static void setMessagesCountAcked(Position position, int messagesCountAcked) { + Optional ackSetState = maybeGetAckSetState(position); + if (ackSetState.isPresent()) { + ackSetState.get().setMessagesCountAcked(messagesCountAcked); + } + } + + public static int getMessagesCountAcked(Position position) { + Optional ackSetState = maybeGetAckSetState(position); + if (ackSetState.isPresent()) { + return ackSetState.get().getMessagesCountAcked(); + } + return 0; + } + /** * Get the AckSetState instance from the position. * @param position position which contains the AckSetState diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index e73699564a218..c2cd4a54ab1f0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2392,7 +2392,16 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position); if (ackSet == null) { if (batchDeletedIndexes != null) { - batchDeletedIndexes.remove(position); + BitSet bitSet = batchDeletedIndexes.remove(position); + if (bitSet != null) { + AckSetStateUtil.setMessagesCountAcked(position, bitSet.cardinality()); + } else { + AckSetStateUtil.setMessagesCountAcked(position, + AckSetState.MESSAGES_COUNT_ACKED_THAT_REQUESTED); + } + } else { + AckSetStateUtil.setMessagesCountAcked(position, + AckSetState.MESSAGES_COUNT_ACKED_THAT_REQUESTED); } // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will // make the RangeSet recognize the "continuity" between adjacent Positions. @@ -2416,7 +2425,13 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb final var givenBitSet = BitSet.valueOf(ackSet); final var bitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> givenBitSet); if (givenBitSet != bitSet) { + int messageUnAckedBefore = bitSet.cardinality(); bitSet.and(givenBitSet); + int messageUnAckedAfter = bitSet.cardinality(); + AckSetStateUtil.setMessagesCountAcked(position, messageUnAckedBefore - messageUnAckedAfter); + } else { + AckSetStateUtil.setMessagesCountAcked(position, + AckSetState.MESSAGES_COUNT_ACKED_THAT_REQUESTED); } if (bitSet.isEmpty()) { Position previousPosition = ledger.getPreviousPosition(position); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 61f9d5c86b32f..dd9f48757b353 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -534,7 +534,7 @@ public CompletableFuture messageAcked(CommandAck ack) { } position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), ackSets); } else { - position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); + position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null); // TODO 用 null 合理吗? } if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) { @@ -589,12 +589,13 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map individualAckNormal(CommandAck ack, Map individualAckWithTransaction(CommandAck ack) { if (AckSetStateUtil.hasAckSet(positionLongMutablePair.getLeft())) { if (((PersistentSubscription) subscription) .checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) { - removePendingAcks(ackOwnerConsumer, positionLongMutablePair.left); + //removePendingAcks(ackOwnerConsumer, positionLongMutablePair.left); } } })); @@ -762,7 +763,7 @@ private void checkAckValidationError(CommandAck ack, Position position) { private boolean checkCanRemovePendingAcksAndHandle(Consumer ackOwnedConsumer, Position position, MessageIdData msgId) { if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) { - return removePendingAcks(ackOwnedConsumer, position); + //return removePendingAcks(ackOwnedConsumer, position); } return false; } @@ -1126,6 +1127,7 @@ public void redeliverUnacknowledgedMessages(long consumerEpoch) { }); for (Position p : pendingPositions) { + // TODO 这里没有处理unack messages pendingAcks.remove(p.getLedgerId(), p.getEntryId()); } @@ -1148,6 +1150,7 @@ public void redeliverUnacknowledgedMessages(List messageIds) { IntIntPair pendingAck = pendingAcks.get(position.getLedgerId(), position.getEntryId()); if (pendingAck != null) { int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt()); + // TODO redeliver 和 ack 并发? pendingAcks.remove(position.getLedgerId(), position.getEntryId()); totalRedeliveryMessages += unAckedCount; pendingPositions.add(position); @@ -1183,7 +1186,7 @@ public Subscription getSubscription() { return subscription; } - private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { + public int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { int unackedMsgs = 0; if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) { subscription.addUnAckedMessages(ackedMessages); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 7a728a037dc62..c716db55e2d79 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -304,11 +304,15 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH * @return true if the pending ack was removed, false otherwise */ public boolean remove(long ledgerId, long entryId) { + return removeAndReturn(ledgerId, entryId) != null; + } + + public IntIntPair removeAndReturn(long ledgerId, long entryId) { try { writeLock.lock(); Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { - return false; + return null; } IntIntPair removedEntry = ledgerMap.remove(entryId); boolean removed = removedEntry != null; @@ -319,7 +323,7 @@ public boolean remove(long ledgerId, long entryId) { if (removed && ledgerMap.isEmpty()) { pendingAcks.remove(ledgerId); } - return removed; + return removedEntry; } finally { writeLock.unlock(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 97b4dc06d0837..cc35971f50c35 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import io.netty.buffer.ByteBuf; +import it.unimi.dsi.fastutil.ints.IntIntPair; import java.io.IOException; import java.util.Collections; import java.util.LinkedHashMap; @@ -51,7 +52,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.ScanOutcome; +import org.apache.bookkeeper.mledger.impl.AckSetState; +import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.ServiceConfiguration; @@ -93,6 +97,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.PositionInPendingAckStats; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -437,7 +442,7 @@ public void acknowledgeMessage(List positions, AckType ackType, Map { if ((cursor.isMessageDeleted(position))) { @@ -517,15 +522,46 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { private final DeleteCallback deleteCallback = new DeleteCallback() { @Override public void deleteComplete(Object context) { + List positions = (List) context; if (log.isDebugEnabled()) { // The value of the param "context" is a position. log.debug("[{}][{}] Deleted message at {}", topicName, subName, context); } + for (Position position : positions) { + for (Consumer consumer : getConsumers()) { + IntIntPair intIntPair = consumer.getPendingAcks() + .get(position.getLedgerId(), position.getEntryId()); + if (intIntPair != null) { + int messagesCountAcked = AckSetStateUtil.getMessagesCountAcked(position); + long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position); + if (messagesCountAcked == 0) { + break; + } + if (AckSetState.MESSAGES_COUNT_ACKED_THAT_REQUESTED != messagesCountAcked) { + log.info("===> 1 consumer: {}, unack: {}, ack: {}", consumer.consumerName(), consumer.getUnackedMessages(), messagesCountAcked); + consumer.addAndGetUnAckedMsgs(consumer, -messagesCountAcked); + } else { + if (ackSet != null) { + BitSetRecyclable bitSet = BitSetRecyclable.create().resetWords(ackSet); + log.info("===> 2 consumer: {}, unack: {}, ack: {}", consumer.consumerName(), consumer.getUnackedMessages(), bitSet.cardinality() - intIntPair.firstInt()); + consumer.addAndGetUnAckedMsgs(consumer, + bitSet.cardinality() - intIntPair.firstInt()); + } else { + log.info("===> 3 consumer: {}, unack: {}, ack: {}", consumer.consumerName(), consumer.getUnackedMessages(), intIntPair.firstInt()); + consumer.addAndGetUnAckedMsgs(consumer, -intIntPair.firstInt()); + } + } + // TODO removePendingAcks(ackOwnerConsumer, position); + break; + } + } + } // Signal the dispatchers to give chance to take extra actions if (dispatcher != null) { dispatcher.afterAckMessages(null, context); } - notifyTheMarkDeletePositionMoveForwardIfNeeded((Position) context); + // TODO fix the issue. + notifyTheMarkDeletePositionMoveForwardIfNeeded(PositionFactory.create(-1, -1)); } @Override From 37a998f9a000067fa7ff8fd721879b1fa2f7f060 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 14 Mar 2025 13:14:22 +0800 Subject: [PATCH 02/18] - --- .../mledger/impl/AckSetPositionImpl.java | 6 +++++ .../bookkeeper/mledger/impl/AckSetState.java | 4 +++ .../mledger/impl/AckSetStateUtil.java | 15 +++++++++++ .../mledger/impl/ManagedCursorImpl.java | 2 ++ .../persistent/PersistentSubscription.java | 26 ++++++++++++------- 5 files changed, 44 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java index 6bfd3ffe64e5a..1e81f5bebcdc4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java @@ -36,6 +36,8 @@ public class AckSetPositionImpl implements Position, AckSetState { @Getter @Setter private int messagesCountAcked; + @Getter + private boolean allMessagesAcked; public AckSetPositionImpl(long ledgerId, long entryId, long[] ackSet) { this.ledgerId = ledgerId; @@ -68,6 +70,10 @@ public Position getNext() { } } + public void markAllMessagesAcked() { + this.allMessagesAcked = true; + } + @Override public String toString() { return ledgerId + ":" + entryId + " (ackSet " + (ackSet == null ? "is null" : diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java index 73c29a18868ef..08731fb536990 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java @@ -45,6 +45,10 @@ public interface AckSetState { int getMessagesCountAcked(); + void markAllMessagesAcked(); + + boolean isAllMessagesAcked(); + /** * Check if the ackSet is set. * @return true if the ackSet is set, false otherwise diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java index e71fb61ae4707..9c4065d89e4c2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java @@ -71,6 +71,21 @@ public static int getMessagesCountAcked(Position position) { return 0; } + public static void markAllMessagesAcked(Position position) { + Optional ackSetState = maybeGetAckSetState(position); + if (ackSetState.isPresent()) { + ackSetState.get().markAllMessagesAcked(); + } + } + + public static boolean isAllMessagesAcked(Position position) { + Optional ackSetState = maybeGetAckSetState(position); + if (ackSetState.isPresent()) { + return ackSetState.get().isAllMessagesAcked(); + } + return true; + } + /** * Get the AckSetState instance from the position. * @param position position which contains the AckSetState diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index c2cd4a54ab1f0..7cb5b3be430cb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2391,6 +2391,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position); if (ackSet == null) { + AckSetStateUtil.markAllMessagesAcked(position); if (batchDeletedIndexes != null) { BitSet bitSet = batchDeletedIndexes.remove(position); if (bitSet != null) { @@ -2434,6 +2435,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb AckSetState.MESSAGES_COUNT_ACKED_THAT_REQUESTED); } if (bitSet.isEmpty()) { + AckSetStateUtil.markAllMessagesAcked(position); Position previousPosition = ledger.getPreviousPosition(position); individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index cc35971f50c35..97e7d7d8db702 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -529,30 +529,38 @@ public void deleteComplete(Object context) { } for (Position position : positions) { for (Consumer consumer : getConsumers()) { - IntIntPair intIntPair = consumer.getPendingAcks() - .get(position.getLedgerId(), position.getEntryId()); + long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position); + IntIntPair intIntPair = null; + if (ackSet != null) { + intIntPair = consumer.getPendingAcks() + .get(position.getLedgerId(), position.getEntryId()); + } else { + intIntPair = consumer.getPendingAcks() + .removeAndReturn(position.getLedgerId(), position.getEntryId()); + } if (intIntPair != null) { int messagesCountAcked = AckSetStateUtil.getMessagesCountAcked(position); - long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position); - if (messagesCountAcked == 0) { - break; - } +// if (messagesCountAcked == 0) { +// break; +// } if (AckSetState.MESSAGES_COUNT_ACKED_THAT_REQUESTED != messagesCountAcked) { - log.info("===> 1 consumer: {}, unack: {}, ack: {}", consumer.consumerName(), consumer.getUnackedMessages(), messagesCountAcked); + log.info("===> 1 consumer: {}, unack: {}, ack: {}, all-msg-acked: {}", consumer.consumerName(), consumer.getUnackedMessages(), messagesCountAcked, AckSetStateUtil.isAllMessagesAcked(position)); consumer.addAndGetUnAckedMsgs(consumer, -messagesCountAcked); } else { if (ackSet != null) { BitSetRecyclable bitSet = BitSetRecyclable.create().resetWords(ackSet); - log.info("===> 2 consumer: {}, unack: {}, ack: {}", consumer.consumerName(), consumer.getUnackedMessages(), bitSet.cardinality() - intIntPair.firstInt()); + log.info("===> 2 consumer: {}, unack: {}, ack: {}, all-msg-acked: {}", consumer.consumerName(), consumer.getUnackedMessages(), bitSet.cardinality() - intIntPair.firstInt(), AckSetStateUtil.isAllMessagesAcked(position)); consumer.addAndGetUnAckedMsgs(consumer, bitSet.cardinality() - intIntPair.firstInt()); } else { - log.info("===> 3 consumer: {}, unack: {}, ack: {}", consumer.consumerName(), consumer.getUnackedMessages(), intIntPair.firstInt()); + log.info("===> 3 consumer: {}, unack: {}, ack: {}, all-msg-acked: {}", consumer.consumerName(), consumer.getUnackedMessages(), intIntPair.firstInt(), AckSetStateUtil.isAllMessagesAcked(position)); consumer.addAndGetUnAckedMsgs(consumer, -intIntPair.firstInt()); } } // TODO removePendingAcks(ackOwnerConsumer, position); break; + } else { + log.info("===> skipped consumer: {}, unack: {}", consumer.consumerName(), consumer.getUnackedMessages()); } } } From e6fdd86cd50670d4638cd39480eed12bff4954e8 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 17 Mar 2025 10:27:07 +0800 Subject: [PATCH 03/18] - --- .../mledger/impl/AckSetPositionImpl.java | 8 +- .../bookkeeper/mledger/impl/AckSetState.java | 11 +- .../mledger/impl/AckSetStateUtil.java | 16 +- .../mledger/impl/ManagedCursorImpl.java | 29 ++- .../service/AbstractBaseDispatcher.java | 2 +- .../pulsar/broker/service/Consumer.java | 78 ++++---- .../pulsar/broker/service/Subscription.java | 9 +- .../NonPersistentSubscription.java | 3 +- .../persistent/PersistentSubscription.java | 152 +++++++++++---- .../PulsarCompactorSubscription.java | 4 +- .../ReplicatedSubscriptionsController.java | 4 +- .../pendingack/impl/PendingAckHandleImpl.java | 6 +- .../BatchMessageWithBatchIndexLevelTest.java | 178 ++++++++++++++++++ .../service/MessageCumulativeAckTest.java | 8 +- .../broker/service/PersistentTopicTest.java | 2 +- .../service/TransactionMarkerDeleteTest.java | 2 +- .../PersistentSubscriptionTest.java | 4 +- .../pulsar/compaction/CompactionTest.java | 2 +- 18 files changed, 380 insertions(+), 138 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java index 1e81f5bebcdc4..43500cfdb49cf 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java @@ -35,9 +35,9 @@ public class AckSetPositionImpl implements Position, AckSetState { protected volatile long[] ackSet; @Getter @Setter - private int messagesCountAcked; + private int batchMessagesAckedCount; @Getter - private boolean allMessagesAcked; + private boolean positionRemovedFromCursor; public AckSetPositionImpl(long ledgerId, long entryId, long[] ackSet) { this.ledgerId = ledgerId; @@ -70,8 +70,8 @@ public Position getNext() { } } - public void markAllMessagesAcked() { - this.allMessagesAcked = true; + public void markPositionRemovedFromCursor() { + this.positionRemovedFromCursor = true; } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java index 08731fb536990..f488de5b93711 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java @@ -27,7 +27,8 @@ */ public interface AckSetState { - int MESSAGES_COUNT_ACKED_THAT_REQUESTED = -1; + int BATCH_MESSAGE_ACKED_AT_ONCE = -1; + int BATCH_MESSAGE_ACKED_FIRST_PART = -2; /** * Get the ackSet bitset information encoded as a long array. @@ -41,13 +42,13 @@ public interface AckSetState { */ void setAckSet(long[] ackSet); - void setMessagesCountAcked(int messagesCountAcked); + void setBatchMessagesAckedCount(int messagesCountAcked); - int getMessagesCountAcked(); + int getBatchMessagesAckedCount(); - void markAllMessagesAcked(); + void markPositionRemovedFromCursor(); - boolean isAllMessagesAcked(); + boolean isPositionRemovedFromCursor(); /** * Check if the ackSet is set. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java index 9c4065d89e4c2..70ec40fbe4779 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java @@ -56,32 +56,32 @@ public static long[] getAckSetArrayOrNull(Position position) { return maybeGetAckSetState(position).map(AckSetState::getAckSet).orElse(null); } - public static void setMessagesCountAcked(Position position, int messagesCountAcked) { + public static void setBatchMessagesAckedCount(Position position, int messagesCountAcked) { Optional ackSetState = maybeGetAckSetState(position); if (ackSetState.isPresent()) { - ackSetState.get().setMessagesCountAcked(messagesCountAcked); + ackSetState.get().setBatchMessagesAckedCount(messagesCountAcked); } } - public static int getMessagesCountAcked(Position position) { + public static int getBatchMessagesAckedCount(Position position) { Optional ackSetState = maybeGetAckSetState(position); if (ackSetState.isPresent()) { - return ackSetState.get().getMessagesCountAcked(); + return ackSetState.get().getBatchMessagesAckedCount(); } return 0; } - public static void markAllMessagesAcked(Position position) { + public static void markPositionRemovedFromCursor(Position position) { Optional ackSetState = maybeGetAckSetState(position); if (ackSetState.isPresent()) { - ackSetState.get().markAllMessagesAcked(); + ackSetState.get().markPositionRemovedFromCursor(); } } - public static boolean isAllMessagesAcked(Position position) { + public static boolean isPositionRemovedFromCursor(Position position) { Optional ackSetState = maybeGetAckSetState(position); if (ackSetState.isPresent()) { - return ackSetState.get().isAllMessagesAcked(); + return ackSetState.get().isPositionRemovedFromCursor(); } return true; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 7cb5b3be430cb..ec41057b9b8d2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -22,6 +22,8 @@ import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize; +import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_AT_ONCE; +import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_FIRST_PART; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; @@ -2391,18 +2393,12 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position); if (ackSet == null) { - AckSetStateUtil.markAllMessagesAcked(position); - if (batchDeletedIndexes != null) { - BitSet bitSet = batchDeletedIndexes.remove(position); - if (bitSet != null) { - AckSetStateUtil.setMessagesCountAcked(position, bitSet.cardinality()); - } else { - AckSetStateUtil.setMessagesCountAcked(position, - AckSetState.MESSAGES_COUNT_ACKED_THAT_REQUESTED); - } + AckSetStateUtil.markPositionRemovedFromCursor(position); + BitSet bitSet; + if (batchDeletedIndexes == null || (bitSet = batchDeletedIndexes.remove(position)) == null) { + AckSetStateUtil.setBatchMessagesAckedCount(position, BATCH_MESSAGE_ACKED_AT_ONCE); } else { - AckSetStateUtil.setMessagesCountAcked(position, - AckSetState.MESSAGES_COUNT_ACKED_THAT_REQUESTED); + AckSetStateUtil.setBatchMessagesAckedCount(position, bitSet.cardinality()); } // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will // make the RangeSet recognize the "continuity" between adjacent Positions. @@ -2426,16 +2422,15 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb final var givenBitSet = BitSet.valueOf(ackSet); final var bitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> givenBitSet); if (givenBitSet != bitSet) { - int messageUnAckedBefore = bitSet.cardinality(); + int unAckedBefore = bitSet.cardinality(); bitSet.and(givenBitSet); - int messageUnAckedAfter = bitSet.cardinality(); - AckSetStateUtil.setMessagesCountAcked(position, messageUnAckedBefore - messageUnAckedAfter); + int unAckedAfter = bitSet.cardinality(); + AckSetStateUtil.setBatchMessagesAckedCount(position, unAckedBefore - unAckedAfter); } else { - AckSetStateUtil.setMessagesCountAcked(position, - AckSetState.MESSAGES_COUNT_ACKED_THAT_REQUESTED); + AckSetStateUtil.setBatchMessagesAckedCount(position, BATCH_MESSAGE_ACKED_FIRST_PART); } if (bitSet.isEmpty()) { - AckSetStateUtil.markAllMessagesAcked(position); + AckSetStateUtil.markPositionRemovedFromCursor(position); Position previousPosition = ledger.getPreviousPosition(position); individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index f074f234b873f..10b4e39b5f275 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -326,7 +326,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i private void individualAcknowledgeMessageIfNeeded(List positions, Map properties) { if (!(subscription instanceof PulsarCompactorSubscription)) { - subscription.acknowledgeMessage(positions, AckType.Individual, properties); + subscription.acknowledgeMessage(positions, AckType.Individual, properties, null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index dd9f48757b353..8d594730d1a3b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -544,7 +544,7 @@ public CompletableFuture messageAcked(CommandAck ack) { .thenApply(unused -> 1L); } else { List positionsAcked = Collections.singletonList(position); - subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties); + subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties, this); future = CompletableFuture.completedFuture(1L); } } else { @@ -557,75 +557,48 @@ public CompletableFuture messageAcked(CommandAck ack) { return future .thenApply(v -> { - this.messageAckRate.recordEvent(v); - this.messageAckCounter.add(v); + // The case that is typed individual ack without transaction will deal metrics after a callback + // that after cursor deleting positions, so we may receive a 0 value here. + if (v > 0) { + this.messageAckRate.recordEvent(v); + this.messageAckCounter.add(v); + } return null; }); } + public void ackMetricRecord(int messageCountInRequest) { + this.messageAckRate.recordEvent(messageCountInRequest); + this.messageAckCounter.add(messageCountInRequest); + } + //this method is for individual ack not carry the transaction private CompletableFuture individualAckNormal(CommandAck ack, Map properties) { - List> positionsAcked = new ArrayList<>(); - long totalAckCount = 0; + List positionsAcked = new ArrayList<>(); for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); Position position; - ObjectIntPair ackOwnerConsumerAndBatchSize = - getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), msgId.getEntryId()); - Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left(); - long ackedCount; - int batchSize = ackOwnerConsumerAndBatchSize.rightInt(); if (msgId.getAckSetsCount() > 0) { long[] ackSets = new long[msgId.getAckSetsCount()]; for (int j = 0; j < msgId.getAckSetsCount(); j++) { ackSets[j] = msgId.getAckSetAt(j); } position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), ackSets); - ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets, ackOwnerConsumer); if (isTransactionEnabled()) { //sync the batch position bit set point, in order to delete the position in pending acks if (Subscription.isIndividualAckMode(subType)) { - ((PersistentSubscription) subscription) - .syncBatchPositionBitSetForPendingAck(position); + ((PersistentSubscription) subscription).syncBatchPositionBitSetForPendingAck(position); } } - // TODO 并发 ack 同一个消息会出现 unack 是负值的情况 -// addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); } else { position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null); - ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer); - if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId)) { -// addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); - updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); - } } - - positionsAcked.add(Pair.of(ackOwnerConsumer, position)); + positionsAcked.add(position); checkAckValidationError(ack, position); - - totalAckCount += ackedCount; - } - subscription.acknowledgeMessage(positionsAcked.stream() - .map(Pair::getRight) - .collect(Collectors.toList()), AckType.Individual, properties); - CompletableFuture completableFuture = new CompletableFuture<>(); - completableFuture.complete(totalAckCount); - if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) { - completableFuture.whenComplete((v, e) -> positionsAcked.forEach(positionPair -> { - Consumer ackOwnerConsumer = positionPair.getLeft(); - Position position = positionPair.getRight(); - //check if the position can remove from the consumer pending acks. - // the bit set is empty in pending ack handle. - if (AckSetStateUtil.hasAckSet(position)) { - if (((PersistentSubscription) subscription) - .checkIsCanDeleteConsumerPendingAck(position)) { - //removePendingAcks(ackOwnerConsumer, position); - } - } - })); } - return completableFuture; + subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties, this); + return CompletableFuture.completedFuture(0L); } @@ -693,7 +666,7 @@ private CompletableFuture individualAckWithTransaction(CommandAck ack) { if (AckSetStateUtil.hasAckSet(positionLongMutablePair.getLeft())) { if (((PersistentSubscription) subscription) .checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) { - //removePendingAcks(ackOwnerConsumer, positionLongMutablePair.left); + removePendingAcks(ackOwnerConsumer, positionLongMutablePair.left); } } })); @@ -763,7 +736,7 @@ private void checkAckValidationError(CommandAck ack, Position position) { private boolean checkCanRemovePendingAcksAndHandle(Consumer ackOwnedConsumer, Position position, MessageIdData msgId) { if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) { - //return removePendingAcks(ackOwnedConsumer, position); + return removePendingAcks(ackOwnedConsumer, position); } return false; } @@ -1191,6 +1164,19 @@ public int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) { subscription.addUnAckedMessages(ackedMessages); unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); + if (log.isDebugEnabled()) { + if (ackedMessages > 0) { + log.debug("[{}][{}]{}-{}-{} delivered out {} messages, un-ack-msg: {}", + topicName, consumer.subscription.getName(), + consumer.cnx(), consumer.consumerId(), consumer.consumerName(), + ackedMessages, consumer.getUnackedMessages()); + } else { + log.debug("[{}][{}]{}-{}-{} acknowledged/redelivered {} messages, un-ack-msg: {}", + topicName, consumer.subscription.getName(), + consumer.cnx(), consumer.consumerId(), consumer.consumerName(), + -ackedMessages, consumer.getUnackedMessages()); + } + } } if (unackedMsgs < 0 && System.currentTimeMillis() - negativeUnackedMsgsTimestamp >= 10_000) { negativeUnackedMsgsTimestamp = System.currentTimeMillis(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 452c30b45febb..4b0707d76ed1c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import javax.annotation.Nullable; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.intercept.BrokerInterceptor; @@ -48,7 +49,13 @@ default void removeConsumer(Consumer consumer) throws BrokerServiceException { void consumerFlow(Consumer consumer, int additionalNumberOfMessages); - void acknowledgeMessage(List positions, AckType ackType, Map properties); + /** + * @param ackFrom It can be null, and it will always be null if {@param ackType} is {@link AckType#Cumulative}. + * The performance will be improved, if this param is the owner consumer that received the messages + * who are being acked when {@param ackType} is {@link AckType#Individual}. + */ + void acknowledgeMessage(List positions, AckType ackType, Map properties, + @Nullable Consumer ackFrom); String getTopicName(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index d469ce1daa1a8..1ec38c8c3e8a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -197,7 +197,8 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { } @Override - public void acknowledgeMessage(List position, AckType ackType, Map properties) { + public void acknowledgeMessage(List position, AckType ackType, Map properties, + Consumer ackFrom) { // No-op } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 97e7d7d8db702..f57dbf222d5d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_AT_ONCE; +import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_FIRST_PART; import static org.apache.pulsar.broker.service.AbstractBaseDispatcher.checkAndApplyReachedEndOfTopicOrTopicMigration; import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic; import com.google.common.annotations.VisibleForTesting; @@ -38,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; +import javax.annotation.Nullable; import lombok.Getter; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; @@ -52,11 +55,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.ScanOutcome; -import org.apache.bookkeeper.mledger.impl.AckSetState; import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; @@ -420,7 +422,8 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { } @Override - public void acknowledgeMessage(List positions, AckType ackType, Map properties) { + public void acknowledgeMessage(List positions, AckType ackType, Map properties, + @Nullable Consumer ackFrom) { cursor.updateLastActive(); Position previousMarkDeletePosition = cursor.getMarkDeletedPosition(); @@ -442,7 +445,8 @@ public void acknowledgeMessage(List positions, AckType ackType, Map { if ((cursor.isMessageDeleted(position))) { @@ -522,54 +526,122 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { private final DeleteCallback deleteCallback = new DeleteCallback() { @Override public void deleteComplete(Object context) { - List positions = (List) context; + ImmutableTriple, Position> ctx = + (ImmutableTriple, Position>) context; + Consumer ackFrom = ctx.getLeft(); + List positions = ctx.getMiddle(); + Position previousMarkDeletePosition = ctx.getRight(); if (log.isDebugEnabled()) { // The value of the param "context" is a position. log.debug("[{}][{}] Deleted message at {}", topicName, subName, context); } + + // Calculate whether TXN check was enabled. + final boolean transactionCheckNeeded = Subscription.isIndividualAckMode(getType()) + && getTopic().getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled(); + + int attemptAckMsgs = 0; for (Position position : positions) { - for (Consumer consumer : getConsumers()) { - long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position); - IntIntPair intIntPair = null; - if (ackSet != null) { - intIntPair = consumer.getPendingAcks() - .get(position.getLedgerId(), position.getEntryId()); - } else { - intIntPair = consumer.getPendingAcks() - .removeAndReturn(position.getLedgerId(), position.getEntryId()); - } - if (intIntPair != null) { - int messagesCountAcked = AckSetStateUtil.getMessagesCountAcked(position); -// if (messagesCountAcked == 0) { -// break; -// } - if (AckSetState.MESSAGES_COUNT_ACKED_THAT_REQUESTED != messagesCountAcked) { - log.info("===> 1 consumer: {}, unack: {}, ack: {}, all-msg-acked: {}", consumer.consumerName(), consumer.getUnackedMessages(), messagesCountAcked, AckSetStateUtil.isAllMessagesAcked(position)); - consumer.addAndGetUnAckedMsgs(consumer, -messagesCountAcked); - } else { - if (ackSet != null) { - BitSetRecyclable bitSet = BitSetRecyclable.create().resetWords(ackSet); - log.info("===> 2 consumer: {}, unack: {}, ack: {}, all-msg-acked: {}", consumer.consumerName(), consumer.getUnackedMessages(), bitSet.cardinality() - intIntPair.firstInt(), AckSetStateUtil.isAllMessagesAcked(position)); - consumer.addAndGetUnAckedMsgs(consumer, - bitSet.cardinality() - intIntPair.firstInt()); - } else { - log.info("===> 3 consumer: {}, unack: {}, ack: {}, all-msg-acked: {}", consumer.consumerName(), consumer.getUnackedMessages(), intIntPair.firstInt(), AckSetStateUtil.isAllMessagesAcked(position)); - consumer.addAndGetUnAckedMsgs(consumer, -intIntPair.firstInt()); - } + final long ledgerId = position.getLedgerId(); + final long entryId = position.getEntryId(); + final int batchMessagesAckedCount = AckSetStateUtil.getBatchMessagesAckedCount(position); + final boolean positionRemovedFromCursor = AckSetStateUtil.isPositionRemovedFromCursor(position); + if (batchMessagesAckedCount == 0) { + // All messages were skipped. + // Since we can not get how many msgs that were attempted to ack, just plus 1 into the + // "attemptAckMsgs". + attemptAckMsgs++; + log.warn("[{}][{}]{}-{}-{} is acknowledging {}:{}, which has been acked before", + topicName, subName, + ackFrom == null ? "null" : ackFrom.cnx(), + ackFrom == null ? "null" : ackFrom.consumerId(), + ackFrom == null ? "null" : ackFrom.consumerName(), + ledgerId, entryId); + continue; + } + // Find the messages' owner and update un-acknowledged messages. + Consumer owner = null; + IntIntPair batchSizeAndHashPair = ackFrom == null ? null + : AckSetStateUtil.isPositionRemovedFromCursor(position) + ? ackFrom.getPendingAcks().removeAndReturn(ledgerId, entryId) + : ackFrom.getPendingAcks().get(ledgerId, entryId); + if (batchSizeAndHashPair != null) { + owner = ackFrom; + } else { + for (Consumer consumer : getConsumers()) { + batchSizeAndHashPair = AckSetStateUtil.isPositionRemovedFromCursor(position) + ? consumer.getPendingAcks().removeAndReturn(ledgerId, entryId) + : consumer.getPendingAcks().get(ledgerId, entryId); + if (batchSizeAndHashPair != null) { + // Continue find the owner + owner = consumer; + break; } - // TODO removePendingAcks(ackOwnerConsumer, position); - break; - } else { - log.info("===> skipped consumer: {}, unack: {}", consumer.consumerName(), consumer.getUnackedMessages()); } } + if (owner == null) { + // Since we can not get how many msgs that were attempted to ack, just plus 1 into the + // "attemptAckMsgs". + attemptAckMsgs++; + log.warn("[{}][{}]{}-{}-{} skipped to reduce un-ack-msgs for {}:{}, because could not find the" + + " message's owner", + topicName, subName, + ackFrom == null ? "null" : ackFrom.cnx(), + ackFrom == null ? "null" : ackFrom.consumerId(), + ackFrom == null ? "null" : ackFrom.consumerName(), + ledgerId, entryId); + continue; + } + // Calculate messages actually acked in batch. + int actualAcked = 0; + if (batchMessagesAckedCount == BATCH_MESSAGE_ACKED_AT_ONCE) { + // All messages in batch were acked at once. + actualAcked = Math.max(batchSizeAndHashPair.firstInt(), 1); + } else if (batchMessagesAckedCount == BATCH_MESSAGE_ACKED_FIRST_PART) { + // First part of batch message acked. + // Regarding this case, only consumer knows how many messages in batch were acked, because + // the cursor do not know how many messages in the batch, only "consumer.pendingAcks" knows. + long[] ackSetWords = AckSetStateUtil.getAckSetArrayOrNull(position); + if (ackSetWords != null) { + BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(ackSetWords); + actualAcked = Math.max(batchSizeAndHashPair.firstInt() - ackSet.cardinality(), 0); + } + } else { + // Following part of batch message acked. + // Regarding this case, only cursor know how many messages in batch were acked, because + // "consumer.pendingAcks" does not know how many messages were acked count before, only "cursor" + // knows. + actualAcked = batchMessagesAckedCount; + } + attemptAckMsgs += actualAcked; + // Reduce un-acknowledged messages. + owner.addAndGetUnAckedMsgs(owner, -actualAcked); + if (log.isDebugEnabled()) { + log.debug("[{}][{}] {}-{}-{} {}-{}-{} acknowledged {} messages, un-ack-msg: {}, position: {}:{}" + + " batch messages acked: {}, position was deleted: {}", + topicName, subName, owner.cnx(), owner.consumerId(), owner.consumerName(), + ackFrom == null ? "null" : ackFrom.cnx(), + ackFrom == null ? "null" : ackFrom.consumerId(), + ackFrom == null ? "null" : ackFrom.consumerName(), + actualAcked, owner.getUnackedMessages(), ledgerId, entryId, + batchMessagesAckedCount, positionRemovedFromCursor); + } + // Remove pending acknowledge record if needed. + if (positionRemovedFromCursor && transactionCheckNeeded + && checkIsCanDeleteConsumerPendingAck(position)) { + // TODO the method "checkIsCanDeleteConsumerPendingAck" should be called before calling + // "cursor.delete(pos)". We have not changed the behavior this PR, but it should be + // corrected in the future. + owner.getPendingAcks().remove(ledgerId, entryId); + } } - // Signal the dispatchers to give chance to take extra actions + // Consumer metrics. + ackFrom.ackMetricRecord(attemptAckMsgs); + // Signal the dispatcher. if (dispatcher != null) { dispatcher.afterAckMessages(null, context); } - // TODO fix the issue. - notifyTheMarkDeletePositionMoveForwardIfNeeded(PositionFactory.create(-1, -1)); + notifyTheMarkDeletePositionMoveForwardIfNeeded(previousMarkDeletePosition); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java index fe13aeb572e2e..76374c4d1eb59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java @@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicContext; @@ -60,7 +61,8 @@ public PulsarCompactorSubscription(PersistentTopic topic, CompactedTopic compact } @Override - public void acknowledgeMessage(List positions, AckType ackType, Map properties) { + public void acknowledgeMessage(List positions, AckType ackType, Map properties, + Consumer ackFrom) { checkArgument(ackType == AckType.Cumulative); checkArgument(positions.size() == 1); checkArgument(properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index b21fe7acfdb6f..ac30ffdbbccca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -204,7 +204,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName()); if (sub != null) { - sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap()); + sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap(), null); } else { // Subscription doesn't exist. We need to force the creation of the subscription in this cluster. log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subscription", @@ -213,7 +213,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { true /* replicateSubscriptionState */, Collections.emptyMap()) .thenAccept(subscriptionCreated -> { subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos), - AckType.Cumulative, Collections.emptyMap()); + AckType.Cumulative, Collections.emptyMap(), null); }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 591842927f35b..7430cf85c6e42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -465,7 +465,7 @@ private void internalCommitTxn(TxnID txnID, Map properties, long l } persistentSubscription.acknowledgeMessage( Collections.singletonList(cumulativeAckOfTransaction.getValue()), - AckType.Cumulative, properties); + AckType.Cumulative, properties, null); cumulativeAckOfTransaction = null; commitFuture.complete(null); }).exceptionally(e -> { @@ -755,7 +755,7 @@ protected void handleCommit(TxnID txnID, AckType ackType, Map prop if (this.cumulativeAckOfTransaction != null) { persistentSubscription.acknowledgeMessage( Collections.singletonList(this.cumulativeAckOfTransaction.getValue()), - AckType.Cumulative, properties); + AckType.Cumulative, properties, null); } this.cumulativeAckOfTransaction = null; } else { @@ -774,7 +774,7 @@ private void individualAckCommitCommon(TxnID txnID, Map properties) { if (currentTxn != null) { persistentSubscription.acknowledgeMessage(new ArrayList<>(currentTxn.values()), - AckType.Individual, properties); + AckType.Individual, properties, null); individualAckOfTransaction.remove(txnID); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index a9dac5a29add6..3eb8a1b433c57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.SneakyThrows; @@ -49,11 +50,15 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.awaitility.Awaitility; @@ -722,4 +727,177 @@ private org.apache.pulsar.broker.service.Consumer getTheUniqueServiceConsumer(St (AbstractPersistentDispatcherMultipleConsumers) persistentTopic.getSubscription(sub).getDispatcher(); return dispatcher.getConsumers().iterator().next(); } + + @DataProvider + public Object[][] argsOfTestAcknowledgeConcurrently() { + // enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2 + return new Object[][] { + {true, true, true}, + {true, false, false}, + {true, true, false}, + {true, false, true}, + {false, true, true}, + {false, false, false}, + {false, true, false}, + {false, false, true}, + }; + } + + /** + * Test: one message may be acknowledged by two consumers at the same time. + * Verify: the metric "unackedMessages" should be "0" after acknowledged all messages. + * 1. Consumer-1 received messages. + * 2. Unload the topic. + * 3. The message may be sent to consumer-2, but the consumption of consumer-1 is still in-progress now. + * 4. Consumer-1 and consumer-2 acknowledge the message concurrently. + * TODO: + * - 没有解决 transaction case 的问题。 + * - 没有解决 ack receipt 接收回调过早地问题。 + * - 增加 test: broker 关闭 batch ack。 + */ + @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently", invocationCount = 20) + public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, + boolean enableBatchIndexAcknowledgment2) + throws Exception { + PulsarClientImpl client1 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + PulsarClientImpl client2 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + final int msgCount = 6; + final CountDownLatch acknowledgeSignal1 = new CountDownLatch(1); + final CountDownLatch acknowledgeSignal2 = new CountDownLatch(1); + final CountDownLatch acknowledgeSignal3 = new CountDownLatch(1); + final CountDownLatch acknowledgeFinishedSignal = new CountDownLatch(3); + final String topic = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp"); + final String subscription1 = "s1"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subscription1, MessageId.earliest); + Producer producer = client1.newProducer(Schema.STRING).topic(topic) + .enableBatching(enableBatchSend).batchingMaxMessages(3).create(); + for (int i = 0; i < msgCount; i++) { + producer.sendAsync(i + ""); + } + + // Consumer-1 using user threads to consume asynchronously, it will not acknowledge messages one by one. + ConsumerImpl consumer1 = (ConsumerImpl) client1.newConsumer(Schema.STRING) + .topic(topic).subscriptionName(subscription1) + .enableBatchIndexAcknowledgment(enableBatchIndexAcknowledgment1) + .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS).consumerName("c1") + .subscriptionType(SubscriptionType.Shared).subscribe(); + Awaitility.await().untilAsserted(() -> { + assertEquals(consumer1.numMessagesInQueue(), msgCount); + }); + consumer1.pause(); + List msgReceivedC11 = new ArrayList<>(); + List msgReceivedC12 = new ArrayList<>(); + List msgReceivedC13 = new ArrayList<>(); + for (int i = 0; i < msgCount; i++) { + Message msg = consumer1.receive(); + assertNotNull(msg); + if (i % 3 == 0) { + msgReceivedC11.add(msg.getMessageId()); + } else if (i % 2 == 0) { + msgReceivedC12.add(msg.getMessageId()); + } else { + msgReceivedC13.add(msg.getMessageId()); + } + } + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer1.acknowledge(msgReceivedC11); + acknowledgeSignal2.await(); + consumer1.acknowledge(msgReceivedC12); + acknowledgeSignal3.await(); + consumer1.acknowledge(msgReceivedC13); + consumer1.resume(); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-1 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + + // After a topic unloading, the messages will be resent to consumer-2. + // Consumer-2 using user threads to consume asynchronously, it will not acknowledge messages one by one. + ConsumerImpl consumer2 = (ConsumerImpl) client2.newConsumer(Schema.STRING) + .topic(topic).subscriptionName(subscription1) + .enableBatchIndexAcknowledgment(enableBatchIndexAcknowledgment2) + .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS).consumerName("c2") + .subscriptionType(SubscriptionType.Shared).subscribe(); + admin.topics().unload(topic); + Awaitility.await().untilAsserted(() -> { + assertEquals(consumer2.numMessagesInQueue(), msgCount); + }); + List msgReceivedC21 = new ArrayList<>(); + List msgReceivedC22 = new ArrayList<>(); + List msgReceivedC23 = new ArrayList<>(); + for (int i = 0; i < msgCount; i++) { + Message msg = consumer2.receive(); + assertNotNull(msg); + if (i % 3 == 0) { + msgReceivedC21.add(msg.getMessageId()); + } else if (i % 2 == 0) { + msgReceivedC22.add(msg.getMessageId()); + } else { + msgReceivedC23.add(msg.getMessageId()); + } + } + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer2.acknowledge(msgReceivedC21); + acknowledgeSignal2.await(); + consumer2.acknowledge(msgReceivedC22); + acknowledgeSignal3.await(); + consumer2.acknowledge(msgReceivedC23); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-2 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + // Start another thread to mock a consumption repeatedly. + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer2.acknowledge(msgReceivedC21); + acknowledgeSignal2.await(); + consumer2.acknowledge(msgReceivedC22); + acknowledgeSignal3.await(); + consumer2.acknowledge(msgReceivedC23); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-2 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + + // Trigger concurrently acknowledge. + acknowledgeSignal1.countDown(); + Thread.sleep(1000); + acknowledgeSignal2.countDown(); + Thread.sleep(1000); + acknowledgeSignal3.countDown(); + + // Verify: the metric "unackedMessages" should be "0" after acknowledged all messages. + acknowledgeFinishedSignal.await(); + Awaitility.await().untilAsserted(() -> { + SubscriptionStats stats = admin.topics().getStats(topic).getSubscriptions().get(subscription1); + log.info("backlog: {}, unack: {}", stats.getMsgBacklog(), stats.getUnackedMessages()); + assertEquals(stats.getMsgBacklog(), 0); + assertEquals(stats.getUnackedMessages(), 0); + for (ConsumerStats consumerStats : stats.getConsumers()) { + assertEquals(consumerStats.getUnackedMessages(), 0); + } + }); + + // cleanup. + consumer1.close(); + consumer2.close(); + producer.close(); + client1.close(); + client2.close(); + admin.topics().delete(topic, false); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index 31a9b7f95d676..9ad0814bca89b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -84,7 +84,7 @@ public void setup() throws Exception { doReturn(Codec.encode("sub-1")).when(cursor).getName(); sub = spy(new PersistentSubscription(persistentTopic, "sub-1", cursor, false)); - doNothing().when(sub).acknowledgeMessage(any(), any(), any()); + doNothing().when(sub).acknowledgeMessage(any(), any(), any(), any()); } @AfterMethod(alwaysRun = true) @@ -124,7 +124,7 @@ public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throw commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); consumer.messageAcked(commandAck).get(); - verify(sub, never()).acknowledgeMessage(any(), any(), any()); + verify(sub, never()).acknowledgeMessage(any(), any(), any(), any()); } @Test(timeOut = 5000, dataProvider = "notIndividualAckModes") @@ -139,7 +139,7 @@ public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) th commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); consumer.messageAcked(commandAck).get(); - verify(sub, times(1)).acknowledgeMessage(any(), any(), any()); + verify(sub, times(1)).acknowledgeMessage(any(), any(), any(), any()); } @Test(timeOut = 5000) @@ -155,6 +155,6 @@ public void testAckWithMoreThanNoneMessageIds() throws Exception { commandAck.addMessageId().setEntryId(0L).setLedgerId(2L); consumer.messageAcked(commandAck).get(); - verify(sub, never()).acknowledgeMessage(any(), any(), any()); + verify(sub, never()).acknowledgeMessage(any(), any(), any(), any()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index a22e090451e25..0aebf318589cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1775,7 +1775,7 @@ public void testCompactorSubscription() { Position position = PositionFactory.create(1, 1); long ledgerId = 0xc0bfefeL; sub.acknowledgeMessage(Collections.singletonList(position), AckType.Cumulative, - Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId)); + Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId), null); verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java index b945f5abcbc4d..2c76f0b22a986 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java @@ -86,7 +86,7 @@ public void testMarkerDeleteTimes() throws Exception { spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", cursor, false); Position position = managedLedger.addEntry("test".getBytes()); persistentSubscription.acknowledgeMessage(Collections.singletonList(position), - AckType.Individual, Collections.emptyMap()); + AckType.Individual, Collections.emptyMap(), null); verify(managedLedger, times(0)).asyncReadEntry(any(), any(), any()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 360be2e435ab1..3d0482f9e3712 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -191,7 +191,7 @@ public void testCanAcknowledgeAndAbortForTransaction() throws Exception { positionList.add(PositionFactory.create(3, 5)); // Acknowledge from normal consumer will succeed ignoring message acked by ongoing transaction. - persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap()); + persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap(), null); //Abort txn. persistentSubscription.endTxn(txnID1.getMostSigBits(), txnID2.getLeastSigBits(), TxnAction.ABORT_VALUE, -1); @@ -223,7 +223,7 @@ public void testAcknowledgeUpdateCursorLastActive() throws Exception { positionList.add(PositionFactory.create(1, 1)); long beforeAcknowledgeTimestamp = System.currentTimeMillis(); Thread.sleep(1); - persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap()); + persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap(), null); // `acknowledgeMessage` should update cursor last active assertTrue(persistentSubscription.cursor.getLastActive() > beforeAcknowledgeTimestamp); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index f43e2a1c672c7..3cfb7677df97e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -2136,7 +2136,7 @@ public void testDeleteCompactedLedgerWithSlowAck() throws Exception { Thread.sleep(500); } }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq( - CommandAck.AckType.Cumulative), Mockito.any()); + CommandAck.AckType.Cumulative), Mockito.any(), null); // trigger compaction admin.topics().triggerCompaction(topicName); From 5c7dbba88524292e8ab9f3e8e90d98bdb5f34958 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 17 Mar 2025 10:29:39 +0800 Subject: [PATCH 04/18] - --- .../main/java/org/apache/pulsar/broker/service/Consumer.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 8d594730d1a3b..3e0232d690179 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -534,7 +534,7 @@ public CompletableFuture messageAcked(CommandAck ack) { } position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), ackSets); } else { - position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null); // TODO 用 null 合理吗? + position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); } if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) { @@ -1100,7 +1100,6 @@ public void redeliverUnacknowledgedMessages(long consumerEpoch) { }); for (Position p : pendingPositions) { - // TODO 这里没有处理unack messages pendingAcks.remove(p.getLedgerId(), p.getEntryId()); } @@ -1123,7 +1122,6 @@ public void redeliverUnacknowledgedMessages(List messageIds) { IntIntPair pendingAck = pendingAcks.get(position.getLedgerId(), position.getEntryId()); if (pendingAck != null) { int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt()); - // TODO redeliver 和 ack 并发? pendingAcks.remove(position.getLedgerId(), position.getEntryId()); totalRedeliveryMessages += unAckedCount; pendingPositions.add(position); From 43f20b526f58e27e6d928614a5e651161a4b79b8 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 19 Mar 2025 22:18:19 +0800 Subject: [PATCH 05/18] [fix] [broker] Fix negative subscription/consumer's unack-messages --- .../pulsar/broker/service/Consumer.java | 14 +- ...PersistentDispatcherMultipleConsumers.java | 8 +- .../persistent/PersistentSubscription.java | 227 ++++++++------- .../BatchMessageWithBatchIndexLevelTest.java | 178 ------------ ...ckAndDisableBrokerBatchAckClassicTest.java | 51 ++++ ...nsumerAckAndDisableBrokerBatchAckTest.java | 50 ++++ .../service/ConsumerAckClassicTest.java | 71 +++++ .../broker/service/ConsumerAckTest.java | 260 ++++++++++++++++++ 8 files changed, 565 insertions(+), 294 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 3e0232d690179..a20be36834d0b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -559,17 +559,17 @@ public CompletableFuture messageAcked(CommandAck ack) { .thenApply(v -> { // The case that is typed individual ack without transaction will deal metrics after a callback // that after cursor deleting positions, so we may receive a 0 value here. - if (v > 0) { - this.messageAckRate.recordEvent(v); - this.messageAckCounter.add(v); - } + + ackMetricRecord(v); return null; }); } - public void ackMetricRecord(int messageCountInRequest) { - this.messageAckRate.recordEvent(messageCountInRequest); - this.messageAckCounter.add(messageCountInRequest); + public void ackMetricRecord(long messageCountInRequest) { + if (messageCountInRequest > 0) { + this.messageAckRate.recordEvent(messageCountInRequest); + this.messageAckCounter.add(messageCountInRequest); + } } //this method is for individual ack not carry the transaction diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index c35d802f43d54..dd5e577642a8b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -362,10 +362,6 @@ public synchronized void readMoreEntries() { Position markDeletePosition = cursor.getMarkDeletedPosition(); if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) { redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); - for (Consumer consumer : consumerList) { - consumer.getPendingAcks() - .removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); - } lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition; } @@ -779,9 +775,7 @@ protected final synchronized boolean sendMessagesToConsumers(ReadType readType, * if you need to change it. */ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { - if (needTrimAckedMessages()) { - cursor.trimDeletedEntries(entries); - } + cursor.trimDeletedEntries(entries); lastNumberOfEntriesProcessed = 0; int entriesToDispatch = entries.size(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index f57dbf222d5d4..746cb715e3df4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -533,110 +533,12 @@ public void deleteComplete(Object context) { Position previousMarkDeletePosition = ctx.getRight(); if (log.isDebugEnabled()) { // The value of the param "context" is a position. - log.debug("[{}][{}] Deleted message at {}", topicName, subName, context); + log.debug("[{}][{}] Deleted message at {}", topicName, subName, previousMarkDeletePosition); } - - // Calculate whether TXN check was enabled. - final boolean transactionCheckNeeded = Subscription.isIndividualAckMode(getType()) - && getTopic().getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled(); - - int attemptAckMsgs = 0; - for (Position position : positions) { - final long ledgerId = position.getLedgerId(); - final long entryId = position.getEntryId(); - final int batchMessagesAckedCount = AckSetStateUtil.getBatchMessagesAckedCount(position); - final boolean positionRemovedFromCursor = AckSetStateUtil.isPositionRemovedFromCursor(position); - if (batchMessagesAckedCount == 0) { - // All messages were skipped. - // Since we can not get how many msgs that were attempted to ack, just plus 1 into the - // "attemptAckMsgs". - attemptAckMsgs++; - log.warn("[{}][{}]{}-{}-{} is acknowledging {}:{}, which has been acked before", - topicName, subName, - ackFrom == null ? "null" : ackFrom.cnx(), - ackFrom == null ? "null" : ackFrom.consumerId(), - ackFrom == null ? "null" : ackFrom.consumerName(), - ledgerId, entryId); - continue; - } - // Find the messages' owner and update un-acknowledged messages. - Consumer owner = null; - IntIntPair batchSizeAndHashPair = ackFrom == null ? null - : AckSetStateUtil.isPositionRemovedFromCursor(position) - ? ackFrom.getPendingAcks().removeAndReturn(ledgerId, entryId) - : ackFrom.getPendingAcks().get(ledgerId, entryId); - if (batchSizeAndHashPair != null) { - owner = ackFrom; - } else { - for (Consumer consumer : getConsumers()) { - batchSizeAndHashPair = AckSetStateUtil.isPositionRemovedFromCursor(position) - ? consumer.getPendingAcks().removeAndReturn(ledgerId, entryId) - : consumer.getPendingAcks().get(ledgerId, entryId); - if (batchSizeAndHashPair != null) { - // Continue find the owner - owner = consumer; - break; - } - } - } - if (owner == null) { - // Since we can not get how many msgs that were attempted to ack, just plus 1 into the - // "attemptAckMsgs". - attemptAckMsgs++; - log.warn("[{}][{}]{}-{}-{} skipped to reduce un-ack-msgs for {}:{}, because could not find the" - + " message's owner", - topicName, subName, - ackFrom == null ? "null" : ackFrom.cnx(), - ackFrom == null ? "null" : ackFrom.consumerId(), - ackFrom == null ? "null" : ackFrom.consumerName(), - ledgerId, entryId); - continue; - } - // Calculate messages actually acked in batch. - int actualAcked = 0; - if (batchMessagesAckedCount == BATCH_MESSAGE_ACKED_AT_ONCE) { - // All messages in batch were acked at once. - actualAcked = Math.max(batchSizeAndHashPair.firstInt(), 1); - } else if (batchMessagesAckedCount == BATCH_MESSAGE_ACKED_FIRST_PART) { - // First part of batch message acked. - // Regarding this case, only consumer knows how many messages in batch were acked, because - // the cursor do not know how many messages in the batch, only "consumer.pendingAcks" knows. - long[] ackSetWords = AckSetStateUtil.getAckSetArrayOrNull(position); - if (ackSetWords != null) { - BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(ackSetWords); - actualAcked = Math.max(batchSizeAndHashPair.firstInt() - ackSet.cardinality(), 0); - } - } else { - // Following part of batch message acked. - // Regarding this case, only cursor know how many messages in batch were acked, because - // "consumer.pendingAcks" does not know how many messages were acked count before, only "cursor" - // knows. - actualAcked = batchMessagesAckedCount; - } - attemptAckMsgs += actualAcked; - // Reduce un-acknowledged messages. - owner.addAndGetUnAckedMsgs(owner, -actualAcked); - if (log.isDebugEnabled()) { - log.debug("[{}][{}] {}-{}-{} {}-{}-{} acknowledged {} messages, un-ack-msg: {}, position: {}:{}" - + " batch messages acked: {}, position was deleted: {}", - topicName, subName, owner.cnx(), owner.consumerId(), owner.consumerName(), - ackFrom == null ? "null" : ackFrom.cnx(), - ackFrom == null ? "null" : ackFrom.consumerId(), - ackFrom == null ? "null" : ackFrom.consumerName(), - actualAcked, owner.getUnackedMessages(), ledgerId, entryId, - batchMessagesAckedCount, positionRemovedFromCursor); - } - // Remove pending acknowledge record if needed. - if (positionRemovedFromCursor && transactionCheckNeeded - && checkIsCanDeleteConsumerPendingAck(position)) { - // TODO the method "checkIsCanDeleteConsumerPendingAck" should be called before calling - // "cursor.delete(pos)". We have not changed the behavior this PR, but it should be - // corrected in the future. - owner.getPendingAcks().remove(ledgerId, entryId); - } + // Update pendingAcks, un-ack-messages, consumer.metrics. + if (Subscription.isIndividualAckMode(getType())) { + PersistentSubscription.this.updatePendingAckMessagesAfterAcknowledged(ackFrom, positions); } - // Consumer metrics. - ackFrom.ackMetricRecord(attemptAckMsgs); // Signal the dispatcher. if (dispatcher != null) { dispatcher.afterAckMessages(null, context); @@ -654,6 +556,127 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { } }; + private void updatePendingAckMessagesAfterAcknowledged(Consumer ackFrom, List positions) { + Dispatcher dispatcher0 = getDispatcher(); + if (dispatcher0 != null) { + /* + * There is a race condition which leads us to add this "synchronized" block. + * - consumer-1 received msg-A + * - consumption task is in-progress + * - topic was unloaded + * - reset messages to consumer-2 + * At this moment, race-condition occurs: + * - consumer-1 is acknowledging msg-A + * - dispatcher is delivering msg-A to consumer-2 + * Issue: broker received the acknowledging of msg-A, but no consumer has pending acknowledge that relate + * to msg-A so broker can not know how many single messages in the batched message. + * Solve: to get a precise messages number, this "synchronized" block is needed. + */ + synchronized (dispatcher0) { + updatePendingAckMessagesAfterAcknowledged0(ackFrom, positions); + } + } else { + updatePendingAckMessagesAfterAcknowledged0(ackFrom, positions); + } + } + + private void updatePendingAckMessagesAfterAcknowledged0(Consumer ackFrom, List positions) { + int attemptAckMsgs = 0; + for (Position position : positions) { + final long ledgerId = position.getLedgerId(); + final long entryId = position.getEntryId(); + final int batchMessagesAckedCount = AckSetStateUtil.getBatchMessagesAckedCount(position); + final boolean positionRemovedFromCursor = AckSetStateUtil.isPositionRemovedFromCursor(position); + if (batchMessagesAckedCount == 0) { + // All messages were skipped. + // Since we can not get how many msgs that were attempted to ack, just plus 1 into the + // "attemptAckMsgs". + attemptAckMsgs++; + log.warn("[{}][{}]{}-{}-{} is acknowledging {}:{}, which has been acked before. consumer_size: {}", + topicName, subName, + ackFrom == null ? "null" : ackFrom.cnx(), + ackFrom == null ? "null" : ackFrom.consumerId(), + ackFrom == null ? "null" : ackFrom.consumerName(), + ledgerId, entryId, getConsumers().size()); + continue; + } + // Find the messages' owner and update un-acknowledged messages. + Consumer owner = null; + IntIntPair batchSizeAndHashPair = ackFrom == null ? null + : positionRemovedFromCursor + ? ackFrom.getPendingAcks().removeAndReturn(ledgerId, entryId) + : ackFrom.getPendingAcks().get(ledgerId, entryId); + if (batchSizeAndHashPair != null) { + owner = ackFrom; + } else { + for (Consumer consumer : getConsumers()) { + if (consumer == ackFrom) { + continue; + } + batchSizeAndHashPair = positionRemovedFromCursor + ? consumer.getPendingAcks().removeAndReturn(ledgerId, entryId) + : consumer.getPendingAcks().get(ledgerId, entryId); + if (batchSizeAndHashPair != null) { + // Continue find the owner + owner = consumer; + break; + } + } + } + if (owner == null) { + // Since we can not get how many msgs that were attempted to ack, just plus 1 into the + // "attemptAckMsgs". + attemptAckMsgs++; + log.warn("[{}][{}]{}-{}-{} skipped to reduce un-ack-msgs for {}:{}, because could not find the" + + " message's owner. consumer size: {}", + topicName, subName, + ackFrom == null ? "null" : ackFrom.cnx(), + ackFrom == null ? "null" : ackFrom.consumerId(), + ackFrom == null ? "null" : ackFrom.consumerName(), + ledgerId, entryId, getConsumers().size()); + continue; + } + // Calculate messages actually acked in batch. + int actualAcked = 0; + if (batchMessagesAckedCount == BATCH_MESSAGE_ACKED_AT_ONCE) { + // All messages in batch were acked at once. + actualAcked = Math.max(batchSizeAndHashPair.firstInt(), 1); + } else if (batchMessagesAckedCount == BATCH_MESSAGE_ACKED_FIRST_PART) { + // First part of batch message acked. + // Regarding this case, only consumer knows how many messages in batch were acked, because + // the cursor do not know how many messages in the batch, only "consumer.pendingAcks" knows. + long[] ackSetWords = AckSetStateUtil.getAckSetArrayOrNull(position); + if (ackSetWords != null) { + BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(ackSetWords); + actualAcked = Math.max(batchSizeAndHashPair.firstInt() - ackSet.cardinality(), 0); + ackSet.recycle(); + } + } else { + // Following part of batch message acked. + // Regarding this case, only cursor know how many messages in batch were acked, because + // "consumer.pendingAcks" does not know how many messages were acked count before, only "cursor" + // knows. + actualAcked = batchMessagesAckedCount; + } + attemptAckMsgs += actualAcked; + // Reduce un-acknowledged messages. + owner.addAndGetUnAckedMsgs(owner, -actualAcked); + owner.updateBlockedConsumerOnUnackedMsgs(owner); + if (log.isInfoEnabled()) { + log.info("[{}][{}] {}-{}-{} {}-{}-{} acknowledged {} messages, un-ack-msg: {}, position: {}:{}" + + " batch messages acked: {}, position {}:{} was deleted: {}", + topicName, subName, owner.cnx(), owner.consumerId(), owner.consumerName(), + ackFrom == null ? "null" : ackFrom.cnx(), + ackFrom == null ? "null" : ackFrom.consumerId(), + ackFrom == null ? "null" : ackFrom.consumerName(), + actualAcked, owner.getUnackedMessages(), ledgerId, entryId, + batchMessagesAckedCount, ledgerId, entryId, positionRemovedFromCursor); + } + } + // Consumer metrics. + ackFrom.ackMetricRecord(attemptAckMsgs); + } + private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position oldPosition) { Position oldMD = oldPosition; Position newMD = cursor.getMarkDeletedPosition(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index 3eb8a1b433c57..a9dac5a29add6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -33,7 +33,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.SneakyThrows; @@ -50,15 +49,11 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.common.policies.data.ConsumerStats; -import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.awaitility.Awaitility; @@ -727,177 +722,4 @@ private org.apache.pulsar.broker.service.Consumer getTheUniqueServiceConsumer(St (AbstractPersistentDispatcherMultipleConsumers) persistentTopic.getSubscription(sub).getDispatcher(); return dispatcher.getConsumers().iterator().next(); } - - @DataProvider - public Object[][] argsOfTestAcknowledgeConcurrently() { - // enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2 - return new Object[][] { - {true, true, true}, - {true, false, false}, - {true, true, false}, - {true, false, true}, - {false, true, true}, - {false, false, false}, - {false, true, false}, - {false, false, true}, - }; - } - - /** - * Test: one message may be acknowledged by two consumers at the same time. - * Verify: the metric "unackedMessages" should be "0" after acknowledged all messages. - * 1. Consumer-1 received messages. - * 2. Unload the topic. - * 3. The message may be sent to consumer-2, but the consumption of consumer-1 is still in-progress now. - * 4. Consumer-1 and consumer-2 acknowledge the message concurrently. - * TODO: - * - 没有解决 transaction case 的问题。 - * - 没有解决 ack receipt 接收回调过早地问题。 - * - 增加 test: broker 关闭 batch ack。 - */ - @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently", invocationCount = 20) - public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, - boolean enableBatchIndexAcknowledgment2) - throws Exception { - PulsarClientImpl client1 = - (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); - PulsarClientImpl client2 = - (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); - final int msgCount = 6; - final CountDownLatch acknowledgeSignal1 = new CountDownLatch(1); - final CountDownLatch acknowledgeSignal2 = new CountDownLatch(1); - final CountDownLatch acknowledgeSignal3 = new CountDownLatch(1); - final CountDownLatch acknowledgeFinishedSignal = new CountDownLatch(3); - final String topic = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp"); - final String subscription1 = "s1"; - admin.topics().createNonPartitionedTopic(topic); - admin.topics().createSubscription(topic, subscription1, MessageId.earliest); - Producer producer = client1.newProducer(Schema.STRING).topic(topic) - .enableBatching(enableBatchSend).batchingMaxMessages(3).create(); - for (int i = 0; i < msgCount; i++) { - producer.sendAsync(i + ""); - } - - // Consumer-1 using user threads to consume asynchronously, it will not acknowledge messages one by one. - ConsumerImpl consumer1 = (ConsumerImpl) client1.newConsumer(Schema.STRING) - .topic(topic).subscriptionName(subscription1) - .enableBatchIndexAcknowledgment(enableBatchIndexAcknowledgment1) - .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS).consumerName("c1") - .subscriptionType(SubscriptionType.Shared).subscribe(); - Awaitility.await().untilAsserted(() -> { - assertEquals(consumer1.numMessagesInQueue(), msgCount); - }); - consumer1.pause(); - List msgReceivedC11 = new ArrayList<>(); - List msgReceivedC12 = new ArrayList<>(); - List msgReceivedC13 = new ArrayList<>(); - for (int i = 0; i < msgCount; i++) { - Message msg = consumer1.receive(); - assertNotNull(msg); - if (i % 3 == 0) { - msgReceivedC11.add(msg.getMessageId()); - } else if (i % 2 == 0) { - msgReceivedC12.add(msg.getMessageId()); - } else { - msgReceivedC13.add(msg.getMessageId()); - } - } - new Thread(() -> { - try { - acknowledgeSignal1.await(); - consumer1.acknowledge(msgReceivedC11); - acknowledgeSignal2.await(); - consumer1.acknowledge(msgReceivedC12); - acknowledgeSignal3.await(); - consumer1.acknowledge(msgReceivedC13); - consumer1.resume(); - acknowledgeFinishedSignal.countDown(); - } catch (Exception e) { - log.error("consumer-1 acknowledge failure", e); - throw new RuntimeException(e); - } - }).start(); - - // After a topic unloading, the messages will be resent to consumer-2. - // Consumer-2 using user threads to consume asynchronously, it will not acknowledge messages one by one. - ConsumerImpl consumer2 = (ConsumerImpl) client2.newConsumer(Schema.STRING) - .topic(topic).subscriptionName(subscription1) - .enableBatchIndexAcknowledgment(enableBatchIndexAcknowledgment2) - .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS).consumerName("c2") - .subscriptionType(SubscriptionType.Shared).subscribe(); - admin.topics().unload(topic); - Awaitility.await().untilAsserted(() -> { - assertEquals(consumer2.numMessagesInQueue(), msgCount); - }); - List msgReceivedC21 = new ArrayList<>(); - List msgReceivedC22 = new ArrayList<>(); - List msgReceivedC23 = new ArrayList<>(); - for (int i = 0; i < msgCount; i++) { - Message msg = consumer2.receive(); - assertNotNull(msg); - if (i % 3 == 0) { - msgReceivedC21.add(msg.getMessageId()); - } else if (i % 2 == 0) { - msgReceivedC22.add(msg.getMessageId()); - } else { - msgReceivedC23.add(msg.getMessageId()); - } - } - new Thread(() -> { - try { - acknowledgeSignal1.await(); - consumer2.acknowledge(msgReceivedC21); - acknowledgeSignal2.await(); - consumer2.acknowledge(msgReceivedC22); - acknowledgeSignal3.await(); - consumer2.acknowledge(msgReceivedC23); - acknowledgeFinishedSignal.countDown(); - } catch (Exception e) { - log.error("consumer-2 acknowledge failure", e); - throw new RuntimeException(e); - } - }).start(); - // Start another thread to mock a consumption repeatedly. - new Thread(() -> { - try { - acknowledgeSignal1.await(); - consumer2.acknowledge(msgReceivedC21); - acknowledgeSignal2.await(); - consumer2.acknowledge(msgReceivedC22); - acknowledgeSignal3.await(); - consumer2.acknowledge(msgReceivedC23); - acknowledgeFinishedSignal.countDown(); - } catch (Exception e) { - log.error("consumer-2 acknowledge failure", e); - throw new RuntimeException(e); - } - }).start(); - - // Trigger concurrently acknowledge. - acknowledgeSignal1.countDown(); - Thread.sleep(1000); - acknowledgeSignal2.countDown(); - Thread.sleep(1000); - acknowledgeSignal3.countDown(); - - // Verify: the metric "unackedMessages" should be "0" after acknowledged all messages. - acknowledgeFinishedSignal.await(); - Awaitility.await().untilAsserted(() -> { - SubscriptionStats stats = admin.topics().getStats(topic).getSubscriptions().get(subscription1); - log.info("backlog: {}, unack: {}", stats.getMsgBacklog(), stats.getUnackedMessages()); - assertEquals(stats.getMsgBacklog(), 0); - assertEquals(stats.getUnackedMessages(), 0); - for (ConsumerStats consumerStats : stats.getConsumers()) { - assertEquals(consumerStats.getUnackedMessages(), 0); - } - }); - - // cleanup. - consumer1.close(); - consumer2.close(); - producer.close(); - client1.close(); - client2.close(); - admin.topics().delete(topic, false); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java new file mode 100644 index 0000000000000..108a022f165a1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import lombok.extern.slf4j.Slf4j; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ConsumerAckAndDisableBrokerBatchAckClassicTest extends ConsumerAckTest { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(false); + conf.setSubscriptionSharedUseClassicPersistentImplementation(true); + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") + public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, + boolean enableBatchIndexAcknowledgment2) + throws Exception { + super.testAcknowledgeConcurrently(enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java new file mode 100644 index 0000000000000..f6fdaa750a9be --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import lombok.extern.slf4j.Slf4j; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ConsumerAckAndDisableBrokerBatchAckTest extends ConsumerAckTest { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(false); + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") + public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, + boolean enableBatchIndexAcknowledgment2) + throws Exception { + super.testAcknowledgeConcurrently(enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java new file mode 100644 index 0000000000000..b5a7a761e3e2e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ConsumerAckClassicTest extends ConsumerAckTest { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); + conf.setSubscriptionSharedUseClassicPersistentImplementation(true); + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") + public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, + boolean enableBatchIndexAcknowledgment2) + throws Exception { + super.testAcknowledgeConcurrently(enableBatchSend, enableBatchIndexAcknowledgment1, + enableBatchIndexAcknowledgment2); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckTest.java new file mode 100644 index 0000000000000..a14139d08d35d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import com.carrotsearch.hppc.ObjectSet; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ConsumerAckTest extends BrokerTestBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider + public Object[][] argsOfTestAcknowledgeConcurrently() { + // enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2 + return new Object[][] { + {true, true, true}, + {true, false, false}, + {true, true, false}, + {true, false, true}, + {false, true, true}, + {false, false, false}, + {false, true, false}, + {false, false, true}, + }; + } + + /** + * Test: one message may be acknowledged by two consumers at the same time. + * Verify: the metric "unackedMessages" should be "0" after acknowledged all messages. + * 1. Consumer-1 received messages. + * 2. Unload the topic. + * 3. The message may be sent to consumer-2, but the consumption of consumer-1 is still in-progress now. + * 4. Consumer-1 and consumer-2 acknowledge the message concurrently. + */ + @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") + public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, + boolean enableBatchIndexAcknowledgment2) + throws Exception { + PulsarClientImpl client1 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + PulsarClientImpl client2 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + final int msgCount = 6; + final CountDownLatch acknowledgeSignal1 = new CountDownLatch(1); + final CountDownLatch acknowledgeSignal2 = new CountDownLatch(1); + final CountDownLatch acknowledgeSignal3 = new CountDownLatch(1); + final CountDownLatch acknowledgeFinishedSignal = new CountDownLatch(3); + final String topic = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp"); + final String subscription1 = "s1"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subscription1, MessageId.earliest); + Producer producer = client1.newProducer(Schema.STRING).topic(topic) + .enableBatching(enableBatchSend).batchingMaxMessages(4).create(); + for (int i = 0; i < msgCount; i++) { + producer.sendAsync(i + ""); + } + + // Consumer-1 using user threads to consume asynchronously, it will not acknowledge messages one by one. + ConsumerImpl consumer1 = (ConsumerImpl) client1.newConsumer(Schema.STRING) + .topic(topic).subscriptionName(subscription1) + .enableBatchIndexAcknowledgment(enableBatchIndexAcknowledgment1) + .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS).consumerName("c1") + .subscriptionType(SubscriptionType.Shared).subscribe(); + Awaitility.await().untilAsserted(() -> { + assertEquals(consumer1.numMessagesInQueue(), msgCount); + }); + consumer1.pause(); + List msgReceivedC11 = new ArrayList<>(); + List msgReceivedC12 = new ArrayList<>(); + List msgReceivedC13 = new ArrayList<>(); + for (int i = 0; i < msgCount; i++) { + Message msg = consumer1.receive(); + assertNotNull(msg); + if (i % 4 == 0) { + msgReceivedC11.add(msg.getMessageId()); + } else if (i % 3 == 0) { + msgReceivedC12.add(msg.getMessageId()); + } else if (i % 2 == 0) { + msgReceivedC13.add(msg.getMessageId()); + } else { + msgReceivedC13.add(msg.getMessageId()); + } + } + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer1.acknowledge(msgReceivedC11); + acknowledgeSignal2.await(); + consumer1.acknowledge(msgReceivedC12); + acknowledgeSignal3.await(); + consumer1.acknowledge(msgReceivedC13); + consumer1.resume(); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-1 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + + // After a topic unloading, the messages will be resent to consumer-2. + // Consumer-2 using user threads to consume asynchronously, it will not acknowledge messages one by one. + ConsumerImpl consumer2 = (ConsumerImpl) client2.newConsumer(Schema.STRING) + .topic(topic).subscriptionName(subscription1) + .enableBatchIndexAcknowledgment(enableBatchIndexAcknowledgment2) + .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS).consumerName("c2") + .subscriptionType(SubscriptionType.Shared).subscribe(); + admin.topics().unload(topic); + Awaitility.await().untilAsserted(() -> { + assertEquals(consumer2.numMessagesInQueue(), msgCount); + }); + List msgReceivedC21 = new ArrayList<>(); + List msgReceivedC22 = new ArrayList<>(); + List msgReceivedC23 = new ArrayList<>(); + for (int i = 0; i < msgCount; i++) { + Message msg = consumer2.receive(); + assertNotNull(msg); + if (i % 4 == 0) { + msgReceivedC21.add(msg.getMessageId()); + } else if (i % 3 == 0) { + msgReceivedC22.add(msg.getMessageId()); + } else if (i % 2 == 0) { + msgReceivedC23.add(msg.getMessageId()); + } else { + msgReceivedC23.add(msg.getMessageId()); + } + } + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer2.acknowledge(msgReceivedC21); + acknowledgeSignal2.await(); + consumer2.acknowledge(msgReceivedC22); + acknowledgeSignal3.await(); + consumer2.acknowledge(msgReceivedC23); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-2 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + // Start another thread to mock a consumption repeatedly. + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer2.acknowledge(msgReceivedC21); + acknowledgeSignal2.await(); + consumer2.acknowledge(msgReceivedC22); + acknowledgeSignal3.await(); + consumer2.acknowledge(msgReceivedC23); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-2 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + + // Trigger concurrently acknowledge. + acknowledgeSignal1.countDown(); + Thread.sleep(1000); + acknowledgeSignal2.countDown(); + Thread.sleep(1000); + acknowledgeSignal3.countDown(); + + // Verify: the metric "unackedMessages" should be "0" after acknowledged all messages. + acknowledgeFinishedSignal.await(); + Awaitility.await().untilAsserted(() -> { + SubscriptionStats stats = admin.topics().getStats(topic).getSubscriptions().get(subscription1); + log.info("backlog: {}, unack: {}", stats.getMsgBacklog(), stats.getUnackedMessages()); + assertEquals(stats.getMsgBacklog(), 0); + assertEquals(stats.getUnackedMessages(), 0); + for (ConsumerStats consumerStats : stats.getConsumers()) { + assertEquals(consumerStats.getUnackedMessages(), 0); + } + }); + + // cleanup. + consumer1.close(); + consumer2.close(); + producer.close(); + client1.close(); + client2.close(); + admin.topics().delete(topic, false); + } +} From 852758067ab2f0149f966eb99cdbfcbb37180627 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 21 Mar 2025 12:31:40 +0800 Subject: [PATCH 06/18] adress comment --- .../apache/bookkeeper/mledger/impl/AckSetPositionImpl.java | 4 ++-- .../broker/service/persistent/PersistentSubscription.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java index 43500cfdb49cf..786cf5a2ff4a0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java @@ -35,9 +35,9 @@ public class AckSetPositionImpl implements Position, AckSetState { protected volatile long[] ackSet; @Getter @Setter - private int batchMessagesAckedCount; + private volatile int batchMessagesAckedCount; @Getter - private boolean positionRemovedFromCursor; + private volatile boolean positionRemovedFromCursor; public AckSetPositionImpl(long ledgerId, long entryId, long[] ackSet) { this.ledgerId = ledgerId; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 746cb715e3df4..51c62981b5572 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -662,8 +662,8 @@ private void updatePendingAckMessagesAfterAcknowledged0(Consumer ackFrom, List

Date: Fri, 21 Mar 2025 12:39:57 +0800 Subject: [PATCH 07/18] checkstyle --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index ec41057b9b8d2..3f7990145ebbf 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -21,9 +21,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; -import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize; import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_AT_ONCE; import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_FIRST_PART; +import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; From 9092825f161a6800b848443953d85cf006069985 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 24 Mar 2025 17:27:06 +0800 Subject: [PATCH 08/18] address comments --- .../persistent/PersistentSubscription.java | 8 +++--- .../service/ConsumerAckClassicTest.java | 19 -------------- .../broker/service/ConsumerAckTest.java | 26 ------------------- 3 files changed, 5 insertions(+), 48 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 51c62981b5572..90c6c12e1897c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -592,7 +592,8 @@ private void updatePendingAckMessagesAfterAcknowledged0(Consumer ackFrom, List

Date: Tue, 1 Apr 2025 14:26:15 +0800 Subject: [PATCH 09/18] todo comments --- .../persistent/PersistentDispatcherMultipleConsumers.java | 2 +- .../pulsar/broker/service/persistent/PersistentTopic.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index dd5e577642a8b..04f9b4c931a49 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -256,7 +256,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE notifyRedeliveryMessageAdded(); } } - } else { + } else { //TODO 这里 consumer/producer 的计数,计算几次? /** * This is not an expected scenario, it will never happen in expected. * Just add a defensive code to avoid the topic can not be unloaded anymore: remove the consumers which diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1fdad8294a385..3285008398236 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1023,7 +1023,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } else if (e instanceof SubscriptionBusyException) { log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); } - + // TODO 多减掉了一次 decrementUsageCount(); return FutureUtil.failedFuture(e); } @@ -1031,7 +1031,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, consumer.consumerName(), currentUsageCount()); } - + // TODO 多减掉了一次 decrementUsageCount(); return FutureUtil.failedFuture( new BrokerServiceException.ConnectionClosedException( From b2a4e8953d762f83373dd84e617cf0f9331001e2 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 3 Apr 2025 18:12:27 +0800 Subject: [PATCH 10/18] fix NPE --- .../broker/service/persistent/PersistentSubscription.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 90c6c12e1897c..b00cdd4b98cda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -676,7 +676,9 @@ private void updatePendingAckMessagesAfterAcknowledged0(Consumer ackFrom, List

Date: Tue, 10 Jun 2025 16:56:26 +0800 Subject: [PATCH 11/18] checkstyle --- .../java/org/apache/pulsar/broker/service/Subscription.java | 2 +- .../broker/service/persistent/PersistentSubscription.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 4b0707d76ed1c..ba08b2cb67940 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import javax.annotation.Nullable; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.intercept.BrokerInterceptor; @@ -30,6 +29,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; +import org.jspecify.annotations.Nullable; public interface Subscription extends MessageExpirer { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index b00cdd4b98cda..76ab65554586a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -40,7 +40,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; -import javax.annotation.Nullable; import lombok.Getter; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; @@ -100,6 +99,7 @@ import org.apache.pulsar.common.stats.PositionInPendingAckStats; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.BitSetRecyclable; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 227ebcbfb2d7b99938a28ddd2513ade6d6738b5d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 11 Jun 2025 13:07:28 +0800 Subject: [PATCH 12/18] fix cursor.delete callback twice --- .../mledger/impl/ManagedCursorImpl.java | 22 ++++++++++++++----- .../persistent/PersistentSubscription.java | 5 ++++- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 3f7990145ebbf..a8e739c2e8ae0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -105,6 +105,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder; import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; @@ -2363,7 +2364,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb Position newMarkDeletePosition = null; lock.writeLock().lock(); - + final MutableBoolean cbHasExecuted = new MutableBoolean(false); try { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}", @@ -2490,6 +2491,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb lock.writeLock().unlock(); if (empty) { callback.deleteComplete(ctx); + cbHasExecuted.setTrue(); } } @@ -2497,7 +2499,9 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { isDirty = true; updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null); - callback.deleteComplete(ctx); + if (!cbHasExecuted.booleanValue()) { + callback.deleteComplete(ctx); + } return; } @@ -2508,12 +2512,18 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { - callback.deleteComplete(ctx); + if (!cbHasExecuted.booleanValue()) { + callback.deleteComplete(ctx); + cbHasExecuted.setTrue(); + } } @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - callback.deleteFailed(exception, ctx); + if (!cbHasExecuted.booleanValue()) { + callback.deleteFailed(exception, ctx); + cbHasExecuted.setTrue(); + } } }, ctx); @@ -2524,7 +2534,9 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { log.debug("[{}] Consumer {} cursor asyncDelete error, counters: consumed {} mdPos {} rdPos {}", ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition); } - callback.deleteFailed(new ManagedLedgerException(e), ctx); + if (!cbHasExecuted.booleanValue()) { + callback.deleteFailed(new ManagedLedgerException(e), ctx); + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 76ab65554586a..eca21ba659275 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -672,7 +672,10 @@ private void updatePendingAckMessagesAfterAcknowledged0(Consumer ackFrom, List

= 0 ? "batch_particularly_ack " + batchMessagesAckedCount + : batchMessagesAckedCount == BATCH_MESSAGE_ACKED_AT_ONCE + ? "ack_all_messages_at_once & batch_size " + batchSizeAndHashPair.firstInt() + : "first_part_ack", ledgerId, entryId, positionRemovedFromCursor); } } // Consumer metrics. From 6c75ff90ee4d4bbb72f777ce17a21012622352ff Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 11 Jun 2025 13:51:28 +0800 Subject: [PATCH 13/18] fix tests --- .../test/java/org/apache/pulsar/compaction/CompactionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 3cfb7677df97e..b322369f1b013 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -2136,7 +2136,7 @@ public void testDeleteCompactedLedgerWithSlowAck() throws Exception { Thread.sleep(500); } }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq( - CommandAck.AckType.Cumulative), Mockito.any(), null); + CommandAck.AckType.Cumulative), Mockito.any(), any()); // trigger compaction admin.topics().triggerCompaction(topicName); From dc79a85b83f02d6b3e881cabe81fa16dc4ac5f4e Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 11 Jun 2025 17:29:31 +0800 Subject: [PATCH 14/18] fix transaction unack-msgs --- .../pulsar/broker/service/Consumer.java | 41 +++---------------- 1 file changed, 5 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index a20be36834d0b..6e037f6c4e438 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -604,14 +604,11 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map individualAckWithTransaction(CommandAck ack) { - // Individual ack - List>> positionsAcked = new ArrayList<>(); if (!isTransactionEnabled()) { return FutureUtil.failedFuture( new BrokerServiceException.NotAllowedException("Server don't support transaction ack!")); } - - LongAdder totalAckCount = new LongAdder(); + List> positionsAcked = new ArrayList<>(); for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); Position position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null); @@ -622,20 +619,13 @@ private CompletableFuture individualAckWithTransaction(CommandAck ack) { consumerId, position); continue; } - Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left(); - // acked count at least one - long ackedCount; int batchSize; if (msgId.hasBatchSize()) { - batchSize = msgId.getBatchSize(); - // ack batch messages set ackeCount = batchSize - ackedCount = msgId.getBatchSize(); - positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, msgId.getBatchSize()))); + positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize())); } else { // ack no batch message set ackedCount = 1 batchSize = 0; - ackedCount = 1; - positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, (int) batchSize))); + positionsAcked.add(new MutablePair<>(position, batchSize)); } if (msgId.getAckSetsCount() > 0) { @@ -644,34 +634,13 @@ private CompletableFuture individualAckWithTransaction(CommandAck ack) { ackSets[j] = msgId.getAckSetAt(j); } AckSetStateUtil.getAckSetState(position).setAckSet(ackSets); - ackedCount = getAckedCountForTransactionAck(batchSize, ackSets); } - addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); - - checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId); - checkAckValidationError(ack, position); - - totalAckCount.add(ackedCount); } - CompletableFuture completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(), - ack.getTxnidLeastBits(), positionsAcked.stream().map(Pair::getRight).collect(Collectors.toList())); - if (Subscription.isIndividualAckMode(subType)) { - completableFuture.whenComplete((v, e) -> - positionsAcked.forEach(positionPair -> { - Consumer ackOwnerConsumer = positionPair.getLeft(); - MutablePair positionLongMutablePair = positionPair.getRight(); - if (AckSetStateUtil.hasAckSet(positionLongMutablePair.getLeft())) { - if (((PersistentSubscription) subscription) - .checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) { - removePendingAcks(ackOwnerConsumer, positionLongMutablePair.left); - } - } - })); - } - return completableFuture.thenApply(__ -> totalAckCount.sum()); + return transactionIndividualAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked) + .thenApply(__ -> 0L); } private long getAckedCountForMsgIdNoAckSets(int batchSize, Position position, Consumer consumer) { From 7b8df249a8d43a05d2f3847dd0ecf19835cd6418 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 11 Jun 2025 17:53:49 +0800 Subject: [PATCH 15/18] checkstyle --- .../src/main/java/org/apache/pulsar/broker/service/Consumer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 6e037f6c4e438..cb3d352e3c34e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -50,7 +50,6 @@ import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.MutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; From 3a51a29e57525d610585f8b032333af2d23e74d5 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 11 Jun 2025 23:43:43 +0800 Subject: [PATCH 16/18] fix tests --- .../PendingAckInMemoryDeleteTest.java | 2 +- .../client/impl/TransactionEndToEndTest.java | 103 +++++++++--------- 2 files changed, 53 insertions(+), 52 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java index 4f773e8a124b0..b9e9d532a2945 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java @@ -246,7 +246,7 @@ public void txnAckTestBatchAndSharedSubMemoryDeleteTest() throws Exception { // and it won't clear the last message in cursor batch index ack set consumer.acknowledgeAsync(messageIds[1], commitTwice).get(); assertEquals(batchDeletedIndexes.size(), 1); - assertEquals(testPersistentSubscription.getConsumers().get(0).getPendingAcks().size(), 0); + assertEquals(testPersistentSubscription.getConsumers().get(0).getPendingAcks().size(), 1); // the messages has been produced were all acked, the memory in broker for the messages has been cleared. commitTwice.commit().get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 52f230733eede..583bd4eeaf8e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -1415,57 +1415,58 @@ public void testSendTxnMessageTimeout() throws Exception { admin.topics().delete(topic, true); } - @Test - public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exception { - final String topic = "persistent://" + NAMESPACE1 + "/testAckWithTransactionReduceUnackCountNotInPendingAcks"; - final String subName = "test"; - @Cleanup - ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() - .topic(topic) - .batchingMaxPublishDelay(1, TimeUnit.SECONDS) - .sendTimeout(1, TimeUnit.SECONDS) - .create(); - - @Cleanup - Consumer consumer = pulsarClient.newConsumer() - .topic(topic) - .subscriptionType(SubscriptionType.Shared) - .subscriptionName(subName) - .subscribe(); - - // send 5 messages with one batch - for (int i = 0; i < 5; i++) { - producer.sendAsync((i + "").getBytes(UTF_8)); - } - - List messageIds = new ArrayList<>(); - - // receive the batch messages add to a list - for (int i = 0; i < 5; i++) { - messageIds.add(consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getMessageId()); - } - - MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0); - - - // remove the message from the pendingAcks, in fact redeliver will remove the messageId from the pendingAck - getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false) - .get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks() - .remove(messageId.ledgerId, messageId.entryId); - - Transaction txn = getTxn(); - consumer.acknowledgeAsync(messageIds.get(1), txn).get(); - - // ack one message, the unack count is 4 - assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false) - .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4); - - // cleanup. - txn.abort().get(); - consumer.close(); - producer.close(); - admin.topics().delete(topic, true); - } +// This test is not meaningful. +// @Test +// public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exception { +// final String topic = "persistent://" + NAMESPACE1 + "/testAckWithTransactionReduceUnackCountNotInPendingAcks"; +// final String subName = "test"; +// @Cleanup +// ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() +// .topic(topic) +// .batchingMaxPublishDelay(1, TimeUnit.SECONDS) +// .sendTimeout(1, TimeUnit.SECONDS) +// .create(); +// +// @Cleanup +// Consumer consumer = pulsarClient.newConsumer() +// .topic(topic) +// .subscriptionType(SubscriptionType.Shared) +// .subscriptionName(subName) +// .subscribe(); +// +// // send 5 messages with one batch +// for (int i = 0; i < 5; i++) { +// producer.sendAsync((i + "").getBytes(UTF_8)); +// } +// +// List messageIds = new ArrayList<>(); +// +// // receive the batch messages add to a list +// for (int i = 0; i < 5; i++) { +// messageIds.add(consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getMessageId()); +// } +// +// MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0); +// +// +// // remove the message from the pendingAcks, in fact redeliver will remove the messageId from the pendingAck +// getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false) +// .get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks() +// .remove(messageId.ledgerId, messageId.entryId); +// +// Transaction txn = getTxn(); +// consumer.acknowledgeAsync(messageIds.get(1), txn).get(); +// +// // ack one message, the unack count is 4 +// assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false) +// .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4); +// +// // cleanup. +// txn.abort().get(); +// consumer.close(); +// producer.close(); +// admin.topics().delete(topic, true); +// } @Test public void testSendTxnAckMessageToDLQ() throws Exception { From 9bd14512d18e2742e7753b4764a824645b9a589d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 11 Jun 2025 23:45:43 +0800 Subject: [PATCH 17/18] Revert "todo comments" This reverts commit 300045eee70c3c8e6fb1f02e58b1dbb68164c8a5. --- .../persistent/PersistentDispatcherMultipleConsumers.java | 2 +- .../pulsar/broker/service/persistent/PersistentTopic.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 04f9b4c931a49..dd5e577642a8b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -256,7 +256,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE notifyRedeliveryMessageAdded(); } } - } else { //TODO 这里 consumer/producer 的计数,计算几次? + } else { /** * This is not an expected scenario, it will never happen in expected. * Just add a defensive code to avoid the topic can not be unloaded anymore: remove the consumers which diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3285008398236..1fdad8294a385 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1023,7 +1023,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } else if (e instanceof SubscriptionBusyException) { log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); } - // TODO 多减掉了一次 + decrementUsageCount(); return FutureUtil.failedFuture(e); } @@ -1031,7 +1031,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, consumer.consumerName(), currentUsageCount()); } - // TODO 多减掉了一次 + decrementUsageCount(); return FutureUtil.failedFuture( new BrokerServiceException.ConnectionClosedException( From b9b671cecb8249157c319151e855ebd0dcff6e7f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 12 Jun 2025 00:50:08 +0800 Subject: [PATCH 18/18] fix tests --- .../client/impl/TransactionEndToEndTest.java | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 583bd4eeaf8e5..94ee37f75e93a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -46,6 +46,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.pulsar.broker.TransactionMetadataStoreService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; @@ -487,23 +488,25 @@ private void testAckWithTransactionReduceUnAckMessageCount(boolean enableBatch) } txn.commit().get(); - boolean flag = false; - String topic = TopicName.get(topicName).toString(); - for (int i = 0; i < getPulsarServiceList().size(); i++) { - CompletableFuture> topicFuture = getPulsarServiceList().get(i) - .getBrokerService().getTopic(topic, false); - - if (topicFuture != null) { - Optional topicOptional = topicFuture.get(); - if (topicOptional.isPresent()) { - PersistentSubscription persistentSubscription = - (PersistentSubscription) topicOptional.get().getSubscription(subName); - assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); - flag = true; + MutableBoolean flag = new MutableBoolean(false); + Awaitility.await().untilAsserted(() -> { + String topic = TopicName.get(topicName).toString(); + for (int i = 0; i < getPulsarServiceList().size(); i++) { + CompletableFuture> topicFuture = getPulsarServiceList().get(i) + .getBrokerService().getTopic(topic, false); + + if (topicFuture != null) { + Optional topicOptional = topicFuture.get(); + if (topicOptional.isPresent()) { + PersistentSubscription persistentSubscription = + (PersistentSubscription) topicOptional.get().getSubscription(subName); + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); + flag.setTrue(); + } } } - } - assertTrue(flag); + }); + assertTrue(flag.booleanValue()); // cleanup. producer.close();