diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 45455f16d4dc1..c83628a411feb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -486,6 +486,7 @@ protected CompletableFuture getPartitionedTopicMetadat return validateTopicOperationAsync(topicName, TopicOperation.LOOKUP) .thenCompose(__ -> validateClusterOwnershipAsync(topicName.getCluster())) .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject())) + .thenCompose(__ -> validateNamespaceExists(topicName.getNamespaceObject())) .thenCompose(__ -> { if (checkAllowAutoCreation) { return pulsar().getBrokerService() @@ -506,6 +507,18 @@ protected void validateClusterExists(String cluster) { } } + protected CompletableFuture validateNamespaceExists(NamespaceName namespace) { + try { + if (namespaceResources().namespaceExists(namespace)) { + return CompletableFuture.completedFuture(null); + } else { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, "Namespace does not exist")); + } + } catch (Exception e) { + return FutureUtil.failedFuture(e); + } + } + protected Policies getNamespacePolicies(String tenant, String cluster, String namespace) { NamespaceName ns = NamespaceName.get(tenant, cluster, namespace); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 512a5cfcab661..0032a050564a5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -74,6 +74,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -245,7 +246,7 @@ public void testConsumerStatsOutput() throws Exception { "connectedSince", "clientVersion"); - final String topicName = "persistent://prop/use/ns-abc/testConsumerStatsOutput"; + final String topicName = "persistent://my-property/my-ns/testConsumerStatsOutput"; final String subName = "my-subscription"; Consumer consumer = pulsarClient.newConsumer() @@ -269,6 +270,30 @@ public void testConsumerStatsOutput() throws Exception { consumer.close(); } + @DataProvider(name = "invalidTopicName") + public static Object[][] invalidTopicName() { + // some topic names in non-exist namespace + return new Object[][]{ + {"persistent://prop/use/ns-abc/testNonExistNamespace"}, + {"persistent://prop/ns-abc/testNonExistNamespace"} + }; + } + + @Test(dataProvider = "invalidTopicName") + public void testNonExistNamespace(String invalidTopicName) throws Exception { + final String subName = "my-subscription"; + try { + Consumer consumer = pulsarClient.newConsumer() + .topic(invalidTopicName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName(subName) + .subscribe(); + Assert.fail("should have failed"); + } catch (Exception e) { + // ok + } + } + @Test public void testPersistentTopicMessageAckRateMetricTopicLevel() throws Exception {