diff --git a/buffer/pom.xml b/buffer/pom.xml index b17ff8b..fcbc001 100644 --- a/buffer/pom.xml +++ b/buffer/pom.xml @@ -19,7 +19,7 @@ io.atomix.catalyst catalyst-parent - 1.1.2-SNAPSHOT + 2.0-SNAPSHOT bundle diff --git a/common/pom.xml b/common/pom.xml index ea694e6..4754427 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -4,7 +4,7 @@ io.atomix.catalyst catalyst-parent - 1.1.2-SNAPSHOT + 2.0-SNAPSHOT bundle diff --git a/concurrent/pom.xml b/concurrent/pom.xml index 2a968d1..e7ea6b1 100644 --- a/concurrent/pom.xml +++ b/concurrent/pom.xml @@ -4,7 +4,7 @@ io.atomix.catalyst catalyst-parent - 1.1.2-SNAPSHOT + 2.0-SNAPSHOT bundle diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystScheduledExecutorService.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystScheduledExecutorService.java new file mode 100644 index 0000000..e274b17 --- /dev/null +++ b/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystScheduledExecutorService.java @@ -0,0 +1,306 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ +package io.atomix.catalyst.concurrent; + +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static io.atomix.catalyst.util.Assert.notNull; +import static java.util.stream.Collectors.toSet; + +/** + * A {@link ScheduledExecutorService} wrapper. + *

+ * A {@link ScheduledExecutorService} wrapper that wraps instances of + * {@link Runnable} and {@link Callable} with code that properly sets up the + * ThreadLocal {@link #CONTEXT_THREAD_LOCAL} before transferring + * control to the wrapped code. The wrapper subsequently tears down the + * ThreadLocal after the trapped code completes execution. + *

+ * Note: All instances of {@link Runnable} and {@link Callable} *MUST* be + * scheduled through instances of this class in order for Catalyst code + * to work correctly. + * + * @author Catalyst Project + */ +class CatalystScheduledExecutorService implements ScheduledExecutorService { + final static ThreadLocal CONTEXT_THREAD_LOCAL = new ThreadLocal<>(); + private final ScheduledExecutorService delegate; + private final ThreadContext threadContext; + + /** + * @param delegate the wrapped instance of {@link ScheduledExecutorService} + * @param threadContext the instance of {@link ThreadContext} to setup for scheduled tasks + */ + CatalystScheduledExecutorService(ScheduledExecutorService delegate, ThreadContext threadContext) { + this.delegate = notNull(delegate, "delegate"); + this.threadContext = notNull(threadContext, "threadContext"); + } + + /** + * {@inheritDoc} + */ + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) + { + return delegate.schedule(new CatalystRunnable(command), delay, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) + { + return delegate.schedule(new CatalystCallable<>(callable), delay, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) + { + return delegate.scheduleAtFixedRate(new CatalystRunnable(command), initialDelay, period, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) + { + return delegate.scheduleWithFixedDelay(new CatalystRunnable(command), initialDelay, delay, unit); + } + + /** + * {@inheritDoc} + */ + @Override + final public void shutdown() { + delegate.shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + final public List shutdownNow() { + return delegate.shutdownNow(); + } + + /** + * {@inheritDoc} + */ + @Override + final public boolean isShutdown() { + return delegate.isShutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + final public boolean isTerminated() { + return delegate.isTerminated(); + } + + /** + * {@inheritDoc} + */ + @Override + final public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public Future submit(Callable task) { + return delegate.submit(new CatalystCallable<>(task)); + } + + /** + * {@inheritDoc} + */ + @Override + public Future submit(Runnable task, T result) { + return delegate.submit(new CatalystRunnable(task), result); + } + + /** + * {@inheritDoc} + */ + @Override + public Future submit(Runnable task) { + return delegate.submit(new CatalystRunnable(task)); + } + + /** + * {@inheritDoc} + */ + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException + { + Set> wrapped = tasks.stream().map(CatalystCallable::new).collect(toSet()); + return delegate.invokeAll(wrapped); + } + + /** + * {@inheritDoc} + */ + @Override + public List> invokeAll(Collection> tasks, + long timeout, TimeUnit unit) throws InterruptedException + { + Set> wrapped = tasks.stream().map(CatalystCallable::new).collect(toSet()); + return delegate.invokeAll(wrapped, timeout, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + Set> wrapped = tasks.stream().map(CatalystCallable::new).collect(toSet()); + return delegate.invokeAny(wrapped); + } + + /** + * {@inheritDoc} + */ + @Override + public T invokeAny(Collection> tasks, + long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, + TimeoutException + { + Set> wrapped = tasks.stream().map(CatalystCallable::new).collect(toSet()); + return delegate.invokeAny(wrapped, timeout, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public void execute(Runnable command) { + delegate.execute(new CatalystRunnable(command)); + } + + /** + * A Catalyst wrapper for instances of {@link Runnable}. + */ + private class CatalystRunnable implements Runnable { + private final Runnable delegate; + + CatalystRunnable(Runnable delegate) { + this.delegate = notNull(delegate, "delegate"); + } + + @Override + public void run() { + CONTEXT_THREAD_LOCAL.set(threadContext); + try { + delegate.run(); + + } catch (RejectedExecutionException ree) { + throw ree; + + } catch (Throwable t) { + threadContext.logger().error("An uncaught exception occurred", t); + throw t; + + } finally { + CONTEXT_THREAD_LOCAL.remove(); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CatalystRunnable that = (CatalystRunnable) o; + return Objects.equals(delegate, that.delegate); + } + + @Override + public int hashCode() { + return delegate.hashCode(); + } + + @Override + public String toString() { + return delegate.toString(); + } + } + + /** + * A Catalyst wrapper for instances of {@link Callable}. + */ + private class CatalystCallable implements Callable { + private final Callable delegate; + + CatalystCallable(Callable delegate) { + this.delegate = delegate; + } + + @Override + public V call() throws Exception { + CONTEXT_THREAD_LOCAL.set(threadContext); + try { + return delegate.call(); + + } catch (RejectedExecutionException ree) { + throw ree; + + } catch (Throwable t) { + threadContext.logger().error("An uncaught exception occurred", t); + throw t; + + } finally { + CONTEXT_THREAD_LOCAL.remove(); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CatalystCallable that = (CatalystCallable) o; + return Objects.equals(delegate, that.delegate); + } + + @Override + public int hashCode() { + return delegate.hashCode(); + } + + @Override + public String toString() { + return delegate.toString(); + } + } +} diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThread.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThread.java deleted file mode 100644 index dd5cdbb..0000000 --- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThread.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.atomix.catalyst.concurrent; - -import java.lang.ref.WeakReference; - -/** - * Catalyst thread. - *

- * The Catalyst thread primarily serves to store a {@link ThreadContext} for the current thread. - * The context is stored in a {@link java.lang.ref.WeakReference} in order to allow the thread to be garbage collected. - *

- * There is no {@link ThreadContext} associated with the thread when it is first created. - * It is the responsibility of thread creators to {@link #setContext(ThreadContext) set} the thread context when appropriate. - * - * @author Jordan Halterman - */ -public class CatalystThread extends Thread { - private WeakReference context; - - public CatalystThread(Runnable target, String name) { - super(target, name); - } - - /** - * Sets the thread context. - * - * @param context The thread context. - */ - public void setContext(ThreadContext context) { - this.context = new WeakReference<>(context); - } - - /** - * Returns the thread context. - * - * @return The thread {@link ThreadContext} or {@code null} if no context has been configured. - */ - public ThreadContext getContext() { - return context != null ? context.get() : null; - } - -} diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThreadFactory.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThreadFactory.java index a7e6118..dc5fa29 100644 --- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThreadFactory.java +++ b/concurrent/src/main/java/io/atomix/catalyst/concurrent/CatalystThreadFactory.java @@ -36,8 +36,8 @@ public CatalystThreadFactory(String nameFormat) { } @Override - public Thread newThread(Runnable r) { - return new CatalystThread(r, String.format(nameFormat, threadNumber.getAndIncrement())); + public Thread newThread(Runnable runnable) { + return new Thread(runnable, String.format(nameFormat, threadNumber.getAndIncrement())); } } diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/Runnables.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/Runnables.java deleted file mode 100644 index c147586..0000000 --- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/Runnables.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.atomix.catalyst.concurrent; - -import org.slf4j.Logger; - -import java.util.concurrent.RejectedExecutionException; - -/** - * Runnable utilities. - */ -final class Runnables { - private Runnables() { - } - - /** - * Returns a wrapped runnable that logs and rethrows uncaught exceptions. - */ - static Runnable logFailure(final Runnable runnable, Logger logger) { - return () -> { - try { - runnable.run(); - } catch (Throwable t) { - if (!(t instanceof RejectedExecutionException)) { - logger.error("An uncaught exception occurred", t); - } - throw t; - } - }; - } -} diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/SingleThreadContext.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/SingleThreadContext.java index d147dd2..10e2a27 100644 --- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/SingleThreadContext.java +++ b/concurrent/src/main/java/io/atomix/catalyst/concurrent/SingleThreadContext.java @@ -26,7 +26,7 @@ public class SingleThreadContext implements ThreadContext { @Override public void execute(Runnable command) { try { - executor.execute(Runnables.logFailure(command, LOGGER)); + executor.execute(command); } catch (RejectedExecutionException e) { } } @@ -62,29 +62,8 @@ public SingleThreadContext(CatalystThreadFactory factory, Serializer serializer) * @param serializer The context serializer. */ public SingleThreadContext(ScheduledExecutorService executor, Serializer serializer) { - this(getThread(executor), executor, serializer); - } - - public SingleThreadContext(Thread thread, ScheduledExecutorService executor, Serializer serializer) { - this.executor = executor; + this.executor = new CatalystScheduledExecutorService(executor, this); this.serializer = serializer; - Assert.state(thread instanceof CatalystThread, "not a Catalyst thread"); - ((CatalystThread) thread).setContext(this); - } - - /** - * Gets the thread from a single threaded executor service. - */ - protected static CatalystThread getThread(ExecutorService executor) { - final AtomicReference thread = new AtomicReference<>(); - try { - executor.submit(() -> { - thread.set((CatalystThread) Thread.currentThread()); - }).get(); - } catch (InterruptedException | ExecutionException e) { - throw new IllegalStateException("failed to initialize thread state", e); - } - return thread.get(); } @Override @@ -119,13 +98,13 @@ public Executor executor() { @Override public Scheduled schedule(Duration delay, Runnable runnable) { - ScheduledFuture future = executor.schedule(Runnables.logFailure(runnable, LOGGER), delay.toMillis(), TimeUnit.MILLISECONDS); + ScheduledFuture future = executor.schedule(runnable, delay.toMillis(), TimeUnit.MILLISECONDS); return () -> future.cancel(false); } @Override public Scheduled schedule(Duration delay, Duration interval, Runnable runnable) { - ScheduledFuture future = executor.scheduleAtFixedRate(Runnables.logFailure(runnable, LOGGER), delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS); + ScheduledFuture future = executor.scheduleAtFixedRate(runnable, delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS); return () -> future.cancel(false); } diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadContext.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadContext.java index 193b990..ac82902 100644 --- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadContext.java +++ b/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadContext.java @@ -15,31 +15,30 @@ */ package io.atomix.catalyst.concurrent; -import io.atomix.catalyst.serializer.Serializer; -import io.atomix.catalyst.util.Assert; -import org.slf4j.Logger; - import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.Supplier; +import io.atomix.catalyst.serializer.Serializer; +import org.slf4j.Logger; +import static io.atomix.catalyst.concurrent.CatalystScheduledExecutorService.CONTEXT_THREAD_LOCAL; +import static io.atomix.catalyst.util.Assert.state; + /** * Thread context. *

- * The thread context is used by Catalyst to determine the correct thread on which to execute asynchronous callbacks. - * All threads created within Catalyst must be instances of {@link CatalystThread}. Once - * a thread has been created, the context is stored in the thread object via - * {@link CatalystThread#setContext(ThreadContext)}. This means there is a one-to-one relationship - * between a context and a thread. That is, a context is representative of a thread and provides an interface for firing - * events on that thread. - *

- * In addition to serving as an {@link java.util.concurrent.Executor}, the context also provides thread-local storage - * for {@link Serializer} serializer instances. All serialization that takes place within a - * {@link CatalystThread} should use the context {@link #serializer()}. + * The thread context is used by Catalyst to determine the correct thread on + * which to execute asynchronous callbacks. All tasks must be + * scheduled/submitted to instances of {@link CatalystScheduledExecutorService}. + * This means there is a one-to-one relationship between a context and a + * thread. That is, a context is representative of a thread and provides an + * interface for firing events on that thread. *

- * Components of the framework that provide custom threads should use {@link CatalystThreadFactory} - * to allocate new threads and provide a custom {@link ThreadContext} implementation. + * In addition to serving as an {@link java.util.concurrent.Executor}, the + * context also provides thread-local storage for {@link Serializer} serializer + * instances. All serialization that takes place within a thread should use the + * context {@link #serializer()}. * * @author Jordan Halterman */ @@ -51,35 +50,18 @@ public interface ThreadContext extends AutoCloseable { * @return The current thread context or {@code null} if no context exists. */ static ThreadContext currentContext() { - Thread thread = Thread.currentThread(); - return thread instanceof CatalystThread ? ((CatalystThread) thread).getContext() : null; + return CONTEXT_THREAD_LOCAL.get(); } - + /** * @throws IllegalStateException if the current thread is not a catalyst thread */ static ThreadContext currentContextOrThrow() { ThreadContext context = currentContext(); - Assert.state(context != null, "not on a Catalyst thread"); + state(context != null, "Not on a Catalyst thread"); return context; } - /** - * Returns a boolean indicating whether the current thread is in this context. - * - * @return Indicates whether the current thread is in this context. - */ - default boolean isCurrentContext() { - return currentContext() == this; - } - - /** - * Checks that the current thread is the correct context thread. - */ - default void checkThread() { - Assert.state(currentContext() == this, "not on a Catalyst thread"); - } - /** * Returns the context logger. * @@ -141,7 +123,7 @@ default CompletableFuture execute(Runnable callback) { * Executes a callback on the context. * * @param callback The callback to execute. - * @param The callback result type. + * @param The callback result type. * @return A completable future to be completed with the callback result. */ default CompletableFuture execute(Supplier callback) { @@ -160,7 +142,7 @@ default CompletableFuture execute(Supplier callback) { * Schedules a runnable on the context. * * @param callback The callback to schedule. - * @param delay The delay at which to schedule the runnable. + * @param delay The delay at which to schedule the runnable. */ Scheduled schedule(Duration delay, Runnable callback); diff --git a/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadPoolContext.java b/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadPoolContext.java index 47747f7..3835359 100644 --- a/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadPoolContext.java +++ b/concurrent/src/main/java/io/atomix/catalyst/concurrent/ThreadPoolContext.java @@ -21,7 +21,6 @@ import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.LinkedList; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -38,57 +37,19 @@ */ public class ThreadPoolContext implements ThreadContext { private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolContext.class); - private final ScheduledExecutorService parent; + private final ScheduledExecutorService executor; private final Serializer serializer; - private final Runnable runner; - private final LinkedList tasks = new LinkedList<>(); private volatile boolean blocked; - private boolean running; - private final Executor executor = new Executor() { - @Override - public void execute(Runnable command) { - synchronized (tasks) { - tasks.add(command); - if (!running) { - running = true; - parent.execute(runner); - } - } - } - }; /** * Creates a new thread pool context. * - * @param parent The thread pool on which to execute events. + * @param executor The thread pool on which to execute events. * @param serializer The context serializer. */ - public ThreadPoolContext(ScheduledExecutorService parent, Serializer serializer) { - this.parent = Assert.notNull(parent, "parent"); + public ThreadPoolContext(ScheduledExecutorService executor, Serializer serializer) { + this.executor = new CatalystScheduledExecutorService(Assert.notNull(executor, "executor"), this); this.serializer = Assert.notNull(serializer, "serializer"); - - // This code was shamelessly stolededed from Vert.x: - // https://github.com/eclipse/vert.x/blob/master/src/main/java/io/vertx/core/impl/OrderedExecutorFactory.java - runner = () -> { - ((CatalystThread) Thread.currentThread()).setContext(this); - for (;;) { - final Runnable task; - synchronized (tasks) { - task = tasks.poll(); - if (task == null) { - running = false; - return; - } - } - - try { - task.run(); - } catch (Throwable t) { - LOGGER.error("An uncaught exception occurred", t); - throw t; - } - } - }; } @Override @@ -123,13 +84,13 @@ public Executor executor() { @Override public Scheduled schedule(Duration delay, Runnable runnable) { - ScheduledFuture future = parent.schedule(() -> executor.execute(Runnables.logFailure(runnable, LOGGER)), delay.toMillis(), TimeUnit.MILLISECONDS); + ScheduledFuture future = executor.schedule(() -> executor.execute(runnable), delay.toMillis(), TimeUnit.MILLISECONDS); return () -> future.cancel(false); } @Override public Scheduled schedule(Duration delay, Duration interval, Runnable runnable) { - ScheduledFuture future = parent.scheduleAtFixedRate(() -> executor.execute(Runnables.logFailure(runnable, LOGGER)), delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS); + ScheduledFuture future = executor.scheduleAtFixedRate(() -> executor.execute(runnable), delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS); return () -> future.cancel(false); } diff --git a/jackson/pom.xml b/jackson/pom.xml index 15d2742..8824ce1 100644 --- a/jackson/pom.xml +++ b/jackson/pom.xml @@ -19,7 +19,7 @@ io.atomix.catalyst catalyst-parent - 1.1.2-SNAPSHOT + 2.0-SNAPSHOT bundle diff --git a/kryo/pom.xml b/kryo/pom.xml index aa97aee..a841e70 100644 --- a/kryo/pom.xml +++ b/kryo/pom.xml @@ -19,7 +19,7 @@ io.atomix.catalyst catalyst-parent - 1.1.2-SNAPSHOT + 2.0-SNAPSHOT bundle diff --git a/local/pom.xml b/local/pom.xml index 6a893f3..1829d35 100644 --- a/local/pom.xml +++ b/local/pom.xml @@ -19,7 +19,7 @@ io.atomix.catalyst catalyst-parent - 1.1.2-SNAPSHOT + 2.0-SNAPSHOT bundle diff --git a/netty/pom.xml b/netty/pom.xml index 80990f3..e82c6a2 100644 --- a/netty/pom.xml +++ b/netty/pom.xml @@ -19,7 +19,7 @@ io.atomix.catalyst catalyst-parent - 1.1.2-SNAPSHOT + 2.0-SNAPSHOT bundle diff --git a/netty/src/main/java/io/atomix/catalyst/transport/netty/NettyHandler.java b/netty/src/main/java/io/atomix/catalyst/transport/netty/NettyHandler.java index c65a8c5..fba3c50 100644 --- a/netty/src/main/java/io/atomix/catalyst/transport/netty/NettyHandler.java +++ b/netty/src/main/java/io/atomix/catalyst/transport/netty/NettyHandler.java @@ -85,7 +85,7 @@ private ThreadContext getOrCreateContext(Channel channel) { if (context != null) { return context; } - return new SingleThreadContext(Thread.currentThread(), channel.eventLoop(), this.context.serializer().clone()); + return new SingleThreadContext(channel.eventLoop(), this.context.serializer().clone()); } @Override diff --git a/pom.xml b/pom.xml index e825222..3ae6686 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ io.atomix.catalyst catalyst-parent - 1.1.2-SNAPSHOT + 2.0-SNAPSHOT pom Catalyst Parent Pom Fast I/O and binary serialization framework. diff --git a/serializer/pom.xml b/serializer/pom.xml index a9c4862..59f6d1b 100644 --- a/serializer/pom.xml +++ b/serializer/pom.xml @@ -19,7 +19,7 @@ io.atomix.catalyst catalyst-parent - 1.1.2-SNAPSHOT + 2.0-SNAPSHOT bundle diff --git a/transport/pom.xml b/transport/pom.xml index acf9bbf..a1cf513 100644 --- a/transport/pom.xml +++ b/transport/pom.xml @@ -19,7 +19,7 @@ io.atomix.catalyst catalyst-parent - 1.1.2-SNAPSHOT + 2.0-SNAPSHOT bundle diff --git a/transport/src/main/java/io/atomix/catalyst/transport/Connection.java b/transport/src/main/java/io/atomix/catalyst/transport/Connection.java index 34ce449..b0b4e25 100644 --- a/transport/src/main/java/io/atomix/catalyst/transport/Connection.java +++ b/transport/src/main/java/io/atomix/catalyst/transport/Connection.java @@ -15,7 +15,6 @@ */ package io.atomix.catalyst.transport; -import io.atomix.catalyst.concurrent.CatalystThread; import io.atomix.catalyst.concurrent.Listener; import io.atomix.catalyst.serializer.CatalystSerializable; import io.atomix.catalyst.serializer.Serializer; @@ -52,7 +51,6 @@ public interface Connection { *

* {@link Connection} implementations must guarantee that all reply * {@link java.util.concurrent.CompletableFuture futures} will be completed in the same - * {@link CatalystThread Catalyst thread}. * * @param message The message to send. * @param The message type.