From 53b1bd5c20a2e2e7eff6b5e83e24c280acfa9d79 Mon Sep 17 00:00:00 2001 From: coderzc Date: Sat, 17 Jan 2026 00:09:31 +0800 Subject: [PATCH 1/3] Implement PIP-452 --- .../pulsar/broker/ServiceConfiguration.java | 10 +- .../admin/impl/PersistentTopicsBase.java | 24 ++- .../broker/admin/v1/PersistentTopics.java | 2 +- .../broker/admin/v2/NonPersistentTopics.java | 3 +- .../broker/admin/v2/PersistentTopics.java | 31 ++- .../broker/intercept/BrokerInterceptor.java | 27 +++ .../BrokerInterceptorWithClassLoader.java | 20 ++ .../broker/intercept/BrokerInterceptors.java | 95 ++++++++- .../pulsar/broker/service/ServerCnx.java | 193 +++++++++++------- .../broker/admin/PersistentTopicsTest.java | 8 +- .../CustomizedTopicListingInterceptor.java | 173 ++++++++++++++++ .../GetTopicsOfNamespaceInterceptorTest.java | 143 +++++++++++++ .../pulsar/broker/service/ServerCnxTest.java | 10 +- .../client/admin/ListTopicsOptions.java | 6 + .../client/admin/internal/TopicsImpl.java | 20 ++ .../apache/pulsar/admin/cli/CmdTopics.java | 5 + .../client/impl/BinaryProtoLookupService.java | 17 +- .../apache/pulsar/client/impl/ClientCnx.java | 2 +- .../pulsar/client/impl/HttpLookupService.java | 3 +- ...ssDeduplicationDecoratorLookupService.java | 20 +- .../pulsar/client/impl/LookupService.java | 9 +- .../impl/PatternMultiTopicsConsumerImpl.java | 3 +- .../pulsar/client/impl/PulsarClientImpl.java | 2 +- .../pulsar/common/protocol/Commands.java | 6 +- pulsar-common/src/main/proto/PulsarApi.proto | 2 + .../proxy/server/LookupProxyHandler.java | 16 +- 26 files changed, 726 insertions(+), 124 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CustomizedTopicListingInterceptor.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/GetTopicsOfNamespaceInterceptorTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e5b7f0e458d23..5e3a25d42db3c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1257,6 +1257,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean enableBrokerSideSubscriptionPatternEvaluation = true; + @FieldContext( + dynamic = false, + category = CATEGORY_POLICIES, + doc = "Enables watching topic add/remove events on broker side for " + + "subscription pattern evaluation." + ) + private boolean enableBrokerTopicListWatcher = true; + @FieldContext( dynamic = false, category = CATEGORY_POLICIES, @@ -1599,7 +1607,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece category = CATEGORY_SERVER, doc = "List of broker interceptor to load, which is a list of broker interceptor names" ) - private Set brokerInterceptors = new TreeSet<>(); + private Set brokerInterceptors = new LinkedHashSet<>(); @FieldContext( category = CATEGORY_SERVER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 865212f48e59c..020886a3b2dd1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -75,6 +75,8 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.namespace.TopicListingResult; import org.apache.pulsar.broker.service.AnalyzeBacklogResult; import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; @@ -102,6 +104,7 @@ import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.EncryptionKeys; @@ -151,6 +154,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,7 +167,8 @@ public class PersistentTopicsBase extends AdminResource { private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v"; private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21); - protected CompletableFuture> internalGetListAsync(Optional bundle) { + protected CompletableFuture> internalGetListAsync(Optional bundle, + @Nullable Map properties) { return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS) .thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName)) .thenAccept(exists -> { @@ -171,7 +176,20 @@ protected CompletableFuture> internalGetListAsync(Optional throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); } }) - .thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName)) + .thenCompose(__ -> { + BrokerInterceptor brokerInterceptor = pulsar().getBrokerInterceptor(); + CompletableFuture> interceptorFuture = + brokerInterceptor == null ? CompletableFuture.completedFuture(null) : + brokerInterceptor.interceptGetTopicsOfNamespace(namespaceName, + CommandGetTopicsOfNamespace.Mode.PERSISTENT, Optional.empty(), properties); + return interceptorFuture.thenCompose(topicListingResult -> { + if (topicListingResult != null && topicListingResult.isPresent()) { + return CompletableFuture.completedFuture(topicListingResult.get().topics()); + } else { + return topicResources().listPersistentTopicsAsync(namespaceName); + } + }); + }) .thenApply(topics -> topics.stream() .filter(topic -> { @@ -4413,7 +4431,7 @@ private CompletableFuture topicNotFoundReasonAsync(TopicName topicName) { "Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType)); } }) - .thenCompose(__ -> internalGetListAsync(Optional.empty())) + .thenCompose(__ -> internalGetListAsync(Optional.empty(), null)) .thenApply(topics -> { if (!topics.contains(topicName.toString())) { throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 43224248fdca0..3ebf8f81f6ed1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -76,7 +76,7 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr @ApiParam(value = "Specify the bundle name", required = false) @QueryParam("bundle") String bundle) { validateNamespaceName(property, cluster, namespace); - internalGetListAsync(Optional.ofNullable(bundle)) + internalGetListAsync(Optional.ofNullable(bundle), null) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { if (!isRedirectException(ex)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index edf4303e1adef..9071884185d37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -381,7 +381,8 @@ public void getList( @ApiParam(value = "Specify the bundle name", required = false) @QueryParam("bundle") String nsBundle, @ApiParam(value = "Include system topic") - @QueryParam("includeSystemTopic") boolean includeSystemTopic) { + @QueryParam("includeSystemTopic") boolean includeSystemTopic, + @QueryParam("properties") String propertiesStr) { Policies policies = null; try { validateNamespaceName(tenant, namespace); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index edc3963b3695c..bebaeb949d2b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -25,6 +25,8 @@ import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -121,9 +123,11 @@ public void getList( @ApiParam(value = "Specify the bundle name", required = false) @QueryParam("bundle") String bundle, @ApiParam(value = "Include system topic") - @QueryParam("includeSystemTopic") boolean includeSystemTopic) { + @QueryParam("includeSystemTopic") boolean includeSystemTopic, + @ApiParam(value = "properties for customized topic listing intercept, format: k1=v1,k2=v2") + @QueryParam("properties") String propertiesStr) { validateNamespaceName(tenant, namespace); - internalGetListAsync(Optional.ofNullable(bundle)) + internalGetListAsync(Optional.ofNullable(bundle), parseProperties(propertiesStr)) .thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic))) .exceptionally(ex -> { if (isNot307And404Exception(ex)) { @@ -5131,5 +5135,28 @@ public void getMessageIDByIndex(@Suspended final AsyncResponse asyncResponse, }); } + private Map parseProperties(String propertiesStr) { + if (propertiesStr == null || propertiesStr.trim().isEmpty()) { + return Collections.emptyMap(); + } + Map map = new HashMap<>(); + String[] pairs = propertiesStr.split(","); + for (String pair : pairs) { + String[] parts = pair.split("=", 2); + if (parts.length == 2) { + try { + String key = Codec.decode(parts[0].trim()); + String value = Codec.decode(parts[1].trim()); + if (!key.isEmpty()) { + map.put(key, value); + } + } catch (Exception e) { + log.warn("Failed to decode property: {}", pair, e); + } + } + } + return map; + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java index 0ade5e0b91bb3..6d2243cab3d16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java @@ -21,12 +21,15 @@ import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import javax.servlet.FilterChain; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import org.apache.bookkeeper.mledger.Entry; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.namespace.TopicListingResult; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.ServerCnx; @@ -34,10 +37,12 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.naming.NamespaceName; /** * A plugin interface that allows you to intercept the @@ -224,6 +229,28 @@ default void onFilter(ServletRequest request, ServletResponse response, FilterCh chain.doFilter(request, response); } + /** + * Intercept the GetTopicsOfNamespace request. + *

+ * This method allows plugins to override the default topic discovery logic (ZooKeeper scan). + * It enables fetching topics from external sources (e.g., databases, other metadata stores) + * based on the provided client context properties. + * + * @param namespace The namespace being queried. + * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or ALL). + * @param topicsPattern Optional regex pattern provided by the client. + * @param properties Context properties provided by the client. + * @return A CompletableFuture containing the result: + * If the future completes with {@code Optional.empty()}, proceed to the next interceptor or Broker's default logic. + */ + default CompletableFuture> interceptGetTopicsOfNamespace( + NamespaceName namespace, + CommandGetTopicsOfNamespace.Mode mode, + Optional topicsPattern, + Map properties) { + return CompletableFuture.completedFuture(Optional.empty()); + } + /** * Initialize the broker interceptor. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java index 849f7aa39f0ef..c6613c9c610a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java @@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import javax.servlet.FilterChain; import javax.servlet.ServletException; import javax.servlet.ServletRequest; @@ -31,6 +33,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.namespace.TopicListingResult; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.ServerCnx; @@ -38,8 +41,10 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.nar.NarClassLoader; /** @@ -285,6 +290,21 @@ public void onFilter(ServletRequest request, ServletResponse response, FilterCha } } + @Override + public CompletableFuture> interceptGetTopicsOfNamespace( + NamespaceName namespace, + CommandGetTopicsOfNamespace.Mode mode, + Optional topicsPattern, + Map properties) { + final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(narClassLoader); + return this.interceptor.interceptGetTopicsOfNamespace(namespace, mode, topicsPattern, properties); + } finally { + Thread.currentThread().setContextClassLoader(previousContext); + } + } + @Override public void close() { final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java index 30d1874a97299..96cf56f00d838 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java @@ -19,10 +19,14 @@ package org.apache.pulsar.broker.intercept; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; @@ -30,6 +34,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.TopicListingResult; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.ServerCnx; @@ -37,8 +42,10 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.naming.NamespaceName; /** * A collection of broker interceptor. @@ -66,7 +73,8 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti BrokerInterceptorUtils.searchForInterceptors(conf.getBrokerInterceptorsDirectory(), conf.getNarExtractionDirectory()); - ImmutableMap.Builder builder = ImmutableMap.builder(); + // Use LinkedHashMap as a temporary container to ensure insertion order + Map orderedInterceptorMap = new LinkedHashMap<>(); conf.getBrokerInterceptors().forEach(interceptorName -> { @@ -80,7 +88,7 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti try { interceptor = BrokerInterceptorUtils.load(definition, conf.getNarExtractionDirectory()); if (interceptor != null) { - builder.put(interceptorName, interceptor); + orderedInterceptorMap.put(interceptorName, interceptor); } log.info("Successfully loaded broker interceptor for name `{}`", interceptorName); } catch (IOException e) { @@ -89,9 +97,8 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti } }); - Map interceptors = builder.build(); - if (!interceptors.isEmpty()) { - return new BrokerInterceptors(interceptors); + if (!orderedInterceptorMap.isEmpty()) { + return new BrokerInterceptors(Map.copyOf(orderedInterceptorMap)); } else { return null; } @@ -273,6 +280,82 @@ public void initialize(PulsarService pulsarService) throws Exception { } } + @Override + public CompletableFuture> interceptGetTopicsOfNamespace( + NamespaceName namespace, + CommandGetTopicsOfNamespace.Mode mode, + Optional topicsPattern, + Map properties) { + + if (!interceptorsEnabled()) { + // No interceptors configured, return PassThrough immediately + return CompletableFuture.completedFuture(Optional.empty()); + } + + CompletableFuture> resultFuture = new CompletableFuture<>(); + + List interceptorList = new ArrayList<>(interceptors.values()); + + // Start recursive processing + interceptRemainingGetTopicsOfNamespace(resultFuture, interceptorList, namespace, mode, topicsPattern, + properties, 0); + + return resultFuture; + } + + private void interceptRemainingGetTopicsOfNamespace( + CompletableFuture> resultFuture, + List list, + NamespaceName namespace, + CommandGetTopicsOfNamespace.Mode mode, + Optional topicsPattern, + Map properties, + int index) { + // Return PassThrough if no more interceptors to process + if (index >= list.size()) { + resultFuture.complete(Optional.empty()); + return; + } + + BrokerInterceptor interceptor = list.get(index); + + try { + CompletableFuture> future = interceptor + .interceptGetTopicsOfNamespace(namespace, mode, topicsPattern, properties); + + // Prevent the plugin from directly returning null, skip to the next interceptor + if (future == null) { + interceptRemainingGetTopicsOfNamespace(resultFuture, list, namespace, mode, topicsPattern, + properties, index + 1); + return; + } + + future.whenComplete((result, ex) -> { + if (ex != null) { + // Ignore the error and try the next one + log.warn("Interceptor {} failed for namespace {}: {}", interceptor.getClass().getName(), + namespace, ex.getMessage()); + interceptRemainingGetTopicsOfNamespace(resultFuture, list, namespace, mode, topicsPattern, + properties, index + 1); + } else if (result != null && result.isPresent()) { + // Interception successful (Found match) -> Complete Future and end recursion + resultFuture.complete(result); + } else { + // Interceptor selects PassThrough (result == null || result.isPassThrough) -> try the next one + interceptRemainingGetTopicsOfNamespace(resultFuture, list, namespace, mode, topicsPattern, + properties, index + 1); + } + }); + + } catch (Throwable t) { + // Ignore the error and try the next one + log.warn("Interceptor {} failed synchronously for namespace {}", interceptor.getClass().getName(), + namespace, t); + interceptRemainingGetTopicsOfNamespace(resultFuture, list, namespace, mode, topicsPattern, properties, + index + 1); + } + } + @Override public void close() { interceptors.values().forEach(BrokerInterceptorWithClassLoader::close); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 7ee003d7ace1f..4db2d54b864a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -91,6 +91,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.namespace.TopicListingResult; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -210,6 +211,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final ConcurrentLongHashMap> producers; private final ConcurrentLongHashMap> consumers; private final boolean enableSubscriptionPatternEvaluation; + private final boolean enableTopicListWatcher; private final int maxSubscriptionPatternLength; private final TopicListService topicListService; private final BrokerInterceptor brokerInterceptor; @@ -376,6 +378,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) { conf.getBrokerMaxConnectionsPerIp()); this.maxTopicListInFlightLimiter = pulsar.getBrokerService().getMaxTopicListInFlightLimiter(); this.enableSubscriptionPatternEvaluation = conf.isEnableBrokerSideSubscriptionPatternEvaluation(); + this.enableTopicListWatcher = conf.isEnableBrokerTopicListWatcher(); this.maxSubscriptionPatternLength = conf.getSubscriptionPatternMaxLength(); this.topicListService = new TopicListService(pulsar, this, enableSubscriptionPatternEvaluation, maxSubscriptionPatternLength); @@ -899,7 +902,7 @@ private void completeConnect(int clientProtoVersion, String clientVersion) { } maybeScheduleAuthenticationCredentialsRefresh(); } - writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize, enableSubscriptionPatternEvaluation)); + writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize, enableTopicListWatcher)); state = State.Connected; service.getPulsarStats().recordConnectionCreateSuccess(); if (log.isDebugEnabled()) { @@ -2556,13 +2559,17 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet final Optional topicsHash = Optional.ofNullable(commandGetTopicsOfNamespace.hasTopicsHash() ? commandGetTopicsOfNamespace.getTopicsHash() : null); final NamespaceName namespaceName = NamespaceName.get(namespace); + final Map properties = new HashMap<>(); + for (KeyValue keyValue : commandGetTopicsOfNamespace.getPropertiesList()) { + properties.put(keyValue.getKey(), keyValue.getValue()); + } final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> { if (isAuthorized) { internalHandleGetTopicsOfNamespace(namespace, namespaceName, requestId, mode, topicsPattern, - topicsHash, lookupSemaphore); + topicsHash, properties, lookupSemaphore); } else { final String msg = "Client is not authorized to GetTopicsOfNamespace"; log.warn("[{}] {} with role {} on namespace {}", remoteAddress, msg, getPrincipal(), namespaceName); @@ -2591,89 +2598,119 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet private void internalHandleGetTopicsOfNamespace(String namespace, NamespaceName namespaceName, long requestId, CommandGetTopicsOfNamespace.Mode mode, Optional topicsPattern, Optional topicsHash, + Map properties, Semaphore lookupSemaphore) { BooleanSupplier isPermitRequestCancelled = () -> !ctx().channel().isActive(); - TopicListSizeResultCache.ResultHolder - listSizeHolder = service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(), mode); + TopicListSizeResultCache.ResultHolder listSizeHolder = + service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(), mode); + listSizeHolder.getSizeAsync().thenAccept(initialSize -> { maxTopicListInFlightLimiter.withAcquiredPermits(initialSize, - AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled, initialPermits -> { - return getBrokerService().pulsar().getNamespaceService() + AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled, initialPermits -> { + BrokerInterceptor brokerInterceptor = getBrokerService().getPulsar().getBrokerInterceptor(); + CompletableFuture> interceptorFuture = + brokerInterceptor == null ? CompletableFuture.completedFuture(Optional.empty()) : + brokerInterceptor.interceptGetTopicsOfNamespace(namespaceName, mode, + topicsPattern, properties); + + if (interceptorFuture == null) { + interceptorFuture = CompletableFuture.completedFuture(null); + } + + return interceptorFuture.thenCompose(interceptorResult -> { + // 2. Decision branch: Use interceptor result OR fall back to default logic + if (interceptorResult != null && interceptorResult.isPresent()) { + // case A: The interceptor has taken over the request + return CompletableFuture.completedFuture(interceptorResult.get()); + } else { + // case B: The interceptor did not handle, so the original query logic is executed. + // Wrap List into TopicListingResult(topics, filtered=false) for unified processing + return getBrokerService().pulsar().getNamespaceService() .getListOfUserTopics(namespaceName, mode) - .thenCompose(topics -> { - long actualSize = TopicListMemoryLimiter.estimateTopicListSize(topics); - listSizeHolder.updateSize(actualSize); - return maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize, - isPermitRequestCancelled, permits -> { - boolean filterTopics = false; - // filter system topic - List filteredTopics = topics; - - if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) { - if (topicsPattern.get().length() <= maxSubscriptionPatternLength) { - filterTopics = true; - filteredTopics = TopicList.filterTopics(filteredTopics, - topicsPattern.get(), - topicsPatternImplementation); - } else { - log.info( - "[{}] Subscription pattern provided [{}] was longer " - + "than maximum {}.", remoteAddress, - topicsPattern.get(), - maxSubscriptionPatternLength); - } - } - String hash = TopicList.calculateHash(filteredTopics); - boolean hashUnchanged = - topicsHash.isPresent() && topicsHash.get().equals(hash); - if (hashUnchanged) { - filteredTopics = Collections.emptyList(); - } - if (log.isDebugEnabled()) { - log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace " - + "[//{}] by {}, size:{}", remoteAddress, namespace, - requestId, - topics.size()); - } - return commandSender.sendGetTopicsOfNamespaceResponse(filteredTopics, - hash, - filterTopics, !hashUnchanged, requestId, ex -> { - log.warn("[{}] Failed to acquire direct memory permits for " - + "GetTopicsOfNamespace: {}", remoteAddress, - ex.getMessage()); - commandSender.sendErrorResponse(requestId, - ServerError.TooManyRequests, - "Cannot acquire permits for direct memory"); - return CompletableFuture.completedFuture(null); - }); - }, t -> { - log.warn("[{}] Failed to acquire heap memory permits for " - + "GetTopicsOfNamespace: {}", remoteAddress, t.getMessage()); - writeAndFlush(Commands.newError(requestId, ServerError.TooManyRequests, - "Failed due to heap memory limit exceeded")); - return CompletableFuture.completedFuture(null); - }); - }).whenComplete((__, ___) -> { - lookupSemaphore.release(); - }).exceptionally(ex -> { - log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}", - remoteAddress, - namespace, requestId); - listSizeHolder.resetIfInitializing(); - commandSender.sendErrorResponse(requestId, - BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)), - ex.getMessage()); - return null; - }); - }, t -> { - log.warn("[{}] Failed to acquire initial heap memory permits for GetTopicsOfNamespace: {}", - remoteAddress, t.getMessage()); - listSizeHolder.resetIfInitializing(); - writeAndFlush(Commands.newError(requestId, ServerError.TooManyRequests, - "Failed due to heap memory limit exceeded")); + .thenApply(topics -> new TopicListingResult(topics, false)); + } + }).thenCompose(listingResult -> { + List rawTopics = listingResult.topics(); + boolean isAlreadyFiltered = listingResult.filtered(); + + // Adjust memory permits based on ACTUAL size + long actualSize = TopicListMemoryLimiter.estimateTopicListSize(rawTopics); + listSizeHolder.updateSize(actualSize); + + return maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize, + isPermitRequestCancelled, permits -> { + + List filteredTopics = rawTopics; + boolean hasAppliedFilter = isAlreadyFiltered; + + // Apply regular expression filtering + // (if the interceptor does not filter and regular expression matching is enabled) + if (!isAlreadyFiltered && enableSubscriptionPatternEvaluation + && topicsPattern.isPresent()) { + if (topicsPattern.get().length() <= maxSubscriptionPatternLength) { + hasAppliedFilter = true; + filteredTopics = TopicList.filterTopics(filteredTopics, topicsPattern.get(), + topicsPatternImplementation); + } else { + log.info("[{}] Subscription pattern provided [{}] was longer than maximum {}.", + remoteAddress, topicsPattern.get(), maxSubscriptionPatternLength); + } + } + + String hash = TopicList.calculateHash(filteredTopics); + boolean hashUnchanged = topicsHash.isPresent() && topicsHash.get().equals(hash); + if (hashUnchanged) { + filteredTopics = Collections.emptyList(); + } + + if (log.isDebugEnabled()) { + log.debug( + "[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", + remoteAddress, namespace, requestId, rawTopics.size()); + } + + return commandSender.sendGetTopicsOfNamespaceResponse(filteredTopics, + hash, + hasAppliedFilter, !hashUnchanged, requestId, ex -> { + log.warn("[{}] Failed to acquire direct memory permits for " + + "GetTopicsOfNamespace: {}", remoteAddress, + ex.getMessage()); + commandSender.sendErrorResponse(requestId, + ServerError.TooManyRequests, + "Cannot acquire permits for direct memory"); + return CompletableFuture.completedFuture(null); + }); + }, + // Handle logic for heap memory limit exceeded inside withUpdatedPermits + t -> { + log.warn("[{}] Failed to acquire heap memory permits for GetTopicsOfNamespace: {}", + remoteAddress, t.getMessage()); + writeAndFlush(Commands.newError(requestId, ServerError.TooManyRequests, + "Failed due to heap memory limit exceeded")); + return CompletableFuture.completedFuture(null); + }); + }).whenComplete((__, ___) -> { lookupSemaphore.release(); - return CompletableFuture.completedFuture(null); + }).exceptionally(ex -> { + log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}", + remoteAddress, namespace, requestId); + listSizeHolder.resetIfInitializing(); + commandSender.sendErrorResponse(requestId, + BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)), + ex.getMessage()); + return null; }); + }, + // Handle logic for initial acquire failure + t -> { + log.warn("[{}] Failed to acquire initial heap memory permits for GetTopicsOfNamespace: {}", + remoteAddress, t.getMessage()); + listSizeHolder.resetIfInitializing(); + writeAndFlush(Commands.newError(requestId, ServerError.TooManyRequests, + "Failed due to heap memory limit exceeded")); + lookupSemaphore.release(); + return CompletableFuture.completedFuture(null); + }); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 281c2ce52c166..090e34deecd44 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -853,14 +853,14 @@ public void testGetList() throws Exception { response = mock(AsyncResponse.class); ArgumentCaptor> listOfStringsCaptor = ArgumentCaptor.forClass(List.class); - persistentTopics.getList(response, testTenant, testNamespace, null, false); + persistentTopics.getList(response, testTenant, testNamespace, null, false, null); verify(response, timeout(5000).times(1)).resume(listOfStringsCaptor.capture()); List topics = listOfStringsCaptor.getValue(); Assert.assertEquals(topics.size(), 1); response = mock(AsyncResponse.class); listOfStringsCaptor = ArgumentCaptor.forClass(List.class); - persistentTopics.getList(response, testTenant, testNamespace, null, true); + persistentTopics.getList(response, testTenant, testNamespace, null, true, null); verify(response, timeout(5000).times(1)).resume(listOfStringsCaptor.capture()); topics = listOfStringsCaptor.getValue(); Assert.assertEquals(topics.size(), 2); @@ -879,14 +879,14 @@ public void testGetList() throws Exception { response = mock(AsyncResponse.class); listOfStringsCaptor = ArgumentCaptor.forClass(List.class); - nonPersistentTopic.getList(response, testTenant, testNamespace, null, false); + nonPersistentTopic.getList(response, testTenant, testNamespace, null, false, null); verify(response, timeout(5000).times(1)).resume(listOfStringsCaptor.capture()); topics = listOfStringsCaptor.getValue(); Assert.assertEquals(topics.size(), 1); response = mock(AsyncResponse.class); listOfStringsCaptor = ArgumentCaptor.forClass(List.class); - nonPersistentTopic.getList(response, testTenant, testNamespace, null, true); + nonPersistentTopic.getList(response, testTenant, testNamespace, null, true, null); verify(response, timeout(5000).times(1)).resume(listOfStringsCaptor.capture()); topics = listOfStringsCaptor.getValue(); Assert.assertEquals(topics.size(), 2); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CustomizedTopicListingInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CustomizedTopicListingInterceptor.java new file mode 100644 index 0000000000000..fed4135e51ad0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CustomizedTopicListingInterceptor.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.intercept; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.TopicListingResult; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; +import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.topics.TopicsPattern; + +@Slf4j +public class CustomizedTopicListingInterceptor implements BrokerInterceptor { + + private PulsarService pulsar; + + // Map>> + private Map>> customTopicPropertiesMap; + private TopicsPattern.RegexImplementation topicsPatternImplementation; + private boolean enableSubscriptionPatternEvaluation; + private int maxSubscriptionPatternLength; + private boolean enableTopicListWatcher; + + + public void setCustomProperties(TopicName topicName, Map properties) + throws ExecutionException, InterruptedException, TimeoutException { + int timeoutSeconds = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); + PartitionedTopicMetadata partitionedTopicMetadata = + pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName) + .get(timeoutSeconds, TimeUnit.SECONDS); + if (partitionedTopicMetadata.partitions == 0) { + setNonPartitionedTopicCustomProperties(topicName, properties); + } else { + for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { + TopicName partitionedTopic = topicName.getPartition(i); + setNonPartitionedTopicCustomProperties(partitionedTopic, properties); + } + } + } + + private void setNonPartitionedTopicCustomProperties(TopicName topicName, Map properties) { + this.customTopicPropertiesMap.compute(topicName.getNamespace(), (ns, topicMap) -> { + if (topicMap == null) { + topicMap = new ConcurrentHashMap<>(); + } + topicMap.put(topicName.toString(), properties); + return topicMap; + }); + } + + public List queryTopicListByProperties(String namespace, Map properties) { + List result = new ArrayList<>(); + Map> topicMap = this.customTopicPropertiesMap.get(namespace); + if (topicMap != null) { + for (Map.Entry> entry : topicMap.entrySet()) { + String topic = entry.getKey(); + Map topicProperties = entry.getValue(); + boolean match = true; + for (Map.Entry propEntry : properties.entrySet()) { + String key = propEntry.getKey(); + String value = propEntry.getValue(); + if (!value.equals(topicProperties.get(key))) { + match = false; + break; + } + } + if (match) { + result.add(topic); + } + } + } + return result; + } + + @Override + public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException { + + } + + @Override + public void onConnectionClosed(ServerCnx cnx) { + + } + + @Override + public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException { + + } + + @Override + public void onWebserviceResponse(ServletRequest request, ServletResponse response) + throws IOException, ServletException { + + } + + @Override + public void initialize(PulsarService pulsarService) throws Exception { + this.pulsar = pulsarService; + this.customTopicPropertiesMap = new ConcurrentHashMap<>(); + ServiceConfiguration config = pulsarService.getConfig(); + this.topicsPatternImplementation = config.getTopicsPatternRegexImplementation(); + this.enableSubscriptionPatternEvaluation = config.isEnableBrokerSideSubscriptionPatternEvaluation(); + this.maxSubscriptionPatternLength = config.getSubscriptionPatternMaxLength(); + this.enableTopicListWatcher = config.isEnableBrokerTopicListWatcher(); + } + + @Override + public CompletableFuture> interceptGetTopicsOfNamespace(NamespaceName namespace, + CommandGetTopicsOfNamespace.Mode mode, + Optional topicsPattern, + Map properties) { + if (enableTopicListWatcher) { + return CompletableFuture.failedFuture(new UnsupportedOperationException( + "CustomizedTopicListingInterceptor does not support topic list watcher feature.")); + } + + List list = queryTopicListByProperties(namespace.toString(), properties); + List filteredTopics = TopicList.filterSystemTopic(list); + // Pattern Filtering (if applicable) + if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) { + if (topicsPattern.get().length() <= maxSubscriptionPatternLength) { + filteredTopics = TopicList.filterTopics(filteredTopics, topicsPattern.get(), + topicsPatternImplementation); + } else { + log.info("[{}] Subscription pattern provided [{}] was longer than maximum {}.", + namespace, topicsPattern.get(), maxSubscriptionPatternLength); + } + } + log.info("[{}] Topic list intercepted [{}]", namespace, filteredTopics); + return CompletableFuture.completedFuture(Optional.of(new TopicListingResult(filteredTopics, true))); + } + + @Override + public void close() { + + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/GetTopicsOfNamespaceInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/GetTopicsOfNamespaceInterceptorTest.java new file mode 100644 index 0000000000000..15cbae3780e6b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/GetTopicsOfNamespaceInterceptorTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.intercept; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import lombok.Cleanup; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.admin.ListTopicsOptions; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class GetTopicsOfNamespaceInterceptorTest extends ProducerConsumerBase { + private final String interceptorName = "get_topics_of_namespace_interceptor"; + + private CustomizedTopicListingInterceptor topicListingInterceptor; + + @BeforeMethod + public void setup() throws Exception { + isTcpLookup = true; + conf.setSystemTopicEnabled(false); + conf.setTopicLevelPoliciesEnabled(false); + conf.setEnableBrokerSideSubscriptionPatternEvaluation(true); + conf.setEnableBrokerTopicListWatcher(false); + + this.enableBrokerInterceptor = true; + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + Map interceptorMap = new HashMap<>(); + BrokerInterceptor interceptor = new CustomizedTopicListingInterceptor(); + this.topicListingInterceptor = (CustomizedTopicListingInterceptor) interceptor; + NarClassLoader narClassLoader = mock(NarClassLoader.class); + interceptorMap.put(interceptorName, new BrokerInterceptorWithClassLoader(interceptor, narClassLoader)); + pulsarTestContextBuilder.brokerInterceptor(new BrokerInterceptors(interceptorMap)); + } + + @Test + public void test() throws Exception { + String namespace = "public/default"; + String subName = "test-sub"; + + Map expectedProperties = + Map.of("env", "prod", "region", "us-west"); + + String topicName = "persistent://" + namespace + "/test-topic"; + admin.topics().createPartitionedTopic(topicName, 3); + String topicName2 = "persistent://" + namespace + "/test-topic2"; + admin.topics().createPartitionedTopic(topicName2, 3); + String topicName3 = "persistent://" + namespace + "/test-topic3-non-partitioned"; + + String topicName4 = "persistent://" + namespace + "/test-topic4"; + admin.topics().createPartitionedTopic(topicName4, 3); + + BrokerInterceptors listener = (BrokerInterceptors) pulsar.getBrokerInterceptor(); + assertNotNull(listener); + BrokerInterceptorWithClassLoader brokerInterceptor = listener.getInterceptors().get(interceptorName); + assertNotNull(brokerInterceptor); + BrokerInterceptor interceptor = brokerInterceptor.getInterceptor(); + assertTrue(interceptor instanceof CustomizedTopicListingInterceptor); + + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topicName3).create(); + for (int i = 0; i < 10; i++) { + producer.send(("msg-" + i).getBytes(StandardCharsets.UTF_8)); + } + + topicListingInterceptor.setCustomProperties(TopicName.get(topicName), expectedProperties); + topicListingInterceptor.setCustomProperties(TopicName.get(topicName2), expectedProperties); + topicListingInterceptor.setCustomProperties(TopicName.get(topicName3), expectedProperties); + + admin.topics().updateProperties(topicName4, Map.of("env", "test", "region", "us-west")); + + Map propsFromClient = new HashMap<>(expectedProperties); + + Set expectedTopics = new HashSet<>(); + for (int i = 0; i < 3; i++) { + expectedTopics.add(topicName + "-partition-" + i); + } + for (int i = 0; i < 3; i++) { + expectedTopics.add(topicName2 + "-partition-" + i); + } + expectedTopics.add(topicName3); + + Awaitility.await().untilAsserted(() -> { + List list = admin.topics().getList("public/default", TopicDomain.persistent, + ListTopicsOptions.builder().properties(propsFromClient).build()); + Assert.assertEquals(new HashSet<>(list), expectedTopics); + }); + + @Cleanup + MultiTopicsConsumerImpl consumer = (MultiTopicsConsumerImpl) pulsarClient + .newConsumer() + .topicsPattern("persistent://public/default/.*") + .subscriptionName(subName) + .properties(propsFromClient) + .subscribe(); + + Set actualTopics = new HashSet<>(consumer.getPartitions()); + Assert.assertEquals(actualTopics, expectedTopics); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index a19bc01757427..e87cede6b67f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -3160,7 +3160,7 @@ public void testGetTopicsOfNamespace() throws Exception { resetChannel(); setChannelConnected(); ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest( - "use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, null, null); + "use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, null, null, null); channel.writeInbound(clientCommand); CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse(); @@ -3179,7 +3179,7 @@ public void testGetTopicsOfNamespaceDisabledFiltering() throws Exception { setChannelConnected(); ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest( "use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, - "use/ns-abc/topic-.*", null); + "use/ns-abc/topic-.*", null, null); channel.writeInbound(clientCommand); CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse(); @@ -3199,7 +3199,7 @@ public void testGetTopicsOfNamespaceLongPattern() throws Exception { setChannelConnected(); ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest( "use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, - "use/ns-abc/(t|o|to|p|i|c)+-?)+!", null); + "use/ns-abc/(t|o|to|p|i|c)+-?)+!", null, null); channel.writeInbound(clientCommand); CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse(); @@ -3218,7 +3218,7 @@ public void testGetTopicsOfNamespaceFiltering() throws Exception { setChannelConnected(); ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest( "use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, - "use/ns-abc/topic-.*", "SOME_HASH"); + "use/ns-abc/topic-.*", "SOME_HASH", null); channel.writeInbound(clientCommand); CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse(); @@ -3237,7 +3237,7 @@ public void testGetTopicsOfNamespaceNoChange() throws Exception { setChannelConnected(); ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest( "use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, - "use/ns-abc/topic-.*", TopicList.calculateHash(matchingTopics)); + "use/ns-abc/topic-.*", TopicList.calculateHash(matchingTopics), null); channel.writeInbound(clientCommand); CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListTopicsOptions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListTopicsOptions.java index 9a5e77e1ed997..eceaea7317b0c 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListTopicsOptions.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ListTopicsOptions.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.admin; +import java.util.Map; import lombok.Builder; import lombok.Data; @@ -37,4 +38,9 @@ public class ListTopicsOptions { */ private final boolean includeSystemTopic; + /** + * Additional properties for listing topics. + */ + private final Map properties; + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 11dd69a23ce58..5be793c5c52f8 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -44,6 +44,7 @@ import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -195,9 +196,14 @@ public CompletableFuture> getListAsync(String namespace, TopicDomai NamespaceName ns = NamespaceName.get(namespace); WebTarget persistentPath = namespacePath("persistent", ns); WebTarget nonPersistentPath = namespacePath("non-persistent", ns); + persistentPath = persistentPath .queryParam("bundle", options.getBundle()) .queryParam("includeSystemTopic", options.isIncludeSystemTopic()); + String encodedPropertiesString = toEncodedPropertiesString(options.getProperties()); + if (StringUtils.isNotBlank(encodedPropertiesString)) { + persistentPath = persistentPath.queryParam("properties", encodedPropertiesString); + } nonPersistentPath = nonPersistentPath .queryParam("bundle", options.getBundle()) .queryParam("includeSystemTopic", options.isIncludeSystemTopic()); @@ -2845,5 +2851,19 @@ public void failed(Throwable throwable) { return messageIdCompletableFuture; } + public static String toEncodedPropertiesString(Map properties) { + if (properties == null || properties.isEmpty()) { + return null; + } + + return properties.entrySet().stream() + .map(entry -> { + String encodedKey = Codec.encode(entry.getKey()); + String encodedValue = Codec.encode(entry.getValue()); + return encodedKey + "=" + encodedValue; + }) + .collect(Collectors.joining(",")); + } + private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index e29a4dcd7a859..d9317d622080f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -296,12 +296,17 @@ private class ListCmd extends CliCommand { "--include-system-topic" }, description = "Include system topic") private boolean includeSystemTopic; + @Option(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)", + required = false) + private Map properties; + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(namespaceName); ListTopicsOptions options = ListTopicsOptions.builder() .bundle(bundle) .includeSystemTopic(includeSystemTopic) + .properties(properties) .build(); print(getTopics().getList(namespace, topicDomain, options)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 674fb40793ff6..1f66e65179a5a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -49,6 +49,7 @@ import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -351,9 +352,10 @@ public InetSocketAddress resolveHost() { @Override public CompletableFuture getTopicsUnderNamespace(NamespaceName namespace, - Mode mode, - String topicsPattern, - String topicsHash) { + Mode mode, + String topicsPattern, + String topicsHash, + @Nullable Map properties) { CompletableFuture topicsFuture = new CompletableFuture<>(); AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); Backoff backoff = new BackoffBuilder() @@ -362,7 +364,7 @@ public CompletableFuture getTopicsUnderNamespace(NamespaceName .setMax(1, TimeUnit.MINUTES) .create(); getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, mode, - topicsPattern, topicsHash); + topicsPattern, topicsHash, properties); return topicsFuture; } @@ -378,13 +380,14 @@ private void getTopicsUnderNamespace( CompletableFuture getTopicsResultFuture, Mode mode, String topicsPattern, - String topicsHash) { + String topicsHash, + @Nullable Map properties) { long startTime = System.nanoTime(); client.getCnxPool().getConnection(serviceNameResolver).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetTopicsOfNamespaceRequest( - namespace.toString(), requestId, mode, topicsPattern, topicsHash); + namespace.toString(), requestId, mode, topicsPattern, topicsHash, properties); clientCnx.newGetTopicsOfNamespace(request, requestId).whenComplete((r, t) -> { if (t != null) { @@ -415,7 +418,7 @@ private void getTopicsUnderNamespace( + " {} ms", namespace, nextDelay); remainingTime.addAndGet(-nextDelay); getTopicsUnderNamespace(namespace, backoff, remainingTime, getTopicsResultFuture, - mode, topicsPattern, topicsHash); + mode, topicsPattern, topicsHash, properties); }, nextDelay, TimeUnit.MILLISECONDS); return null; }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 184b210e2b0cd..434bf435eea0c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1218,7 +1218,7 @@ public CompletableFuture newWatchTopicList( if (!supportsTopicWatchers) { return FutureUtil.failedFuture( new PulsarClientException.NotAllowedException( - "Broker does not allow broker side pattern evaluation.")); + "Broker does not support topic list watchers.")); } return sendRequestAndHandleTimeout(Commands.serializeWithSize(commandWatchTopicList), requestId, RequestType.Command, true); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 9044088f7249d..c364a79453765 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -190,7 +190,8 @@ public InetSocketAddress resolveHost() { @Override public CompletableFuture getTopicsUnderNamespace(NamespaceName namespace, Mode mode, - String topicsPattern, String topicsHash) { + String topicsPattern, String topicsHash, + Map properties) { long startTime = System.nanoTime(); CompletableFuture future = new CompletableFuture<>(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java index 8cf95b6263469..2c00d4f7d02f4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java @@ -105,10 +105,11 @@ public CompletableFuture> getSchema(TopicName topicName, by @Override public CompletableFuture getTopicsUnderNamespace(NamespaceName namespace, Mode mode, - String topicPattern, String topicsHash) { + String topicPattern, String topicsHash, + Map properties) { return topicsUnderNamespaceInProgress.getOrComputeIfAbsent( - new TopicsUnderNamespaceKey(namespace, mode, topicPattern, topicsHash), - () -> delegate.getTopicsUnderNamespace(namespace, mode, topicPattern, topicsHash)); + new TopicsUnderNamespaceKey(namespace, mode, topicPattern, topicsHash, properties), + () -> delegate.getTopicsUnderNamespace(namespace, mode, topicPattern, topicsHash, properties)); } @Override @@ -202,12 +203,16 @@ private static final class TopicsUnderNamespaceKey { private final Mode mode; private final String topicsPattern; private final String topicsHash; + private final Map properties; - TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode, String topicsPattern, String topicsHash) { + TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode, String topicsPattern, + String topicsHash, Map properties) { this.namespace = namespace; this.mode = mode; this.topicsPattern = topicsPattern; this.topicsHash = topicsHash; + this.properties = properties != null && !properties.isEmpty() + ? Map.copyOf(properties) : Collections.emptyMap(); } @Override @@ -220,18 +225,19 @@ public boolean equals(Object o) { } TopicsUnderNamespaceKey that = (TopicsUnderNamespaceKey) o; return Objects.equals(namespace, that.namespace) && mode == that.mode && Objects.equals(topicsPattern, - that.topicsPattern) && Objects.equals(topicsHash, that.topicsHash); + that.topicsPattern) && Objects.equals(topicsHash, that.topicsHash) && properties.equals( + that.properties); } @Override public int hashCode() { - return Objects.hash(namespace, mode, topicsPattern, topicsHash); + return Objects.hash(namespace, mode, topicsPattern, topicsHash, properties); } @Override public String toString() { return "TopicsUnderNamespaceKey{" + "namespace=" + namespace + ", mode=" + mode + ", topicsPattern='" - + topicsPattern + '\'' + ", topicsHash='" + topicsHash + '\'' + '}'; + + topicsPattern + '\'' + ", topicsHash='" + topicsHash + '\'' + ", properties=" + properties + '}'; } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 67cfe449ec920..256ba2549272e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -29,6 +29,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.schema.SchemaInfo; +import org.jspecify.annotations.Nullable; /** * Provides lookup service to find broker which serves given topic. It helps to @@ -159,8 +160,14 @@ default CompletableFuture> getSchema(TopicName topicName) { * @param namespace : namespace-name * @return */ + default CompletableFuture getTopicsUnderNamespace(NamespaceName namespace, Mode mode, + String topicPattern, String topicsHash) { + return getTopicsUnderNamespace(namespace, mode, topicPattern, topicsHash, null); + } + CompletableFuture getTopicsUnderNamespace(NamespaceName namespace, Mode mode, - String topicPattern, String topicsHash); + String topicPattern, String topicsHash, + @Nullable Map properties); /** * Returns true if the lookup service is a binary protocol lookup service. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 8a3798a670cb5..7623ff5aefc9c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -138,7 +138,8 @@ public void run(Timeout timeout) throws Exception { CompletableFuture recheckTopicsChange() { String pattern = topicsPattern.inputPattern(); final int epoch = recheckPatternEpoch.incrementAndGet(); - return client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode, pattern, topicsHash) + return client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode, + pattern, topicsHash, conf.getProperties()) .thenCompose(getTopicsResult -> { // If "recheckTopicsChange" has been called more than one times, only make the last one take affects. // Use "synchronized (recheckPatternTaskBackoff)" instead of diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 605757c230c9f..848fa8713c51c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -672,7 +672,7 @@ private CompletableFuture> patternTopicSubscribeAsync(ConsumerCo TopicsPattern pattern = TopicsPatternFactory.create(conf.getTopicsPattern()); CompletableFuture> consumerSubscribedFuture = new CompletableFuture<>(); - lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, null) + lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, null, conf.getProperties()) .thenAccept(getTopicsResult -> { if (log.isDebugEnabled()) { log.debug("Pattern consumer [{}] get topics under namespace {}, topics.size: {}," diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index a6796387f9147..2f5f83f6f3317 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1219,7 +1219,8 @@ public static BaseCommand newConsumerStatsResponseCommand(ServerError serverErro } public static ByteBuf newGetTopicsOfNamespaceRequest(String namespace, long requestId, Mode mode, - String topicsPattern, String topicsHash) { + String topicsPattern, String topicsHash, + Map properties) { BaseCommand cmd = localCmd(Type.GET_TOPICS_OF_NAMESPACE); CommandGetTopicsOfNamespace topics = cmd.setGetTopicsOfNamespace(); topics.setNamespace(namespace); @@ -1231,6 +1232,9 @@ public static ByteBuf newGetTopicsOfNamespaceRequest(String namespace, long requ if (topicsHash != null) { topics.setTopicsHash(topicsHash); } + if (properties != null) { + properties.forEach((key, value) -> topics.addProperty().setKey(key).setValue(value)); + } return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 1bfcf16beb5c1..c9dd7b0eef97a 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -790,6 +790,8 @@ message CommandGetTopicsOfNamespace { optional Mode mode = 3 [default = PERSISTENT]; optional string topics_pattern = 4; optional string topics_hash = 5; + // Context properties from the client + repeated KeyValue properties = 6; } message CommandGetTopicsOfNamespaceResponse { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index b0bd26bbcdfcd..eca9da17c951f 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -26,6 +26,9 @@ import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; @@ -42,6 +45,7 @@ import org.apache.pulsar.common.api.proto.CommandLookupTopic; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; +import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.naming.TopicName; @@ -316,7 +320,8 @@ private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTo String topicsHash = commandGetTopicsOfNamespace.hasTopicsHash() ? commandGetTopicsOfNamespace.getTopicsHash() : null; performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, - 10, topicsPattern, topicsHash, commandGetTopicsOfNamespace.getMode()); + 10, topicsPattern, topicsHash, commandGetTopicsOfNamespace.getMode(), + commandGetTopicsOfNamespace.getPropertiesList()); } private void performGetTopicsOfNamespace(long clientRequestId, @@ -325,7 +330,8 @@ private void performGetTopicsOfNamespace(long clientRequestId, int numberOfRetries, String topicsPattern, String topicsHash, - CommandGetTopicsOfNamespace.Mode mode) { + CommandGetTopicsOfNamespace.Mode mode, + List properties) { if (numberOfRetries == 0) { writeAndFlush(Commands.newError(clientRequestId, ServerError.ServiceNotReady, "Reached max number of redirections")); @@ -346,8 +352,12 @@ private void performGetTopicsOfNamespace(long clientRequestId, // Connected to backend broker long requestId = proxyConnection.newRequestId(); ByteBuf command; + Map propertiesMap = new HashMap<>(); + for (KeyValue kv : properties) { + propertiesMap.put(kv.getKey(), kv.getValue()); + } command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId, mode, - topicsPattern, topicsHash); + topicsPattern, topicsHash, propertiesMap); internalPerformGetTopicsOfNamespace(clientRequestId, namespaceName, mode, clientCnx, command, requestId); proxyConnection.getConnectionPool().releaseConnection(clientCnx); From cd33c386e3da656038c646a0264eb0bbde074fbe Mon Sep 17 00:00:00 2001 From: coderzc Date: Sun, 18 Jan 2026 20:37:17 +0800 Subject: [PATCH 2/3] Add TopicListingResult class --- .../broker/namespace/TopicListingResult.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicListingResult.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicListingResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicListingResult.java new file mode 100644 index 0000000000000..ed79c1de5fe36 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicListingResult.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.namespace; + +import java.util.List; +import java.util.Objects; + +public record TopicListingResult (List topics, boolean filtered) { + + public TopicListingResult{ + Objects.requireNonNull(topics, "topics"); + } +} From 06c7471cc975168802fa616d04dd8eb16d570c7c Mon Sep 17 00:00:00 2001 From: coderzc Date: Sat, 24 Jan 2026 23:28:53 +0800 Subject: [PATCH 3/3] fix test --- .../org/apache/pulsar/client/impl/PulsarClientImplTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index 82b1b7263b79a..72c942eaf0c5b 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; import java.util.regex.Pattern; @@ -105,7 +106,8 @@ public void testConsumerIsClosed() throws Exception { any(NamespaceName.class), any(CommandGetTopicsOfNamespace.Mode.class), nullable(String.class), - nullable(String.class))) + nullable(String.class), + nullable(Map.class))) .thenReturn(CompletableFuture.completedFuture( new GetTopicsResult(Collections.emptyList(), null, false, true))); when(lookup.getPartitionedTopicMetadata(any(TopicName.class), anyBoolean(), anyBoolean()))