diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c14602bfca507..c08d305e81884 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1250,6 +1250,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH; final Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( subscribe.getSubscriptionPropertiesList()); + final boolean resetIncludeHead = subscribe.isResetIncludeHead(); if (log.isDebugEnabled()) { log.debug("Topic name = {}, subscription name = {}, schema is {}", topicName, subscriptionName, @@ -1374,6 +1375,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { .subscriptionProperties(subscriptionProperties) .consumerEpoch(consumerEpoch) .schemaType(schema == null ? null : schema.getType()) + .resetIncludeHead(resetIncludeHead) .build(); if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) { return ignoreUnrecoverableBKException diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java index af56d023616b4..1b9dec0b09cb5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java @@ -51,6 +51,7 @@ public class SubscriptionOption { private Optional> subscriptionProperties; private long consumerEpoch; private SchemaType schemaType; + private boolean resetIncludeHead; public static Optional> getPropertiesMap(List list) { if (list == null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9c86a99de0f14..92f247ee867aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -889,7 +889,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getInitialPosition(), option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(Collections.emptyMap()), - option.getConsumerEpoch(), option.getSchemaType()); + option.getConsumerEpoch(), option.getSchemaType(), option.isResetIncludeHead()); } private CompletableFuture internalSubscribe(final TransportCnx cnx, String subscriptionName, @@ -903,7 +903,8 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St KeySharedMeta keySharedMeta, Map subscriptionProperties, long consumerEpoch, - SchemaType schemaType) { + SchemaType schemaType, + boolean resetIncludeHead) { if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) { return FutureUtil.failedFuture(new NotAllowedException( "readCompacted only allowed on failover or exclusive subscriptions")); @@ -986,7 +987,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St ? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, replicatedSubscriptionState, subscriptionProperties) : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, - startMessageRollbackDurationSec, readCompacted, subscriptionProperties); + startMessageRollbackDurationSec, readCompacted, subscriptionProperties, resetIncludeHead); CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, @@ -1070,7 +1071,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs KeySharedMeta keySharedMeta) { return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null); + replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null, false); } private CompletableFuture getDurableSubscription(String subscriptionName, @@ -1136,7 +1137,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { private CompletableFuture getNonDurableSubscription(String subscriptionName, MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec, - boolean isReadCompacted, Map subscriptionProperties) { + boolean isReadCompacted, Map subscriptionProperties, boolean resetIncludeHead) { log.info("[{}][{}] Creating non-durable subscription at msg id {} - {}", topic, subscriptionName, startMessageId, subscriptionProperties); @@ -1159,10 +1160,13 @@ private CompletableFuture getNonDurableSubscription(Stri long entryId = msgId.getEntryId(); // Ensure that the start message id starts from a valid entry. if (ledgerId >= 0 && entryId >= 0 - && msgId instanceof BatchMessageIdImpl) { + && msgId instanceof BatchMessageIdImpl + && (msgId.getBatchIndex() >= 0 || resetIncludeHead)) { + // When resetIncludeHead is true, we need to take one step back on the previous message, + // to ensure the read position starts with startMessageId + // When the start message is relative to a batch, we need to take one step back on the previous - // message, - // because the "batch" might not have been consumed in its entirety. + // message, because the "batch" might not have been consumed in its entirety. // The client will then be able to discard the first messages if needed. entryId = msgId.getEntryId() - 1; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java index bbac688d9224c..dac960e1d9ab8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java @@ -256,6 +256,50 @@ public void testNonDurableSubscriptionRecovery(SubscriptionType subscriptionType } + @Test + public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception { + String topicName = "persistent://my-property/my-ns/nonDurable-sub-test"; + String subName = "test-sub"; + + admin.topics().createNonPartitionedTopic(topicName); + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topicName).create(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionMode(SubscriptionMode.NonDurable).subscribe(); + + // 1. send message + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 10); + + // 2. receive the message + for (int i = 0; i < 10; i++) { + Message msg = consumer.receive(); + consumer.acknowledge(msg); + } + + // 3. consumed all messages and the msgBacklog is 0 + Awaitility.await().untilAsserted(() -> + assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 0)); + + // 4. unload the topic + admin.topics().unload(topicName); + + // 5. wait the consumer reconnect + Awaitility.await().until(() -> admin.topics().getStats(topicName).getSubscriptions() != null); + + // 6. the backlog should still be 0 + Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName). + getSubscriptions().get(subName).getMsgBacklog(), 0)); + } + @Test public void testFlowCountForMultiTopics() throws Exception { String topicName = "persistent://my-property/my-ns/test-flow-count"; @@ -464,7 +508,7 @@ public void testInitReaderAtSpecifiedPosition() throws Exception { // A middle ledger id, and entry id is "-1". log.info("start test s8"); String s8 = "s8"; - MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), 0, -1); + MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), -1, -1); Reader reader8 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s8) .receiverQueueSize(0).startMessageId(startMessageId8).create(); ManagedLedgerInternalStats.CursorStats cursor8 = admin.topics().getInternalStats(topicName).cursors.get(s8); @@ -497,7 +541,7 @@ public void testInitReaderAtSpecifiedPosition() throws Exception { ManagedLedgerInternalStats.CursorStats cursor10 = admin.topics().getInternalStats(topicName).cursors.get(s10); log.info("cursor10 readPosition: {}, markDeletedPosition: {}", cursor10.readPosition, cursor10.markDeletePosition); Position p10 = parseReadPosition(cursor10); - assertEquals(p10.getLedgerId(), ledgers.get(2)); + assertEquals(p10.getLedgerId(), ledgers.get(3)); assertEquals(p10.getEntryId(), 0); reader10.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b7010a1ddc7b4..298acce4d2ae7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -895,7 +895,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. - conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this)); + conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this), resetIncludeHead); cnx.sendRequestWithId(request, requestId).thenRun(() -> { synchronized (ConsumerImpl.this) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 8635368f00f0b..16a6ab7981168 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -588,7 +588,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition, startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null, - Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH); + Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH, false); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, @@ -596,7 +596,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu Map metadata, boolean readCompacted, boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, - Map subscriptionProperties, long consumerEpoch) { + Map subscriptionProperties, long consumerEpoch, boolean resetIncludeHead) { BaseCommand cmd = localCmd(Type.SUBSCRIBE); CommandSubscribe subscribe = cmd.setSubscribe() .setTopic(topic) @@ -611,7 +611,8 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu .setInitialPosition(subscriptionInitialPosition) .setReplicateSubscriptionState(isReplicated) .setForceTopicCreation(createTopicIfDoesNotExist) - .setConsumerEpoch(consumerEpoch); + .setConsumerEpoch(consumerEpoch) + .setResetIncludeHead(resetIncludeHead); if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) { List keyValues = new ArrayList<>(); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 19658c5e57ff9..1583ff7bc4ade 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -401,6 +401,8 @@ message CommandSubscribe { // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch optional uint64 consumer_epoch = 19; + + optional bool reset_include_head = 20 [default = false]; } message CommandPartitionedTopicMetadata {