From cba7ef75e4b41320f1e3e80360a0b21125669221 Mon Sep 17 00:00:00 2001 From: AnuragReddy2000 Date: Thu, 30 Jan 2025 16:33:42 +0530 Subject: [PATCH 1/5] Adds DLQ support in PulsarSpout of the Storm adaptor --- .../org/apache/pulsar/storm/PulsarSpout.java | 42 +++++++++++++++++-- .../storm/PulsarSpoutConfiguration.java | 23 +++++++++- .../pulsar/storm/PulsarSpoutConsumer.java | 7 ++++ 3 files changed, 67 insertions(+), 5 deletions(-) diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java index 797f340..76be542 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java @@ -159,6 +159,21 @@ public void ack(Object msgId) { } } + public void negativeAck(Object msgId) { + if (msgId instanceof Message) { + Message msg = (Message) msgId; + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Received negative ack for message {}", spoutId, msg.getMessageId()); + } + consumer.negativeAcknowledge(msg); + pendingMessageRetries.remove(msg.getMessageId()); + // we should also remove message from failedMessages but it will be + // eventually removed while emitting next + // tuple + --pendingAcks; + } + } + @Override public void fail(Object msgId) { if (msgId instanceof Message) { @@ -183,8 +198,13 @@ public void fail(Object msgId) { --pendingAcks; messagesFailed++; } else { - LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id); - ack(msg); + if(pulsarSpoutConf.shouldNegativeAckFailedMessages()){ + LOG.warn("[{}] Number of retries limit reached, negative acking the message {}", spoutId, id); + negativeAck(msg); + } else { + LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id); + ack(msg); + } } } @@ -325,7 +345,13 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { private boolean mapToValueAndEmit(Message msg) { if (msg != null) { - Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg); + Values values; + try{ + values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg); + } catch (Exception e){ + LOG.error("[{}] Error mapping message to values", msg.getMessageId(), e); + return false; + } ++pendingAcks; if (values == null) { // since the mapper returned null, we can drop the message and @@ -447,6 +473,11 @@ public void acknowledgeAsync(Message msg) { consumer.acknowledgeAsync(msg); } + @Override + public void negativeAcknowledge(Message msg) { + consumer.negativeAcknowledge(msg); + } + @Override public void close() throws PulsarClientException { consumer.close(); @@ -477,6 +508,11 @@ public void acknowledgeAsync(Message msg) { // No-op } + @Override + public void negativeAcknowledge(Message msg) { + // No-op + } + @Override public void close() throws PulsarClientException { try { diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java index db797ee..e8e883a 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java @@ -49,9 +49,11 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration { private boolean autoUnsubscribe = false; private boolean durableSubscription = true; // read position if non-durable subscription is enabled : default oldest message available in topic - private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest; + private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest; + private boolean negativeAckFailedMessages = false; + + - /** * @return the subscription name for the consumer in the spout */ @@ -192,4 +194,21 @@ public MessageId getNonDurableSubscriptionReadPosition() { public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) { this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition; } + + /** + * + * @return whether the consumer will negative ack the failed messages + */ + public boolean shouldNegativeAckFailedMessages(){ + return this.negativeAckFailedMessages; + } + + /** + * Sets whether the consumer will negative ack the failed messages. (default: false) + * + * @param negativeAckFailedMessages + */ + public void setNegativeAckFailedMessages(boolean negativeAckFailedMessages){ + this.negativeAckFailedMessages = negativeAckFailedMessages; + } } diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java index 5502a62..6e69e3f 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java @@ -42,6 +42,13 @@ public interface PulsarSpoutConsumer { */ void acknowledgeAsync(Message msg); + /** + * Negative ack the message. + * + * @param msg + */ + void negativeAcknowledge(Message msg); + /** * unsubscribe the consumer * @throws PulsarClientException From c27bf740ac2ab033db0e29b200fd5752346cf8d0 Mon Sep 17 00:00:00 2001 From: AnuragReddy2000 Date: Thu, 30 Jan 2025 16:55:00 +0530 Subject: [PATCH 2/5] Updated pulsar version to 4.0.2 --- .github/workflows/integration-test.yaml | 12 ++++++------ README.md | 4 ++-- pom.xml | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/integration-test.yaml b/.github/workflows/integration-test.yaml index 2afbca8..62a64b9 100644 --- a/.github/workflows/integration-test.yaml +++ b/.github/workflows/integration-test.yaml @@ -71,11 +71,11 @@ jobs: distribution: 'adopt' java-version: 17 - - name: install org.apache.pulsar.tests:integration:jar:tests:4.0.1 + - name: install org.apache.pulsar.tests:integration:jar:tests:4.0.2 if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | cd ~ - git clone --depth 50 --single-branch --branch v4.0.1 https://github.com/apache/pulsar + git clone --depth 50 --single-branch --branch v4.0.2 https://github.com/apache/pulsar cd pulsar mvn -B -ntp -f tests/pom.xml -pl org.apache.pulsar.tests:tests-parent,org.apache.pulsar.tests:integration install @@ -83,10 +83,10 @@ jobs: if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | cd ~/pulsar - docker pull apachepulsar/pulsar-all:4.0.1 - docker pull apachepulsar/pulsar:4.0.1 - docker tag apachepulsar/pulsar-all:4.0.1 apachepulsar/pulsar-all:4.0.1-$(git rev-parse --short=7 HEAD) - docker tag apachepulsar/pulsar:4.0.1 apachepulsar/pulsar:4.0.1-$(git rev-parse --short=7 HEAD) + docker pull apachepulsar/pulsar-all:4.0.2 + docker pull apachepulsar/pulsar:4.0.2 + docker tag apachepulsar/pulsar-all:4.0.2 apachepulsar/pulsar-all:4.0.2-$(git rev-parse --short=7 HEAD) + docker tag apachepulsar/pulsar:4.0.2 apachepulsar/pulsar:4.0.2-$(git rev-parse --short=7 HEAD) mvn -B -ntp -f tests/docker-images/pom.xml install -pl org.apache.pulsar.tests:latest-version-image -am -Pdocker,-main -DskipTests - name: run integration tests diff --git a/README.md b/README.md index 29ceada..938f726 100644 --- a/README.md +++ b/README.md @@ -34,11 +34,11 @@ mvn install In order to build this repository the linked Pulsar release must be released to Maven Central other wise you have to build it locally. -For instance if this code depends on Pulsar 4.0.1 you have to build Pulsar 4.0.1 locally +For instance if this code depends on Pulsar 4.0.2 you have to build Pulsar 4.0.2 locally ``` git clone https://github.com/apache/pulsar -git checkout v4.0.1 +git checkout v4.0.2 mvn clean install -DskipTests ``` diff --git a/pom.xml b/pom.xml index 29a118e..6de62a3 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ - 4.0.1 + 4.0.2 2.7.2 2.0.0 0.8.1.1 From f887d4730fe49268207a45dbbcf2cb732cbb82ef Mon Sep 17 00:00:00 2001 From: AnuragReddy2000 Date: Thu, 6 Feb 2025 11:58:52 +0530 Subject: [PATCH 3/5] Updated tests for the DLQ feature --- .../apache/pulsar/storm/PulsarSpoutTest.java | 84 +++++++++++++++---- 1 file changed, 69 insertions(+), 15 deletions(-) diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java index e6cbc51..b88e054 100644 --- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java +++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java @@ -18,16 +18,7 @@ */ package org.apache.pulsar.storm; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -38,13 +29,11 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer; import org.apache.storm.spout.SpoutOutputCollector; @@ -98,6 +87,71 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { verify(consumer, atLeast(1)).receive(anyInt(), any()); } + @Test + public void testFailedMessageNegativeAck() throws Exception { + testFailedMessageRetryExhausted(true); + } + + @Test + public void testFailedMessageAckAndDrop() throws Exception { + testFailedMessageRetryExhausted(false); + } + + + public void testFailedMessageRetryExhausted(boolean negativeAckFailedMessages) throws Exception { + + PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration(); + conf.setServiceUrl("http://localhost:8080"); + conf.setSubscriptionName("sub1"); + conf.setTopic("persistent://prop/ns1/topic1"); + conf.setSubscriptionType(SubscriptionType.Exclusive); + conf.setMaxFailedRetries(1); + conf.setNegativeAckFailedMessages(negativeAckFailedMessages); + conf.setMessageToValuesMapper(new MessageToValuesMapper() { + @Override + public Values toValues(Message msg) { + return null; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } + + }); + + DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(1) + .deadLetterTopic("persistent://prop/ns1/dl-topic-1") + .build(); + ConsumerConfigurationData consumerConfig = new ConsumerConfigurationData<>(); + consumerConfig.setDeadLetterPolicy(deadLetterPolicy); + + ClientConfigurationData clientConfigurationData = spy(new ClientBuilderImpl()).getClientConfigurationData().clone(); + PulsarSpout spout = spy(new PulsarSpout(conf, clientConfigurationData, consumerConfig)); + + Message msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), + new byte[0], Schema.BYTES, new MessageMetadata()); + Consumer consumer = mock(Consumer.class); + SpoutConsumer spoutConsumer = new SpoutConsumer(consumer); + doNothing().when(consumer).negativeAcknowledge(msg); + + Field consField = PulsarSpout.class.getDeclaredField("consumer"); + consField.setAccessible(true); + consField.set(spout, spoutConsumer); + + spout.fail(msg); + spout.fail(msg); + + if(negativeAckFailedMessages){ + verify(consumer, atLeast(1)).negativeAcknowledge(msg); + verify(consumer, never()).acknowledgeAsync(msg); + } else { + verify(consumer, never()).negativeAcknowledge(msg); + verify(consumer, atLeast(1)).acknowledgeAsync(msg); + } + + } + @Test public void testPulsarTuple() throws Exception { testPulsarSpout(true); From 9a4c355fec64988d29061bd4a4c6106bb92abbbc Mon Sep 17 00:00:00 2001 From: AnuragReddy2000 Date: Wed, 19 Feb 2025 10:37:09 +0530 Subject: [PATCH 4/5] Removed unrelated change --- .../main/java/org/apache/pulsar/storm/PulsarSpout.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java index 76be542..783db4c 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java @@ -345,13 +345,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { private boolean mapToValueAndEmit(Message msg) { if (msg != null) { - Values values; - try{ - values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg); - } catch (Exception e){ - LOG.error("[{}] Error mapping message to values", msg.getMessageId(), e); - return false; - } + Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg); ++pendingAcks; if (values == null) { // since the mapper returned null, we can drop the message and From 3ccacbd594a460a7ae109544c7bc6522f6796006 Mon Sep 17 00:00:00 2001 From: Anurag Reddy Karri <62467911+AnuragReddy2000@users.noreply.github.com> Date: Wed, 12 Mar 2025 18:33:53 +0530 Subject: [PATCH 5/5] Update PulsarSpout.java Addressed review comments --- .../java/org/apache/pulsar/storm/PulsarSpout.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java index 783db4c..db97451 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java @@ -159,14 +159,14 @@ public void ack(Object msgId) { } } - public void negativeAck(Object msgId) { - if (msgId instanceof Message) { - Message msg = (Message) msgId; + public void negativeAck(Object msg) { + if (msg instanceof Message) { + Message pulsarMsg = (Message) msg; if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Received negative ack for message {}", spoutId, msg.getMessageId()); + LOG.debug("[{}] Received negative ack for message {}", spoutId, pulsarMsg.getMessageId()); } - consumer.negativeAcknowledge(msg); - pendingMessageRetries.remove(msg.getMessageId()); + consumer.negativeAcknowledge(pulsarMsg); + pendingMessageRetries.remove(pulsarMsg.getMessageId()); // we should also remove message from failedMessages but it will be // eventually removed while emitting next // tuple @@ -521,4 +521,4 @@ public void unsubscribe() throws PulsarClientException { // No-op } } -} \ No newline at end of file +}