diff --git a/pip/pip-419.md b/pip/pip-419.md new file mode 100644 index 0000000000000..c9d4fcde28fa4 --- /dev/null +++ b/pip/pip-419.md @@ -0,0 +1,180 @@ +# PIP-419: Support consumer message filter on broker-side for specific subscription + +Implementation PR: + +# Background knowledge + +In my company, we developed a centralized message broker service, which starts multiple subscriptions to consume messages from multiple topics, and then forwards them to business clients after certain processing. Our users have message filtering requirements, that is, when messages are written to topics with different tags, multiple different subscriptions will be started when consuming, and each subscription will only consume messages with the tags it cares about. + +What we are doing is replacing all RocketMQ with Pulsar, but I found that consumers cannot filter messages based on certain message tags or properties, just like [message filtering](https://rocketmq.apache.org/docs/featureBehavior/07messagefilter) in RocketMQ. + +From the user's perspective, when using a Pulsar Consumer, we have two main options to consume messages: + +1. Pull mode, by calling `consumer.receive()`(or `consumer.receiveAsync()`) + +```java +public class ConsumerExample { + public static void main(String[] args) throws PulsarClientException { + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + Consumer consumer = pulsarClient.newConsumer(Schema.INT64) + .topic("persistent://public/default/my-topic") + .subscriptionName("my-subscription") + .subscribe(); + do { + Message message = consumer.receive(); + consumer.acknowledge(message); + } while (true); + + } +} + +``` + +1. Push mode, by registering a `MessageListener` interface, when building the Consumer. + +```java +public class ConsumerExample { + public static void main(String[] args) throws PulsarClientException { + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + Consumer consumer = pulsarClient.newConsumer(Schema.INT64) + .topic("persistent://public/default/my-topic") + .subscriptionName("my-subscription2") + .messageListener((consumer, message) -> { + // process message + consumer.acknowledgeAsync(message); + }) + .subscribe(); + } +} + +``` + +However, I found that they do not have similar message filtering configurations, which means that a subscription will receive all messages from a topic and then filter them at the business layer if necessary. These meaningless messages may cause a lot of bandwidth waste. **Therefore, I think it is very meaningful to implement message filtering on the pulsar broker side.** + +# Motivation + +As [Background knowledge](#background-knowledge) mentioned, implement message filtering on the pulsar broker side, enrich the pulsar ecosystem, save bandwidth, and help RocketMQ users switch to Pulsar more smoothly. And, I've noticed two similar PRs https://github.com/apache/pulsar/pull/8544 and https://github.com/apache/pulsar/pull/7629, but neither was approved, so I want to try to get this work implemented. + +# Goals + +1. **Implement message filtering on the pulsar broker side.** + +## In Scope + +If this PIP is accepted, it will help users to easily filter messages when consumption. + +# Detailed Design + +## Design & Implementation Details + +**The general idea is to filter based on the properties of the message at pulsar broker.** + +In the following scenario, the producer adds two properties, `messageType` and `messageSource`, when writing messages. For example: + +```java + // message1 + TypedMessageBuilder messageBuilder1 = producer.newMessage(); + messageBuilder1.property("messageType", "file"); + messageBuilder1.property("messageSource", "QQ"); + + // message2 + TypedMessageBuilder messageBuilder2 = producer.newMessage(); + messageBuilder2.property("messageType", "audio"); + messageBuilder2.property("messageSource", "wechat"); +``` + +If a subscription only wants to consume *audio* messages from w*echat*, how should it be implemented? + +We can implement message filtering logic on pulsar broker at [org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer](https://github.com/apache/pulsar/blob/965ef5c14c93ca896ef4c8f34520066285fcf047/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L108) + +The filtering engine can choose the following two solutions. **In general, I prefer the SQL-92 solution, which should have better performance. Groovy may have slightly worse performance due to its dynamic characteristics.** + +### Solution 1 SQL-92 + +Message filtering using [SQL-92](https://en.wikipedia.org/wiki/SQL-92) syntax mentioned in https://github.com/apache/pulsar/pull/8544#issuecomment-1064619792 + +We can write the following SQL expression to filter messages, the pseudo code is as follow: + +```java +// The filter expression passed by the pulsar consumer client to the pulsar broker +String sql = "messageSource='wechat' AND messageType='audio'" + +// The pulsar broker uses the expression passed by the client to filter the messages and returns the messages that meet the conditions to the client. +List filtedMessages = messages.stream() + .filter(message -> { + return SQL92Expression.evaluate(message) + }) + .collect(Collectors.toList()); +``` + +### Solution 2 Groovy + +> [Groovy](https://groovy-lang.org/) is a JVM-based object-oriented programming language under Apache. It can be used for object-oriented programming and can also be used as a pure scripting language to call Groovy scripts in Java. +> + +We can write the following Groovy expression to filter messages, the pseudo code is as follows: + +```java +// The filter expression passed by the pulsar consumer client to the pulsar broker +String groovyExpression = "it.properties.messageType.equals('audio') && it.properties.messageSource.equals('wechat')" + +// The pulsar broker uses the expression passed by the client to filter the messages and returns the messages that meet the conditions to the client. +GroovyShell shell = new GroovyShell(); +Script script = shell.parse("return " + groovyExpression); +List filtedMessages = messages.stream() + .filter(message -> { + binding.setVariable("it", message); + script.setBinding(binding); + return (Boolean) script.run(); + }) + .collect(Collectors.toList()); +``` + +### Usage Example + +```java +public class ConsumerExample { + public static void main(String[] args) throws PulsarClientException { + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + Consumer consumer = pulsarClient.newConsumer(Schema.INT64) + .topic("persistent://public/default/my-topic") + .subscriptionName("my-subscription") + .filterExpression(new FilterExpression("messageSource='wechat' AND messageType='audio'", FilterExpressionType.SQL92)) + //.filterExpression(new FilterExpression("it.properties.messageType.equals('audio') && it.properties.messageSource.equals('wechat')", FilterExpressionType.Groovy)) + .subscribe(); + + do { + Message message = consumer.receive(); // just receive messages that meet the filter rules. + consumer.acknowledge(message); + } while (true); + + } +} + +``` + +## Public-facing Changes + +### Public API + +1. Add an optional config `filterExpression` in [ConsumerBuilder](https://github.com/apache/pulsar/blob/965ef5c14c93ca896ef4c8f34520066285fcf047/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java) + +```java +ConsumerBuilder filterExpression(FilterExpression filterExpression); + +``` + +# Backward & Forward Compatibility + +You can do upgrading or reverting normally, no specified steps are needed to do. + +# Links + +- Mailing List discussion thread: +- Mailing List voting thread: \ No newline at end of file