From 1e4eced06254aeb157eae328cd68b0044c9ef74b Mon Sep 17 00:00:00 2001 From: Jiawen Wang <2876645134@qq.com> Date: Fri, 13 Sep 2024 18:35:13 +0800 Subject: [PATCH 1/3] [fix][broker] Fix NonDurable Subscription msgBackLog incorrect after topic unload --- .../pulsar/broker/service/ServerCnx.java | 2 + .../broker/service/SubscriptionOption.java | 1 + .../service/persistent/PersistentTopic.java | 14 +++-- .../api/NonDurableSubscriptionTest.java | 55 ++++++++++++++++++- .../pulsar/client/impl/ConsumerImpl.java | 2 +- .../pulsar/common/protocol/Commands.java | 7 ++- pulsar-common/src/main/proto/PulsarApi.proto | 2 + 7 files changed, 71 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c14602bfca507..c08d305e81884 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1250,6 +1250,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH; final Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( subscribe.getSubscriptionPropertiesList()); + final boolean resetIncludeHead = subscribe.isResetIncludeHead(); if (log.isDebugEnabled()) { log.debug("Topic name = {}, subscription name = {}, schema is {}", topicName, subscriptionName, @@ -1374,6 +1375,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { .subscriptionProperties(subscriptionProperties) .consumerEpoch(consumerEpoch) .schemaType(schema == null ? null : schema.getType()) + .resetIncludeHead(resetIncludeHead) .build(); if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) { return ignoreUnrecoverableBKException diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java index af56d023616b4..1b9dec0b09cb5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java @@ -51,6 +51,7 @@ public class SubscriptionOption { private Optional> subscriptionProperties; private long consumerEpoch; private SchemaType schemaType; + private boolean resetIncludeHead; public static Optional> getPropertiesMap(List list) { if (list == null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9c86a99de0f14..ca66314988cb3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -889,7 +889,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getInitialPosition(), option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(Collections.emptyMap()), - option.getConsumerEpoch(), option.getSchemaType()); + option.getConsumerEpoch(), option.getSchemaType(), option.isResetIncludeHead()); } private CompletableFuture internalSubscribe(final TransportCnx cnx, String subscriptionName, @@ -903,7 +903,8 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St KeySharedMeta keySharedMeta, Map subscriptionProperties, long consumerEpoch, - SchemaType schemaType) { + SchemaType schemaType, + boolean resetIncludeHead) { if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) { return FutureUtil.failedFuture(new NotAllowedException( "readCompacted only allowed on failover or exclusive subscriptions")); @@ -986,7 +987,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St ? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, replicatedSubscriptionState, subscriptionProperties) : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, - startMessageRollbackDurationSec, readCompacted, subscriptionProperties); + startMessageRollbackDurationSec, readCompacted, subscriptionProperties, resetIncludeHead); CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, @@ -1070,7 +1071,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs KeySharedMeta keySharedMeta) { return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null); + replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null, false); } private CompletableFuture getDurableSubscription(String subscriptionName, @@ -1136,7 +1137,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { private CompletableFuture getNonDurableSubscription(String subscriptionName, MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec, - boolean isReadCompacted, Map subscriptionProperties) { + boolean isReadCompacted, Map subscriptionProperties, boolean resetIncludeHead) { log.info("[{}][{}] Creating non-durable subscription at msg id {} - {}", topic, subscriptionName, startMessageId, subscriptionProperties); @@ -1159,7 +1160,8 @@ private CompletableFuture getNonDurableSubscription(Stri long entryId = msgId.getEntryId(); // Ensure that the start message id starts from a valid entry. if (ledgerId >= 0 && entryId >= 0 - && msgId instanceof BatchMessageIdImpl) { + && msgId instanceof BatchMessageIdImpl + && (msgId.getBatchIndex() >= 0 || resetIncludeHead)) { // When the start message is relative to a batch, we need to take one step back on the previous // message, // because the "batch" might not have been consumed in its entirety. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java index bbac688d9224c..c5da85e2a82f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java @@ -256,6 +256,57 @@ public void testNonDurableSubscriptionRecovery(SubscriptionType subscriptionType } + @Test + public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception { + String topicName = "persistent://my-property/my-ns/nonDurable-sub-test"; + String subName = "test-sub"; + + admin.topics().createNonPartitionedTopic(topicName); + Producer producer = pulsarClient.newProducer().topic(topicName).create(); + + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionMode(SubscriptionMode.NonDurable).subscribe(); + + // 1. send message + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + producer.close(); + + assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 10); + + // 2. receive the message + Thread t = new Thread(() -> { + while (true) { + Message msg; + try { + msg = consumer.receive(); + consumer.acknowledge(msg); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + } + }); + t.start(); + + // 3. consumed all messages and the msgBacklog is 0 + Awaitility.await().untilAsserted(() -> + assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 0)); + + // 4. unload the topic + admin.topics().unload(topicName); + + // 5. wait the consumer reconnect + Awaitility.await().until(() -> admin.topics().getStats(topicName).getSubscriptions() != null); + + // 6. the backlog should still be 0 + Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName). + getSubscriptions().get(subName).getMsgBacklog(), 0)); + } + @Test public void testFlowCountForMultiTopics() throws Exception { String topicName = "persistent://my-property/my-ns/test-flow-count"; @@ -464,7 +515,7 @@ public void testInitReaderAtSpecifiedPosition() throws Exception { // A middle ledger id, and entry id is "-1". log.info("start test s8"); String s8 = "s8"; - MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), 0, -1); + MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), -1, -1); Reader reader8 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s8) .receiverQueueSize(0).startMessageId(startMessageId8).create(); ManagedLedgerInternalStats.CursorStats cursor8 = admin.topics().getInternalStats(topicName).cursors.get(s8); @@ -497,7 +548,7 @@ public void testInitReaderAtSpecifiedPosition() throws Exception { ManagedLedgerInternalStats.CursorStats cursor10 = admin.topics().getInternalStats(topicName).cursors.get(s10); log.info("cursor10 readPosition: {}, markDeletedPosition: {}", cursor10.readPosition, cursor10.markDeletePosition); Position p10 = parseReadPosition(cursor10); - assertEquals(p10.getLedgerId(), ledgers.get(2)); + assertEquals(p10.getLedgerId(), ledgers.get(3)); assertEquals(p10.getEntryId(), 0); reader10.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b7010a1ddc7b4..298acce4d2ae7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -895,7 +895,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. - conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this)); + conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this), resetIncludeHead); cnx.sendRequestWithId(request, requestId).thenRun(() -> { synchronized (ConsumerImpl.this) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 8635368f00f0b..16a6ab7981168 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -588,7 +588,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition, startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null, - Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH); + Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH, false); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, @@ -596,7 +596,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu Map metadata, boolean readCompacted, boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, - Map subscriptionProperties, long consumerEpoch) { + Map subscriptionProperties, long consumerEpoch, boolean resetIncludeHead) { BaseCommand cmd = localCmd(Type.SUBSCRIBE); CommandSubscribe subscribe = cmd.setSubscribe() .setTopic(topic) @@ -611,7 +611,8 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu .setInitialPosition(subscriptionInitialPosition) .setReplicateSubscriptionState(isReplicated) .setForceTopicCreation(createTopicIfDoesNotExist) - .setConsumerEpoch(consumerEpoch); + .setConsumerEpoch(consumerEpoch) + .setResetIncludeHead(resetIncludeHead); if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) { List keyValues = new ArrayList<>(); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 19658c5e57ff9..1583ff7bc4ade 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -401,6 +401,8 @@ message CommandSubscribe { // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch optional uint64 consumer_epoch = 19; + + optional bool reset_include_head = 20 [default = false]; } message CommandPartitionedTopicMetadata { From 20e038407eadd29a178188f5a194bce03c26e039 Mon Sep 17 00:00:00 2001 From: Jiawen Wang <2876645134@qq.com> Date: Thu, 19 Sep 2024 14:39:55 +0800 Subject: [PATCH 2/3] fix --- .../api/NonDurableSubscriptionTest.java | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java index c5da85e2a82f3..dac960e1d9ab8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java @@ -262,8 +262,10 @@ public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception String subName = "test-sub"; admin.topics().createNonPartitionedTopic(topicName); + @Cleanup Producer producer = pulsarClient.newProducer().topic(topicName).create(); + @Cleanup Consumer consumer = pulsarClient.newConsumer() .topic(topicName) .subscriptionName(subName) @@ -274,23 +276,14 @@ public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception String message = "my-message-" + i; producer.send(message.getBytes()); } - producer.close(); assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 10); // 2. receive the message - Thread t = new Thread(() -> { - while (true) { - Message msg; - try { - msg = consumer.receive(); - consumer.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } - }); - t.start(); + for (int i = 0; i < 10; i++) { + Message msg = consumer.receive(); + consumer.acknowledge(msg); + } // 3. consumed all messages and the msgBacklog is 0 Awaitility.await().untilAsserted(() -> From e3bfce6df8c4b46718ac9633703fe0a9c98e092c Mon Sep 17 00:00:00 2001 From: Jiawen Wang <2876645134@qq.com> Date: Thu, 17 Oct 2024 12:20:39 +0800 Subject: [PATCH 3/3] add annotation --- .../pulsar/broker/service/persistent/PersistentTopic.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ca66314988cb3..92f247ee867aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1162,9 +1162,11 @@ private CompletableFuture getNonDurableSubscription(Stri if (ledgerId >= 0 && entryId >= 0 && msgId instanceof BatchMessageIdImpl && (msgId.getBatchIndex() >= 0 || resetIncludeHead)) { + // When resetIncludeHead is true, we need to take one step back on the previous message, + // to ensure the read position starts with startMessageId + // When the start message is relative to a batch, we need to take one step back on the previous - // message, - // because the "batch" might not have been consumed in its entirety. + // message, because the "batch" might not have been consumed in its entirety. // The client will then be able to discard the first messages if needed. entryId = msgId.getEntryId() - 1; }