diff --git a/.github/workflows/integration-test.yaml b/.github/workflows/integration-test.yaml index 2afbca8..62a64b9 100644 --- a/.github/workflows/integration-test.yaml +++ b/.github/workflows/integration-test.yaml @@ -71,11 +71,11 @@ jobs: distribution: 'adopt' java-version: 17 - - name: install org.apache.pulsar.tests:integration:jar:tests:4.0.1 + - name: install org.apache.pulsar.tests:integration:jar:tests:4.0.2 if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | cd ~ - git clone --depth 50 --single-branch --branch v4.0.1 https://github.com/apache/pulsar + git clone --depth 50 --single-branch --branch v4.0.2 https://github.com/apache/pulsar cd pulsar mvn -B -ntp -f tests/pom.xml -pl org.apache.pulsar.tests:tests-parent,org.apache.pulsar.tests:integration install @@ -83,10 +83,10 @@ jobs: if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | cd ~/pulsar - docker pull apachepulsar/pulsar-all:4.0.1 - docker pull apachepulsar/pulsar:4.0.1 - docker tag apachepulsar/pulsar-all:4.0.1 apachepulsar/pulsar-all:4.0.1-$(git rev-parse --short=7 HEAD) - docker tag apachepulsar/pulsar:4.0.1 apachepulsar/pulsar:4.0.1-$(git rev-parse --short=7 HEAD) + docker pull apachepulsar/pulsar-all:4.0.2 + docker pull apachepulsar/pulsar:4.0.2 + docker tag apachepulsar/pulsar-all:4.0.2 apachepulsar/pulsar-all:4.0.2-$(git rev-parse --short=7 HEAD) + docker tag apachepulsar/pulsar:4.0.2 apachepulsar/pulsar:4.0.2-$(git rev-parse --short=7 HEAD) mvn -B -ntp -f tests/docker-images/pom.xml install -pl org.apache.pulsar.tests:latest-version-image -am -Pdocker,-main -DskipTests - name: run integration tests diff --git a/README.md b/README.md index 29ceada..938f726 100644 --- a/README.md +++ b/README.md @@ -34,11 +34,11 @@ mvn install In order to build this repository the linked Pulsar release must be released to Maven Central other wise you have to build it locally. -For instance if this code depends on Pulsar 4.0.1 you have to build Pulsar 4.0.1 locally +For instance if this code depends on Pulsar 4.0.2 you have to build Pulsar 4.0.2 locally ``` git clone https://github.com/apache/pulsar -git checkout v4.0.1 +git checkout v4.0.2 mvn clean install -DskipTests ``` diff --git a/pom.xml b/pom.xml index 29a118e..6de62a3 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ - 4.0.1 + 4.0.2 2.7.2 2.0.0 0.8.1.1 diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java index 797f340..db97451 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java @@ -159,6 +159,21 @@ public void ack(Object msgId) { } } + public void negativeAck(Object msg) { + if (msg instanceof Message) { + Message pulsarMsg = (Message) msg; + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Received negative ack for message {}", spoutId, pulsarMsg.getMessageId()); + } + consumer.negativeAcknowledge(pulsarMsg); + pendingMessageRetries.remove(pulsarMsg.getMessageId()); + // we should also remove message from failedMessages but it will be + // eventually removed while emitting next + // tuple + --pendingAcks; + } + } + @Override public void fail(Object msgId) { if (msgId instanceof Message) { @@ -183,8 +198,13 @@ public void fail(Object msgId) { --pendingAcks; messagesFailed++; } else { - LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id); - ack(msg); + if(pulsarSpoutConf.shouldNegativeAckFailedMessages()){ + LOG.warn("[{}] Number of retries limit reached, negative acking the message {}", spoutId, id); + negativeAck(msg); + } else { + LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id); + ack(msg); + } } } @@ -447,6 +467,11 @@ public void acknowledgeAsync(Message msg) { consumer.acknowledgeAsync(msg); } + @Override + public void negativeAcknowledge(Message msg) { + consumer.negativeAcknowledge(msg); + } + @Override public void close() throws PulsarClientException { consumer.close(); @@ -477,6 +502,11 @@ public void acknowledgeAsync(Message msg) { // No-op } + @Override + public void negativeAcknowledge(Message msg) { + // No-op + } + @Override public void close() throws PulsarClientException { try { @@ -491,4 +521,4 @@ public void unsubscribe() throws PulsarClientException { // No-op } } -} \ No newline at end of file +} diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java index db797ee..e8e883a 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java @@ -49,9 +49,11 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration { private boolean autoUnsubscribe = false; private boolean durableSubscription = true; // read position if non-durable subscription is enabled : default oldest message available in topic - private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest; + private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest; + private boolean negativeAckFailedMessages = false; + + - /** * @return the subscription name for the consumer in the spout */ @@ -192,4 +194,21 @@ public MessageId getNonDurableSubscriptionReadPosition() { public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) { this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition; } + + /** + * + * @return whether the consumer will negative ack the failed messages + */ + public boolean shouldNegativeAckFailedMessages(){ + return this.negativeAckFailedMessages; + } + + /** + * Sets whether the consumer will negative ack the failed messages. (default: false) + * + * @param negativeAckFailedMessages + */ + public void setNegativeAckFailedMessages(boolean negativeAckFailedMessages){ + this.negativeAckFailedMessages = negativeAckFailedMessages; + } } diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java index 5502a62..6e69e3f 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java @@ -42,6 +42,13 @@ public interface PulsarSpoutConsumer { */ void acknowledgeAsync(Message msg); + /** + * Negative ack the message. + * + * @param msg + */ + void negativeAcknowledge(Message msg); + /** * unsubscribe the consumer * @throws PulsarClientException diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java index e6cbc51..b88e054 100644 --- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java +++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java @@ -18,16 +18,7 @@ */ package org.apache.pulsar.storm; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -38,13 +29,11 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer; import org.apache.storm.spout.SpoutOutputCollector; @@ -98,6 +87,71 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { verify(consumer, atLeast(1)).receive(anyInt(), any()); } + @Test + public void testFailedMessageNegativeAck() throws Exception { + testFailedMessageRetryExhausted(true); + } + + @Test + public void testFailedMessageAckAndDrop() throws Exception { + testFailedMessageRetryExhausted(false); + } + + + public void testFailedMessageRetryExhausted(boolean negativeAckFailedMessages) throws Exception { + + PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration(); + conf.setServiceUrl("http://localhost:8080"); + conf.setSubscriptionName("sub1"); + conf.setTopic("persistent://prop/ns1/topic1"); + conf.setSubscriptionType(SubscriptionType.Exclusive); + conf.setMaxFailedRetries(1); + conf.setNegativeAckFailedMessages(negativeAckFailedMessages); + conf.setMessageToValuesMapper(new MessageToValuesMapper() { + @Override + public Values toValues(Message msg) { + return null; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } + + }); + + DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(1) + .deadLetterTopic("persistent://prop/ns1/dl-topic-1") + .build(); + ConsumerConfigurationData consumerConfig = new ConsumerConfigurationData<>(); + consumerConfig.setDeadLetterPolicy(deadLetterPolicy); + + ClientConfigurationData clientConfigurationData = spy(new ClientBuilderImpl()).getClientConfigurationData().clone(); + PulsarSpout spout = spy(new PulsarSpout(conf, clientConfigurationData, consumerConfig)); + + Message msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), + new byte[0], Schema.BYTES, new MessageMetadata()); + Consumer consumer = mock(Consumer.class); + SpoutConsumer spoutConsumer = new SpoutConsumer(consumer); + doNothing().when(consumer).negativeAcknowledge(msg); + + Field consField = PulsarSpout.class.getDeclaredField("consumer"); + consField.setAccessible(true); + consField.set(spout, spoutConsumer); + + spout.fail(msg); + spout.fail(msg); + + if(negativeAckFailedMessages){ + verify(consumer, atLeast(1)).negativeAcknowledge(msg); + verify(consumer, never()).acknowledgeAsync(msg); + } else { + verify(consumer, never()).negativeAcknowledge(msg); + verify(consumer, atLeast(1)).acknowledgeAsync(msg); + } + + } + @Test public void testPulsarTuple() throws Exception { testPulsarSpout(true);