From 2d2331cd281f9d7d938b6a63ccdadb9a79b0171c Mon Sep 17 00:00:00 2001 From: poorna Date: Fri, 9 Sep 2016 16:44:22 -0700 Subject: [PATCH 1/2] TEPHRA-179 Transaction service high availability changes --- .../org/apache/tephra/TransactionManager.java | 6 + .../apache/tephra/TransactionServiceMain.java | 6 +- .../distributed/TransactionService.java | 79 ++++++++++- .../inmemory/InMemoryTransactionService.java | 132 ------------------ .../DefaultTransactionManagerProvider.java | 71 ++++++++++ .../runtime/TransactionDistributedModule.java | 27 ++-- .../runtime/TransactionInMemoryModule.java | 8 +- .../runtime/TransactionLocalModule.java | 13 +- .../runtime/TransactionServiceModule.java | 37 +++++ .../TransactionStateStorageProvider.java | 18 +-- .../TestTransactionManagerProvider.java | 54 +++++++ .../tephra/ThriftTransactionSystemTest.java | 24 ++-- .../apache/tephra/TransactionAdminTest.java | 6 +- .../ThriftTransactionServerTest.java | 35 ++++- .../tephra/snapshot/SnapshotCodecTest.java | 22 ++- 15 files changed, 343 insertions(+), 195 deletions(-) delete mode 100644 tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java create mode 100644 tephra-core/src/main/java/org/apache/tephra/runtime/DefaultTransactionManagerProvider.java create mode 100644 tephra-core/src/main/java/org/apache/tephra/runtime/TransactionServiceModule.java create mode 100644 tephra-core/src/test/java/org/apache/tephra/TestTransactionManagerProvider.java diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java index cfefc83b..0fa1e40c 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java @@ -18,6 +18,7 @@ package org.apache.tephra; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -1265,6 +1266,11 @@ public void logStatistics() { ", committed = " + committedChangeSets.size()); } + @VisibleForTesting + public TransactionStateStorage getTransactionStateStorage() { + return persistor; + } + private abstract static class DaemonThreadExecutor extends Thread { private AtomicBoolean stopped = new AtomicBoolean(false); diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionServiceMain.java b/tephra-core/src/main/java/org/apache/tephra/TransactionServiceMain.java index 0a9dd1d5..136bfd01 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionServiceMain.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionServiceMain.java @@ -24,8 +24,7 @@ import org.apache.tephra.distributed.TransactionService; import org.apache.tephra.runtime.ConfigModule; import org.apache.tephra.runtime.DiscoveryModules; -import org.apache.tephra.runtime.TransactionClientModule; -import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.runtime.TransactionServiceModule; import org.apache.tephra.runtime.ZKModule; import org.apache.tephra.util.ConfigurationFactory; import org.apache.twill.zookeeper.ZKClientService; @@ -104,8 +103,7 @@ public void start() throws Exception { new ConfigModule(conf), new ZKModule(), new DiscoveryModules().getDistributedModules(), - new TransactionModules().getDistributedModules(), - new TransactionClientModule() + new TransactionServiceModule() ); ZKClientService zkClientService = injector.getInstance(ZKClientService.class); diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java index 6fbf926f..b44d185c 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java @@ -19,16 +19,19 @@ package org.apache.tephra.distributed; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.google.inject.Inject; import com.google.inject.Provider; import org.apache.hadoop.conf.Configuration; import org.apache.tephra.TransactionManager; +import org.apache.tephra.TxConstants; import org.apache.tephra.distributed.thrift.TTransactionServer; -import org.apache.tephra.inmemory.InMemoryTransactionService; import org.apache.tephra.rpc.ThriftRPCServer; import org.apache.twill.api.ElectionHandler; +import org.apache.twill.common.Cancellable; +import org.apache.twill.discovery.Discoverable; import org.apache.twill.discovery.DiscoveryService; import org.apache.twill.internal.ServiceListenerAdapter; import org.apache.twill.internal.zookeeper.LeaderElection; @@ -42,15 +45,29 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; /** * */ -public final class TransactionService extends InMemoryTransactionService { +public class TransactionService extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(TransactionService.class); private LeaderElection leaderElection; private final ZKClient zkClient; + private final DiscoveryService discoveryService; + private final Provider txManagerProvider; + private final String serviceName; + private Cancellable cancelDiscovery; + + // thrift server config + private final String address; + private final int port; + private final int threads; + private final int ioThreads; + private final int maxReadBufferBytes; + + private TransactionManager txManager; private ThriftRPCServer server; @Inject @@ -58,12 +75,34 @@ public TransactionService(Configuration conf, ZKClient zkClient, DiscoveryService discoveryService, Provider txManagerProvider) { - super(conf, discoveryService, txManagerProvider); this.zkClient = zkClient; + this.discoveryService = discoveryService; + this.txManagerProvider = txManagerProvider; + + serviceName = conf.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME, + TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME); + + address = conf.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS, TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS); + port = conf.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT); + + // Retrieve the number of threads for the service + threads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS, + TxConstants.Service.DEFAULT_DATA_TX_SERVER_THREADS); + ioThreads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS, + TxConstants.Service.DEFAULT_DATA_TX_SERVER_IO_THREADS); + + maxReadBufferBytes = conf.getInt(TxConstants.Service.CFG_DATA_TX_THRIFT_MAX_READ_BUFFER, + TxConstants.Service.DEFAULT_DATA_TX_THRIFT_MAX_READ_BUFFER); + + LOG.info("Configuring TransactionService" + + ", address: " + address + + ", port: " + port + + ", threads: " + threads + + ", io threads: " + ioThreads + + ", max read buffer (bytes): " + maxReadBufferBytes); } - @Override - protected InetSocketAddress getAddress() { + private InetSocketAddress getAddress() { if (address.equals("0.0.0.0")) { // resolve hostname try { @@ -112,6 +151,7 @@ public void failed(State from, Throwable failure) { public void follower() { // First stop the transaction server as un-registering from discovery can block sometimes. // That can lead to multiple transaction servers being active at the same time. + txManager = null; if (server != null && server.isRunning()) { server.stopAndWait(); } @@ -140,7 +180,7 @@ protected void abort(Throwable cause) { notifyFailed(cause); } - protected void internalStop() { + private void internalStop() { if (leaderElection != null) { // NOTE: if was a leader this will cause loosing of leadership which in callback above will // de-register service in discovery service and stop the service if needed @@ -153,4 +193,31 @@ protected void internalStop() { } } } + + private void undoRegister() { + if (cancelDiscovery != null) { + cancelDiscovery.cancel(); + } + } + + private void doRegister() { + cancelDiscovery = discoveryService.register(new Discoverable() { + @Override + public String getName() { + return serviceName; + } + + @Override + public InetSocketAddress getSocketAddress() { + return getAddress(); + } + }); + } + + @SuppressWarnings("WeakerAccess") + @VisibleForTesting + @Nullable + public TransactionManager getTransactionManager() { + return txManager; + } } diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java deleted file mode 100644 index fb453625..00000000 --- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tephra.inmemory; - -import com.google.common.util.concurrent.AbstractService; -import com.google.inject.Inject; -import com.google.inject.Provider; -import org.apache.hadoop.conf.Configuration; -import org.apache.tephra.TransactionManager; -import org.apache.tephra.TxConstants; -import org.apache.twill.common.Cancellable; -import org.apache.twill.discovery.Discoverable; -import org.apache.twill.discovery.DiscoveryService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; - -/** - * Transaction server that manages transaction data for the Reactor. - *

- * Transaction server is HA, one can start multiple instances, only one of which is active and will register itself in - * discovery service. - *

- */ -public class InMemoryTransactionService extends AbstractService { - private static final Logger LOG = LoggerFactory.getLogger(InMemoryTransactionService.class); - - private final DiscoveryService discoveryService; - private final String serviceName; - // this is Provider, so that we can have multiple instances of it (use a new instance after leader election) - protected final Provider txManagerProvider; - private Cancellable cancelDiscovery; - protected TransactionManager txManager; - - // thrift server config - protected final String address; - protected final int port; - protected final int threads; - protected final int ioThreads; - protected final int maxReadBufferBytes; - - @Inject - public InMemoryTransactionService(Configuration conf, DiscoveryService discoveryService, - Provider txManagerProvider) { - - this.discoveryService = discoveryService; - this.txManagerProvider = txManagerProvider; - this.serviceName = conf.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME, - TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME); - - address = conf.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS, TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS); - port = conf.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT); - - // Retrieve the number of threads for the service - threads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS, - TxConstants.Service.DEFAULT_DATA_TX_SERVER_THREADS); - ioThreads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS, - TxConstants.Service.DEFAULT_DATA_TX_SERVER_IO_THREADS); - - maxReadBufferBytes = conf.getInt(TxConstants.Service.CFG_DATA_TX_THRIFT_MAX_READ_BUFFER, - TxConstants.Service.DEFAULT_DATA_TX_THRIFT_MAX_READ_BUFFER); - - LOG.info("Configuring TransactionService" + - ", address: " + address + - ", port: " + port + - ", threads: " + threads + - ", io threads: " + ioThreads + - ", max read buffer (bytes): " + maxReadBufferBytes); - } - - protected void undoRegister() { - if (cancelDiscovery != null) { - cancelDiscovery.cancel(); - } - } - - protected void doRegister() { - cancelDiscovery = discoveryService.register(new Discoverable() { - @Override - public String getName() { - return serviceName; - } - - @Override - public InetSocketAddress getSocketAddress() { - return getAddress(); - } - }); - } - - protected InetSocketAddress getAddress() { - return new InetSocketAddress(1); - } - - @Override - protected void doStart() { - try { - txManager = txManagerProvider.get(); - txManager.startAndWait(); - doRegister(); - LOG.info("Transaction Thrift service started successfully on " + getAddress()); - notifyStarted(); - } catch (Throwable t) { - LOG.info("Transaction Thrift service didn't start on " + getAddress()); - notifyFailed(t); - } - } - - @Override - protected void doStop() { - undoRegister(); - txManager.stopAndWait(); - notifyStopped(); - } - -} diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/DefaultTransactionManagerProvider.java b/tephra-core/src/main/java/org/apache/tephra/runtime/DefaultTransactionManagerProvider.java new file mode 100644 index 00000000..6d5094cd --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/DefaultTransactionManagerProvider.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.runtime; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provider; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TransactionManager; +import org.apache.twill.zookeeper.ZKClient; +import org.apache.twill.zookeeper.ZKClientService; + +/** + * A provider for {@link TransactionManager} that provides a new instance every time. + */ +public class DefaultTransactionManagerProvider implements Provider { + private final Configuration conf; + private final ZKClientService zkClientService; + + @Inject + public DefaultTransactionManagerProvider(Configuration conf, ZKClientService zkClientService) { + this.conf = conf; + this.zkClientService = zkClientService; + } + + @Override + public TransactionManager get() { + // Create a new injector every time since Guice services cannot be restarted TEPHRA-179 + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new AbstractModule() { + @Override + protected void configure() { + // Instead of using ZKClientModule that will create new instance of ZKClient, we create instance + // binding to reuse the same ZKClient used for leader election + bind(ZKClient.class).toInstance(zkClientService); + bind(ZKClientService.class).toInstance(zkClientService); + } + }, + new TransactionClientModule(), + getTransactionModule() + ); + return injector.getInstance(TransactionManager.class); + } + + @VisibleForTesting + protected Module getTransactionModule() { + return new TransactionModules().getDistributedModules(); + } +} diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java index ff796c13..25a729a6 100644 --- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java @@ -18,9 +18,8 @@ package org.apache.tephra.runtime; -import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; import com.google.inject.Scopes; -import com.google.inject.Singleton; import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.name.Names; import org.apache.tephra.DefaultTransactionExecutor; @@ -38,23 +37,25 @@ /** * Guice bindings for running in distributed mode on a cluster. */ -final class TransactionDistributedModule extends AbstractModule { +final class TransactionDistributedModule extends PrivateModule { @Override protected void configure() { - // some of these classes need to be non-singleton in order to create a new instance during leader() in - // TransactionService - bind(SnapshotCodecProvider.class).in(Singleton.class); - bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")).to(HDFSTransactionStateStorage.class); - bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class); + bind(SnapshotCodecProvider.class).in(Scopes.SINGLETON); + bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")) + .to(HDFSTransactionStateStorage.class).in(Scopes.SINGLETON); + bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Scopes.SINGLETON); - // to catch issues during configure time - bind(TransactionManager.class); + bind(TransactionManager.class).in(Scopes.SINGLETON); bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Scopes.SINGLETON); - bind(MetricsCollector.class).to(DefaultMetricsCollector.class); + bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Scopes.SINGLETON); install(new FactoryModuleBuilder() - .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) - .build(TransactionExecutorFactory.class)); + .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) + .build(TransactionExecutorFactory.class)); + + expose(TransactionManager.class); + expose(TransactionSystemClient.class); + expose(TransactionExecutorFactory.class); } } diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java index 0c5a3f31..15c65527 100644 --- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java @@ -18,7 +18,7 @@ package org.apache.tephra.runtime; -import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; import com.google.inject.Scopes; import com.google.inject.assistedinject.FactoryModuleBuilder; import org.apache.tephra.DefaultTransactionExecutor; @@ -37,7 +37,7 @@ * Guice bindings for running completely in-memory (no persistence). This should only be used for * test classes, as the transaction state cannot be recovered in the case of a failure. */ -public class TransactionInMemoryModule extends AbstractModule { +public class TransactionInMemoryModule extends PrivateModule { @Override protected void configure() { @@ -51,5 +51,9 @@ protected void configure() { install(new FactoryModuleBuilder() .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) .build(TransactionExecutorFactory.class)); + + expose(TransactionManager.class); + expose(TransactionSystemClient.class); + expose(TransactionExecutorFactory.class); } } diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java index 0dba09ec..d1e5e73c 100644 --- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java @@ -18,9 +18,8 @@ package org.apache.tephra.runtime; -import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; import com.google.inject.Scopes; -import com.google.inject.Singleton; import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.name.Names; import org.apache.tephra.DefaultTransactionExecutor; @@ -38,21 +37,25 @@ /** * Guice bindings for running in single-node mode (persistence to local disk and in-memory client). */ -final class TransactionLocalModule extends AbstractModule { +final class TransactionLocalModule extends PrivateModule { @Override protected void configure() { - bind(SnapshotCodecProvider.class).in(Singleton.class); + bind(SnapshotCodecProvider.class).in(Scopes.SINGLETON); bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")) .to(LocalFileTransactionStateStorage.class).in(Scopes.SINGLETON); bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Scopes.SINGLETON); bind(TransactionManager.class).in(Scopes.SINGLETON); - bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class); + bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Scopes.SINGLETON); bind(MetricsCollector.class).to(DefaultMetricsCollector.class); install(new FactoryModuleBuilder() .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) .build(TransactionExecutorFactory.class)); + + expose(TransactionManager.class); + expose(TransactionSystemClient.class); + expose(TransactionExecutorFactory.class); } } diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionServiceModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionServiceModule.java new file mode 100644 index 00000000..b927620b --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionServiceModule.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.runtime; + +import com.google.inject.PrivateModule; +import com.google.inject.Scopes; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.distributed.TransactionService; + +/** + * Guice bindings for {@link TransactionService}. + */ +public class TransactionServiceModule extends PrivateModule { + @Override + protected void configure() { + bind(TransactionManager.class).toProvider(DefaultTransactionManagerProvider.class); + bind(TransactionService.class).in(Scopes.SINGLETON); + expose(TransactionService.class); + } +} diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java index 54565537..76e85346 100644 --- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java @@ -18,11 +18,9 @@ package org.apache.tephra.runtime; import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.Key; import com.google.inject.Provider; import com.google.inject.Singleton; -import com.google.inject.name.Names; +import com.google.inject.name.Named; import org.apache.hadoop.conf.Configuration; import org.apache.tephra.TxConstants; import org.apache.tephra.persist.NoOpTransactionStateStorage; @@ -36,20 +34,24 @@ public final class TransactionStateStorageProvider implements Provider { private final Configuration cConf; - private final Injector injector; + private final Provider storageProvider; + private final Provider noOpTransactionStateStorageProvider; @Inject - TransactionStateStorageProvider(Configuration cConf, Injector injector) { + TransactionStateStorageProvider(Configuration cConf, + Provider noOpTransactionStateStorageProvider, + @Named("persist") Provider storageProvider) { this.cConf = cConf; - this.injector = injector; + this.storageProvider = storageProvider; + this.noOpTransactionStateStorageProvider = noOpTransactionStateStorageProvider; } @Override public TransactionStateStorage get() { if (cConf.getBoolean(TxConstants.Manager.CFG_DO_PERSIST, true)) { - return injector.getInstance(Key.get(TransactionStateStorage.class, Names.named("persist"))); + return storageProvider.get(); } else { - return injector.getInstance(NoOpTransactionStateStorage.class); + return noOpTransactionStateStorageProvider.get(); } } } diff --git a/tephra-core/src/test/java/org/apache/tephra/TestTransactionManagerProvider.java b/tephra-core/src/test/java/org/apache/tephra/TestTransactionManagerProvider.java new file mode 100644 index 00000000..3bdd36b7 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/TestTransactionManagerProvider.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra; + +import com.google.inject.AbstractModule; +import com.google.inject.Inject; +import com.google.inject.Module; +import com.google.inject.Scopes; +import com.google.inject.util.Modules; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.runtime.DefaultTransactionManagerProvider; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.twill.zookeeper.ZKClientService; + +/** + * + */ +public class TestTransactionManagerProvider extends DefaultTransactionManagerProvider { + @Inject + public TestTransactionManagerProvider(Configuration conf, ZKClientService zkClientService) { + super(conf, zkClientService); + } + + @Override + protected Module getTransactionModule() { + return Modules.override(new TransactionModules().getDistributedModules()) + .with(new AbstractModule() { + @Override + protected void configure() { + bind(TransactionStateStorage.class) + .to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON); + } + }); + } +} diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java index ef9b9c24..9e8d98c0 100644 --- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java @@ -18,14 +18,10 @@ package org.apache.tephra; -import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; -import com.google.inject.Scopes; -import com.google.inject.util.Modules; import org.apache.hadoop.conf.Configuration; import org.apache.tephra.distributed.TransactionService; -import org.apache.tephra.persist.InMemoryTransactionStateStorage; import org.apache.tephra.persist.TransactionStateStorage; import org.apache.tephra.runtime.ConfigModule; import org.apache.tephra.runtime.DiscoveryModules; @@ -33,9 +29,11 @@ import org.apache.tephra.runtime.TransactionModules; import org.apache.tephra.runtime.ZKModule; import org.apache.tephra.util.Tests; +import org.apache.twill.discovery.DiscoveryService; import org.apache.twill.internal.zookeeper.InMemoryZKServer; import org.apache.twill.zookeeper.ZKClientService; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -70,13 +68,7 @@ public static void start() throws Exception { new ConfigModule(conf), new ZKModule(), new DiscoveryModules().getDistributedModules(), - Modules.override(new TransactionModules().getDistributedModules()) - .with(new AbstractModule() { - @Override - protected void configure() { - bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON); - } - }), + new TransactionModules().getDistributedModules(), new TransactionClientModule() ); @@ -84,12 +76,13 @@ protected void configure() { zkClientService.startAndWait(); // start a tx server - txService = injector.getInstance(TransactionService.class); - storage = injector.getInstance(TransactionStateStorage.class); + DiscoveryService discoveryService = injector.getInstance(DiscoveryService.class); + txService = + new TransactionService(conf, zkClientService, discoveryService, + new TestTransactionManagerProvider(conf, zkClientService)); txClient = injector.getInstance(TransactionSystemClient.class); try { LOG.info("Starting transaction service"); - storage.startAndWait(); txService.startAndWait(); } catch (Exception e) { LOG.error("Failed to start service: ", e); @@ -97,6 +90,8 @@ protected void configure() { } Tests.waitForTxReady(txClient); + Assert.assertNotNull(txService.getTransactionManager()); + storage = txService.getTransactionManager().getTransactionStateStorage(); } @Before @@ -107,7 +102,6 @@ public void reset() throws Exception { @AfterClass public static void stop() throws Exception { txService.stopAndWait(); - storage.stopAndWait(); zkClientService.stopAndWait(); zkServer.stopAndWait(); } diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java index 27fc0715..c35228b0 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java @@ -33,6 +33,7 @@ import org.apache.tephra.runtime.TransactionModules; import org.apache.tephra.runtime.ZKModule; import org.apache.tephra.util.Tests; +import org.apache.twill.discovery.DiscoveryService; import org.apache.twill.internal.zookeeper.InMemoryZKServer; import org.apache.twill.zookeeper.ZKClientService; import org.junit.AfterClass; @@ -90,7 +91,10 @@ protected void configure() { zkClientService.startAndWait(); // start a tx server - txService = injector.getInstance(TransactionService.class); + DiscoveryService discoveryService = injector.getInstance(DiscoveryService.class); + txService = + new TransactionService(conf, zkClientService, discoveryService, + new TestTransactionManagerProvider(conf, zkClientService)); txClient = injector.getInstance(TransactionSystemClient.class); try { LOG.info("Starting transaction service"); diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java index 60754525..4dae9987 100644 --- a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java @@ -22,7 +22,9 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.inject.AbstractModule; import com.google.inject.Guice; +import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.inject.Module; import com.google.inject.Scopes; import com.google.inject.util.Modules; import org.apache.hadoop.conf.Configuration; @@ -35,11 +37,13 @@ import org.apache.tephra.persist.TransactionLog; import org.apache.tephra.persist.TransactionStateStorage; import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DefaultTransactionManagerProvider; import org.apache.tephra.runtime.DiscoveryModules; import org.apache.tephra.runtime.TransactionClientModule; import org.apache.tephra.runtime.TransactionModules; import org.apache.tephra.runtime.ZKModule; import org.apache.tephra.util.Tests; +import org.apache.twill.discovery.DiscoveryService; import org.apache.twill.internal.zookeeper.InMemoryZKServer; import org.apache.twill.zookeeper.ZKClientService; import org.apache.zookeeper.WatchedEvent; @@ -72,7 +76,6 @@ public class ThriftTransactionServerTest { private static InMemoryZKServer zkServer; private static ZKClientService zkClientService; private static TransactionService txService; - private static TransactionStateStorage storage; private static Injector injector; private static final int NUM_CLIENTS = 17; @@ -107,7 +110,6 @@ public void start() throws Exception { .with(new AbstractModule() { @Override protected void configure() { - bind(TransactionStateStorage.class).to(SlowTransactionStorage.class).in(Scopes.SINGLETON); // overriding this to make it non-singleton bind(TransactionSystemClient.class).to(TransactionServiceClient.class); } @@ -119,8 +121,10 @@ protected void configure() { zkClientService.startAndWait(); // start a tx server - txService = injector.getInstance(TransactionService.class); - storage = injector.getInstance(TransactionStateStorage.class); + DiscoveryService discoveryService = injector.getInstance(DiscoveryService.class); + txService = + new TransactionService(conf, zkClientService, discoveryService, + new SlowTransactionManagerProvider(conf, zkClientService)); try { LOG.info("Starting transaction service"); txService.startAndWait(); @@ -140,7 +144,6 @@ protected void configure() { @After public void stop() throws Exception { txService.stopAndWait(); - storage.stopAndWait(); zkClientService.stopAndWait(); zkServer.stopAndWait(); } @@ -281,4 +284,26 @@ public void append(List edits) throws IOException { super.append(edits); } } + + /** + * + */ + private static final class SlowTransactionManagerProvider extends DefaultTransactionManagerProvider { + @Inject + SlowTransactionManagerProvider(Configuration conf, ZKClientService zkClientService) { + super(conf, zkClientService); + } + + @Override + protected Module getTransactionModule() { + return Modules.override(new TransactionModules().getDistributedModules()) + .with(new AbstractModule() { + @Override + protected void configure() { + bind(TransactionStateStorage.class) + .to(SlowTransactionStorage.class).in(Scopes.SINGLETON); + } + }); + } + } } diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java index afdff5c5..4cd19193 100644 --- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java @@ -32,6 +32,8 @@ import org.apache.tephra.TransactionNotInProgressException; import org.apache.tephra.TransactionType; import org.apache.tephra.TxConstants; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.LocalFileTransactionStateStorage; import org.apache.tephra.persist.TransactionSnapshot; import org.apache.tephra.persist.TransactionStateStorage; import org.apache.tephra.persist.TransactionVisibilityState; @@ -209,7 +211,10 @@ public void testDefaultToV3Migration() throws Exception { // shutdown to force a snapshot txManager.stopAndWait(); - TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class); + TransactionStateStorage txStorage = + new LocalFileTransactionStateStorage(conf, + new SnapshotCodecProvider(conf), + new TxMetricsCollector()); txStorage.startAndWait(); // confirm that the in-progress entry is missing a type @@ -245,7 +250,10 @@ public void testDefaultToV3Migration() throws Exception { // save a new snapshot txManager2.stopAndWait(); - TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class); + TransactionStateStorage txStorage2 = + new LocalFileTransactionStateStorage(conf, + new SnapshotCodecProvider(conf2), + new TxMetricsCollector()); txStorage2.startAndWait(); TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot(); @@ -307,7 +315,10 @@ public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressEx txManager.stopAndWait(); // Validate the snapshot on disk - TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class); + TransactionStateStorage txStorage = + new LocalFileTransactionStateStorage(conf, + new SnapshotCodecProvider(conf), + new TxMetricsCollector()); txStorage.startAndWait(); TransactionSnapshot snapshot = txStorage.getLatestSnapshot(); @@ -349,7 +360,10 @@ public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressEx // save a new snapshot txManager.stopAndWait(); - TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class); + TransactionStateStorage txStorage2 = + new LocalFileTransactionStateStorage(conf, + new SnapshotCodecProvider(conf), + new TxMetricsCollector()); txStorage2.startAndWait(); snapshot = txStorage2.getLatestSnapshot(); From 851bb655d41ed0fa60ca789a481a37485e3ab84e Mon Sep 17 00:00:00 2001 From: poorna Date: Fri, 9 Sep 2016 17:24:26 -0700 Subject: [PATCH 2/2] Add HA test --- .../ThriftTransactionServerTest.java | 30 +------ .../ThriftTransactionSystemHATest.java | 90 +++++++++++++++++++ .../ThriftTransactionSystemTest.java | 22 ++--- .../java/org/apache/tephra/util/Tests.java | 26 ++++++ 4 files changed, 130 insertions(+), 38 deletions(-) create mode 100644 tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionSystemHATest.java rename tephra-core/src/test/java/org/apache/tephra/{ => distributed}/ThriftTransactionSystemTest.java (88%) diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java index 4dae9987..0d04a52c 100644 --- a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java @@ -19,7 +19,6 @@ package org.apache.tephra.distributed; import com.google.common.util.concurrent.Service; -import com.google.common.util.concurrent.SettableFuture; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Inject; @@ -28,7 +27,6 @@ import com.google.inject.Scopes; import com.google.inject.util.Modules; import org.apache.hadoop.conf.Configuration; -import org.apache.tephra.ThriftTransactionSystemTest; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; @@ -46,9 +44,6 @@ import org.apache.twill.discovery.DiscoveryService; import org.apache.twill.internal.zookeeper.InMemoryZKServer; import org.apache.twill.zookeeper.ZKClientService; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -179,7 +174,7 @@ public void run() { TimeUnit.SECONDS.sleep(1); // Expire zookeeper session, which causes Thrift server to stop. - expireZkSession(zkClientService); + Tests.expireZkSession(zkClientService); waitForThriftStop(); // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift server again. @@ -203,7 +198,7 @@ public void testThriftServerRestart() throws Exception { txClient.commit(tx); // Expire zookeeper session, which causes Thrift server to stop running. - expireZkSession(zkClientService); + Tests.expireZkSession(zkClientService); waitForThriftStop(); // wait for the thrift rpc server to be in running state again @@ -221,27 +216,6 @@ public Boolean call() throws Exception { txClient.commit(tx); } - private void expireZkSession(ZKClientService zkClientService) throws Exception { - ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get(); - final SettableFuture connectFuture = SettableFuture.create(); - Watcher watcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - if (event.getState() == Event.KeeperState.SyncConnected) { - connectFuture.set(null); - } - } - }; - - // Create another Zookeeper session with the same sessionId so that the original one expires. - ZooKeeper dupZookeeper = - new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher, - zooKeeper.getSessionId(), zooKeeper.getSessionPasswd()); - connectFuture.get(30, TimeUnit.SECONDS); - Assert.assertEquals("Failed to re-create current session", dupZookeeper.getState(), ZooKeeper.States.CONNECTED); - dupZookeeper.close(); - } - private void waitForThriftStop() throws Exception { Tests.waitFor("Failed to wait for txService to stop", new Callable() { @Override diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionSystemHATest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionSystemHATest.java new file mode 100644 index 00000000..cf31f601 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionSystemHATest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.distributed; + +import com.google.common.util.concurrent.Service; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionClientModule; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.util.Tests; +import org.apache.twill.zookeeper.ZKClient; +import org.apache.twill.zookeeper.ZKClientService; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.concurrent.Callable; + +/** + * Test HA behavior of Transaction Service + */ +public class ThriftTransactionSystemHATest extends ThriftTransactionSystemTest { + + @BeforeClass + public static void start() throws Exception { + // Start tx service + ThriftTransactionSystemTest.start(); + + // Expire zk session to make tx service follower + Tests.expireZkSession(zkClientService); + Tests.waitFor("Failed to wait for txService to stop", new Callable() { + @Override + public Boolean call() throws Exception { + return Service.State.RUNNING != txService.thriftRPCServerState(); + } + }); + + // wait for the thrift rpc server to be in running state again + Tests.waitFor("Failed to wait for txService to be running.", new Callable() { + @Override + public Boolean call() throws Exception { + return Service.State.RUNNING == txService.thriftRPCServerState(); + } + }); + + // we need to get a new txClient, because the old one will no longer work after the thrift server restart + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new AbstractModule() { + @Override + protected void configure() { + // Instead of using ZKClientModule that will create new instance of ZKClient, we create instance + // binding to reuse the same ZKClient used for leader election + bind(ZKClient.class).toInstance(zkClientService); + bind(ZKClientService.class).toInstance(zkClientService); + } + }, + new DiscoveryModules().getDistributedModules(), + new TransactionModules().getDistributedModules(), + new TransactionClientModule() + ); + txClient = injector.getInstance(TransactionSystemClient.class); + Tests.waitForTxReady(txClient); + } + + @AfterClass + public static void stop() throws Exception { + ThriftTransactionSystemTest.stop(); + } +} diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionSystemTest.java similarity index 88% rename from tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java rename to tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionSystemTest.java index 9e8d98c0..a3de9ef9 100644 --- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionSystemTest.java @@ -16,12 +16,15 @@ * limitations under the License. */ -package org.apache.tephra; +package org.apache.tephra.distributed; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.hadoop.conf.Configuration; -import org.apache.tephra.distributed.TransactionService; +import org.apache.tephra.TestTransactionManagerProvider; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.TransactionSystemTest; +import org.apache.tephra.TxConstants; import org.apache.tephra.persist.TransactionStateStorage; import org.apache.tephra.runtime.ConfigModule; import org.apache.tephra.runtime.DiscoveryModules; @@ -45,10 +48,10 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest { private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class); private static InMemoryZKServer zkServer; - private static ZKClientService zkClientService; - private static TransactionService txService; - private static TransactionStateStorage storage; - private static TransactionSystemClient txClient; + static ZKClientService zkClientService; + static TransactionService txService; + static TransactionSystemClient txClient; + static Configuration conf; @ClassRule public static TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -58,7 +61,7 @@ public static void start() throws Exception { zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); zkServer.startAndWait(); - Configuration conf = new Configuration(); + conf = new Configuration(); conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); @@ -90,8 +93,6 @@ public static void start() throws Exception { } Tests.waitForTxReady(txClient); - Assert.assertNotNull(txService.getTransactionManager()); - storage = txService.getTransactionManager().getTransactionStateStorage(); } @Before @@ -113,6 +114,7 @@ protected TransactionSystemClient getClient() throws Exception { @Override protected TransactionStateStorage getStateStorage() throws Exception { - return storage; + Assert.assertNotNull(txService.getTransactionManager()); + return txService.getTransactionManager().getTransactionStateStorage(); } } diff --git a/tephra-core/src/test/java/org/apache/tephra/util/Tests.java b/tephra-core/src/test/java/org/apache/tephra/util/Tests.java index 243aa5ca..2edc98b1 100644 --- a/tephra-core/src/test/java/org/apache/tephra/util/Tests.java +++ b/tephra-core/src/test/java/org/apache/tephra/util/Tests.java @@ -19,8 +19,13 @@ package org.apache.tephra.util; +import com.google.common.util.concurrent.SettableFuture; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; +import org.apache.twill.zookeeper.ZKClientService; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; import org.junit.Assert; import java.util.concurrent.Callable; @@ -56,4 +61,25 @@ public Boolean call() throws Exception { } }); } + + public static void expireZkSession(ZKClientService zkClientService) throws Exception { + ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get(); + final SettableFuture connectFuture = SettableFuture.create(); + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + if (event.getState() == Event.KeeperState.SyncConnected) { + connectFuture.set(null); + } + } + }; + + // Create another Zookeeper session with the same sessionId so that the original one expires. + ZooKeeper dupZookeeper = + new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher, + zooKeeper.getSessionId(), zooKeeper.getSessionPasswd()); + connectFuture.get(30, TimeUnit.SECONDS); + Assert.assertEquals("Failed to re-create current session", dupZookeeper.getState(), ZooKeeper.States.CONNECTED); + dupZookeeper.close(); + } }