From 881064827a28acc484a87088a81861187d36be94 Mon Sep 17 00:00:00 2001 From: wujie Date: Thu, 15 May 2025 11:28:33 +0800 Subject: [PATCH 1/5] [feat][pip]:PIP-413: Support consumer message filter on broker-side for specific subscription --- pip/pip-419.md | 184 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 pip/pip-419.md diff --git a/pip/pip-419.md b/pip/pip-419.md new file mode 100644 index 0000000000000..34f6214d411a4 --- /dev/null +++ b/pip/pip-419.md @@ -0,0 +1,184 @@ +# 消息过滤 + +# PIP-419: Support consumer message filter on broker-side for specific subscription + +Implementation PR: + +# Background knowledge + +In my company, we developed a centralized message broker service, which starts multiple subscriptions to consume messages from multiple topics, and then forwards them to business clients after certain processing. Our users have message filtering requirements, that is, when messages are written to topics with different tags, multiple different subscriptions will be started when consuming, and each subscription will only consume messages with the tags it cares about. + +What we are doing is replacing all RocketMQ with Pulsar, but I found that consumers cannot filter messages based on certain message tags or properties, just like [message filtering](https://rocketmq.apache.org/docs/featureBehavior/07messagefilter) in RocketMQ. + +From the user's perspective, when using a Pulsar Consumer, we have two main options to consume messages: + +1. Pull mode, by calling `consumer.receive()`(or `consumer.receiveAsync()`) + +```java +public class ConsumerExample { + public static void main(String[] args) throws PulsarClientException { + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + Consumer consumer = pulsarClient.newConsumer(Schema.INT64) + .topic("persistent://public/default/my-topic") + .subscriptionName("my-subscription") + .subscribe(); + do { + Message message = consumer.receive(); + consumer.acknowledge(message); + } while (true); + + } +} + +``` + +1. Push mode, by registering a `MessageListener` interface, when building the Consumer. + +```java +public class ConsumerExample { + public static void main(String[] args) throws PulsarClientException { + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + Consumer consumer = pulsarClient.newConsumer(Schema.INT64) + .topic("persistent://public/default/my-topic") + .subscriptionName("my-subscription2") + .messageListener((consumer, message) -> { + // process message + consumer.acknowledgeAsync(message); + }) + .subscribe(); + } +} + +``` + +However, I found that they do not have similar message filtering configurations, which means that a subscription will receive all messages from a topic and then filter them at the business layer if necessary. These meaningless messages may cause a lot of bandwidth waste. **Therefore, I think it is very meaningful to implement message filtering on the pulsar broker side.** + +# Motivation + +As [Background knowledge](https://www.notion.so/1f38504c2b898043982ce46d7d903c64?pvs=21) mentioned, implement message filtering on the pulsar broker side, enrich the pulsar ecosystem, save bandwidth, and help RocketMQ users switch to Pulsar more smoothly. And, I've noticed two similar PRshttps://github.com/apache/pulsar/pull/8544 and https://github.com/apache/pulsar/pull/7629, but neither was approved, so I want to try to get this work implemented. + +# Goals + +1. **Implement message filtering on the pulsar broker side.** + +## In Scope + +If this PIP is accepted, it will help users to easily filter messages when consumption. + +# Detailed Design + +## Design & Implementation Details + +**The general idea is to filter based on the properties of the message at pulsar broker.** + +In the following scenario, the producer adds two properties, messageType and messageSource, when writing messages. For example: + +```java + // message1 + TypedMessageBuilder messageBuilder1 = producer.newMessage(); + messageBuilder1.property("messageType", "file"); + messageBuilder1.property("messageSource", "QQ"); + + // message2 + TypedMessageBuilder messageBuilder2 = producer.newMessage(); + messageBuilder2.property("messageType", "audio"); + messageBuilder2.property("messageSource", "wechat"); +``` + +If a subscription only wants to consume *audio* messages from w*echat*, how should it be implemented? + +We can implement message filtering logic on pulsar broker at `org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer` + +![image.png](%E6%B6%88%E6%81%AF%E8%BF%87%E6%BB%A4%201f38504c2b898043982ce46d7d903c64/image.png) + +The filtering engine can choose the following two solutions. **In general, I prefer the SQL-92 solution, which should have better performance. Groovy may have slightly worse performance due to its dynamic characteristics.** + +### Solution 1 SQL-92 + +Message filtering using [SQL-92](https://en.wikipedia.org/wiki/SQL-92) syntax mentioned in https://github.com/apache/pulsar/pull/8544#issuecomment-1064619792 + +We can write the following SQL expression to filter messages, the pseudo code is as follow: + +```java +// The filter expression passed by the pulsar consumer client to the pulsar broker +String sql = "messageSource='wechat' AND messageType='audio'" + +// The pulsar broker uses the expression passed by the client to filter the messages and returns the messages that meet the conditions to the client. +List filtedMessages = messages.stream() + .filter(message -> { + return SQL92Expression.evaluate(message) + }) + .collect(Collectors.toList()); +``` + +### Solution 2 Groovy + +> [Groovy](https://groovy-lang.org/) is a JVM-based object-oriented programming language under Apache. It can be used for object-oriented programming and can also be used as a pure scripting language to call Groovy scripts in Java. +> + +We can write the following Groovy expression to filter messages, the pseudo code is as follows: + +```java +// The filter expression passed by the pulsar consumer client to the pulsar broker +String groovyExpression = "it.properties.messageType.equals('audio') && it.properties.messageSource.equals('wechat')" + +// The pulsar broker uses the expression passed by the client to filter the messages and returns the messages that meet the conditions to the client. +GroovyShell shell = new GroovyShell(); +Script script = shell.parse("return " + groovyExpression); +List filtedMessages = messages.stream() + .filter(message -> { + binding.setVariable("it", message); + script.setBinding(binding); + return (Boolean) script.run(); + }) + .collect(Collectors.toList()); +``` + +### Usage Example + +```java +public class ConsumerExample { + public static void main(String[] args) throws PulsarClientException { + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + Consumer consumer = pulsarClient.newConsumer(Schema.INT64) + .topic("persistent://public/default/my-topic") + .subscriptionName("my-subscription") + .filterExpression(new FilterExpression("messageSource='wechat' AND messageType='audio'", FilterExpressionType.SQL92)) + //.filterExpression(new FilterExpression("it.properties.messageType.equals('audio') && it.properties.messageSource.equals('wechat')", FilterExpressionType.Groovy)) + .subscribe(); + + do { + Message message = consumer.receive(); // just receive messages that meet the filter rules. + consumer.acknowledge(message); + } while (true); + + } +} + +``` + +## Public-facing Changes + +### Public API + +1. Add an optional config `filterExpression` in `ConsumerBuilder` + +```java +ConsumerBuilder filterExpression(FilterExpression filterExpression); + +``` + +# Backward & Forward Compatibility + +You can do upgrading or reverting normally, no specified steps are needed to do. + +# Links + +- Mailing List discussion thread: +- Mailing List voting thread: \ No newline at end of file From fcccd5053707035defd8d0d61d7a0cb54954142e Mon Sep 17 00:00:00 2001 From: wujie Date: Thu, 15 May 2025 12:10:59 +0800 Subject: [PATCH 2/5] fix --- pip/pip-419.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pip/pip-419.md b/pip/pip-419.md index 34f6214d411a4..2c04f265aba54 100644 --- a/pip/pip-419.md +++ b/pip/pip-419.md @@ -1,5 +1,3 @@ -# 消息过滤 - # PIP-419: Support consumer message filter on broker-side for specific subscription Implementation PR: @@ -59,7 +57,7 @@ However, I found that they do not have similar message filtering configurations, # Motivation -As [Background knowledge](https://www.notion.so/1f38504c2b898043982ce46d7d903c64?pvs=21) mentioned, implement message filtering on the pulsar broker side, enrich the pulsar ecosystem, save bandwidth, and help RocketMQ users switch to Pulsar more smoothly. And, I've noticed two similar PRshttps://github.com/apache/pulsar/pull/8544 and https://github.com/apache/pulsar/pull/7629, but neither was approved, so I want to try to get this work implemented. +As [Background knowledge](#background-knowledge) mentioned, implement message filtering on the pulsar broker side, enrich the pulsar ecosystem, save bandwidth, and help RocketMQ users switch to Pulsar more smoothly. And, I've noticed two similar PRs https://github.com/apache/pulsar/pull/8544 and https://github.com/apache/pulsar/pull/7629, but neither was approved, so I want to try to get this work implemented. # Goals @@ -93,7 +91,7 @@ If a subscription only wants to consume *audio* messages from w*echat*, how shou We can implement message filtering logic on pulsar broker at `org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer` -![image.png](%E6%B6%88%E6%81%AF%E8%BF%87%E6%BB%A4%201f38504c2b898043982ce46d7d903c64/image.png) +![image](https://github.com/user-attachments/assets/10eea512-1181-42f1-af9c-83dea5bf565e) The filtering engine can choose the following two solutions. **In general, I prefer the SQL-92 solution, which should have better performance. Groovy may have slightly worse performance due to its dynamic characteristics.** From 4de4b15cdab36863cfa91a4c44ee09cd5ad68053 Mon Sep 17 00:00:00 2001 From: wujie Date: Thu, 15 May 2025 13:34:36 +0800 Subject: [PATCH 3/5] add code refer link --- pip/pip-419.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-419.md b/pip/pip-419.md index 2c04f265aba54..f401624a3511e 100644 --- a/pip/pip-419.md +++ b/pip/pip-419.md @@ -89,7 +89,7 @@ In the following scenario, the producer adds two properties, messageType and mes If a subscription only wants to consume *audio* messages from w*echat*, how should it be implemented? -We can implement message filtering logic on pulsar broker at `org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer` +We can implement message filtering logic on pulsar broker at [org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer](https://github.com/apache/pulsar/blob/965ef5c14c93ca896ef4c8f34520066285fcf047/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L108) ![image](https://github.com/user-attachments/assets/10eea512-1181-42f1-af9c-83dea5bf565e) From a7eb83f3f5eed1942363fa1fa7462c44b27efba8 Mon Sep 17 00:00:00 2001 From: wujie Date: Thu, 15 May 2025 13:36:26 +0800 Subject: [PATCH 4/5] fix --- pip/pip-419.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pip/pip-419.md b/pip/pip-419.md index f401624a3511e..d711f37779f0d 100644 --- a/pip/pip-419.md +++ b/pip/pip-419.md @@ -73,7 +73,7 @@ If this PIP is accepted, it will help users to easily filter messages when consu **The general idea is to filter based on the properties of the message at pulsar broker.** -In the following scenario, the producer adds two properties, messageType and messageSource, when writing messages. For example: +In the following scenario, the producer adds two properties, `messageType` and `messageSource`, when writing messages. For example: ```java // message1 @@ -91,8 +91,6 @@ If a subscription only wants to consume *audio* messages from w*echat*, how shou We can implement message filtering logic on pulsar broker at [org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer](https://github.com/apache/pulsar/blob/965ef5c14c93ca896ef4c8f34520066285fcf047/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L108) -![image](https://github.com/user-attachments/assets/10eea512-1181-42f1-af9c-83dea5bf565e) - The filtering engine can choose the following two solutions. **In general, I prefer the SQL-92 solution, which should have better performance. Groovy may have slightly worse performance due to its dynamic characteristics.** ### Solution 1 SQL-92 From 9400cc9cef3115c84fb861f2835db8e9a82cd431 Mon Sep 17 00:00:00 2001 From: wujie Date: Thu, 15 May 2025 13:40:01 +0800 Subject: [PATCH 5/5] add code refer link --- pip/pip-419.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-419.md b/pip/pip-419.md index d711f37779f0d..c9d4fcde28fa4 100644 --- a/pip/pip-419.md +++ b/pip/pip-419.md @@ -163,7 +163,7 @@ public class ConsumerExample { ### Public API -1. Add an optional config `filterExpression` in `ConsumerBuilder` +1. Add an optional config `filterExpression` in [ConsumerBuilder](https://github.com/apache/pulsar/blob/965ef5c14c93ca896ef4c8f34520066285fcf047/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java) ```java ConsumerBuilder filterExpression(FilterExpression filterExpression);