diff --git a/.travis.yml b/.travis.yml index 29271e05..6ec7aba2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,4 +11,4 @@ before_script: - ./consul --version script: - - ../consul agent -dev -advertise=127.0.0.1 & \ No newline at end of file + - ../consul agent -dev -advertise=127.0.0.1 & diff --git a/ob1k-cache/pom.xml b/ob1k-cache/pom.xml index 8d1e2282..d4886e12 100644 --- a/ob1k-cache/pom.xml +++ b/ob1k-cache/pom.xml @@ -55,6 +55,11 @@ test + + com.github.ben-manes.caffeine + caffeine + + org.slf4j slf4j-log4j12 @@ -82,5 +87,11 @@ net.jpountz.lz4 lz4 + + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + + diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/CacheConfiguration.kt b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/CacheConfiguration.kt new file mode 100644 index 00000000..35b4cbbb --- /dev/null +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/CacheConfiguration.kt @@ -0,0 +1,61 @@ +package com.outbrain.ob1k.cache + + +import com.outbrain.swinfra.metrics.api.MetricFactory +import java.util.concurrent.TimeUnit + +data class CacheConfiguration(val cacheName: String, + var ttl: Int = 360_000, + var ttlTimeUnit: TimeUnit = TimeUnit.MILLISECONDS, + var loader: CacheLoader? = null, + var failOnMissingEntries: Boolean = false, + var maxSize: Int = 1_000_000, + var loadTimeout: Long = 500, + var loadTimeUnit: TimeUnit = TimeUnit.MILLISECONDS, + var metricFactory: MetricFactory? = null) { + + constructor(cacheName1: String) : this(cacheName = cacheName1) + + @JvmOverloads + fun withTtl(ttl: Int, ttlTimeUnit: TimeUnit = TimeUnit.MILLISECONDS): CacheConfiguration { + this.ttl = ttl + this.ttlTimeUnit = ttlTimeUnit + return this + } + + fun withLoader(loader: CacheLoader): CacheConfiguration { + this.loader = loader + return this + } + + @JvmOverloads + fun failOnMissingEntries(fail: Boolean = true): CacheConfiguration { + this.failOnMissingEntries = fail + return this + } + + fun withMaxSize(maxSize: Int): CacheConfiguration { + this.maxSize = maxSize + return this + } + + @JvmOverloads + fun withLoadTimeout(loadTimeout: Long, loadTimeUnit: TimeUnit = TimeUnit.MILLISECONDS): CacheConfiguration { + this.loadTimeout = loadTimeout + this.loadTimeUnit = loadTimeUnit + return this + } + + /** + * for testing purposes + */ + fun withMetricFactory(metricFactory: MetricFactory): CacheConfiguration { + this.metricFactory = metricFactory + return this + } + + fun buildLocalAsyncCache(): LocalAsyncCache = LocalAsyncCache(this) + + fun buildLoadingCacheDelegate(cache: TypedCache) = LoadingCacheDelegate(cache, this) + +} diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/GuavaCacheGaugesFactory.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/CaffeineCacheGaugesFactory.java similarity index 77% rename from ob1k-cache/src/main/java/com/outbrain/ob1k/cache/GuavaCacheGaugesFactory.java rename to ob1k-cache/src/main/java/com/outbrain/ob1k/cache/CaffeineCacheGaugesFactory.java index a04a064c..74e36436 100644 --- a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/GuavaCacheGaugesFactory.java +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/CaffeineCacheGaugesFactory.java @@ -1,6 +1,6 @@ package com.outbrain.ob1k.cache; -import com.google.common.cache.Cache; +import com.github.benmanes.caffeine.cache.Cache; import com.outbrain.swinfra.metrics.api.MetricFactory; @@ -9,14 +9,14 @@ * creates a set of gauges that monitors Guava cache. * */ -public class GuavaCacheGaugesFactory { +public class CaffeineCacheGaugesFactory { public static void createGauges(final MetricFactory metricFactory, final Cache cache, final String cacheName) { if (metricFactory != null) { metricFactory.registerGauge(cacheName, "averageLoadPenalty", () -> cache.stats().averageLoadPenalty()); metricFactory.registerGauge(cacheName, "hitRate", () -> cache.stats().hitRate()); - metricFactory.registerGauge(cacheName, "loadExceptionRate", () -> cache.stats().loadExceptionRate()); + metricFactory.registerGauge(cacheName, "loadFailureRate", () -> cache.stats().loadFailureRate()); metricFactory.registerGauge(cacheName, "missRate", () -> cache.stats().missRate()); @@ -26,7 +26,7 @@ public static void createGauges(final MetricFactory metricFactory, final Cache c metricFactory.registerGauge(cacheName, "loadCount", () -> cache.stats().loadCount()); - metricFactory.registerGauge(cacheName, "loadExceptionCount", () -> cache.stats().loadExceptionCount()); + metricFactory.registerGauge(cacheName, "loadFailureCount", () -> cache.stats().loadFailureCount()); metricFactory.registerGauge(cacheName, "loadSuccessCount", () -> cache.stats().loadSuccessCount()); @@ -36,7 +36,7 @@ public static void createGauges(final MetricFactory metricFactory, final Cache c metricFactory.registerGauge(cacheName, "totalLoadTime", () -> cache.stats().totalLoadTime()); - metricFactory.registerGauge(cacheName, "size", cache::size); + metricFactory.registerGauge(cacheName, "size", cache::estimatedSize); } diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LoadingCacheDelegate.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LoadingCacheDelegate.java index 191731f5..f8c27fa0 100644 --- a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LoadingCacheDelegate.java +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LoadingCacheDelegate.java @@ -64,6 +64,7 @@ public LoadingCacheDelegate(final TypedCache cache, final CacheLoader cache, final CacheLoader loader, final String cacheName, final MetricFactory metricFactory, final long duration, final TimeUnit timeUnit, final boolean failOnError) { @@ -96,6 +97,38 @@ public LoadingCacheDelegate(final TypedCache cache, final CacheLoader cache, CacheConfiguration cacheConfig) { + this.cache = cache; + this.loader = cacheConfig.getLoader(); + this.cacheName = cacheConfig.getCacheName(); + this.futureValues = new ConcurrentHashMap<>(); + this.failOnError = cacheConfig.getFailOnMissingEntries(); + + this.duration = cacheConfig.getLoadTimeout(); + this.timeUnit = cacheConfig.getLoadTimeUnit(); + + final MetricFactory metricFactory = cacheConfig.getMetricFactory(); + if (metricFactory != null) { + metricFactory.registerGauge("LoadingCacheDelegate." + cacheName, "mapSize", futureValues::size); + + cacheHits = metricFactory.createCounter("LoadingCacheDelegate." + cacheName, "hits"); + cacheMiss = metricFactory.createCounter("LoadingCacheDelegate." + cacheName, "miss"); + cacheErrors = metricFactory.createCounter("LoadingCacheDelegate." + cacheName, "cacheErrors"); + loaderErrors = metricFactory.createCounter("LoadingCacheDelegate." + cacheName, "loaderErrors"); + cacheTimeouts = metricFactory.createCounter("LoadingCacheDelegate." + cacheName, "cacheTimeouts"); + loaderTimeouts = metricFactory.createCounter("LoadingCacheDelegate." + cacheName, "loaderTimeouts"); + + } else { + cacheHits = null; + cacheMiss = null; + cacheErrors = null; + loaderErrors = null; + cacheTimeouts = null; + loaderTimeouts = null; + } + } + + @Override public ComposableFuture getAsync(final K key) { return ComposableFutures.build(consumer -> { @@ -148,7 +181,12 @@ private void fetchFromLoader(final K key, final ComposablePromise promise) { loadedResult.consume(loadedRes -> { if (loadedRes.isSuccess()) { promise.set(loadedRes.getValue()); - cacheLoadedValue(key, loadedRes); + // we are not adding null value into a cache + if (loadedRes.getValue() != null) { + cacheLoadedValue(key, loadedRes); + } else { + futureValues.remove(key); + } } else { final Throwable error = loadedRes.getError(); if (loaderErrors != null) { diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java index fe6afe38..7618706a 100644 --- a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java @@ -1,247 +1,146 @@ package com.outbrain.ob1k.cache; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ExecutionError; +import com.github.benmanes.caffeine.cache.AsyncCache; +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.Maps; import com.outbrain.ob1k.concurrent.ComposableFuture; -import com.outbrain.ob1k.concurrent.UncheckedExecutionException; +import com.outbrain.ob1k.concurrent.ComposableFutures; +import com.outbrain.ob1k.concurrent.eager.EagerComposableFuture; import com.outbrain.swinfra.metrics.api.MetricFactory; -import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import static com.outbrain.ob1k.concurrent.ComposableFutures.all; -import static com.outbrain.ob1k.concurrent.ComposableFutures.fromError; -import static com.outbrain.ob1k.concurrent.ComposableFutures.fromNull; -import static com.outbrain.ob1k.concurrent.ComposableFutures.fromValue; +import java.util.concurrent.*; +import static com.outbrain.ob1k.concurrent.ComposableFutures.*; +import static java.util.concurrent.CompletableFuture.completedFuture; /** * User: aronen * Date: 6/30/13 * Time: 6:08 PM */ -public class LocalAsyncCache implements TypedCache { - private final LoadingCache> loadingCache; - private final Cache> localCache; - private final CacheLoader loader; - private final String cacheName; - private final boolean failOnMissingEntries; - - public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, - final MetricFactory metricFactory, final String cacheName) { - this(maximumSize, ttl, unit, loader, metricFactory, cacheName, false); - } - - public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, - final MetricFactory metricFactory, final String cacheName, final boolean failOnMissingEntries) { - this.loader = loader; - this.cacheName = cacheName; - this.failOnMissingEntries = failOnMissingEntries; - - final boolean collectStats = metricFactory != null; - final CacheBuilder builder = CacheBuilder.newBuilder() - .maximumSize(maximumSize) - .expireAfterWrite(ttl, unit); - - if (collectStats) - builder.recordStats(); - - this.loadingCache = builder - .build(new com.google.common.cache.CacheLoader>() { - public ComposableFuture load(final K key) throws Exception { - return loadElement(key); - } +public class LocalAsyncCache implements TypedCache { - @Override - public Map> loadAll(final Iterable keys) throws Exception { - return loadElements(Lists.newArrayList(keys)); - } - }); - - if (collectStats) { - GuavaCacheGaugesFactory.createGauges(metricFactory, loadingCache, "LocalAsyncCache-" + cacheName); - } - - this.localCache = null; - } + private final AsyncLoadingCache loadingCache; + private final AsyncCache localCache; + private final String cacheName; - public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader) { - this(maximumSize, ttl, unit, loader, null, null); - } + public LocalAsyncCache(final com.outbrain.ob1k.cache.CacheConfiguration cacheConfig) { + final Caffeine builder = Caffeine.newBuilder() + .maximumSize(cacheConfig.getMaxSize()) + .expireAfterWrite(cacheConfig.getTtl(), cacheConfig.getTtlTimeUnit()) + .recordStats(); - public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final MetricFactory metricFactory, final String cacheName) { - this.loader = null; - this.loadingCache = null; - this.failOnMissingEntries = true; // fake value, not in use. - this.cacheName = cacheName; - - final boolean collectStats = metricFactory != null; - final CacheBuilder builder = CacheBuilder.newBuilder() - .maximumSize(maximumSize) - .expireAfterWrite(ttl, unit); - - if (collectStats) { - builder.recordStats(); + cacheName = cacheConfig.getCacheName(); + if (cacheConfig.getLoader() != null) { + this.localCache = null; + this.loadingCache = builder.buildAsync(new InternalCacheLoader<>(cacheConfig.getLoader(), cacheName, cacheConfig.getFailOnMissingEntries())); + } else { + this.loadingCache = null; + this.localCache = builder.buildAsync(); } - this.localCache = builder.build(); - if (collectStats) { - GuavaCacheGaugesFactory.createGauges(metricFactory, localCache, "LocalAsyncCache-" + cacheName); - } + CaffeineCacheGaugesFactory.createGauges(cacheConfig.getMetricFactory(), cache().synchronous(), cacheName); } - public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit) { - this(maximumSize, ttl, unit, null, null); + /** + * @deprecated Replaced by {@link #LocalAsyncCache(com.outbrain.ob1k.cache.CacheConfiguration)} + */ + @Deprecated + public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, + final MetricFactory metricFactory, final String cacheName, final boolean failOnMissingEntries) { + this(new com.outbrain.ob1k.cache.CacheConfiguration(cacheName) + .withMetricFactory(metricFactory) + .withMaxSize(maximumSize) + .withTtl(ttl, unit) + .withLoader(loader) + .failOnMissingEntries(failOnMissingEntries)); } - // for testing purposes only! - public LocalAsyncCache() { - this(1000, 20, TimeUnit.SECONDS); - } - private ComposableFuture loadElement(final K key) { - return loader.load(cacheName, key).materialize(); - } - private Function, ComposableFuture> extractLoaderResultEntry(final K key) { - return loaderResults -> { - final V res = loaderResults.get(key); - if (res != null) { - return fromValue(res); - } else { - if (failOnMissingEntries) { - return fromError(new RuntimeException(key + " is missing from" + (cacheName == null ? "" : " " + cacheName) + " loader response.")); - } else { - return fromNull(); - } - } - }; - } - - private Map> loadElements(final Iterable keys) { - final ComposableFuture> loaded = loader.load(cacheName, keys).materialize(); - final Map> result = new HashMap<>(); - for (final K key : keys) { - result.put(key, loaded.flatMap(extractLoaderResultEntry(key))); - } - return result; + /** + * @deprecated Replaced by {link: #LocalAsyncCache(CacheConfiguration)} + */ + @Deprecated + public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final MetricFactory metricFactory, final String cacheName) { + this(new com.outbrain.ob1k.cache.CacheConfiguration(cacheName) + .withMetricFactory(metricFactory) + .withMaxSize(maximumSize) + .withTtl(ttl, unit)); } @Override public ComposableFuture getAsync(final K key) { - try { - if (loadingCache != null) { - final ComposableFuture res = loadingCache.get(key); - if (res == null) { - return fromNull(); - } - return res.recoverWith(error -> { - loadingCache.asMap().remove(key, res); - return fromError(error); - }); - } else { - final ComposableFuture res = localCache.getIfPresent(key); - if (res == null) { - return fromNull(); - } - return res; - } - } catch (final com.google.common.util.concurrent.UncheckedExecutionException e) { - return fromError(e.getCause()); - } catch (final ExecutionException | UncheckedExecutionException | ExecutionError e) { - return fromError(e.getCause()); - } - } - private void unloadErrorsFromCache(final ImmutableMap> innerMap, final Iterable keys) { - for (final K key : keys) { - innerMap.get(key).consume(result -> { - if (!result.isSuccess() || result.getValue() == null) { - loadingCache.asMap().remove(key); + try { + if (loadingCache != null) { + return EagerComposableFuture. + fromCompletableFuture(loadingCache.get(key)). + recoverWith(CompletionException.class, e -> ComposableFutures.fromError(e.getCause())); + } else { + final CompletableFuture res = localCache.getIfPresent(key); + if (res == null) { + return fromNull(); + } + return EagerComposableFuture.fromCompletableFuture(res); } - }); - } + } catch (final Exception e) { + return fromError(e); + } } @Override public ComposableFuture> getBulkAsync(final Iterable keys) { - try { - final ImmutableMap> innerMap; - if (loadingCache != null) { - innerMap = loadingCache.getAll(keys); - unloadErrorsFromCache(innerMap, keys); - } else { - innerMap = localCache.getAllPresent(keys); - } - - final Map> result = new HashMap<>(); - for (final K key : innerMap.keySet()) { - final ComposableFuture value = innerMap.get(key); - result.put(key, value); - } + return getAll(keys); + } - return all(true, result); - } catch (final com.google.common.util.concurrent.UncheckedExecutionException e) { - return fromError(e.getCause()); - } catch (final ExecutionException | UncheckedExecutionException | ExecutionError e) { - return fromError(e.getCause()); - } + private ComposableFuture> getAll(Iterable keys) { + return loadingCache == null ? + fromValue(localCache.synchronous().getAllPresent(keys)) : + EagerComposableFuture.fromCompletableFuture(loadingCache.getAll(keys)). + recoverWith(CompletionException.class, e -> ComposableFutures.fromError(e.getCause())); } @Override public ComposableFuture deleteAsync(final K key) { - if (loadingCache != null) { - loadingCache.invalidate(key); - } else { - localCache.invalidate(key); - } + cache().synchronous().invalidate(key); return fromValue(true); } @Override public ComposableFuture setAsync(final K key, final V value) { - if (loadingCache != null) { - loadingCache.put(key, fromValue(value)); - } else { - localCache.put(key, fromValue(value)); - } + cache().put(key, completedFuture(value)); return fromValue(true); } @Override public ComposableFuture setIfAbsentAsync(final K key, final V value) { - final Cache> cache = loadingCache == null ? localCache : loadingCache; - return fromValue(cache.asMap().putIfAbsent(key, fromValue(value)) == null); + return fromValue(cache().asMap().putIfAbsent(key, completedFuture(value)) == null); } @Override public ComposableFuture setAsync(final K key, final EntryMapper mapper, final int maxIterations) { - final ConcurrentMap> map = loadingCache != null ? loadingCache.asMap() : localCache.asMap(); + final ConcurrentMap> map = cache().asMap(); try { if (maxIterations == 0) { return fromValue(false); } - final ComposableFuture currentFuture = map.get(key); + final CompletableFuture currentFuture = map.get(key); if (currentFuture != null) { - return currentFuture.flatMap(currentValue -> { + return EagerComposableFuture.fromCompletableFuture(currentFuture).flatMap(currentValue -> { try { final V newValue = mapper.map(key, currentValue); if (newValue == null) { return fromValue(false); } - final boolean success = map.replace(key, currentFuture, fromValue(newValue)); + final boolean success = map.replace(key, currentFuture, completedFuture(newValue)); if (success) { return fromValue(true); } else { @@ -255,7 +154,7 @@ public ComposableFuture setAsync(final K key, final EntryMapper m } else { final V newValue = mapper.map(key, null); if (newValue != null) { - final ComposableFuture prev = map.putIfAbsent(key, fromValue(newValue)); + final CompletableFuture prev = map.putIfAbsent(key, completedFuture(newValue)); if (prev == null) { return fromValue(true); } else { @@ -271,14 +170,47 @@ public ComposableFuture setAsync(final K key, final EntryMapper m } } + private AsyncCache cache() { + return loadingCache == null ? localCache : loadingCache; + } + @Override public ComposableFuture> setBulkAsync(final Map entries) { - final Map> result = new HashMap<>(); - for (final K key : entries.keySet()) { - result.put(key, setAsync(key, entries.get(key))); - } + final Map> result = Maps.newHashMapWithExpectedSize(entries.size()); + entries.forEach((k, v) -> result.put(k, setAsync(k, v))); return all(false, result); } + private static class InternalCacheLoader implements AsyncCacheLoader { + + private final CacheLoader loader; + private final String cacheName; + private final boolean failOnMissingEntries; + + private InternalCacheLoader(final CacheLoader loader, final String cacheName, final boolean failOnMissingEntries) { + this.loader = loader; + this.cacheName = cacheName; + this.failOnMissingEntries = failOnMissingEntries; + } + + @Override + public CompletableFuture asyncLoad(final K key, final Executor executor) { + return loader.load(cacheName, key).toCompletableFuture(); + } + + @Override + public CompletableFuture> asyncLoadAll(final Iterable keys, final Executor executor) { + if (failOnMissingEntries) { + return loader.load(cacheName, keys).peek(map -> + keys.forEach(key -> { + if (!map.containsKey(key)) { + throw new RuntimeException(key + " is missing from " + cacheName + " loader response."); + } + })).toCompletableFuture(); + } else { + return loader.load(cacheName, keys).toCompletableFuture(); + } + } + } } diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/filters/CachingFilter.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/filters/CachingFilter.java index b153d946..d6733869 100644 --- a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/filters/CachingFilter.java +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/filters/CachingFilter.java @@ -1,6 +1,7 @@ package com.outbrain.ob1k.cache.filters; import com.outbrain.ob1k.AsyncRequestContext; +import com.outbrain.ob1k.cache.CacheConfiguration; import com.outbrain.ob1k.cache.LocalAsyncCache; import com.outbrain.ob1k.cache.TypedCache; import com.outbrain.ob1k.common.filters.AsyncFilter; @@ -18,8 +19,16 @@ public class CachingFilter implements AsyncFilter private final TypedCache cache; private final CacheKeyGenerator generator; + /** + * @deprecated Replaced by {link: #CachingFilter(CacheKeyGenerator, CacheConfiguration)} + */ public CachingFilter(final CacheKeyGenerator generator, int cacheSize, int ttl, TimeUnit timeUnit) { - this.cache = new LocalAsyncCache<>(cacheSize, ttl, timeUnit); + this.cache = new LocalAsyncCache<>(cacheSize, ttl, timeUnit, null, "default"); + this.generator = generator; + } + + public CachingFilter(final CacheKeyGenerator generator, final CacheConfiguration cacheConfiguration) { + this.cache = new LocalAsyncCache<>(cacheConfiguration); this.generator = generator; } diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/CacheLoaderForTesting.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/CacheLoaderForTesting.java new file mode 100644 index 00000000..511c8a82 --- /dev/null +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/CacheLoaderForTesting.java @@ -0,0 +1,60 @@ +package com.outbrain.ob1k.cache; + +import com.google.common.collect.Iterables; +import com.outbrain.ob1k.concurrent.ComposableFuture; +import com.outbrain.ob1k.concurrent.ComposableFutures; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class CacheLoaderForTesting implements CacheLoader { + public static final String VALUE_FOR = "ValueFor-"; + public static final String VALUE_FOR_BULK = "ValueFor-Bulk-"; + public static final String MISSING_KEY = "missing-key"; + public static final String NULL_KEY = "null-key"; + public static final String ERROR_MESSAGE = "missing key"; + public static final String TIMEOUT_KEY = "timeOutKey"; + public static final String TIMEOUT_MESSAGE = "timeout occurred"; + public static final String TEMPORARY_ERROR_MESSAGE = "Load failed temporarily"; + + private final AtomicBoolean generateLoaderErrors = new AtomicBoolean(false); + + public void setGenerateLoaderErrors(final boolean val) { + generateLoaderErrors.set(val); + } + + @Override + public ComposableFuture load(final String cacheName, final String key) { + if (generateLoaderErrors.get()) { + return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); + } + if (key.equals(MISSING_KEY)) { + return ComposableFutures.fromError(new RuntimeException(ERROR_MESSAGE)); + } + if (key.equals(NULL_KEY)) { + return ComposableFutures.fromNull(); + } + return ComposableFutures.fromValue(VALUE_FOR + key); + } + + @Override + public ComposableFuture> load(final String cacheName, final Iterable keys) { + if (generateLoaderErrors.get()) { + return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); + } + if (Iterables.contains(keys, TIMEOUT_KEY)) { + return ComposableFutures.fromError(new RuntimeException(TIMEOUT_MESSAGE)); + } + final HashMap res = new HashMap<>(); + for (String key : keys) { + if (key.equals(NULL_KEY)) { + res.put(key, null); + } + else if (!key.equals(MISSING_KEY)) { + res.put(key, VALUE_FOR_BULK + key); + } + } + return ComposableFutures.fromValue(res); + } +} diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java index df495b37..6b6ebe6f 100644 --- a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java @@ -1,15 +1,15 @@ package com.outbrain.ob1k.cache; -import com.google.common.collect.Iterables; import com.outbrain.ob1k.concurrent.ComposableFuture; import com.outbrain.ob1k.concurrent.ComposableFutures; +import org.jetbrains.annotations.NotNull; import org.junit.Test; + import java.util.Arrays; -import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; + +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.*; import static org.junit.Assert.*; /** @@ -17,107 +17,72 @@ *

* test local cache behavior */ +@SuppressWarnings("unchecked") public class LocalAsyncCacheTest { - private static final String VALUE_FOR = "ValueFor-"; - private static final String VALUE_FOR_BULK = "ValueFor-Bulk-"; - private static final String MISSING_KEY = "missing-key"; - private static final String NULL_KEY = "null-key"; - public static final String ERROR_MESSAGE = "missing key"; - public static final String TIMEOUT_KEY = "timeOutKey"; - public static final String TIMEOUT_MESSAGE = "timeout occurred"; - public static final String TEMPORARY_ERROR_MESSAGE = "Load failed temporarily"; - - private static class ExceptionalCacheLoader implements CacheLoader { - private final AtomicBoolean generateLoaderErrors = new AtomicBoolean(false); - - public void setGenerateLoaderErrors(final boolean val) { - generateLoaderErrors.set(val); - } - - @Override - public ComposableFuture load(final String cacheName, final String key) { - if (generateLoaderErrors.get()) { - return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); - } - if (key.equals(MISSING_KEY)) { - return ComposableFutures.fromError(new RuntimeException(ERROR_MESSAGE)); - } - if (key.equals(NULL_KEY)) { - return ComposableFutures.fromNull(); - } - return ComposableFutures.fromValue(VALUE_FOR+key); - } - - @Override - public ComposableFuture> load(final String cacheName, final Iterable keys) { - if (generateLoaderErrors.get()) { - return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); - } - if (Iterables.contains(keys, TIMEOUT_KEY)) { - return ComposableFutures.fromError(new RuntimeException(TIMEOUT_MESSAGE)); - } - final HashMap res = new HashMap<>(); - for (String key:keys) { - if (key.equals(NULL_KEY)) { - res.put(key, null); - } if (!key.equals(MISSING_KEY)) { - res.put(key, VALUE_FOR_BULK + key); - } - } - return ComposableFutures.fromValue(res); - } + private LocalAsyncCache createCache() { + return new LocalAsyncCache<>(createCacheConfiguration()); } - private static LocalAsyncCache createCache() { - return new LocalAsyncCache<>(3, 100, TimeUnit.MILLISECONDS, new ExceptionalCacheLoader()); + @NotNull + private CacheConfiguration createCacheConfiguration() { + return new CacheConfiguration("local").withMaxSize(3).withTtl(100).withLoader(new CacheLoaderForTesting()); } @Test public void testLocalCache() { final LocalAsyncCache cache = createCache(); - try { - final String value1 = cache.getAsync("key1").get(); - assertEquals(value1, VALUE_FOR + "key1"); - final Map values = cache.getBulkAsync(Arrays.asList("key3", "key4")).get(); - assertEquals(values.get("key3"), VALUE_FOR_BULK + "key3"); - assertEquals(values.get("key4"), VALUE_FOR_BULK + "key4"); - assertEquals(values.get("key1"), null); - - } catch (final Exception e) { - fail(e.getMessage()); - } + final String value1 = cache.getAsync("key1").getUnchecked(); + assertEquals(value1, VALUE_FOR + "key1"); + final Map values = cache.getBulkAsync(Arrays.asList("key3", "key4")).getUnchecked(); + assertEquals(values.get("key3"), VALUE_FOR_BULK + "key3"); + assertEquals(values.get("key4"), VALUE_FOR_BULK + "key4"); + assertNull(values.get("key1")); + } + + @Test + public void testLocalCacheWithoutLoader() { + final LocalAsyncCache cache = new LocalAsyncCache<>(new CacheConfiguration("local").withMaxSize(3).withTtl(100)); + cache.setAsync("key1", "value1"); + final String value1 = cache.getAsync("key1").getUnchecked(); + assertEquals(value1, "value1"); + final String value2 = cache.getAsync("key2").getUnchecked(); + assertNull(value2); + final Map values = cache.getBulkAsync(Arrays.asList("key1", "key2")).getUnchecked(); + assertEquals(values.get("key1"), "value1"); + assertNull(values.get("key2")); } @Test public void testMissingKey() { final LocalAsyncCache cache = createCache(); final ComposableFuture missingValue = cache.getAsync(MISSING_KEY); - assertComposableFutureError(ERROR_MESSAGE,RuntimeException.class,missingValue); + assertComposableFutureError(ERROR_MESSAGE, RuntimeException.class, missingValue); } @Test public void testMissingKeys() { final LocalAsyncCache cache = createCache(); + final Map values = cache.getBulkAsync(Arrays.asList("key1", MISSING_KEY)).getUnchecked(); + assertEquals(values.size(), 1); + assertEquals(values.get("key1"), VALUE_FOR_BULK + "key1"); + } - try { - final Map values = cache.getBulkAsync(Arrays.asList("key1", MISSING_KEY)).get(); - assertEquals(values.size(), 1); - assertEquals(values.get("key1"), VALUE_FOR_BULK + "key1"); - } catch (InterruptedException | ExecutionException e) { - fail(e.getMessage()); - } + @Test + public void testFailOnMissingKeys() { + final LocalAsyncCache cache = new LocalAsyncCache<>(createCacheConfiguration().failOnMissingEntries(true)); + assertComposableFutureError(MISSING_KEY + " is missing from local loader response.", RuntimeException.class, cache.getBulkAsync(Arrays.asList("key1", MISSING_KEY))); } @Test public void testLoaderError() { final LocalAsyncCache cache = createCache(); - assertComposableFutureError(TIMEOUT_MESSAGE,RuntimeException.class,cache.getBulkAsync(Arrays.asList("newKey", TIMEOUT_KEY))); + assertComposableFutureError(TIMEOUT_MESSAGE, RuntimeException.class, cache.getBulkAsync(Arrays.asList("newKey", TIMEOUT_KEY))); } @Test public void testBadLoader() { - final LocalAsyncCache cache = new LocalAsyncCache<>(3, 100, TimeUnit.MILLISECONDS, new CacheLoader() { + final CacheLoader cacheLoader = new CacheLoader() { @Override public ComposableFuture load(final String cacheName, final String key) { if (key.equals("error")) { @@ -133,21 +98,26 @@ public ComposableFuture load(final String cacheName, final String key) { public ComposableFuture> load(final String cacheName, final Iterable keys) { throw new RuntimeException("ooops"); } - }); + }; - assertComposableFutureError("ooops",RuntimeException.class, cache.getAsync("error")); - assertComposableFutureError(null,NullPointerException.class, cache.getAsync("null")); + final LocalAsyncCache cache = new LocalAsyncCache<>(new CacheConfiguration("local").withLoader(cacheLoader).withMaxSize(3).withTtl(100)); + + assertComposableFutureError("ooops", RuntimeException.class, cache.getAsync("error")); + assertComposableFutureError(null, NullPointerException.class, cache.getAsync("null")); } public static class Box { + public final int number; public Box(final int number) { this.number = number; } + } public static class BoxUpdater extends Thread { + private final TypedCache cache; private final String key; private final int iterations; @@ -163,14 +133,9 @@ public BoxUpdater(final TypedCache cache, final String key, final i public void run() { int success = 0; int failure = 0; - for (int i =0; i < iterations; i++) { + for (int i = 0; i < iterations; i++) { try { - final boolean res = cache.setAsync(key, new EntryMapper() { - @Override - public Box map(final String key, final Box value) { - return new Box(value.number + 1); - } - }, 100).get(); + final boolean res = cache.setAsync(key, (key, value) -> new Box(value.number + 1), 100).get(); if (res) { success++; @@ -178,8 +143,8 @@ public Box map(final String key, final Box value) { failure++; } - } catch (final Exception e) { - e.printStackTrace(); + } catch (final Exception ignore) { + } } @@ -189,18 +154,19 @@ public Box map(final String key, final Box value) { public int getSuccessCounter() { return successCounter; } + } @Test - public void testCasWithMapper() throws Exception { - final TypedCache cache = new LocalAsyncCache<>(300, 100, TimeUnit.SECONDS); + public void testCacheWithMapper() throws Exception { + final TypedCache cache = new LocalAsyncCache<>(new CacheConfiguration("cacheWithMapper").withMaxSize(300).withTtl(100, TimeUnit.SECONDS)); final String cacheKey = "box"; - cache.setAsync(cacheKey, new Box(0)); + cache.setAsync(cacheKey, new Box(0)).get(); - final BoxUpdater updater1 = new BoxUpdater(cache, cacheKey, 1000000); - final BoxUpdater updater2 = new BoxUpdater(cache, cacheKey, 1000000); - final BoxUpdater updater3 = new BoxUpdater(cache, cacheKey, 1000000); - final BoxUpdater updater4 = new BoxUpdater(cache, cacheKey, 1000000); + final BoxUpdater updater1 = new BoxUpdater(cache, cacheKey, 100); + final BoxUpdater updater2 = new BoxUpdater(cache, cacheKey, 100); + final BoxUpdater updater3 = new BoxUpdater(cache, cacheKey, 100); + final BoxUpdater updater4 = new BoxUpdater(cache, cacheKey, 100); updater1.start(); updater2.start(); @@ -214,62 +180,59 @@ public void testCasWithMapper() throws Exception { final int finalRes = cache.getAsync(cacheKey).get().number; final int successfulUpdates = updater1.getSuccessCounter() + - updater2.getSuccessCounter() + - updater3.getSuccessCounter() + - updater4.getSuccessCounter(); + updater2.getSuccessCounter() + + updater3.getSuccessCounter() + + updater4.getSuccessCounter(); assertEquals(finalRes, successfulUpdates); } - - @Test public void testCacheExceptionRemoval() throws Exception { - final ExceptionalCacheLoader loader = new ExceptionalCacheLoader() ; + final CacheLoaderForTesting loader = new CacheLoaderForTesting(); loader.setGenerateLoaderErrors(true); - final LocalAsyncCache myCache = new LocalAsyncCache(10, 10, TimeUnit.MINUTES, loader, null, "MyCache"); - assertComposableFutureError(TEMPORARY_ERROR_MESSAGE,RuntimeException.class,myCache.getAsync("a")); - assertComposableFutureError(TEMPORARY_ERROR_MESSAGE,RuntimeException.class,myCache.getAsync("b")); + final LocalAsyncCache myCache = new LocalAsyncCache<>(new CacheConfiguration("MyCache").withLoader(loader).withMaxSize(10).withTtl(10, TimeUnit.MINUTES)); + assertComposableFutureError(TEMPORARY_ERROR_MESSAGE, RuntimeException.class, myCache.getAsync("a")); + assertComposableFutureError(TEMPORARY_ERROR_MESSAGE, RuntimeException.class, myCache.getAsync("b")); loader.setGenerateLoaderErrors(false); - assertEquals(VALUE_FOR+"a", myCache.getAsync("a").get()); - assertEquals(VALUE_FOR+"b", myCache.getAsync("b").get()); + assertEquals(VALUE_FOR + "a", myCache.getAsync("a").get()); + assertEquals(VALUE_FOR + "b", myCache.getAsync("b").get()); loader.setGenerateLoaderErrors(true); // already populated - assertEquals(VALUE_FOR+"a", myCache.getAsync("a").get()); - assertEquals(VALUE_FOR+"b", myCache.getAsync("b").get()); + assertEquals(VALUE_FOR + "a", myCache.getAsync("a").get()); + assertEquals(VALUE_FOR + "b", myCache.getAsync("b").get()); } @Test public void testCacheExceptionRemovalBulk() throws Exception { - final ExceptionalCacheLoader loader = new ExceptionalCacheLoader() ; - final LocalAsyncCache myCache = new LocalAsyncCache(10, 10, TimeUnit.MINUTES, loader, null, "MyCache"); + final CacheLoaderForTesting loader = new CacheLoaderForTesting(); + final LocalAsyncCache myCache = new LocalAsyncCache<>(new CacheConfiguration("MyCache").withLoader(loader).withMaxSize(10).withTtl(10, TimeUnit.MINUTES)); loader.setGenerateLoaderErrors(true); - final Iterable keys = Arrays.asList("a","b","null"); - assertComposableFutureError(TEMPORARY_ERROR_MESSAGE,RuntimeException.class,myCache.getBulkAsync(keys)); + final Iterable keys = Arrays.asList("a", "b", "null"); + assertComposableFutureError(TEMPORARY_ERROR_MESSAGE, RuntimeException.class, myCache.getBulkAsync(keys)); loader.setGenerateLoaderErrors(false); - assertEquals(VALUE_FOR+"a", myCache.getAsync("a").get()); - final Map bulk = (Map)myCache.getBulkAsync(keys).get(); - assertEquals(VALUE_FOR+"a",bulk.get("a")); - assertEquals(VALUE_FOR_BULK+"b",bulk.get("b")); + assertEquals(VALUE_FOR + "a", myCache.getAsync("a").get()); + final Map bulk = (Map) myCache.getBulkAsync(keys).get(); + assertEquals(VALUE_FOR + "a", bulk.get("a")); + assertEquals(VALUE_FOR_BULK + "b", bulk.get("b")); assertNull(bulk.get(NULL_KEY)); loader.setGenerateLoaderErrors(true); - assertEquals(VALUE_FOR+"a", myCache.getAsync("a").get()); // already populated - assertEquals(VALUE_FOR_BULK+"b", myCache.getAsync("b").get()); // already populated - assertComposableFutureError(TEMPORARY_ERROR_MESSAGE,RuntimeException.class,myCache.getAsync(NULL_KEY)); // nulls are not cached on bulk + assertEquals(VALUE_FOR + "a", myCache.getAsync("a").get()); // already populated + assertEquals(VALUE_FOR_BULK + "b", myCache.getAsync("b").get()); // already populated + assertComposableFutureError(TEMPORARY_ERROR_MESSAGE, RuntimeException.class, myCache.getAsync(NULL_KEY)); // nulls are not cached on bulk } + private static void assertComposableFutureError(String expectedMsg, Class exceptionType, ComposableFuture future) { try { - future.get(); + future.getUnchecked(); fail("Expected to get exception"); - } catch (final InterruptedException e) { - fail("Interrupted ?"+ e.getMessage()); - } catch (ExecutionException e) { - if (exceptionType != null) assertEquals(exceptionType,e.getCause().getClass()); - if (expectedMsg != null) assertEquals(expectedMsg,e.getCause().getMessage()); + } catch (final RuntimeException e) { + if (exceptionType != null) assertEquals(exceptionType, e.getClass()); + if (expectedMsg != null) assertEquals(expectedMsg, e.getMessage()); } } } diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/TestLoadingCacheDelegate.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/TestLoadingCacheDelegate.java index f8fbef9c..f657d0cc 100644 --- a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/TestLoadingCacheDelegate.java +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/TestLoadingCacheDelegate.java @@ -17,6 +17,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * emulate a loader that takes a while and test to se that values are loaded only once. * @@ -40,7 +44,21 @@ public ComposableFuture> load(final String cacheName, final } }; - private static final TypedCache SLOW_CACHE = new LocalAsyncCache() { + private static final CacheLoader CACHE_LOADER_RETURNS_NULL = new CacheLoader() { + + @Override + public ComposableFuture load(final String cacheName, final String key) { + return ComposableFutures.fromNull(); + } + + @Override + public ComposableFuture> load(final String cacheName, final Iterable keys) { + return ComposableFutures.fromError(new RuntimeException("Not related")); + } + }; + + + private final TypedCache SLOW_CACHE = new LocalAsyncCache(new CacheConfiguration("slow").withMetricFactory(metricFactory)) { @Override public ComposableFuture getAsync(final String key) { return slowResponse(100); @@ -53,7 +71,8 @@ private static ComposableFuture slowResponse(final int responseDuration) @Test public void testCacheTimeoutHandling() throws InterruptedException { - final TypedCache loadingCache = new LoadingCacheDelegate<>(SLOW_CACHE, SLOW_CACHE_LOADER, "meh", metricFactory, 1, TimeUnit.MILLISECONDS); + final TypedCache loadingCache = + new LoadingCacheDelegate<>(SLOW_CACHE, new CacheConfiguration("meh").withLoader(SLOW_CACHE_LOADER).withMetricFactory(metricFactory).withLoadTimeout( 1, TimeUnit.MILLISECONDS)); try { loadingCache.getAsync("key").get(); @@ -62,10 +81,25 @@ public void testCacheTimeoutHandling() throws InterruptedException { } } + + @Test + public void loaderReturnsNullNotAddingNullIntoCache() throws InterruptedException, ExecutionException { + final TypedCache cache = Mockito.mock(TypedCache.class); + + when(cache.getAsync("key")).thenReturn(ComposableFutures.fromNull()); + final TypedCache loadingCache = + new LoadingCacheDelegate<>(cache, new CacheConfiguration("local").withLoader(CACHE_LOADER_RETURNS_NULL)); + + final String val = loadingCache.getAsync("key").get(); + Assert.assertNull("returned value should be null" ,val); + verify(cache, never()).setAsync("key", null); + } + @Test public void testLoaderTimeoutHandling() throws InterruptedException { - final TypedCache cache = new LocalAsyncCache<>(); - final TypedCache loadingCache = new LoadingCacheDelegate<>(cache, SLOW_CACHE_LOADER, "meh", metricFactory, 1, TimeUnit.MILLISECONDS); + final TypedCache cache = new LocalAsyncCache<>(new CacheConfiguration("local").withMetricFactory(metricFactory)); + final TypedCache loadingCache = new LoadingCacheDelegate<>(cache, + new CacheConfiguration("meh").withLoader(SLOW_CACHE_LOADER).withMetricFactory(metricFactory).withLoadTimeout(1, TimeUnit.MILLISECONDS)); try { loadingCache.getAsync("key").get(); @@ -81,23 +115,24 @@ public void testCacheLoadResultsFailure_shouldCountErrors() throws ExecutionExce final String value = "value"; final String cacheName = "meh"; final TypedCache mockCache = Mockito.mock(TypedCache.class); - Mockito.when(mockCache.getAsync(key)).thenReturn(ComposableFutures.fromNull()); - Mockito.when(mockCache.setAsync(key, value)).thenReturn(ComposableFutures.fromError(new RuntimeException("MOCK failure"))); + when(mockCache.getAsync(key)).thenReturn(ComposableFutures.fromNull()); + when(mockCache.setAsync(key, value)).thenReturn(ComposableFutures.fromError(new RuntimeException("MOCK failure"))); final CacheLoader mockLoader = Mockito.mock(CacheLoader.class); - Mockito.when(mockLoader.load(cacheName, key)).thenReturn(ComposableFutures.fromValue(value)); + when(mockLoader.load(cacheName, key)).thenReturn(ComposableFutures.fromValue(value)); - final TypedCache loadingCache = new LoadingCacheDelegate<>(mockCache, mockLoader, cacheName, metricFactory, 1, TimeUnit.HOURS); + final TypedCache loadingCache = + new LoadingCacheDelegate<>(mockCache, new CacheConfiguration(cacheName).withLoader(mockLoader).withMetricFactory(metricFactory).withLoadTimeout(1, TimeUnit.HOURS)); loadingCache.getAsync(key).get(); - Assert.assertEquals("expected errros", 1, registry.getCounters().get("LoadingCacheDelegate.meh.cacheErrors").getCount()); + Assert.assertEquals("expected errors", 1, registry.getCounters().get("LoadingCacheDelegate.meh.cacheErrors").getCount()); } @Test public void testPartialLoading() throws ExecutionException, InterruptedException { final AtomicInteger loaderCounter = new AtomicInteger(); - final TypedCache cache = new LocalAsyncCache<>(); - final TypedCache loadingCache = new LoadingCacheDelegate<>(cache, new CacheLoader() { + final TypedCache cache = new LocalAsyncCache<>(new CacheConfiguration("local").withMetricFactory(metricFactory)); + final TypedCache loadingCache = new LoadingCacheDelegate<>(cache, new CacheConfiguration("default").withMetricFactory(metricFactory).withLoader(new CacheLoader() { ThreadLocalRandom random = ThreadLocalRandom.current(); @Override @@ -116,7 +151,7 @@ public ComposableFuture> load(final String cacheName, final return ComposableFutures.all(true, res); } - }, "default"); + })); for (int i=0;i < 100; i++) { final ComposableFuture res1 = loadingCache.getAsync("1"); @@ -135,8 +170,6 @@ public ComposableFuture> load(final String cacheName, final loadingCache.deleteAsync("3").get(); loadingCache.deleteAsync("4").get(); loadingCache.deleteAsync("5").get(); - -// Thread.sleep(200L); loaderCounter.set(0); } diff --git a/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/ComposableFuture.java b/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/ComposableFuture.java index ae8a7496..0b3406db 100644 --- a/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/ComposableFuture.java +++ b/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/ComposableFuture.java @@ -1,24 +1,15 @@ package com.outbrain.ob1k.concurrent; -import com.outbrain.ob1k.concurrent.handlers.ErrorHandler; -import com.outbrain.ob1k.concurrent.handlers.FutureErrorHandler; -import com.outbrain.ob1k.concurrent.handlers.FutureResultHandler; -import com.outbrain.ob1k.concurrent.handlers.FutureSuccessHandler; -import com.outbrain.ob1k.concurrent.handlers.ResultHandler; -import com.outbrain.ob1k.concurrent.handlers.SuccessHandler; +import com.google.common.base.Throwables; +import com.outbrain.ob1k.concurrent.handlers.*; import java.util.NoSuchElementException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; -import static com.outbrain.ob1k.concurrent.ComposableFutures.fromError; -import static com.outbrain.ob1k.concurrent.ComposableFutures.fromValue; -import static com.outbrain.ob1k.concurrent.ComposableFutures.schedule; +import static com.outbrain.ob1k.concurrent.ComposableFutures.*; /** *

A base interface for all future implementation in the system.

@@ -196,6 +187,19 @@ default ComposableFuture ensure(final Predicate predicate) { }); } + default CompletableFuture toCompletableFuture() { + final CompletableFuture future = new CompletableFuture<>(); + consume(aTry -> { + if (aTry.isSuccess()) { + future.complete(aTry.getValue()); + } else { + future.completeExceptionally(aTry.getError()); + } + }); + + return future; + } + /** * Consumes the value(or error) of the future into a consumer. * if the future is lazy the value will be reproduced on each consumption. @@ -275,6 +279,28 @@ default T get(final long timeout, final TimeUnit unit) throws InterruptedExcepti throw new TimeoutException("Timeout occurred while waiting for a value"); } + /** + * Blocks until a value is available for consumption and then return it. + * checked exceptions are wrapped in an UncheckedExecutionException and thrown. + *

+ * DO NOT use in non-blocking environment. + * + * @return the future value if successful + * @throws UncheckedExecutionException if the thread has been interrupted or the future threw a checked exception + */ + default T getUnchecked() { + try { + return get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedExecutionException(e); + } catch (ExecutionException e) { + Throwables.propagateIfPossible(e.getCause()); + throw new UncheckedExecutionException(e.getCause()); + } + } + + /** * Turns the current future into an eager one. * diff --git a/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/eager/EagerComposableFuture.java b/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/eager/EagerComposableFuture.java index 465d9c0d..dd916198 100644 --- a/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/eager/EagerComposableFuture.java +++ b/ob1k-concurrent/src/main/java/com/outbrain/ob1k/concurrent/eager/EagerComposableFuture.java @@ -1,29 +1,12 @@ package com.outbrain.ob1k.concurrent.eager; -import com.outbrain.ob1k.concurrent.CancellationToken; -import com.outbrain.ob1k.concurrent.ComposableFuture; -import com.outbrain.ob1k.concurrent.ComposableFutures; -import com.outbrain.ob1k.concurrent.Consumer; -import com.outbrain.ob1k.concurrent.Producer; -import com.outbrain.ob1k.concurrent.Scheduler; -import com.outbrain.ob1k.concurrent.Try; -import com.outbrain.ob1k.concurrent.UncheckedExecutionException; -import com.outbrain.ob1k.concurrent.handlers.ErrorHandler; -import com.outbrain.ob1k.concurrent.handlers.FutureAction; -import com.outbrain.ob1k.concurrent.handlers.FutureErrorHandler; -import com.outbrain.ob1k.concurrent.handlers.FutureResultHandler; -import com.outbrain.ob1k.concurrent.handlers.FutureSuccessHandler; -import com.outbrain.ob1k.concurrent.handlers.ResultHandler; -import com.outbrain.ob1k.concurrent.handlers.SuccessHandler; +import com.outbrain.ob1k.concurrent.*; +import com.outbrain.ob1k.concurrent.handlers.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -317,6 +300,20 @@ public ComposableFuture andThen(final Consumer resultConsumer) { return future; } + public static ComposableFuture fromCompletableFuture(final CompletableFuture source) { + final EagerComposableFuture future = new EagerComposableFuture<>(); + source.whenComplete((value, throwable) -> { + if (throwable == null) { + future.set(value); + } else { + future.setException(throwable); + } + }); + + return future; + } + + @Override public void consume(final Consumer consumer) { handlers.addHandler(new ConsumerAction<>(consumer, this), threadPool); diff --git a/pom.xml b/pom.xml index dc2435ea..b34f6558 100644 --- a/pom.xml +++ b/pom.xml @@ -102,6 +102,7 @@ 1.5.9 true 4.0.56.Final + 2.8.0 @@ -489,6 +490,11 @@ test + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} +