From 22f227b9e0d5dcc74aded829e472313561ca9b97 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sun, 11 May 2025 16:28:29 +0800 Subject: [PATCH 1/7] Remove BookKeeper client from ManagedLedgerStorage --- .../broker/ManagedLedgerClientFactory.java | 15 ++++++++++++--- .../org/apache/pulsar/broker/PulsarService.java | 11 +++++++---- .../broker/storage/ManagedLedgerStorage.java | 16 ++++++---------- .../broker/testcontext/PulsarTestContext.java | 2 -- .../client/impl/SequenceIdWithErrorTest.java | 6 +++--- 5 files changed, 28 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index b060475a43f31..bc8babcafc2bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -61,9 +61,18 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { @Override public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, - BookKeeperClientFactory bookkeeperProvider, - EventLoopGroup eventLoopGroup, - OpenTelemetry openTelemetry) throws Exception { + OpenTelemetry openTelemetry) { + throw new IllegalStateException("The initialize method should not be called for the built-in storage"); + } + + @VisibleForTesting + public ManagedLedgerClientFactory() { + } + + public ManagedLedgerClientFactory(ServiceConfiguration conf, MetadataStoreExtended metadataStore, + BookKeeperClientFactory bookkeeperProvider, + EventLoopGroup eventLoopGroup, + OpenTelemetry openTelemetry) throws Exception { ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L); managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 1a55b95852a4c..b361b371db7f6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1105,10 +1105,13 @@ protected OrderedExecutor newOrderedExecutor() { @VisibleForTesting protected ManagedLedgerStorage newManagedLedgerStorage() throws Exception { - return ManagedLedgerStorage.create( - config, localMetadataStore, - bkClientFactory, ioEventLoopGroup, openTelemetry.getOpenTelemetryService().getOpenTelemetry() - ); + final var openTelemetry = this.openTelemetry.getOpenTelemetryService().getOpenTelemetry(); + if (config.getManagedLedgerStorageClassName().equals(ManagedLedgerClientFactory.class.getName())) { + return new ManagedLedgerClientFactory(config, localMetadataStore, bkClientFactory, ioEventLoopGroup, + openTelemetry); + } else { + return ManagedLedgerStorage.create(config, localMetadataStore, openTelemetry); + } } @VisibleForTesting diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java index 720798123e7b4..56ba927b4b36e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java @@ -18,12 +18,10 @@ */ package org.apache.pulsar.broker.storage; -import io.netty.channel.EventLoopGroup; import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import java.util.Collection; import java.util.Optional; -import org.apache.pulsar.broker.BookKeeperClientFactory; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.classification.InterfaceAudience.Private; import org.apache.pulsar.common.classification.InterfaceStability.Unstable; @@ -47,13 +45,12 @@ public interface ManagedLedgerStorage extends AutoCloseable { * Initialize the managed ledger storage. * * @param conf service config - * @param bookkeeperProvider bookkeeper provider + * @param metadataStore the metadata store used in Pulsar + * @param openTelemetry the OpenTelemetry instance used in Pulsar * @throws Exception */ void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, - BookKeeperClientFactory bookkeeperProvider, - EventLoopGroup eventLoopGroup, OpenTelemetry openTelemetry) throws Exception; /** @@ -85,21 +82,20 @@ default ManagedLedgerStorageClass getDefaultStorageClass() { void close() throws IOException; /** - * Initialize the {@link ManagedLedgerStorage} from the provided resources. + * Initialize the {@link ManagedLedgerStorage} from the provided resources if it's not the built-in storage. * * @param conf service config - * @param bkProvider bookkeeper client provider + * @param metadataStore the metadata store used in Pulsar + * @param openTelemetry the OpenTelemetry instance used in Pulsar * @return the initialized managed ledger storage. */ static ManagedLedgerStorage create(ServiceConfiguration conf, MetadataStoreExtended metadataStore, - BookKeeperClientFactory bkProvider, - EventLoopGroup eventLoopGroup, OpenTelemetry openTelemetry) throws Exception { ManagedLedgerStorage storage = Reflections.createInstance(conf.getManagedLedgerStorageClassName(), ManagedLedgerStorage.class, Thread.currentThread().getContextClassLoader()); - storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup, openTelemetry); + storage.initialize(conf, metadataStore, openTelemetry); return storage; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index 035941a93deaf..82c8f385fa4fa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.testcontext; -import io.netty.channel.EventLoopGroup; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; @@ -1026,7 +1025,6 @@ public BookKeeper getBookKeeperClient() { return new ManagedLedgerStorage() { @Override public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, - BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup, OpenTelemetry openTelemetry) { } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java index 2b1b409b71ce8..b2c677d412796 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java @@ -59,9 +59,9 @@ public void testCheckSequenceId() throws Exception { // Fence the topic by opening the ManagedLedger for the topic outside the Pulsar broker. This will cause the // broker to fail subsequent send operation and it will trigger a recover EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); - ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory(); - clientFactory.initialize(pulsar.getConfiguration(), pulsar.getLocalMetadataStore(), - pulsar.getBookKeeperClientFactory(), eventLoopGroup, OpenTelemetry.noop()); + ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory(pulsar.getConfiguration(), + pulsar.getLocalMetadataStore(), pulsar.getBookKeeperClientFactory(), eventLoopGroup, + OpenTelemetry.noop()); ManagedLedgerFactory mlFactory = clientFactory.getDefaultStorageClass().getManagedLedgerFactory(); ManagedLedger ml = mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding()); ml.close(); From 24cf7731b62244bf8d3caf10f61a1b4f16d95a98 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sun, 11 May 2025 17:52:30 +0800 Subject: [PATCH 2/7] Fall back to a disabled compaction service is BK is not available --- .../pulsar/broker/PulsarServerException.java | 8 ++ .../apache/pulsar/broker/PulsarService.java | 38 +++--- .../pulsar/broker/admin/AdminResource.java | 5 - .../pulsar/broker/admin/v2/Bookies.java | 5 +- .../service/persistent/PersistentTopic.java | 8 +- .../DisabledTopicCompactionService.java | 125 ++++++++++++++++++ .../PulsarCompactionServiceFactory.java | 9 +- 7 files changed, 172 insertions(+), 26 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/compaction/DisabledTopicCompactionService.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java index d7c0d0adb3afc..05291a59bf7bb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java @@ -46,6 +46,14 @@ public NotFoundException(Throwable t) { } } + public static class BookKeeperNotSupportedException extends PulsarServerException { + + public BookKeeperNotSupportedException() { + super("ManagedLedgerStorage#getDefaultStorageClass does not return a BookkeeperManagedLedgerStorageClass " + + "instance"); + } + } + public static PulsarServerException from(Throwable throwable) { if (throwable instanceof CompletionException) { return from(throwable.getCause()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index b361b371db7f6..22f0b456387f4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -125,7 +125,6 @@ import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet; import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass; import org.apache.pulsar.broker.storage.ManagedLedgerStorage; -import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; @@ -166,6 +165,7 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.DisabledTopicCompactionService; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor; import org.apache.pulsar.compaction.TopicCompactionService; @@ -1519,16 +1519,19 @@ public WorkerService getWorkerService() throws UnsupportedOperationException { + "is not enabled, probably functionsWorkerEnabled is set to false")); } - public BookKeeper getBookKeeperClient() { - ManagedLedgerStorageClass defaultStorageClass = getManagedLedgerStorage().getDefaultStorageClass(); - if (defaultStorageClass instanceof BookkeeperManagedLedgerStorageClass bkStorageClass) { - return bkStorageClass.getBookKeeperClient(); + public Optional getOptionalBookKeeperClient() { + final var defaultStorage = managedLedgerStorage.getDefaultStorageClass(); + if (defaultStorage instanceof BookkeeperManagedLedgerStorageClass bkStorage) { + return Optional.of(bkStorage.getBookKeeperClient()); } else { - // TODO: Refactor code to support other than default bookkeeper based storage class - throw new UnsupportedOperationException("BookKeeper client is not available"); + return Optional.empty(); } } + public BookKeeper getBookKeeperClient() throws PulsarServerException { + return getOptionalBookKeeperClient().orElseThrow(PulsarServerException.BookKeeperNotSupportedException::new); + } + public ManagedLedgerFactory getDefaultManagedLedgerFactory() { return getManagedLedgerStorage().getDefaultStorageClass().getManagedLedgerFactory(); } @@ -1640,17 +1643,15 @@ public Compactor getNullableCompactor() { return null; } - public StrategicTwoPhaseCompactor newStrategicCompactor() throws PulsarServerException { - return new StrategicTwoPhaseCompactor(this.getConfiguration(), - getClient(), getBookKeeperClient(), - getCompactorExecutor()); - } - - public synchronized StrategicTwoPhaseCompactor getStrategicCompactor() throws PulsarServerException { + public synchronized Optional getStrategicCompactor() throws PulsarServerException { + if (getOptionalBookKeeperClient().isEmpty()) { + return Optional.empty(); + } if (this.strategicCompactor == null) { - this.strategicCompactor = newStrategicCompactor(); + this.strategicCompactor = new StrategicTwoPhaseCompactor(this.getConfiguration(), getClient(), + getOptionalBookKeeperClient().get(), getCompactorExecutor()); } - return this.strategicCompactor; + return Optional.of(this.strategicCompactor); } protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) { @@ -2079,6 +2080,11 @@ private CompactionServiceFactory loadCompactionServiceFactory() { var compactionServiceFactory = Reflections.createInstance(compactionServiceFactoryClassName, CompactionServiceFactory.class, Thread.currentThread().getContextClassLoader()); + if (getOptionalBookKeeperClient().isEmpty() + && PulsarCompactionServiceFactory.class.isAssignableFrom(compactionServiceFactory.getClass())) { + LOG.warn("BookKeeper client is not available, fallback to a disabled compaction service"); + return new DisabledTopicCompactionService(); + } compactionServiceFactory.initialize(this).join(); return compactionServiceFactory; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index fb7679ff2696a..ba4ca8f6887f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -39,7 +39,6 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; @@ -90,10 +89,6 @@ public abstract class AdminResource extends PulsarWebResource { protected NamespaceName namespaceName; protected TopicName topicName; - protected BookKeeper bookKeeper() { - return pulsar().getBookKeeperClient(); - } - /** * Get the domain of the topic (whether it's persistent or non-persistent). */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java index c7b09ca9b0aa1..aedae800c37e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java @@ -84,7 +84,10 @@ public void getBookiesRackInfo(@Suspended final AsyncResponse asyncResponse) { public BookiesClusterInfo getAllBookies() throws Exception { validateSuperUserAccess(); - BookKeeper bookKeeper = bookKeeper(); + BookKeeper bookKeeper = pulsar().getOptionalBookKeeperClient().orElse(null); + if (bookKeeper == null) { + return BookiesClusterInfo.builder().bookies(List.of()).build(); + } MetadataClientDriver metadataClientDriver = bookKeeper.getMetadataClientDriver(); RegistrationClient registrationClient = metadataClientDriver.getRegistrationClient(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 89b9dfe0e3ed6..1af343ce5aee8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2904,8 +2904,9 @@ public CompletableFuture getInternalStats(boolean getLedgerMetadataFutures.add(completableFuture); CompletableFuture metadataFuture = null; try { - metadataFuture = brokerService.getPulsar().getBookKeeperClient() - .getLedgerMetadata(ledgerId); + metadataFuture = brokerService.getPulsar().getOptionalBookKeeperClient() + .map(bkClient -> bkClient.getLedgerMetadata(ledgerId)) + .orElse(null); } catch (NullPointerException e) { // related to bookkeeper issue https://github.com/apache/bookkeeper/issues/2741 if (log.isDebugEnabled()) { @@ -4025,7 +4026,8 @@ public synchronized void triggerCompaction() if (strategicCompactionMap.containsKey(topic)) { currentCompaction = brokerService.pulsar().getStrategicCompactor() - .compact(topic, strategicCompactionMap.get(topic)); + .map(__ -> __.compact(topic, strategicCompactionMap.get(topic))) + .orElse(CompletableFuture.completedFuture(COMPACTION_NEVER_RUN)); } else { currentCompaction = topicCompactionService.compact().thenApply(x -> null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/DisabledTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/DisabledTopicCompactionService.java new file mode 100644 index 0000000000000..79cd33ef75ef8 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/DisabledTopicCompactionService.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.PulsarService; +import org.jspecify.annotations.NonNull; + +public class DisabledTopicCompactionService implements CompactionServiceFactory { + + private static final Entry DUMMY_ENTRY = new Entry() { + @Override + public byte[] getData() { + return new byte[0]; + } + + @Override + public byte[] getDataAndRelease() { + return new byte[0]; + } + + @Override + public int getLength() { + return 0; + } + + @Override + public ByteBuf getDataBuffer() { + return Unpooled.wrappedBuffer(new byte[0]); + } + + @Override + public Position getPosition() { + return PositionFactory.EARLIEST; + } + + @Override + public long getLedgerId() { + return -1; + } + + @Override + public long getEntryId() { + return -1; + } + + @Override + public boolean release() { + return false; + } + }; + private static final TopicCompactionService DUMMY_TOPIC_COMPACTION_SERVICE = new TopicCompactionService() { + + @Override + public void close() { + } + + @Override + public CompletableFuture compact() { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture> readCompactedEntries(@NonNull Position startPosition, + int numberOfEntriesToRead) { + return CompletableFuture.completedFuture(List.of()); + } + + @Override + public CompletableFuture readLastCompactedEntry() { + return CompletableFuture.completedFuture(DUMMY_ENTRY); + } + + @Override + public CompletableFuture getLastCompactedPosition() { + return CompletableFuture.completedFuture(DUMMY_ENTRY.getPosition()); + } + + @Override + public CompletableFuture findEntryByPublishTime(long publishTime) { + return CompletableFuture.completedFuture(DUMMY_ENTRY); + } + + @Override + public CompletableFuture findEntryByEntryIndex(long entryIndex) { + return CompletableFuture.completedFuture(DUMMY_ENTRY); + } + }; + + @Override + public CompletableFuture initialize(@NonNull PulsarService pulsarService) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture newTopicCompactionService(@NonNull String topic) { + return CompletableFuture.completedFuture(DUMMY_TOPIC_COMPACTION_SERVICE); + } + + @Override + public void close() throws Exception { + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java index ff0fa0bcd1207..08a814735ee58 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletionException; import lombok.AccessLevel; import lombok.Getter; +import org.apache.bookkeeper.client.BookKeeper; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.jspecify.annotations.NonNull; @@ -69,8 +70,14 @@ public CompletableFuture initialize(@NonNull PulsarService pulsarService) @Override public CompletableFuture newTopicCompactionService(@NonNull String topic) { Objects.requireNonNull(topic); + final BookKeeper bookKeeper; + try { + bookKeeper = pulsarService.getBookKeeperClient(); + } catch (PulsarServerException e) { + return CompletableFuture.failedFuture(e); + } PulsarTopicCompactionService pulsarTopicCompactionService = - new PulsarTopicCompactionService(topic, pulsarService.getBookKeeperClient(), () -> { + new PulsarTopicCompactionService(topic, bookKeeper, () -> { try { return this.getCompactor(); } catch (Throwable e) { From e0738a44a0600e5a1f37c0126581c497edcf036b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sun, 11 May 2025 17:58:39 +0800 Subject: [PATCH 3/7] Fix test failure --- .../extensions/channel/ServiceUnitStateChannelTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 7db0baabf6552..91c45d578f7e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -1002,7 +1002,7 @@ public void conflictAndCompactionTest() throws Exception { return; } - var compactor = spy(pulsar1.getStrategicCompactor()); + var compactor = spy(pulsar1.getStrategicCompactor().orElseThrow()); Field strategicCompactorField = FieldUtils.getDeclaredField(PulsarService.class, "strategicCompactor", true); FieldUtils.writeField(strategicCompactorField, pulsar1, compactor, true); FieldUtils.writeField(strategicCompactorField, pulsar2, compactor, true); From ffce51ae9a9d78b4d66feff885bde4f1374be9ec Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 12 May 2025 21:22:19 +0800 Subject: [PATCH 4/7] Fix getLastCompactedPosition --- .../pulsar/compaction/DisabledTopicCompactionService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/DisabledTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/DisabledTopicCompactionService.java index 79cd33ef75ef8..9d3104d7b88fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/DisabledTopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/DisabledTopicCompactionService.java @@ -95,7 +95,9 @@ public CompletableFuture readLastCompactedEntry() { @Override public CompletableFuture getLastCompactedPosition() { - return CompletableFuture.completedFuture(DUMMY_ENTRY.getPosition()); + // Return a null position so that when handling the GetLastMessageId request, no `asyncReadEntry` call will + // be performed. Instead, the mark delete position will be returned. + return CompletableFuture.completedFuture(null); } @Override From dcd357459dee7df83ded2c62a99523ad73196aa2 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 12 May 2025 23:23:48 +0800 Subject: [PATCH 5/7] Fail fast if some components rely on the BookKeeper --- .../apache/pulsar/broker/PulsarService.java | 10 +- .../delayed/DelayedDeliveryTrackerLoader.java | 6 +- .../pulsar/BookKeeperNotSupportedTest.java | 127 ++++++++++++++++++ 3 files changed, 137 insertions(+), 6 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/BookKeeperNotSupportedTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 22f0b456387f4..e38e353569763 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -62,6 +62,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; +import javax.annotation.Nullable; import javax.servlet.ServletException; import javax.websocket.DeploymentException; import lombok.AccessLevel; @@ -218,6 +219,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private WebService webService = null; private WebSocketService webSocketService = null; private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED; + @Nullable private BookKeeperClientFactory bkClientFactory; protected CompactionServiceFactory compactionServiceFactory; private StrategicTwoPhaseCompactor strategicCompactor; @@ -879,8 +881,6 @@ public void start() throws PulsarServerException { protocolHandlers.initialize(config); // Now we are ready to start services - this.bkClientFactory = newBookKeeperClientFactory(); - managedLedgerStorage = newManagedLedgerStorage(); this.brokerService = newBrokerService(this); @@ -1107,6 +1107,7 @@ protected OrderedExecutor newOrderedExecutor() { protected ManagedLedgerStorage newManagedLedgerStorage() throws Exception { final var openTelemetry = this.openTelemetry.getOpenTelemetryService().getOpenTelemetry(); if (config.getManagedLedgerStorageClassName().equals(ManagedLedgerClientFactory.class.getName())) { + this.bkClientFactory = newBookKeeperClientFactory(); return new ManagedLedgerClientFactory(config, localMetadataStore, bkClientFactory, ioEventLoopGroup, openTelemetry); } else { @@ -1619,7 +1620,10 @@ public BookKeeperClientFactory newBookKeeperClientFactory() { return new BookKeeperClientFactoryImpl(); } - public BookKeeperClientFactory getBookKeeperClientFactory() { + public BookKeeperClientFactory getBookKeeperClientFactory() throws PulsarServerException { + if (bkClientFactory == null) { + throw new PulsarServerException.BookKeeperNotSupportedException(); + } return bkClientFactory; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java index 6e32de1998ada..bb951a5bbc124 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerLoader.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.broker.delayed; -import java.io.IOException; import lombok.experimental.UtilityClass; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.util.Reflections; @@ -27,7 +27,7 @@ @UtilityClass public class DelayedDeliveryTrackerLoader { public static DelayedDeliveryTrackerFactory loadDelayedDeliveryTrackerFactory(PulsarService pulsarService) - throws IOException { + throws PulsarServerException { try { ServiceConfiguration conf = pulsarService.getConfiguration(); DelayedDeliveryTrackerFactory factory = @@ -36,7 +36,7 @@ public static DelayedDeliveryTrackerFactory loadDelayedDeliveryTrackerFactory(Pu factory.initialize(pulsarService); return factory; } catch (Exception e) { - throw new IOException(e); + throw PulsarServerException.from(e); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/BookKeeperNotSupportedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/BookKeeperNotSupportedTest.java new file mode 100644 index 0000000000000..308c07e81fd98 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/BookKeeperNotSupportedTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar; + +import io.opentelemetry.api.OpenTelemetry; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; +import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; +import org.apache.pulsar.broker.service.schema.SchemaStorageFactory; +import org.apache.pulsar.broker.storage.ManagedLedgerStorage; +import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass; +import org.apache.pulsar.common.protocol.schema.SchemaStorage; +import org.apache.pulsar.compaction.DisabledTopicCompactionService; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.jspecify.annotations.NonNull; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Test the start will fail fast if managed ledger does not use BookKeeper as the default storage layer while some + * plugins depend on BookKeeper. + */ +@Test +public class BookKeeperNotSupportedTest { + + @Test + public void testSchemaStorage() throws PulsarServerException { + final var config = newConfig(); + try (final var pulsar = new PulsarService(config)) { + Assert.assertThrows(PulsarServerException.BookKeeperNotSupportedException.class, pulsar::start); + } + } + + @Test + public void testDelayedDeliveryTracker() throws PulsarServerException { + final var config = newConfig(); + config.setSchemaRegistryStorageClassName(DummySchemaFactory.class.getName()); + config.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName()); + try (final var pulsar = new PulsarService(config)) { + Assert.assertThrows(PulsarServerException.BookKeeperNotSupportedException.class, pulsar::start); + } + } + + @Test + public void testStartSucceed() throws PulsarServerException { + final var config = newConfig(); + config.setSchemaRegistryStorageClassName(DummySchemaFactory.class.getName()); + try (final var ignored = new PulsarService(config)) { + } + } + + private static ServiceConfiguration newConfig() { + final var config = new ServiceConfiguration(); + config.setClusterName("test"); + config.setMetadataStoreUrl("memory:local"); + config.setManagedLedgerStorageClassName(NoBkStorage.class.getName()); + config.setCompactionServiceFactoryClassName(DisabledTopicCompactionService.class.getName()); + config.setAdvertisedAddress("127.0.0.1"); // avoid being affected by the local proxy + return config; + } + + private static class NoBkStorage implements ManagedLedgerStorage { + + final ManagedLedgerStorageClass storage = new ManagedLedgerStorageClass() { + @Override + public String getName() { + return "other-storage"; + } + + @Override + public ManagedLedgerFactory getManagedLedgerFactory() { + return Mockito.mock(ManagedLedgerFactoryImpl.class); + } + }; + + @Override + public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, + OpenTelemetry openTelemetry) { + } + + @Override + public Collection getStorageClasses() { + return List.of(storage); + } + + @Override + public Optional getManagedLedgerStorageClass(String name) { + return Optional.of(storage); + } + + @Override + public void close() { + } + } + + static class DummySchemaFactory implements SchemaStorageFactory { + + @Override + public @NonNull SchemaStorage create(PulsarService pulsar) throws Exception { + return Mockito.mock(BookkeeperSchemaStorage.class); + } + } +} From 322231ddcfd698539738f3ab55303d0dbbf3e339 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 13 May 2025 18:23:56 +0800 Subject: [PATCH 6/7] Move bkClientFactory into ManagedLedgerClientFactory --- .../broker/ManagedLedgerClientFactory.java | 9 +++++- .../apache/pulsar/broker/PulsarService.java | 20 +++---------- .../BookkeeperManagedLedgerStorageClass.java | 6 ++++ .../broker/testcontext/PulsarTestContext.java | 28 ++++++++----------- 4 files changed, 29 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index bc8babcafc2bc..2d2b34f759544 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -53,6 +53,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class); private static final String DEFAULT_STORAGE_CLASS_NAME = "bookkeeper"; private BookkeeperManagedLedgerStorageClass defaultStorageClass; + private BookKeeperClientFactory bookkeeperProvider; private ManagedLedgerFactory managedLedgerFactory; private BookKeeper defaultBkClient; private final AsyncCache @@ -70,9 +71,9 @@ public ManagedLedgerClientFactory() { } public ManagedLedgerClientFactory(ServiceConfiguration conf, MetadataStoreExtended metadataStore, - BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup, OpenTelemetry openTelemetry) throws Exception { + this.bookkeeperProvider = new BookKeeperClientFactoryImpl(); ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L); managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark()); @@ -162,6 +163,11 @@ public StatsProvider getStatsProvider() { return statsProvider; } + @Override + public BookKeeperClientFactory getBookKeeperClientFactory() { + return bookkeeperProvider; + } + @Override public BookKeeper getBookKeeperClient() { return defaultBkClient; @@ -238,6 +244,7 @@ public void close() throws IOException { log.warn("Failed to close bookkeeper-client for policy {}", policy, e); } }); + bookkeeperProvider.close(); log.info("Closed BookKeeper client"); } catch (Exception e) { log.warn(e.getMessage(), e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index e38e353569763..6961e75cc7a68 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -219,8 +219,6 @@ public class PulsarService implements AutoCloseable, ShutdownService { private WebService webService = null; private WebSocketService webSocketService = null; private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED; - @Nullable - private BookKeeperClientFactory bkClientFactory; protected CompactionServiceFactory compactionServiceFactory; private StrategicTwoPhaseCompactor strategicCompactor; private ResourceUsageTransportManager resourceUsageTransportManager; @@ -592,11 +590,6 @@ public CompletableFuture closeAsync() { this.managedLedgerStorage = null; } - if (bkClientFactory != null) { - this.bkClientFactory.close(); - this.bkClientFactory = null; - } - closeLeaderElectionService(); if (adminClient != null) { @@ -1107,9 +1100,7 @@ protected OrderedExecutor newOrderedExecutor() { protected ManagedLedgerStorage newManagedLedgerStorage() throws Exception { final var openTelemetry = this.openTelemetry.getOpenTelemetryService().getOpenTelemetry(); if (config.getManagedLedgerStorageClassName().equals(ManagedLedgerClientFactory.class.getName())) { - this.bkClientFactory = newBookKeeperClientFactory(); - return new ManagedLedgerClientFactory(config, localMetadataStore, bkClientFactory, ioEventLoopGroup, - openTelemetry); + return new ManagedLedgerClientFactory(config, localMetadataStore, ioEventLoopGroup, openTelemetry); } else { return ManagedLedgerStorage.create(config, localMetadataStore, openTelemetry); } @@ -1616,15 +1607,12 @@ private SchemaStorage createAndStartSchemaStorage() throws Exception { return schemaStorage; } - public BookKeeperClientFactory newBookKeeperClientFactory() { - return new BookKeeperClientFactoryImpl(); - } - public BookKeeperClientFactory getBookKeeperClientFactory() throws PulsarServerException { - if (bkClientFactory == null) { + if (managedLedgerStorage.getDefaultStorageClass() instanceof BookkeeperManagedLedgerStorageClass bkStorage) { + return bkStorage.getBookKeeperClientFactory(); + } else { throw new PulsarServerException.BookKeeperNotSupportedException(); } - return bkClientFactory; } public synchronized ScheduledExecutorService getCompactorExecutor() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/BookkeeperManagedLedgerStorageClass.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/BookkeeperManagedLedgerStorageClass.java index 1f05cde72a5b5..c1f7ccf30cf90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/BookkeeperManagedLedgerStorageClass.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/BookkeeperManagedLedgerStorageClass.java @@ -20,6 +20,7 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.pulsar.broker.BookKeeperClientFactory; /** * ManagedLedgerStorageClass represents a configured instance of ManagedLedgerFactory for managed ledgers. @@ -39,4 +40,9 @@ public interface BookkeeperManagedLedgerStorageClass extends ManagedLedgerStorag * @return the stats provider. */ StatsProvider getStatsProvider(); + + /** + * Return the bookkeeper client factory instance that can be used to create a bookkeeper client instance. + */ + BookKeeperClientFactory getBookKeeperClientFactory(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index 82c8f385fa4fa..439caa72366f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.stats.StatsProvider; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.BookKeeperClientFactory; +import org.apache.pulsar.broker.BookKeeperClientFactoryImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -554,21 +555,6 @@ public Builder useTestPulsarResources(MetadataStore metadataStore) { return this; } - /** - * Applicable only when PulsarTestContext is not startable. This will configure the {@link BookKeeper} - * and {@link ManagedLedgerFactory} instances to use for creating a {@link ManagedLedgerStorage} instance - * for PulsarService. - * - * @param bookKeeperClient the bookkeeper client to use (mock bookkeeper) - * @param managedLedgerFactory the managed ledger factory to use (could be a mock) - * @return the builder - */ - public Builder managedLedgerClients(BookKeeper bookKeeperClient, - ManagedLedgerFactory managedLedgerFactory) { - return managedLedgerStorage( - PulsarTestContext.createManagedLedgerStorage(bookKeeperClient, managedLedgerFactory)); - } - /** * Configures a function to use for customizing the {@link BrokerService} instance when it gets created. * @return the builder @@ -950,7 +936,8 @@ protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) { ManagedLedgerFactory mlFactoryMock = Mockito.mock(ManagedLedgerFactory.class); managedLedgerStorage( spyConfig.getManagedLedgerStorage() - .spy(PulsarTestContext.createManagedLedgerStorage(builder.bookKeeperClient, + .spy(PulsarTestContext.createManagedLedgerStorage(new BookKeeperClientFactoryImpl(), + builder.bookKeeperClient, mlFactoryMock))); } if (builder.pulsarResources == null) { @@ -998,10 +985,12 @@ protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) { } @NonNull - private static ManagedLedgerStorage createManagedLedgerStorage(BookKeeper bookKeeperClient, + private static ManagedLedgerStorage createManagedLedgerStorage(BookKeeperClientFactory bookKeeperClientFactory, + BookKeeper bookKeeperClient, ManagedLedgerFactory managedLedgerFactory) { BookkeeperManagedLedgerStorageClass managedLedgerStorageClass = new BookkeeperManagedLedgerStorageClass() { + @Override public String getName() { return "bookkeeper"; @@ -1017,6 +1006,11 @@ public StatsProvider getStatsProvider() { return new NullStatsProvider(); } + @Override + public BookKeeperClientFactory getBookKeeperClientFactory() { + return bookKeeperClientFactory; + } + @Override public BookKeeper getBookKeeperClient() { return bookKeeperClient; From e12fb2cdc05c043b05e44121e23cbc057ddab844 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 13 May 2025 18:25:33 +0800 Subject: [PATCH 7/7] Fix build failure --- .../src/main/java/org/apache/pulsar/broker/PulsarService.java | 1 - .../org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6961e75cc7a68..da4b88e167ae9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -62,7 +62,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; -import javax.annotation.Nullable; import javax.servlet.ServletException; import javax.websocket.DeploymentException; import lombok.AccessLevel; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java index b2c677d412796..edeada594f1e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java @@ -60,8 +60,7 @@ public void testCheckSequenceId() throws Exception { // broker to fail subsequent send operation and it will trigger a recover EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory(pulsar.getConfiguration(), - pulsar.getLocalMetadataStore(), pulsar.getBookKeeperClientFactory(), eventLoopGroup, - OpenTelemetry.noop()); + pulsar.getLocalMetadataStore(), eventLoopGroup, OpenTelemetry.noop()); ManagedLedgerFactory mlFactory = clientFactory.getDefaultStorageClass().getManagedLedgerFactory(); ManagedLedger ml = mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding()); ml.close();