diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java index 7881038cc7aaf..ff2535db49a80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.pulsar.broker.BrokerTestUtil; @@ -402,7 +403,7 @@ private void testDispatchRate(SubscriptionType subscription, final String namespace = "my-property/throttling_ns"; final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll"); - final String subName = "my-subscriber-name-" + subscription; + final String subName = BrokerTestUtil.newUniqueName("my-subscriber-name-" + subscription); DispatchRate subscriptionDispatchRate = DispatchRate.builder() .dispatchThrottlingRateInMsg(-1) @@ -419,7 +420,7 @@ private void testDispatchRate(SubscriptionType subscription, admin.namespaces().setDispatchRate(namespace, topicDispatchRate); admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate); - final int numProducedMessages = 30; + final int numProducedMessages = 100; final CountDownLatch latch = new CountDownLatch(numProducedMessages); final AtomicInteger totalReceived = new AtomicInteger(0); // enable throttling for nonBacklog consumers @@ -464,19 +465,18 @@ private void testDispatchRate(SubscriptionType subscription, Assert.assertEquals(admin.namespaces().getDispatchRate(namespace) .getDispatchThrottlingRateInByte(), topicRate); - long start = System.currentTimeMillis(); + long start = System.nanoTime(); // Asynchronously produce messages for (int i = 0; i < numProducedMessages; i++) { - producer.send(new byte[expectRate / 10]); + producer.sendAsync(new byte[expectRate / 10]); } latch.await(); Assert.assertEquals(totalReceived.get(), numProducedMessages, 10); - long end = System.currentTimeMillis(); - log.info("-- end - start: {} ", end - start); - + long end = System.nanoTime(); + log.info("-- end - start: {} seconds", (end - start) / 10E8); // first 10 messages, which equals receiverQueueSize, will not wait. - Assert.assertTrue((end - start) >= 2500); - Assert.assertTrue((end - start) <= 8000); + Assert.assertTrue((end - start) / 10E8 >= 5); + Assert.assertTrue((end - start) / 10E8 <= 12); consumer.close(); producer.close(); @@ -497,14 +497,18 @@ private void testDispatchRate(SubscriptionType subscription, * @throws Exception */ @Test(dataProvider = "subscriptions") - public void testMultiLevelDispatch(SubscriptionType subscription) throws Exception { + public void testMultiLevelDispatchType(SubscriptionType subscription) throws Exception { log.info("-- Starting {} test --", methodName); + long delaySeconds = 5l; testDispatchRate(subscription, 1000, 5000, 10000, 1000); + TimeUnit.SECONDS.sleep(delaySeconds); testDispatchRate(subscription, 10000, 1000, 5000, 1000); + TimeUnit.SECONDS.sleep(delaySeconds); testDispatchRate(subscription, 5000, 10000, 1000, 1000); + TimeUnit.SECONDS.sleep(delaySeconds); log.info("-- Exiting {} test --", methodName); }