Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private boolean enableBrokerSideSubscriptionPatternEvaluation = true;

@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
doc = "Enables watching topic add/remove events on broker side for "
+ "subscription pattern evaluation."
)
private boolean enableBrokerTopicListWatcher = true;

@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
Expand Down Expand Up @@ -1599,7 +1607,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
category = CATEGORY_SERVER,
doc = "List of broker interceptor to load, which is a list of broker interceptor names"
)
private Set<String> brokerInterceptors = new TreeSet<>();
private Set<String> brokerInterceptors = new LinkedHashSet<>();

@FieldContext(
category = CATEGORY_SERVER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.TopicListingResult;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
Expand Down Expand Up @@ -102,6 +104,7 @@
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.EncryptionKeys;
Expand Down Expand Up @@ -151,6 +154,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -163,15 +167,29 @@ public class PersistentTopicsBase extends AdminResource {
private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);

protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle) {
protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle,
@Nullable Map<String, String> properties) {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
.thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
.thenAccept(exists -> {
if (!exists) {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
})
.thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName))
.thenCompose(__ -> {
BrokerInterceptor brokerInterceptor = pulsar().getBrokerInterceptor();
CompletableFuture<Optional<TopicListingResult>> interceptorFuture =
brokerInterceptor == null ? CompletableFuture.completedFuture(null) :
brokerInterceptor.interceptGetTopicsOfNamespace(namespaceName,
CommandGetTopicsOfNamespace.Mode.PERSISTENT, Optional.empty(), properties);
return interceptorFuture.thenCompose(topicListingResult -> {
if (topicListingResult != null && topicListingResult.isPresent()) {
return CompletableFuture.completedFuture(topicListingResult.get().topics());
} else {
return topicResources().listPersistentTopicsAsync(namespaceName);
}
});
})
.thenApply(topics ->
topics.stream()
.filter(topic -> {
Expand Down Expand Up @@ -4413,7 +4431,7 @@ private CompletableFuture<Topic> topicNotFoundReasonAsync(TopicName topicName) {
"Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
}
})
.thenCompose(__ -> internalGetListAsync(Optional.empty()))
.thenCompose(__ -> internalGetListAsync(Optional.empty(), null))
.thenApply(topics -> {
if (!topics.contains(topicName.toString())) {
throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String bundle) {
validateNamespaceName(property, cluster, namespace);
internalGetListAsync(Optional.ofNullable(bundle))
internalGetListAsync(Optional.ofNullable(bundle), null)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ public void getList(
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String nsBundle,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
@QueryParam("includeSystemTopic") boolean includeSystemTopic,
@QueryParam("properties") String propertiesStr) {
Policies policies = null;
try {
validateNamespaceName(tenant, namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -121,9 +123,11 @@ public void getList(
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String bundle,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
@QueryParam("includeSystemTopic") boolean includeSystemTopic,
@ApiParam(value = "properties for customized topic listing intercept, format: k1=v1,k2=v2")
@QueryParam("properties") String propertiesStr) {
validateNamespaceName(tenant, namespace);
internalGetListAsync(Optional.ofNullable(bundle))
internalGetListAsync(Optional.ofNullable(bundle), parseProperties(propertiesStr))
.thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic)))
.exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
Expand Down Expand Up @@ -5131,5 +5135,28 @@ public void getMessageIDByIndex(@Suspended final AsyncResponse asyncResponse,
});
}

private Map<String, String> parseProperties(String propertiesStr) {
if (propertiesStr == null || propertiesStr.trim().isEmpty()) {
return Collections.emptyMap();
}
Map<String, String> map = new HashMap<>();
String[] pairs = propertiesStr.split(",");
for (String pair : pairs) {
String[] parts = pair.split("=", 2);
if (parts.length == 2) {
try {
String key = Codec.decode(parts[0].trim());
String value = Codec.decode(parts[1].trim());
if (!key.isEmpty()) {
map.put(key, value);
}
} catch (Exception e) {
log.warn("Failed to decode property: {}", pair, e);
}
}
}
return map;
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,28 @@
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.TopicListingResult;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.NamespaceName;

/**
* A plugin interface that allows you to intercept the
Expand Down Expand Up @@ -224,6 +229,28 @@ default void onFilter(ServletRequest request, ServletResponse response, FilterCh
chain.doFilter(request, response);
}

/**
* Intercept the GetTopicsOfNamespace request.
* <p>
* This method allows plugins to override the default topic discovery logic (ZooKeeper scan).
* It enables fetching topics from external sources (e.g., databases, other metadata stores)
* based on the provided client context properties.
*
* @param namespace The namespace being queried.
* @param mode The query mode (PERSISTENT, NON_PERSISTENT, or ALL).
* @param topicsPattern Optional regex pattern provided by the client.
* @param properties Context properties provided by the client.
* @return A CompletableFuture containing the result:
* If the future completes with {@code Optional.empty()}, proceed to the next interceptor or Broker's default logic.
*/
default CompletableFuture<Optional<TopicListingResult>> interceptGetTopicsOfNamespace(
NamespaceName namespace,
CommandGetTopicsOfNamespace.Mode mode,
Optional<String> topicsPattern,
Map<String, String> properties) {
return CompletableFuture.completedFuture(Optional.empty());
}

/**
* Initialize the broker interceptor.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
Expand All @@ -31,15 +33,18 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.TopicListingResult;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.nar.NarClassLoader;

/**
Expand Down Expand Up @@ -285,6 +290,21 @@ public void onFilter(ServletRequest request, ServletResponse response, FilterCha
}
}

@Override
public CompletableFuture<Optional<TopicListingResult>> interceptGetTopicsOfNamespace(
NamespaceName namespace,
CommandGetTopicsOfNamespace.Mode mode,
Optional<String> topicsPattern,
Map<String, String> properties) {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
return this.interceptor.interceptGetTopicsOfNamespace(namespace, mode, topicsPattern, properties);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void close() {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
Expand Down
Loading
Loading