future = new CompletableFuture<>();
+ consume(aTry -> {
+ if (aTry.isSuccess()) {
+ future.complete(aTry.getValue());
+ } else {
+ future.completeExceptionally(aTry.getError());
+ }
+ });
+
+ return future;
+ }
+
/**
* Consumes the value(or error) of the future into a consumer.
* if the future is lazy the value will be reproduced on each consumption.
@@ -275,6 +279,28 @@ default T get(final long timeout, final TimeUnit unit) throws InterruptedExcepti
throw new TimeoutException("Timeout occurred while waiting for a value");
}
+ /**
+ * Blocks until a value is available for consumption and then return it.
+ * checked exceptions are wrapped in an UncheckedExecutionException and thrown.
+ *
+ * DO NOT use in non-blocking environment.
+ *
+ * @return the future value if successful
+ * @throws UncheckedExecutionException if the thread has been interrupted or the future threw a checked exception
+ */
+ default T getUnchecked() {
+ try {
+ return get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedExecutionException(e);
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause());
+ throw new UncheckedExecutionException(e.getCause());
+ }
+ }
+
+
/**
* Turns the current future into an eager one.
*
diff --git a/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/eager/EagerComposableFuture.java b/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/eager/EagerComposableFuture.java
index 465d9c0d..dd916198 100644
--- a/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/eager/EagerComposableFuture.java
+++ b/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/eager/EagerComposableFuture.java
@@ -1,29 +1,12 @@
package com.outbrain.ob1k.concurrent.eager;
-import com.outbrain.ob1k.concurrent.CancellationToken;
-import com.outbrain.ob1k.concurrent.ComposableFuture;
-import com.outbrain.ob1k.concurrent.ComposableFutures;
-import com.outbrain.ob1k.concurrent.Consumer;
-import com.outbrain.ob1k.concurrent.Producer;
-import com.outbrain.ob1k.concurrent.Scheduler;
-import com.outbrain.ob1k.concurrent.Try;
-import com.outbrain.ob1k.concurrent.UncheckedExecutionException;
-import com.outbrain.ob1k.concurrent.handlers.ErrorHandler;
-import com.outbrain.ob1k.concurrent.handlers.FutureAction;
-import com.outbrain.ob1k.concurrent.handlers.FutureErrorHandler;
-import com.outbrain.ob1k.concurrent.handlers.FutureResultHandler;
-import com.outbrain.ob1k.concurrent.handlers.FutureSuccessHandler;
-import com.outbrain.ob1k.concurrent.handlers.ResultHandler;
-import com.outbrain.ob1k.concurrent.handlers.SuccessHandler;
+import com.outbrain.ob1k.concurrent.*;
+import com.outbrain.ob1k.concurrent.handlers.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -317,6 +300,20 @@ public ComposableFuture andThen(final Consumer super T> resultConsumer) {
return future;
}
+ public static ComposableFuture fromCompletableFuture(final CompletableFuture source) {
+ final EagerComposableFuture future = new EagerComposableFuture<>();
+ source.whenComplete((value, throwable) -> {
+ if (throwable == null) {
+ future.set(value);
+ } else {
+ future.setException(throwable);
+ }
+ });
+
+ return future;
+ }
+
+
@Override
public void consume(final Consumer super T> consumer) {
handlers.addHandler(new ConsumerAction<>(consumer, this), threadPool);
diff --git a/pom.xml b/pom.xml
index dc2435ea..b34f6558 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,7 @@
1.5.9
true
4.0.56.Final
+ 2.8.0
@@ -489,6 +490,11 @@
test
+
+ com.github.ben-manes.caffeine
+ caffeine
+ ${caffeine.version}
+