diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index d1d44709a9c52..f9661f195a997 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -151,6 +151,10 @@ default boolean checkAndUnblockIfStuck() { */ default void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion){} + default boolean isAllWaitingReadMessagesAreFixedDelayed() { + return false; + } + /** * Trigger a new "readMoreEntries" if the dispatching has been paused before. This method is only implemented in * {@link org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers} right now, other diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 73ad2cf0a3dee..fb44498f4da21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1334,6 +1334,11 @@ protected synchronized boolean shouldPauseDeliveryForDelayTracker() { return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries(); } + @Override + public boolean isAllWaitingReadMessagesAreFixedDelayed() { + return shouldPauseDeliveryForDelayTracker(); + } + @Override public long getNumberOfDelayedMessages() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 9a0545e6f0ab2..eb4a5d78e22d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1269,16 +1269,27 @@ public CompletableFuture getStatsAsync(GetStatsOptions ge ((PersistentDispatcherMultipleConsumers) dispatcher).getBucketDelayedIndexStats(); } + subStats.msgBacklog = getNumberOfEntriesInBacklog(getStatsOptions.isGetPreciseBacklog()); if (Subscription.isIndividualAckMode(subType)) { if (dispatcher instanceof PersistentDispatcherMultipleConsumers) { PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers) dispatcher; subStats.unackedMessages = d.getTotalUnackedMessages(); subStats.blockedSubscriptionOnUnackedMsgs = d.isBlockedDispatcherOnUnackedMsgs(); - subStats.msgDelayed = d.getNumberOfDelayedMessages(); subStats.msgInReplay = d.getNumberOfMessagesInReplay(); + if (d.isAllWaitingReadMessagesAreFixedDelayed()) { + long msgDeliveredOut = 0; + for (Consumer c : dispatcher.getConsumers()){ + msgDeliveredOut += c.getUnackedMessages(); + } + subStats.msgDelayed = subStats.msgBacklog - msgDeliveredOut - subStats.msgInReplay; + subStats.msgDelayedInMemory = d.getNumberOfDelayedMessages(); + } else { + subStats.msgDelayed = d.getNumberOfDelayedMessages(); + subStats.msgDelayedInMemory = subStats.msgDelayed; + } } } - subStats.msgBacklog = getNumberOfEntriesInBacklog(getStatsOptions.isGetPreciseBacklog()); + if (getStatsOptions.isSubscriptionBacklogSize()) { subStats.backlogSize = topic.getManagedLedger() .getEstimatedBacklogSize(cursor.getMarkDeletedPosition()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index 3ca966d210886..3ae55358992ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.opentelemetry.api.common.Attributes; +import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -36,6 +37,7 @@ import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; @@ -43,14 +45,18 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -684,4 +690,92 @@ public void testDelayedDeliveryExceedsMaxDelay() throws Exception { + maxDeliveryDelayInMillis + " milliseconds"); } } + + @Test + public void testMsgBacklogNoDelayedWhenFixedDelay() throws Exception { + final long originalDelayedDeliveryFixedDelayDetectionLookahead = + pulsar.getConfig().getDelayedDeliveryFixedDelayDetectionLookahead();; + final int delayedDeliveryFixedDelayDetectionLookahead = 10; + final int msgNoDelayed = 15; + final int receiverQueueSize = 10; + final int msgDelayed = delayedDeliveryFixedDelayDetectionLookahead * 10; + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String sName = "delayed_s1"; + DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory = + WhiteboxImpl.getInternalState(pulsar.getBrokerService(), "delayedDeliveryTrackerFactory"); + if (delayedDeliveryTrackerFactory != null) { + pulsar.getConfig() + .setDelayedDeliveryFixedDelayDetectionLookahead(delayedDeliveryFixedDelayDetectionLookahead); + delayedDeliveryTrackerFactory.initialize(pulsar); + } + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, sName, MessageId.earliest); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + + // Send some message no-delayed. + // Send many messages delayed. + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + for (int i = 0; i < msgNoDelayed; i++) { + producer.newMessage().send(); + } + for (int i = 0; i < msgDelayed; i++) { + producer.newMessage().deliverAfter(1, TimeUnit.HOURS).value(i + "").send(); + } + Consumer consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(sName) + .receiverQueueSize(receiverQueueSize) + .subscriptionType(SubscriptionType.Shared).subscribe(); + Consumer consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(sName) + .receiverQueueSize(receiverQueueSize) + .subscriptionType(SubscriptionType.Shared).subscribe(); + + // Wait for the checker that named "shouldPauseDeliveryForDelayTracker". + PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentTopic + .getSubscription(sName).getDispatcher(); + Awaitility.await().untilAsserted(() -> { + assertTrue(dispatcher.shouldPauseDeliveryForDelayTracker()); + assertTrue(dispatcher.getNumberOfDelayedMessages() >= delayedDeliveryFixedDelayDetectionLookahead); + }); + // Verify: backlog stats + SubscriptionStats subscriptionStats1 = persistentTopic.getStats(true, false, false) + .getSubscriptions().get(sName); + assertEquals(subscriptionStats1.getMsgBacklog(), msgDelayed + msgNoDelayed); + assertEquals(subscriptionStats1.getMsgDelayed(), msgDelayed); + assertEquals(subscriptionStats1.getMsgBacklogNoDelayed(), msgNoDelayed); + assertEquals(subscriptionStats1.getMsgInReplay(), 0); + assertTrue(subscriptionStats1.getMsgDelayedInMemory() < msgDelayed); + + // Let some messages being pushed into replay queue. + consumer2.close(); + // Wait for the checker that named "shouldPauseDeliveryForDelayTracker". + Awaitility.await().untilAsserted(() -> { + assertTrue(dispatcher.shouldPauseDeliveryForDelayTracker()); + assertTrue(dispatcher.getNumberOfDelayedMessages() >= delayedDeliveryFixedDelayDetectionLookahead); + }); + // And verify: backlog stats + Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> { + SubscriptionStats subscriptionStats2 = persistentTopic.getStats(true, false, false) + .getSubscriptions().get(sName); + assertEquals(subscriptionStats2.getMsgBacklog(), msgDelayed + msgNoDelayed); + assertEquals(subscriptionStats2.getMsgDelayed(), msgDelayed); + assertEquals(subscriptionStats2.getMsgBacklogNoDelayed(), msgNoDelayed); + GrowableArrayBlockingQueue incomingMessages = + WhiteboxImpl.getInternalState(consumer1, "incomingMessages"); + assertEquals(subscriptionStats2.getMsgInReplay(), msgNoDelayed - incomingMessages.size()); + assertTrue(subscriptionStats2.getMsgDelayedInMemory() < msgDelayed); + }); + + // cleanup. + DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory2 = + WhiteboxImpl.getInternalState(pulsar.getBrokerService(), "delayedDeliveryTrackerFactory"); + if (delayedDeliveryTrackerFactory2 != null) { + pulsar.getConfig().setDelayedDeliveryFixedDelayDetectionLookahead( + originalDelayedDeliveryFixedDelayDetectionLookahead); + delayedDeliveryTrackerFactory2.initialize(pulsar); + } + consumer1.close(); + consumer2.close(); + producer.close(); + admin.topics().delete(topic, false); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index e307e41862e74..52d53a61b9d8c 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -63,9 +63,12 @@ public interface SubscriptionStats { /** Flag to verify if subscription is blocked due to reaching threshold of unacked messages. */ boolean isBlockedSubscriptionOnUnackedMsgs(); - /** Number of delayed messages currently being tracked. */ + /** Number of delayed messages. */ long getMsgDelayed(); + /** Number of delayed messages currently being tracked. */ + long getMsgDelayedInMemory(); + /** Number of messages registered for replay. */ long getMsgInReplay(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index 977ed28e86814..bbd843e720445 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -71,9 +71,12 @@ public class SubscriptionStatsImpl implements SubscriptionStats { /** Flag to verify if subscription is blocked due to reaching threshold of unacked messages. */ public boolean blockedSubscriptionOnUnackedMsgs; - /** Number of delayed messages currently being tracked. */ + /** Number of delayed messages. */ public long msgDelayed; + /** Number of delayed messages currently being tracked. */ + public long msgDelayedInMemory; + /** Number of messages registered for replay. */ public long msgInReplay;