From afdd697dd76087c1275785c349ccb2bd93221c4b Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Thu, 1 Sep 2022 14:45:50 +0800 Subject: [PATCH 1/4] [fx][flaky-test]Fix PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.testMultiLevelDispatch --- .../client/api/SubscriptionMessageDispatchThrottlingTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java index 7881038cc7aaf..50a216b662f11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java @@ -475,7 +475,6 @@ private void testDispatchRate(SubscriptionType subscription, log.info("-- end - start: {} ", end - start); // first 10 messages, which equals receiverQueueSize, will not wait. - Assert.assertTrue((end - start) >= 2500); Assert.assertTrue((end - start) <= 8000); consumer.close(); From e52daddea65f6b7d10b1bd5996e4db1fc6d6abd5 Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Thu, 1 Sep 2022 14:58:39 +0800 Subject: [PATCH 2/4] [fx][flaky-test]Fix PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.testMultiLevelDispatch --- .../client/api/SubscriptionMessageDispatchThrottlingTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java index 50a216b662f11..671c4ee39b02c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java @@ -467,7 +467,7 @@ private void testDispatchRate(SubscriptionType subscription, long start = System.currentTimeMillis(); // Asynchronously produce messages for (int i = 0; i < numProducedMessages; i++) { - producer.send(new byte[expectRate / 10]); + producer.sendAsync(new byte[expectRate / 10]); } latch.await(); Assert.assertEquals(totalReceived.get(), numProducedMessages, 10); From ca5eb6116fba949afd159ee108a942143b90b5db Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Fri, 30 Sep 2022 23:02:15 +0800 Subject: [PATCH 3/4] fix test --- ...criptionMessageDispatchThrottlingTest.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java index 671c4ee39b02c..9b65412eb3131 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.pulsar.broker.BrokerTestUtil; @@ -419,7 +420,7 @@ private void testDispatchRate(SubscriptionType subscription, admin.namespaces().setDispatchRate(namespace, topicDispatchRate); admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate); - final int numProducedMessages = 30; + final int numProducedMessages = 100; final CountDownLatch latch = new CountDownLatch(numProducedMessages); final AtomicInteger totalReceived = new AtomicInteger(0); // enable throttling for nonBacklog consumers @@ -464,18 +465,18 @@ private void testDispatchRate(SubscriptionType subscription, Assert.assertEquals(admin.namespaces().getDispatchRate(namespace) .getDispatchThrottlingRateInByte(), topicRate); - long start = System.currentTimeMillis(); + long start = System.nanoTime(); // Asynchronously produce messages for (int i = 0; i < numProducedMessages; i++) { producer.sendAsync(new byte[expectRate / 10]); } latch.await(); Assert.assertEquals(totalReceived.get(), numProducedMessages, 10); - long end = System.currentTimeMillis(); - log.info("-- end - start: {} ", end - start); - + long end = System.nanoTime(); + log.info("-- end - start: {} seconds", (end - start) / 10E8); // first 10 messages, which equals receiverQueueSize, will not wait. - Assert.assertTrue((end - start) <= 8000); + Assert.assertTrue((end - start) / 10E8 >= 5); + Assert.assertTrue((end - start) / 10E8 <= 12); consumer.close(); producer.close(); @@ -496,14 +497,18 @@ private void testDispatchRate(SubscriptionType subscription, * @throws Exception */ @Test(dataProvider = "subscriptions") - public void testMultiLevelDispatch(SubscriptionType subscription) throws Exception { + public void testMultiLevelDispatchType(SubscriptionType subscription) throws Exception { log.info("-- Starting {} test --", methodName); + long delaySeconds = 5l; testDispatchRate(subscription, 1000, 5000, 10000, 1000); + TimeUnit.SECONDS.sleep(delaySeconds); testDispatchRate(subscription, 10000, 1000, 5000, 1000); + TimeUnit.SECONDS.sleep(delaySeconds); testDispatchRate(subscription, 5000, 10000, 1000, 1000); + TimeUnit.SECONDS.sleep(delaySeconds); log.info("-- Exiting {} test --", methodName); } From 602aa1a9238fa73bd87d56d9506c3c5b0a9e480c Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Fri, 30 Sep 2022 23:03:29 +0800 Subject: [PATCH 4/4] fix test --- .../client/api/SubscriptionMessageDispatchThrottlingTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java index 9b65412eb3131..ff2535db49a80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java @@ -403,7 +403,7 @@ private void testDispatchRate(SubscriptionType subscription, final String namespace = "my-property/throttling_ns"; final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll"); - final String subName = "my-subscriber-name-" + subscription; + final String subName = BrokerTestUtil.newUniqueName("my-subscriber-name-" + subscription); DispatchRate subscriptionDispatchRate = DispatchRate.builder() .dispatchThrottlingRateInMsg(-1)