Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions pip/pip-419.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# PIP-419: Support consumer message filter on broker-side for specific subscription

Implementation PR:

# Background knowledge

In my company, we developed a centralized message broker service, which starts multiple subscriptions to consume messages from multiple topics, and then forwards them to business clients after certain processing. Our users have message filtering requirements, that is, when messages are written to topics with different tags, multiple different subscriptions will be started when consuming, and each subscription will only consume messages with the tags it cares about.

What we are doing is replacing all RocketMQ with Pulsar, but I found that consumers cannot filter messages based on certain message tags or properties, just like [message filtering](https://rocketmq.apache.org/docs/featureBehavior/07messagefilter) in RocketMQ.

From the user's perspective, when using a Pulsar Consumer, we have two main options to consume messages:

1. Pull mode, by calling `consumer.receive()`(or `consumer.receiveAsync()`)

```java
public class ConsumerExample {
public static void main(String[] args) throws PulsarClientException {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscribe();
do {
Message<Long> message = consumer.receive();
consumer.acknowledge(message);
} while (true);

}
}

```

1. Push mode, by registering a `MessageListener` interface, when building the Consumer.

```java
public class ConsumerExample {
public static void main(String[] args) throws PulsarClientException {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription2")
.messageListener((consumer, message) -> {
// process message
consumer.acknowledgeAsync(message);
})
.subscribe();
}
}

```

However, I found that they do not have similar message filtering configurations, which means that a subscription will receive all messages from a topic and then filter them at the business layer if necessary. These meaningless messages may cause a lot of bandwidth waste. **Therefore, I think it is very meaningful to implement message filtering on the pulsar broker side.**

# Motivation

As [Background knowledge](#background-knowledge) mentioned, implement message filtering on the pulsar broker side, enrich the pulsar ecosystem, save bandwidth, and help RocketMQ users switch to Pulsar more smoothly. And, I've noticed two similar PRs https://github.com/apache/pulsar/pull/8544 and https://github.com/apache/pulsar/pull/7629, but neither was approved, so I want to try to get this work implemented.

# Goals

1. **Implement message filtering on the pulsar broker side.**

## In Scope

If this PIP is accepted, it will help users to easily filter messages when consumption.

# Detailed Design

## Design & Implementation Details

**The general idea is to filter based on the properties of the message at pulsar broker.**

In the following scenario, the producer adds two properties, `messageType` and `messageSource`, when writing messages. For example:

```java
// message1
TypedMessageBuilder<byte[]> messageBuilder1 = producer.newMessage();
messageBuilder1.property("messageType", "file");
messageBuilder1.property("messageSource", "QQ");

// message2
TypedMessageBuilder<byte[]> messageBuilder2 = producer.newMessage();
messageBuilder2.property("messageType", "audio");
messageBuilder2.property("messageSource", "wechat");
```

If a subscription only wants to consume *audio* messages from w*echat*, how should it be implemented?

We can implement message filtering logic on pulsar broker at [org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer](https://github.com/apache/pulsar/blob/965ef5c14c93ca896ef4c8f34520066285fcf047/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L108)

The filtering engine can choose the following two solutions. **In general, I prefer the SQL-92 solution, which should have better performance. Groovy may have slightly worse performance due to its dynamic characteristics.**

### Solution 1 SQL-92

Message filtering using [SQL-92](https://en.wikipedia.org/wiki/SQL-92) syntax mentioned in https://github.com/apache/pulsar/pull/8544#issuecomment-1064619792

We can write the following SQL expression to filter messages, the pseudo code is as follow:
Comment on lines +96 to +100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The project has forked the filtering logic from ActiveMQ in this module: https://github.com/datastax/pulsar-jms/tree/master/activemq-filters


```java
// The filter expression passed by the pulsar consumer client to the pulsar broker
String sql = "messageSource='wechat' AND messageType='audio'"

// The pulsar broker uses the expression passed by the client to filter the messages and returns the messages that meet the conditions to the client.
List filtedMessages = messages.stream()
.filter(message -> {
return SQL92Expression.evaluate(message)
})
.collect(Collectors.toList());
```

### Solution 2 Groovy

> [Groovy](https://groovy-lang.org/) is a JVM-based object-oriented programming language under Apache. It can be used for object-oriented programming and can also be used as a pure scripting language to call Groovy scripts in Java.
>

We can write the following Groovy expression to filter messages, the pseudo code is as follows:

```java
// The filter expression passed by the pulsar consumer client to the pulsar broker
String groovyExpression = "it.properties.messageType.equals('audio') && it.properties.messageSource.equals('wechat')"

// The pulsar broker uses the expression passed by the client to filter the messages and returns the messages that meet the conditions to the client.
GroovyShell shell = new GroovyShell();
Script script = shell.parse("return " + groovyExpression);
List filtedMessages = messages.stream()
.filter(message -> {
binding.setVariable("it", message);
script.setBinding(binding);
return (Boolean) script.run();
})
.collect(Collectors.toList());
Comment on lines +114 to +134
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be terrible from security perspective. Something like Google CEL or Spring Expression Language (SpEL) would be a better choice.

```

### Usage Example

```java
public class ConsumerExample {
public static void main(String[] args) throws PulsarClientException {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.filterExpression(new FilterExpression("messageSource='wechat' AND messageType='audio'", FilterExpressionType.SQL92))
Copy link
Contributor

@liangyepianzhou liangyepianzhou May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work! I think this feature is indeed useful. However, I suggest that the filter expressions should be configured, modified, and deleted through the management/control plane (Pulsar Admin/CLI), rather than being set during consumer initialization. My reasoning is as follows:

  1. Practical Operational Considerations: When maintaining a messaging service, we typically provide users with a management platform to configure subscription settings. If filter conditions are set during consumer initialization, any configuration changes would require propagating updates to user clients and forcing client reinitialization to take effect. This prevents direct updates via admin operations.

  2. Service Stability: Broker-side configuration changes requiring client reinitialization could introduce consumption service instability.

  3. Subscription Consistency: A single subscription may have multiple consumer instances. Setting filter conditions at consumer initialization risks inconsistent filtering rules across consumers under the same subscription, potentially causing rule overwrites (e.g., Consumer A sets Filter X while Consumer B sets Filter Y, leading to conflicts).

Regarding point 3: This mirrors real-world issues observed in RocketMQ's message filtering. I've encountered multiple cases where tag-based filtering failed because different consumers in the same consumer group subscribed to conflicting tags, resulting in rule overrides.

Recommendation: Store filter rules as subscription metadata (managed via Admin API/CLI) rather than per-consumer settings. This would:

  • Ensure consistent filtering across all consumers in a subscription
  • Allow dynamic rule updates without client restarts
  • Prevent rule conflicts between consumers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a good idea to configure the filtering rules through admin API. This can avoid the problems you mentioned, which is also the current problem of RocketMQ. I will refactor it together after receiving more comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already possible to handle filtering rules through the Admin API. The pulsar-admin CLI tool has a plugin model that allows adding also new commands. This approach is used in Pulsar JMS: https://github.com/datastax/pulsar-jms/blob/master/pulsar-jms-admin-ext/src/main/java/com/datastax/oss/pulsar/jms/cli/SubscriptionBaseCommand.java (more context in my other comment, #24305 (comment))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Pulsar JMS, you could also provide the filter at a consumer level with consumer properties. If we were to add message filtering as a top level feature, it would be great to have full support in the clients too. In my other comment, I commented about this aspect

One of the possible compromises could be to make it an optional feature. In the Pulsar Java client API, we'd need to have a way to handle optional features in the builder interface itself so that the main builder interface for consumers doesn't get distracted by options that are only supported in certain configurations. One way to do this is with an "extensions" pattern. Where the builder would have a method extension(Class consumerExtensionBuilderClass) which continues configuring the extension in a "nested" builder.

//.filterExpression(new FilterExpression("it.properties.messageType.equals('audio') && it.properties.messageSource.equals('wechat')", FilterExpressionType.Groovy))
.subscribe();

do {
Message<Long> message = consumer.receive(); // just receive messages that meet the filter rules.
consumer.acknowledge(message);
} while (true);

}
}

```

## Public-facing Changes

### Public API

1. Add an optional config `filterExpression` in [ConsumerBuilder](https://github.com/apache/pulsar/blob/965ef5c14c93ca896ef4c8f34520066285fcf047/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java)

```java
ConsumerBuilder<T> filterExpression(FilterExpression filterExpression);

```

# Backward & Forward Compatibility

You can do upgrading or reverting normally, no specified steps are needed to do.

# Links

- Mailing List discussion thread:
- Mailing List voting thread:
Loading