Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.pulsar.broker.BrokerTestUtil;
Expand Down Expand Up @@ -402,7 +403,7 @@ private void testDispatchRate(SubscriptionType subscription,

final String namespace = "my-property/throttling_ns";
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
final String subName = "my-subscriber-name-" + subscription;
final String subName = BrokerTestUtil.newUniqueName("my-subscriber-name-" + subscription);

DispatchRate subscriptionDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
Expand All @@ -419,7 +420,7 @@ private void testDispatchRate(SubscriptionType subscription,
admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);

final int numProducedMessages = 30;
final int numProducedMessages = 100;
final CountDownLatch latch = new CountDownLatch(numProducedMessages);
final AtomicInteger totalReceived = new AtomicInteger(0);
// enable throttling for nonBacklog consumers
Expand Down Expand Up @@ -464,19 +465,18 @@ private void testDispatchRate(SubscriptionType subscription,
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
.getDispatchThrottlingRateInByte(), topicRate);

long start = System.currentTimeMillis();
long start = System.nanoTime();
// Asynchronously produce messages
for (int i = 0; i < numProducedMessages; i++) {
producer.send(new byte[expectRate / 10]);
producer.sendAsync(new byte[expectRate / 10]);
}
latch.await();
Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
long end = System.currentTimeMillis();
log.info("-- end - start: {} ", end - start);

long end = System.nanoTime();
log.info("-- end - start: {} seconds", (end - start) / 10E8);
// first 10 messages, which equals receiverQueueSize, will not wait.
Assert.assertTrue((end - start) >= 2500);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can't remove this line because we have to use it to verify the rate limiter.
Could you get the actual number here? Maybe 2000 is better or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. The longer the time, the more accurate the rate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help explain this new assert? before each message size is rate/10 and we send 30 message, so should take at least 3 second to receive all, but I don't quite understand what this new assert is trying to prove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case is correct. And I found the failure is due to dataProvider. I update the PR. PTAL.

Assert.assertTrue((end - start) <= 8000);
Assert.assertTrue((end - start) / 10E8 >= 5);
Assert.assertTrue((end - start) / 10E8 <= 12);

consumer.close();
producer.close();
Expand All @@ -497,14 +497,18 @@ private void testDispatchRate(SubscriptionType subscription,
* @throws Exception
*/
@Test(dataProvider = "subscriptions")
public void testMultiLevelDispatch(SubscriptionType subscription) throws Exception {
public void testMultiLevelDispatchType(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
long delaySeconds = 5l;

testDispatchRate(subscription, 1000, 5000, 10000, 1000);
TimeUnit.SECONDS.sleep(delaySeconds);

testDispatchRate(subscription, 10000, 1000, 5000, 1000);
TimeUnit.SECONDS.sleep(delaySeconds);

testDispatchRate(subscription, 5000, 10000, 1000, 1000);
TimeUnit.SECONDS.sleep(delaySeconds);

log.info("-- Exiting {} test --", methodName);
}
Expand Down