diff --git a/buffer/pom.xml b/buffer/pom.xml
index b17ff8b..fcbc001 100644
--- a/buffer/pom.xml
+++ b/buffer/pom.xml
@@ -19,7 +19,7 @@
io.atomix.catalyst
catalyst-parent
- 1.1.2-SNAPSHOT
+ 2.0-SNAPSHOT
bundle
diff --git a/common/pom.xml b/common/pom.xml
index ea694e6..4754427 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -4,7 +4,7 @@
io.atomix.catalyst
catalyst-parent
- 1.1.2-SNAPSHOT
+ 2.0-SNAPSHOT
bundle
diff --git a/concurrent/pom.xml b/concurrent/pom.xml
index 2a968d1..e7ea6b1 100644
--- a/concurrent/pom.xml
+++ b/concurrent/pom.xml
@@ -4,7 +4,7 @@
io.atomix.catalyst
catalyst-parent
- 1.1.2-SNAPSHOT
+ 2.0-SNAPSHOT
bundle
diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystScheduledExecutorService.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystScheduledExecutorService.java
new file mode 100644
index 0000000..e274b17
--- /dev/null
+++ b/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystScheduledExecutorService.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package io.atomix.catalyst.concurrent;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static io.atomix.catalyst.util.Assert.notNull;
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * A {@link ScheduledExecutorService} wrapper.
+ *
+ * A {@link ScheduledExecutorService} wrapper that wraps instances of
+ * {@link Runnable} and {@link Callable} with code that properly sets up the
+ * ThreadLocal {@link #CONTEXT_THREAD_LOCAL} before transferring
+ * control to the wrapped code. The wrapper subsequently tears down the
+ * ThreadLocal after the trapped code completes execution.
+ *
+ * Note: All instances of {@link Runnable} and {@link Callable} *MUST* be
+ * scheduled through instances of this class in order for Catalyst code
+ * to work correctly.
+ *
+ * @author Catalyst Project
+ */
+class CatalystScheduledExecutorService implements ScheduledExecutorService {
+ final static ThreadLocal CONTEXT_THREAD_LOCAL = new ThreadLocal<>();
+ private final ScheduledExecutorService delegate;
+ private final ThreadContext threadContext;
+
+ /**
+ * @param delegate the wrapped instance of {@link ScheduledExecutorService}
+ * @param threadContext the instance of {@link ThreadContext} to setup for scheduled tasks
+ */
+ CatalystScheduledExecutorService(ScheduledExecutorService delegate, ThreadContext threadContext) {
+ this.delegate = notNull(delegate, "delegate");
+ this.threadContext = notNull(threadContext, "threadContext");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit)
+ {
+ return delegate.schedule(new CatalystRunnable(command), delay, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit)
+ {
+ return delegate.schedule(new CatalystCallable<>(callable), delay, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+ {
+ return delegate.scheduleAtFixedRate(new CatalystRunnable(command), initialDelay, period, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+ {
+ return delegate.scheduleWithFixedDelay(new CatalystRunnable(command), initialDelay, delay, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ final public void shutdown() {
+ delegate.shutdown();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ final public List shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ final public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ final public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ final public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Future submit(Callable task) {
+ return delegate.submit(new CatalystCallable<>(task));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Future submit(Runnable task, T result) {
+ return delegate.submit(new CatalystRunnable(task), result);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Future> submit(Runnable task) {
+ return delegate.submit(new CatalystRunnable(task));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks) throws InterruptedException
+ {
+ Set> wrapped = tasks.stream().map(CatalystCallable::new).collect(toSet());
+ return delegate.invokeAll(wrapped);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks,
+ long timeout, TimeUnit unit) throws InterruptedException
+ {
+ Set> wrapped = tasks.stream().map(CatalystCallable::new).collect(toSet());
+ return delegate.invokeAll(wrapped, timeout, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public T invokeAny(Collection extends Callable> tasks) throws InterruptedException, ExecutionException {
+ Set> wrapped = tasks.stream().map(CatalystCallable::new).collect(toSet());
+ return delegate.invokeAny(wrapped);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public T invokeAny(Collection extends Callable> tasks,
+ long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
+ TimeoutException
+ {
+ Set> wrapped = tasks.stream().map(CatalystCallable::new).collect(toSet());
+ return delegate.invokeAny(wrapped, timeout, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void execute(Runnable command) {
+ delegate.execute(new CatalystRunnable(command));
+ }
+
+ /**
+ * A Catalyst wrapper for instances of {@link Runnable}.
+ */
+ private class CatalystRunnable implements Runnable {
+ private final Runnable delegate;
+
+ CatalystRunnable(Runnable delegate) {
+ this.delegate = notNull(delegate, "delegate");
+ }
+
+ @Override
+ public void run() {
+ CONTEXT_THREAD_LOCAL.set(threadContext);
+ try {
+ delegate.run();
+
+ } catch (RejectedExecutionException ree) {
+ throw ree;
+
+ } catch (Throwable t) {
+ threadContext.logger().error("An uncaught exception occurred", t);
+ throw t;
+
+ } finally {
+ CONTEXT_THREAD_LOCAL.remove();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CatalystRunnable that = (CatalystRunnable) o;
+ return Objects.equals(delegate, that.delegate);
+ }
+
+ @Override
+ public int hashCode() {
+ return delegate.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
+ }
+
+ /**
+ * A Catalyst wrapper for instances of {@link Callable}.
+ */
+ private class CatalystCallable implements Callable {
+ private final Callable delegate;
+
+ CatalystCallable(Callable delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public V call() throws Exception {
+ CONTEXT_THREAD_LOCAL.set(threadContext);
+ try {
+ return delegate.call();
+
+ } catch (RejectedExecutionException ree) {
+ throw ree;
+
+ } catch (Throwable t) {
+ threadContext.logger().error("An uncaught exception occurred", t);
+ throw t;
+
+ } finally {
+ CONTEXT_THREAD_LOCAL.remove();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CatalystCallable that = (CatalystCallable) o;
+ return Objects.equals(delegate, that.delegate);
+ }
+
+ @Override
+ public int hashCode() {
+ return delegate.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
+ }
+}
diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThread.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThread.java
deleted file mode 100644
index dd5cdbb..0000000
--- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThread.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2015 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.catalyst.concurrent;
-
-import java.lang.ref.WeakReference;
-
-/**
- * Catalyst thread.
- *
- * The Catalyst thread primarily serves to store a {@link ThreadContext} for the current thread.
- * The context is stored in a {@link java.lang.ref.WeakReference} in order to allow the thread to be garbage collected.
- *
- * There is no {@link ThreadContext} associated with the thread when it is first created.
- * It is the responsibility of thread creators to {@link #setContext(ThreadContext) set} the thread context when appropriate.
- *
- * @author Jordan Halterman
- */
-public class CatalystThread extends Thread {
- private WeakReference context;
-
- public CatalystThread(Runnable target, String name) {
- super(target, name);
- }
-
- /**
- * Sets the thread context.
- *
- * @param context The thread context.
- */
- public void setContext(ThreadContext context) {
- this.context = new WeakReference<>(context);
- }
-
- /**
- * Returns the thread context.
- *
- * @return The thread {@link ThreadContext} or {@code null} if no context has been configured.
- */
- public ThreadContext getContext() {
- return context != null ? context.get() : null;
- }
-
-}
diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThreadFactory.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThreadFactory.java
index a7e6118..dc5fa29 100644
--- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThreadFactory.java
+++ b/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThreadFactory.java
@@ -36,8 +36,8 @@ public CatalystThreadFactory(String nameFormat) {
}
@Override
- public Thread newThread(Runnable r) {
- return new CatalystThread(r, String.format(nameFormat, threadNumber.getAndIncrement()));
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable, String.format(nameFormat, threadNumber.getAndIncrement()));
}
}
diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/Runnables.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/Runnables.java
deleted file mode 100644
index c147586..0000000
--- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/Runnables.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package io.atomix.catalyst.concurrent;
-
-import org.slf4j.Logger;
-
-import java.util.concurrent.RejectedExecutionException;
-
-/**
- * Runnable utilities.
- */
-final class Runnables {
- private Runnables() {
- }
-
- /**
- * Returns a wrapped runnable that logs and rethrows uncaught exceptions.
- */
- static Runnable logFailure(final Runnable runnable, Logger logger) {
- return () -> {
- try {
- runnable.run();
- } catch (Throwable t) {
- if (!(t instanceof RejectedExecutionException)) {
- logger.error("An uncaught exception occurred", t);
- }
- throw t;
- }
- };
- }
-}
diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/SingleThreadContext.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/SingleThreadContext.java
index d147dd2..10e2a27 100644
--- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/SingleThreadContext.java
+++ b/concurrent/src/main/java/io/atomix/catalyst/concurrent/SingleThreadContext.java
@@ -26,7 +26,7 @@ public class SingleThreadContext implements ThreadContext {
@Override
public void execute(Runnable command) {
try {
- executor.execute(Runnables.logFailure(command, LOGGER));
+ executor.execute(command);
} catch (RejectedExecutionException e) {
}
}
@@ -62,29 +62,8 @@ public SingleThreadContext(CatalystThreadFactory factory, Serializer serializer)
* @param serializer The context serializer.
*/
public SingleThreadContext(ScheduledExecutorService executor, Serializer serializer) {
- this(getThread(executor), executor, serializer);
- }
-
- public SingleThreadContext(Thread thread, ScheduledExecutorService executor, Serializer serializer) {
- this.executor = executor;
+ this.executor = new CatalystScheduledExecutorService(executor, this);
this.serializer = serializer;
- Assert.state(thread instanceof CatalystThread, "not a Catalyst thread");
- ((CatalystThread) thread).setContext(this);
- }
-
- /**
- * Gets the thread from a single threaded executor service.
- */
- protected static CatalystThread getThread(ExecutorService executor) {
- final AtomicReference thread = new AtomicReference<>();
- try {
- executor.submit(() -> {
- thread.set((CatalystThread) Thread.currentThread());
- }).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new IllegalStateException("failed to initialize thread state", e);
- }
- return thread.get();
}
@Override
@@ -119,13 +98,13 @@ public Executor executor() {
@Override
public Scheduled schedule(Duration delay, Runnable runnable) {
- ScheduledFuture> future = executor.schedule(Runnables.logFailure(runnable, LOGGER), delay.toMillis(), TimeUnit.MILLISECONDS);
+ ScheduledFuture> future = executor.schedule(runnable, delay.toMillis(), TimeUnit.MILLISECONDS);
return () -> future.cancel(false);
}
@Override
public Scheduled schedule(Duration delay, Duration interval, Runnable runnable) {
- ScheduledFuture> future = executor.scheduleAtFixedRate(Runnables.logFailure(runnable, LOGGER), delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
+ ScheduledFuture> future = executor.scheduleAtFixedRate(runnable, delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
return () -> future.cancel(false);
}
diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadContext.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadContext.java
index 193b990..ac82902 100644
--- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadContext.java
+++ b/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadContext.java
@@ -15,31 +15,30 @@
*/
package io.atomix.catalyst.concurrent;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.util.Assert;
-import org.slf4j.Logger;
-
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
+import io.atomix.catalyst.serializer.Serializer;
+import org.slf4j.Logger;
+import static io.atomix.catalyst.concurrent.CatalystScheduledExecutorService.CONTEXT_THREAD_LOCAL;
+import static io.atomix.catalyst.util.Assert.state;
+
/**
* Thread context.
*
- * The thread context is used by Catalyst to determine the correct thread on which to execute asynchronous callbacks.
- * All threads created within Catalyst must be instances of {@link CatalystThread}. Once
- * a thread has been created, the context is stored in the thread object via
- * {@link CatalystThread#setContext(ThreadContext)}. This means there is a one-to-one relationship
- * between a context and a thread. That is, a context is representative of a thread and provides an interface for firing
- * events on that thread.
- *
- * In addition to serving as an {@link java.util.concurrent.Executor}, the context also provides thread-local storage
- * for {@link Serializer} serializer instances. All serialization that takes place within a
- * {@link CatalystThread} should use the context {@link #serializer()}.
+ * The thread context is used by Catalyst to determine the correct thread on
+ * which to execute asynchronous callbacks. All tasks must be
+ * scheduled/submitted to instances of {@link CatalystScheduledExecutorService}.
+ * This means there is a one-to-one relationship between a context and a
+ * thread. That is, a context is representative of a thread and provides an
+ * interface for firing events on that thread.
*
- * Components of the framework that provide custom threads should use {@link CatalystThreadFactory}
- * to allocate new threads and provide a custom {@link ThreadContext} implementation.
+ * In addition to serving as an {@link java.util.concurrent.Executor}, the
+ * context also provides thread-local storage for {@link Serializer} serializer
+ * instances. All serialization that takes place within a thread should use the
+ * context {@link #serializer()}.
*
* @author Jordan Halterman
*/
@@ -51,35 +50,18 @@ public interface ThreadContext extends AutoCloseable {
* @return The current thread context or {@code null} if no context exists.
*/
static ThreadContext currentContext() {
- Thread thread = Thread.currentThread();
- return thread instanceof CatalystThread ? ((CatalystThread) thread).getContext() : null;
+ return CONTEXT_THREAD_LOCAL.get();
}
-
+
/**
* @throws IllegalStateException if the current thread is not a catalyst thread
*/
static ThreadContext currentContextOrThrow() {
ThreadContext context = currentContext();
- Assert.state(context != null, "not on a Catalyst thread");
+ state(context != null, "Not on a Catalyst thread");
return context;
}
- /**
- * Returns a boolean indicating whether the current thread is in this context.
- *
- * @return Indicates whether the current thread is in this context.
- */
- default boolean isCurrentContext() {
- return currentContext() == this;
- }
-
- /**
- * Checks that the current thread is the correct context thread.
- */
- default void checkThread() {
- Assert.state(currentContext() == this, "not on a Catalyst thread");
- }
-
/**
* Returns the context logger.
*
@@ -141,7 +123,7 @@ default CompletableFuture execute(Runnable callback) {
* Executes a callback on the context.
*
* @param callback The callback to execute.
- * @param The callback result type.
+ * @param The callback result type.
* @return A completable future to be completed with the callback result.
*/
default CompletableFuture execute(Supplier callback) {
@@ -160,7 +142,7 @@ default CompletableFuture execute(Supplier callback) {
* Schedules a runnable on the context.
*
* @param callback The callback to schedule.
- * @param delay The delay at which to schedule the runnable.
+ * @param delay The delay at which to schedule the runnable.
*/
Scheduled schedule(Duration delay, Runnable callback);
diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadPoolContext.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadPoolContext.java
index 47747f7..3835359 100644
--- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadPoolContext.java
+++ b/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadPoolContext.java
@@ -21,7 +21,6 @@
import org.slf4j.LoggerFactory;
import java.time.Duration;
-import java.util.LinkedList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -38,57 +37,19 @@
*/
public class ThreadPoolContext implements ThreadContext {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolContext.class);
- private final ScheduledExecutorService parent;
+ private final ScheduledExecutorService executor;
private final Serializer serializer;
- private final Runnable runner;
- private final LinkedList tasks = new LinkedList<>();
private volatile boolean blocked;
- private boolean running;
- private final Executor executor = new Executor() {
- @Override
- public void execute(Runnable command) {
- synchronized (tasks) {
- tasks.add(command);
- if (!running) {
- running = true;
- parent.execute(runner);
- }
- }
- }
- };
/**
* Creates a new thread pool context.
*
- * @param parent The thread pool on which to execute events.
+ * @param executor The thread pool on which to execute events.
* @param serializer The context serializer.
*/
- public ThreadPoolContext(ScheduledExecutorService parent, Serializer serializer) {
- this.parent = Assert.notNull(parent, "parent");
+ public ThreadPoolContext(ScheduledExecutorService executor, Serializer serializer) {
+ this.executor = new CatalystScheduledExecutorService(Assert.notNull(executor, "executor"), this);
this.serializer = Assert.notNull(serializer, "serializer");
-
- // This code was shamelessly stolededed from Vert.x:
- // https://github.com/eclipse/vert.x/blob/master/src/main/java/io/vertx/core/impl/OrderedExecutorFactory.java
- runner = () -> {
- ((CatalystThread) Thread.currentThread()).setContext(this);
- for (;;) {
- final Runnable task;
- synchronized (tasks) {
- task = tasks.poll();
- if (task == null) {
- running = false;
- return;
- }
- }
-
- try {
- task.run();
- } catch (Throwable t) {
- LOGGER.error("An uncaught exception occurred", t);
- throw t;
- }
- }
- };
}
@Override
@@ -123,13 +84,13 @@ public Executor executor() {
@Override
public Scheduled schedule(Duration delay, Runnable runnable) {
- ScheduledFuture> future = parent.schedule(() -> executor.execute(Runnables.logFailure(runnable, LOGGER)), delay.toMillis(), TimeUnit.MILLISECONDS);
+ ScheduledFuture> future = executor.schedule(() -> executor.execute(runnable), delay.toMillis(), TimeUnit.MILLISECONDS);
return () -> future.cancel(false);
}
@Override
public Scheduled schedule(Duration delay, Duration interval, Runnable runnable) {
- ScheduledFuture> future = parent.scheduleAtFixedRate(() -> executor.execute(Runnables.logFailure(runnable, LOGGER)), delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
+ ScheduledFuture> future = executor.scheduleAtFixedRate(() -> executor.execute(runnable), delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
return () -> future.cancel(false);
}
diff --git a/jackson/pom.xml b/jackson/pom.xml
index 15d2742..8824ce1 100644
--- a/jackson/pom.xml
+++ b/jackson/pom.xml
@@ -19,7 +19,7 @@
io.atomix.catalyst
catalyst-parent
- 1.1.2-SNAPSHOT
+ 2.0-SNAPSHOT
bundle
diff --git a/kryo/pom.xml b/kryo/pom.xml
index aa97aee..a841e70 100644
--- a/kryo/pom.xml
+++ b/kryo/pom.xml
@@ -19,7 +19,7 @@
io.atomix.catalyst
catalyst-parent
- 1.1.2-SNAPSHOT
+ 2.0-SNAPSHOT
bundle
diff --git a/local/pom.xml b/local/pom.xml
index 6a893f3..1829d35 100644
--- a/local/pom.xml
+++ b/local/pom.xml
@@ -19,7 +19,7 @@
io.atomix.catalyst
catalyst-parent
- 1.1.2-SNAPSHOT
+ 2.0-SNAPSHOT
bundle
diff --git a/netty/pom.xml b/netty/pom.xml
index 80990f3..e82c6a2 100644
--- a/netty/pom.xml
+++ b/netty/pom.xml
@@ -19,7 +19,7 @@
io.atomix.catalyst
catalyst-parent
- 1.1.2-SNAPSHOT
+ 2.0-SNAPSHOT
bundle
diff --git a/netty/src/main/java/io/atomix/catalyst/transport/netty/NettyHandler.java b/netty/src/main/java/io/atomix/catalyst/transport/netty/NettyHandler.java
index c65a8c5..fba3c50 100644
--- a/netty/src/main/java/io/atomix/catalyst/transport/netty/NettyHandler.java
+++ b/netty/src/main/java/io/atomix/catalyst/transport/netty/NettyHandler.java
@@ -85,7 +85,7 @@ private ThreadContext getOrCreateContext(Channel channel) {
if (context != null) {
return context;
}
- return new SingleThreadContext(Thread.currentThread(), channel.eventLoop(), this.context.serializer().clone());
+ return new SingleThreadContext(channel.eventLoop(), this.context.serializer().clone());
}
@Override
diff --git a/pom.xml b/pom.xml
index e825222..3ae6686 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,7 +9,7 @@
io.atomix.catalyst
catalyst-parent
- 1.1.2-SNAPSHOT
+ 2.0-SNAPSHOT
pom
Catalyst Parent Pom
Fast I/O and binary serialization framework.
diff --git a/serializer/pom.xml b/serializer/pom.xml
index a9c4862..59f6d1b 100644
--- a/serializer/pom.xml
+++ b/serializer/pom.xml
@@ -19,7 +19,7 @@
io.atomix.catalyst
catalyst-parent
- 1.1.2-SNAPSHOT
+ 2.0-SNAPSHOT
bundle
diff --git a/transport/pom.xml b/transport/pom.xml
index acf9bbf..a1cf513 100644
--- a/transport/pom.xml
+++ b/transport/pom.xml
@@ -19,7 +19,7 @@
io.atomix.catalyst
catalyst-parent
- 1.1.2-SNAPSHOT
+ 2.0-SNAPSHOT
bundle
diff --git a/transport/src/main/java/io/atomix/catalyst/transport/Connection.java b/transport/src/main/java/io/atomix/catalyst/transport/Connection.java
index 34ce449..b0b4e25 100644
--- a/transport/src/main/java/io/atomix/catalyst/transport/Connection.java
+++ b/transport/src/main/java/io/atomix/catalyst/transport/Connection.java
@@ -15,7 +15,6 @@
*/
package io.atomix.catalyst.transport;
-import io.atomix.catalyst.concurrent.CatalystThread;
import io.atomix.catalyst.concurrent.Listener;
import io.atomix.catalyst.serializer.CatalystSerializable;
import io.atomix.catalyst.serializer.Serializer;
@@ -52,7 +51,6 @@ public interface Connection {
*
* {@link Connection} implementations must guarantee that all reply
* {@link java.util.concurrent.CompletableFuture futures} will be completed in the same
- * {@link CatalystThread Catalyst thread}.
*
* @param message The message to send.
* @param The message type.