-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat][pip] PIP-419: Support consumer message filter on broker-side for specific subscription #24305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[feat][pip] PIP-419: Support consumer message filter on broker-side for specific subscription #24305
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,180 @@ | ||
| # PIP-419: Support consumer message filter on broker-side for specific subscription | ||
|
|
||
| Implementation PR: | ||
|
|
||
| # Background knowledge | ||
|
|
||
| In my company, we developed a centralized message broker service, which starts multiple subscriptions to consume messages from multiple topics, and then forwards them to business clients after certain processing. Our users have message filtering requirements, that is, when messages are written to topics with different tags, multiple different subscriptions will be started when consuming, and each subscription will only consume messages with the tags it cares about. | ||
|
|
||
| What we are doing is replacing all RocketMQ with Pulsar, but I found that consumers cannot filter messages based on certain message tags or properties, just like [message filtering](https://rocketmq.apache.org/docs/featureBehavior/07messagefilter) in RocketMQ. | ||
|
|
||
| From the user's perspective, when using a Pulsar Consumer, we have two main options to consume messages: | ||
|
|
||
| 1. Pull mode, by calling `consumer.receive()`(or `consumer.receiveAsync()`) | ||
|
|
||
| ```java | ||
| public class ConsumerExample { | ||
| public static void main(String[] args) throws PulsarClientException { | ||
| PulsarClient pulsarClient = PulsarClient.builder() | ||
| .serviceUrl("pulsar://localhost:6650") | ||
| .build(); | ||
| Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) | ||
| .topic("persistent://public/default/my-topic") | ||
| .subscriptionName("my-subscription") | ||
| .subscribe(); | ||
| do { | ||
| Message<Long> message = consumer.receive(); | ||
| consumer.acknowledge(message); | ||
| } while (true); | ||
|
|
||
| } | ||
| } | ||
|
|
||
| ``` | ||
|
|
||
| 1. Push mode, by registering a `MessageListener` interface, when building the Consumer. | ||
|
|
||
| ```java | ||
| public class ConsumerExample { | ||
| public static void main(String[] args) throws PulsarClientException { | ||
| PulsarClient pulsarClient = PulsarClient.builder() | ||
| .serviceUrl("pulsar://localhost:6650") | ||
| .build(); | ||
| Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) | ||
| .topic("persistent://public/default/my-topic") | ||
| .subscriptionName("my-subscription2") | ||
| .messageListener((consumer, message) -> { | ||
| // process message | ||
| consumer.acknowledgeAsync(message); | ||
| }) | ||
| .subscribe(); | ||
| } | ||
| } | ||
|
|
||
| ``` | ||
|
|
||
| However, I found that they do not have similar message filtering configurations, which means that a subscription will receive all messages from a topic and then filter them at the business layer if necessary. These meaningless messages may cause a lot of bandwidth waste. **Therefore, I think it is very meaningful to implement message filtering on the pulsar broker side.** | ||
|
|
||
| # Motivation | ||
|
|
||
| As [Background knowledge](#background-knowledge) mentioned, implement message filtering on the pulsar broker side, enrich the pulsar ecosystem, save bandwidth, and help RocketMQ users switch to Pulsar more smoothly. And, I've noticed two similar PRs https://github.com/apache/pulsar/pull/8544 and https://github.com/apache/pulsar/pull/7629, but neither was approved, so I want to try to get this work implemented. | ||
|
|
||
| # Goals | ||
|
|
||
| 1. **Implement message filtering on the pulsar broker side.** | ||
|
|
||
| ## In Scope | ||
|
|
||
| If this PIP is accepted, it will help users to easily filter messages when consumption. | ||
|
|
||
| # Detailed Design | ||
|
|
||
| ## Design & Implementation Details | ||
|
|
||
| **The general idea is to filter based on the properties of the message at pulsar broker.** | ||
|
|
||
| In the following scenario, the producer adds two properties, `messageType` and `messageSource`, when writing messages. For example: | ||
|
|
||
| ```java | ||
| // message1 | ||
| TypedMessageBuilder<byte[]> messageBuilder1 = producer.newMessage(); | ||
| messageBuilder1.property("messageType", "file"); | ||
| messageBuilder1.property("messageSource", "QQ"); | ||
|
|
||
| // message2 | ||
| TypedMessageBuilder<byte[]> messageBuilder2 = producer.newMessage(); | ||
| messageBuilder2.property("messageType", "audio"); | ||
| messageBuilder2.property("messageSource", "wechat"); | ||
| ``` | ||
|
|
||
| If a subscription only wants to consume *audio* messages from w*echat*, how should it be implemented? | ||
|
|
||
| We can implement message filtering logic on pulsar broker at [org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer](https://github.com/apache/pulsar/blob/965ef5c14c93ca896ef4c8f34520066285fcf047/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L108) | ||
|
|
||
| The filtering engine can choose the following two solutions. **In general, I prefer the SQL-92 solution, which should have better performance. Groovy may have slightly worse performance due to its dynamic characteristics.** | ||
|
|
||
| ### Solution 1 SQL-92 | ||
|
|
||
| Message filtering using [SQL-92](https://en.wikipedia.org/wiki/SQL-92) syntax mentioned in https://github.com/apache/pulsar/pull/8544#issuecomment-1064619792 | ||
|
|
||
| We can write the following SQL expression to filter messages, the pseudo code is as follow: | ||
|
|
||
| ```java | ||
| // The filter expression passed by the pulsar consumer client to the pulsar broker | ||
| String sql = "messageSource='wechat' AND messageType='audio'" | ||
|
|
||
| // The pulsar broker uses the expression passed by the client to filter the messages and returns the messages that meet the conditions to the client. | ||
| List filtedMessages = messages.stream() | ||
| .filter(message -> { | ||
| return SQL92Expression.evaluate(message) | ||
| }) | ||
| .collect(Collectors.toList()); | ||
| ``` | ||
|
|
||
| ### Solution 2 Groovy | ||
|
|
||
| > [Groovy](https://groovy-lang.org/) is a JVM-based object-oriented programming language under Apache. It can be used for object-oriented programming and can also be used as a pure scripting language to call Groovy scripts in Java. | ||
| > | ||
|
|
||
| We can write the following Groovy expression to filter messages, the pseudo code is as follows: | ||
|
|
||
| ```java | ||
| // The filter expression passed by the pulsar consumer client to the pulsar broker | ||
| String groovyExpression = "it.properties.messageType.equals('audio') && it.properties.messageSource.equals('wechat')" | ||
|
|
||
| // The pulsar broker uses the expression passed by the client to filter the messages and returns the messages that meet the conditions to the client. | ||
| GroovyShell shell = new GroovyShell(); | ||
| Script script = shell.parse("return " + groovyExpression); | ||
| List filtedMessages = messages.stream() | ||
| .filter(message -> { | ||
| binding.setVariable("it", message); | ||
| script.setBinding(binding); | ||
| return (Boolean) script.run(); | ||
| }) | ||
| .collect(Collectors.toList()); | ||
|
Comment on lines
+114
to
+134
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be terrible from security perspective. Something like Google CEL or Spring Expression Language (SpEL) would be a better choice. |
||
| ``` | ||
|
|
||
| ### Usage Example | ||
|
|
||
| ```java | ||
| public class ConsumerExample { | ||
| public static void main(String[] args) throws PulsarClientException { | ||
| PulsarClient pulsarClient = PulsarClient.builder() | ||
| .serviceUrl("pulsar://localhost:6650") | ||
| .build(); | ||
| Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) | ||
| .topic("persistent://public/default/my-topic") | ||
| .subscriptionName("my-subscription") | ||
| .filterExpression(new FilterExpression("messageSource='wechat' AND messageType='audio'", FilterExpressionType.SQL92)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Excellent work! I think this feature is indeed useful. However, I suggest that the filter expressions should be configured, modified, and deleted through the management/control plane (Pulsar Admin/CLI), rather than being set during consumer initialization. My reasoning is as follows:
Regarding point 3: This mirrors real-world issues observed in RocketMQ's message filtering. I've encountered multiple cases where tag-based filtering failed because different consumers in the same consumer group subscribed to conflicting tags, resulting in rule overrides. Recommendation: Store filter rules as subscription metadata (managed via Admin API/CLI) rather than per-consumer settings. This would:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a good idea to configure the filtering rules through admin API. This can avoid the problems you mentioned, which is also the current problem of RocketMQ. I will refactor it together after receiving more comments.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's already possible to handle filtering rules through the Admin API. The pulsar-admin CLI tool has a plugin model that allows adding also new commands. This approach is used in Pulsar JMS: https://github.com/datastax/pulsar-jms/blob/master/pulsar-jms-admin-ext/src/main/java/com/datastax/oss/pulsar/jms/cli/SubscriptionBaseCommand.java (more context in my other comment, #24305 (comment))
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Pulsar JMS, you could also provide the filter at a consumer level with consumer properties. If we were to add message filtering as a top level feature, it would be great to have full support in the clients too. In my other comment, I commented about this aspect
|
||
| //.filterExpression(new FilterExpression("it.properties.messageType.equals('audio') && it.properties.messageSource.equals('wechat')", FilterExpressionType.Groovy)) | ||
| .subscribe(); | ||
|
|
||
| do { | ||
| Message<Long> message = consumer.receive(); // just receive messages that meet the filter rules. | ||
| consumer.acknowledge(message); | ||
| } while (true); | ||
|
|
||
| } | ||
| } | ||
|
|
||
| ``` | ||
|
|
||
| ## Public-facing Changes | ||
|
|
||
| ### Public API | ||
|
|
||
| 1. Add an optional config `filterExpression` in [ConsumerBuilder](https://github.com/apache/pulsar/blob/965ef5c14c93ca896ef4c8f34520066285fcf047/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java) | ||
|
|
||
| ```java | ||
| ConsumerBuilder<T> filterExpression(FilterExpression filterExpression); | ||
|
|
||
| ``` | ||
|
|
||
| # Backward & Forward Compatibility | ||
|
|
||
| You can do upgrading or reverting normally, no specified steps are needed to do. | ||
|
|
||
| # Links | ||
|
|
||
| - Mailing List discussion thread: | ||
| - Mailing List voting thread: | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulsar JMS reuses ActiveMQ's SQL-92 parser for filtering messages
https://github.com/datastax/pulsar-jms/blob/master/pulsar-jms-filters-common/src/main/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupport.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The project has forked the filtering logic from ActiveMQ in this module: https://github.com/datastax/pulsar-jms/tree/master/activemq-filters