diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java index fe6afe38..d3385187 100644 --- a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java @@ -4,12 +4,15 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ExecutionError; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.outbrain.ob1k.concurrent.ComposableFuture; +import com.outbrain.ob1k.concurrent.ComposableFutures; import com.outbrain.ob1k.concurrent.UncheckedExecutionException; import com.outbrain.swinfra.metrics.api.MetricFactory; +import javax.annotation.Nonnull; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -29,11 +32,9 @@ * Time: 6:08 PM */ public class LocalAsyncCache implements TypedCache { + private final LoadingCache> loadingCache; private final Cache> localCache; - private final CacheLoader loader; - private final String cacheName; - private final boolean failOnMissingEntries; public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, final MetricFactory metricFactory, final String cacheName) { @@ -42,29 +43,25 @@ public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, final MetricFactory metricFactory, final String cacheName, final boolean failOnMissingEntries) { - this.loader = loader; - this.cacheName = cacheName; - this.failOnMissingEntries = failOnMissingEntries; + this(maximumSize, ttl, unit, loader, metricFactory, cacheName, failOnMissingEntries, -1, null); + } + public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, + final MetricFactory metricFactory, final String cacheName, final boolean failOnMissingEntries, + final long refreshAfterWriteDuration, final TimeUnit refreshAfterWriteUnit) { final boolean collectStats = metricFactory != null; final CacheBuilder builder = CacheBuilder.newBuilder() - .maximumSize(maximumSize) - .expireAfterWrite(ttl, unit); + .maximumSize(maximumSize) + .expireAfterWrite(ttl, unit); + + if (refreshAfterWriteDuration != -1) { + builder.refreshAfterWrite(refreshAfterWriteDuration, refreshAfterWriteUnit); + } if (collectStats) builder.recordStats(); - this.loadingCache = builder - .build(new com.google.common.cache.CacheLoader>() { - public ComposableFuture load(final K key) throws Exception { - return loadElement(key); - } - - @Override - public Map> loadAll(final Iterable keys) throws Exception { - return loadElements(Lists.newArrayList(keys)); - } - }); + this.loadingCache = builder.build(new InternalCacheLoader(loader, cacheName, failOnMissingEntries)); if (collectStats) { GuavaCacheGaugesFactory.createGauges(metricFactory, loadingCache, "LocalAsyncCache-" + cacheName); @@ -78,10 +75,7 @@ public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit } public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final MetricFactory metricFactory, final String cacheName) { - this.loader = null; this.loadingCache = null; - this.failOnMissingEntries = true; // fake value, not in use. - this.cacheName = cacheName; final boolean collectStats = metricFactory != null; final CacheBuilder builder = CacheBuilder.newBuilder() @@ -107,33 +101,6 @@ public LocalAsyncCache() { this(1000, 20, TimeUnit.SECONDS); } - private ComposableFuture loadElement(final K key) { - return loader.load(cacheName, key).materialize(); - } - - private Function, ComposableFuture> extractLoaderResultEntry(final K key) { - return loaderResults -> { - final V res = loaderResults.get(key); - if (res != null) { - return fromValue(res); - } else { - if (failOnMissingEntries) { - return fromError(new RuntimeException(key + " is missing from" + (cacheName == null ? "" : " " + cacheName) + " loader response.")); - } else { - return fromNull(); - } - } - }; - } - - private Map> loadElements(final Iterable keys) { - final ComposableFuture> loaded = loader.load(cacheName, keys).materialize(); - final Map> result = new HashMap<>(); - for (final K key : keys) { - result.put(key, loaded.flatMap(extractLoaderResultEntry(key))); - } - return result; - } @Override public ComposableFuture getAsync(final K key) { @@ -154,9 +121,7 @@ public ComposableFuture getAsync(final K key) { } return res; } - } catch (final com.google.common.util.concurrent.UncheckedExecutionException e) { - return fromError(e.getCause()); - } catch (final ExecutionException | UncheckedExecutionException | ExecutionError e) { + } catch (final com.google.common.util.concurrent.UncheckedExecutionException | ExecutionException | UncheckedExecutionException | ExecutionError e) { return fromError(e.getCause()); } } @@ -189,9 +154,7 @@ public ComposableFuture> getBulkAsync(final Iterable keys } return all(true, result); - } catch (final com.google.common.util.concurrent.UncheckedExecutionException e) { - return fromError(e.getCause()); - } catch (final ExecutionException | UncheckedExecutionException | ExecutionError e) { + } catch (final com.google.common.util.concurrent.UncheckedExecutionException | ExecutionException | UncheckedExecutionException | ExecutionError e) { return fromError(e.getCause()); } } @@ -281,4 +244,104 @@ public ComposableFuture> setBulkAsync(final Map { + private final int maximumSize; + private final int ttl; + private final TimeUnit unit; + private final MetricFactory metricFactory; + private final String cacheName; + private CacheLoader loader; + private boolean failOnMissingEntries = false; + private long refreshAfterWriteDuration; + private TimeUnit refreshAfterWriteUnit; + + public Builder(final int maximumSize, final int ttl, final TimeUnit unit, final MetricFactory metricFactory, final String cacheName) { + this.maximumSize = maximumSize; + this.ttl = ttl; + this.unit = unit; + this.metricFactory = metricFactory; + this.cacheName = cacheName; + } + + public Builder withLoader(final CacheLoader loader) { + this.loader = loader; + return this; + } + + public Builder failOnMissingEntries() { + this.failOnMissingEntries = true; + return this; + } + + public Builder refreshAfterWrite(long duration, TimeUnit unit) { + this.refreshAfterWriteDuration = duration; + this.refreshAfterWriteUnit = unit; + return this; + } + + public LocalAsyncCache build() { + if (loader == null) { + return new LocalAsyncCache<>(maximumSize, ttl, unit, metricFactory, cacheName); + } + return new LocalAsyncCache<>(maximumSize, ttl, unit, loader, metricFactory, cacheName, failOnMissingEntries, + refreshAfterWriteDuration, refreshAfterWriteUnit); + } + } + + private static class InternalCacheLoader extends com.google.common.cache.CacheLoader> { + + private final CacheLoader loader; + private final String cacheName; + private final boolean failOnMissingEntries; + + InternalCacheLoader(final CacheLoader loader, final String cacheName, final boolean failOnMissingEntries) { + this.loader = loader; + this.cacheName = cacheName; + this.failOnMissingEntries = failOnMissingEntries; + } + + @Override + public ComposableFuture load(@Nonnull final K key) { + return loader.load(cacheName, key).materialize(); + } + + @Override + public Map> loadAll(final Iterable keys) { + final ComposableFuture> loaded = loader.load(cacheName, keys).materialize(); + final Map> result = new HashMap<>(); + for (final K key : keys) { + result.put(key, loaded.flatMap(extractLoaderResultEntry(key))); + } + return result; + } + + @Override + public ListenableFuture> reload(final K key, final ComposableFuture oldValue) { + final SettableFuture> future = SettableFuture.create(); + loader.load(cacheName, key).materialize().consume(t -> { + if (t.isSuccess()) { + future.set(ComposableFutures.fromValue(t.getValue())); + } else { + future.setException(t.getError()); + } + }); + + return future; + } + + private Function, ComposableFuture> extractLoaderResultEntry(final K key) { + return loaderResults -> { + final V res = loaderResults.get(key); + if (res != null) { + return fromValue(res); + } else { + if (failOnMissingEntries) { + return fromError(new RuntimeException(key + " is missing from" + (cacheName == null ? "" : " " + cacheName) + " loader response.")); + } else { + return fromNull(); + } + } + }; + } + } } diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java new file mode 100644 index 00000000..52ee2008 --- /dev/null +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java @@ -0,0 +1,284 @@ +package com.outbrain.ob1k.cache; + +import com.google.common.collect.Iterables; +import com.outbrain.ob1k.concurrent.ComposableFuture; +import com.outbrain.swinfra.metrics.api.Counter; +import com.outbrain.swinfra.metrics.api.MetricFactory; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * A wrapper of LoadingCacheDelegate that adds the ability to refresh cache values in the configured interval on access + */ +public class RefreshLoadingCacheDelegate implements TypedCache { + + private final LoadingCacheDelegate> cache; + private final InternalCacheLoader internalCacheLoader; + private final String cacheName; + private final long refreshAfterWriteDuration; + private final Supplier timeSupplier; + + private final ConcurrentMap refreshingKeys; + + private final Counter refreshes; + private final Counter refreshErrors; + private final Counter refreshTimeouts; + + RefreshLoadingCacheDelegate(final TypedCache> cache, final CacheLoader loader, final String cacheName, final MetricFactory metricFactory, + final long duration, final TimeUnit timeUnit, final boolean failOnError, + final long refreshAfterWriteDuration, final TimeUnit refreshAfterWriteUnit, + final Supplier timeSupplier) { + this.cacheName = cacheName; + this.refreshAfterWriteDuration = refreshAfterWriteUnit.toMillis(refreshAfterWriteDuration); + + refreshingKeys = new ConcurrentHashMap<>(); + + if (timeSupplier == null) { + this.timeSupplier = new SystemTimeSupplier(); + } else { + this.timeSupplier = timeSupplier; + } + + this.internalCacheLoader = new InternalCacheLoader<>(loader, this.timeSupplier); + this.cache = new LoadingCacheDelegate<>(cache, internalCacheLoader, cacheName, metricFactory, duration, timeUnit, failOnError); + + if (metricFactory != null) { + metricFactory.registerGauge("RefreshLoadingCacheDelegate." + cacheName, "refreshMapSize", refreshingKeys::size); + + refreshes = metricFactory.createCounter("RefreshLoadingCacheDelegate." + cacheName, "refreshes"); + refreshErrors = metricFactory.createCounter("RefreshLoadingCacheDelegate." + cacheName, "refreshErrors"); + refreshTimeouts = metricFactory.createCounter("RefreshLoadingCacheDelegate." + cacheName, "refreshTimeouts"); + } else { + refreshes = null; + refreshErrors = null; + refreshTimeouts = null; + } + } + + @Override + public ComposableFuture getAsync(final K key) { + ComposableFuture> futureValue = cache.getAsync(key); + futureValue.consume(value -> { + if (value.isSuccess() && shouldRefresh(value.getValue(), timeSupplier.get())) { + this.refresh(key); + } + }); + return futureValue.map(ValueWithWriteTime::getValue); + } + + private boolean shouldRefresh(final ValueWithWriteTime value, long currentTimeMillis) { + return value != null && currentTimeMillis - value.getWriteTime() >= refreshAfterWriteDuration; + } + + @Override + public ComposableFuture> getBulkAsync(final Iterable keys) { + ComposableFuture>> resultMap = cache.getBulkAsync(keys); + resultMap.consume(result -> { + if (result.isSuccess()) { + final List keysToRefresh = collectKeysToRefresh(result.getValue(), timeSupplier.get()); + this.refresh(keysToRefresh); + } + }); + return extractValues(resultMap); + } + + private List collectKeysToRefresh(final Map> result, final long currentTimeMillis) { + return result.entrySet().stream() + .filter(entry -> shouldRefresh(entry.getValue(), currentTimeMillis)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + private ComposableFuture> extractValues(final ComposableFuture>> valuesMap) { + return valuesMap.map(map -> map.entrySet().stream() + .filter(entry -> entry.getValue() != null && entry.getValue().getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getValue()))); + } + + @Override + public ComposableFuture setAsync(final K key, final V value) { + return cache.setAsync(key, new ValueWithWriteTime<>(value, timeSupplier.get())); + } + + @Override + public ComposableFuture setAsync(final K key, final EntryMapper mapper, final int maxIterations) { + return cache.setAsync(key, new InternalEntityMapper<>(mapper, timeSupplier), maxIterations); + } + + @Override + public ComposableFuture> setBulkAsync(final Map entries) { + Map> newEntries = entries.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ValueWithWriteTime<>(e.getValue(), timeSupplier.get()))); + return cache.setBulkAsync(newEntries); + } + + @Override + public ComposableFuture setIfAbsentAsync(final K key, final V value) { + return cache.setIfAbsentAsync(key, new ValueWithWriteTime<>(value, timeSupplier.get())); + } + + @Override + public ComposableFuture deleteAsync(final K key) { + return cache.deleteAsync(key); + } + + private static class InternalCacheLoader implements CacheLoader> { + + private final CacheLoader loader; + private Supplier timeSupplier; + + private InternalCacheLoader(final CacheLoader loader, final Supplier timeSupplier) { + this.loader = loader; + this.timeSupplier = timeSupplier; + } + + @Override + public ComposableFuture> load(final String cacheName, final K key) { + return loader.load(cacheName, key).map(v -> new ValueWithWriteTime<>(v, timeSupplier.get())); + } + + @Override + public ComposableFuture>> load(final String cacheName, final Iterable keys) { + return loader.load(cacheName, keys).map(entries -> entries.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ValueWithWriteTime<>(e.getValue(), timeSupplier.get())))); + } + } + + private static class InternalEntityMapper implements EntryMapper> { + private final EntryMapper mapper; + private Supplier timeSupplier; + + InternalEntityMapper(final EntryMapper mapper, final Supplier timeSupplier) { + this.mapper = mapper; + this.timeSupplier = timeSupplier; + } + + @Override + public ValueWithWriteTime map(final K key, final ValueWithWriteTime value) { + V extractedValue = value == null ? null : value.getValue(); + return new ValueWithWriteTime<>(mapper.map(key, extractedValue), timeSupplier.get()); + } + } + + /** + * loads a fresh value from loader and sets into cache + * @param key to refresh value from loader + */ + private void refresh(final K key) { + Boolean alreadyRefreshing = refreshingKeys.putIfAbsent(key, true); + if (alreadyRefreshing == null) { + incRefreshCount(1); + internalCacheLoader.load(cacheName, key) + .consume(res -> { + refreshingKeys.remove(key); + if (res.isSuccess()) { + cache.setAsync(key, res.getValue()); + } else { + collectRefreshErrorMetrics(res.getError()); + } + }); + } + } + + /** + * loads fresh values from loader and sets into cache + * @param keys to refresh value from loader + */ + private void refresh(final Collection keys) { + if (keys == null || Iterables.isEmpty(keys)) { + return; + } + + final List keysToRefresh = keys.stream() + .filter(key -> null == refreshingKeys.putIfAbsent(key, true)) + .collect(Collectors.toList()); + + if (!keysToRefresh.isEmpty()) { + incRefreshCount(keys.size()); + internalCacheLoader.load(cacheName, keysToRefresh) + .consume(res -> { + refreshingKeys.keySet().removeAll(keysToRefresh); + if (res.isSuccess()) { + cache.setBulkAsync(res.getValue()); + } else { + collectRefreshErrorMetrics(res.getError()); + } + }); + } + } + + private void incRefreshCount(int amount) { + if (refreshes != null) { + refreshes.inc(amount); + } + } + + private void collectRefreshErrorMetrics(final Throwable error) { + if (refreshErrors != null) { + refreshErrors.inc(); + if (error instanceof TimeoutException) { + refreshTimeouts.inc(); + } + } + } + + public static class Builder { + private static final long DEFAULT_LOAD_TIMEOUT_MS = 500; + + private final TypedCache> cache; + private final CacheLoader loader; + private final String cacheName; + private final MetricFactory metricFactory; + + private boolean failOnError = false; + private long loadTimeout = DEFAULT_LOAD_TIMEOUT_MS; + private TimeUnit loadTimeUnit = TimeUnit.MILLISECONDS; + private long refreshAfterWriteDuration = -1; + private TimeUnit refreshAfterWriteTimeUnit; + + public Builder(final TypedCache> cache, final CacheLoader loader, final String cacheName, final MetricFactory metricFactory) { + this.cache = cache; + this.loader = loader; + this.cacheName = cacheName; + this.metricFactory = metricFactory; + } + + public Builder failOnError() { + this.failOnError = true; + return this; + } + + public Builder withLoadTimeout(final long timeout, final TimeUnit timeUnit) { + this.loadTimeout = timeout; + this.loadTimeUnit = timeUnit; + return this; + } + + public Builder refreshAfterWrite(final long duration, final TimeUnit timeUnit) { + this.refreshAfterWriteDuration = duration; + this.refreshAfterWriteTimeUnit = timeUnit; + return this; + } + + public RefreshLoadingCacheDelegate build() { + if (refreshAfterWriteDuration == -1) { + throw new IllegalArgumentException("missing refreshAfterWrite config"); + } + return new RefreshLoadingCacheDelegate<>(cache, loader, cacheName, metricFactory, loadTimeout, loadTimeUnit, failOnError, refreshAfterWriteDuration, refreshAfterWriteTimeUnit, null); + } + } + + private static class SystemTimeSupplier implements Supplier { + + @Override + public Long get() { + return System.currentTimeMillis(); + } + } +} diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/ValueWithWriteTime.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/ValueWithWriteTime.java new file mode 100644 index 00000000..032250cc --- /dev/null +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/ValueWithWriteTime.java @@ -0,0 +1,40 @@ +package com.outbrain.ob1k.cache; + +/** + * wrapper for cache value that holds the write time of the value in the cache + */ +public class ValueWithWriteTime { + + private V value; + private long writeTime; + + /** + * empty constructor for message pack + */ + public ValueWithWriteTime() { } + + /** + * creates an object with the given value and the current timestamp + * @param value + */ + public ValueWithWriteTime(final V value, final long writeTime) { + this.value = value; + this.writeTime = writeTime; + } + + /** + * + * @return the actual value + */ + public V getValue() { + return value; + } + + /** + * + * @return the write timestamp in milliseconds + */ + public long getWriteTime() { + return writeTime; + } +} diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/CacheLoaderForTesting.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/CacheLoaderForTesting.java new file mode 100644 index 00000000..b0ed8309 --- /dev/null +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/CacheLoaderForTesting.java @@ -0,0 +1,60 @@ +package com.outbrain.ob1k.cache; + +import com.google.common.collect.Iterables; +import com.outbrain.ob1k.concurrent.ComposableFuture; +import com.outbrain.ob1k.concurrent.ComposableFutures; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class CacheLoaderForTesting implements CacheLoader{ + static final String VALUE_FOR = "ValueFor-"; + static final String VALUE_FOR_BULK = "ValueFor-Bulk-"; + static final String MISSING_KEY = "missing-key"; + static final String NULL_KEY = "null-key"; + static final String ERROR_MESSAGE = "missing key"; + static final String TIMEOUT_KEY = "timeOutKey"; + static final String TIMEOUT_MESSAGE = "timeout occurred"; + static final String TEMPORARY_ERROR_MESSAGE = "Load failed temporarily"; + + private final AtomicBoolean generateLoaderErrors = new AtomicBoolean(false); + + void setGenerateLoaderErrors(final boolean val) { + generateLoaderErrors.set(val); + } + + @Override + public ComposableFuture load(final String cacheName, final String key) { + if (generateLoaderErrors.get()) { + return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); + } + if (key.equals(MISSING_KEY)) { + return ComposableFutures.fromError(new RuntimeException(ERROR_MESSAGE)); + } + if (key.equals(NULL_KEY)) { + return ComposableFutures.fromNull(); + } + return ComposableFutures.fromValue(VALUE_FOR + key); + } + + @Override + public ComposableFuture> load(final String cacheName, final Iterable keys) { + if (generateLoaderErrors.get()) { + return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); + } + if (Iterables.contains(keys, TIMEOUT_KEY)) { + return ComposableFutures.fromError(new RuntimeException(TIMEOUT_MESSAGE)); + } + final HashMap res = new HashMap<>(); + for (String key : keys) { + if (key.equals(NULL_KEY)) { + res.put(key, null); + } + else if (!key.equals(MISSING_KEY)) { + res.put(key, VALUE_FOR_BULK + key); + } + } + return ComposableFutures.fromValue(res); + } +} diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java index df495b37..8180817d 100644 --- a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java @@ -1,76 +1,41 @@ package com.outbrain.ob1k.cache; -import com.google.common.collect.Iterables; import com.outbrain.ob1k.concurrent.ComposableFuture; import com.outbrain.ob1k.concurrent.ComposableFutures; import org.junit.Test; + import java.util.Arrays; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import static org.junit.Assert.*; + +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.ERROR_MESSAGE; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.MISSING_KEY; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.NULL_KEY; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.TEMPORARY_ERROR_MESSAGE; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.TIMEOUT_KEY; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.TIMEOUT_MESSAGE; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.VALUE_FOR; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.VALUE_FOR_BULK; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Created by aronen on 4/22/14. *

* test local cache behavior */ +@SuppressWarnings("unchecked") public class LocalAsyncCacheTest { - private static final String VALUE_FOR = "ValueFor-"; - private static final String VALUE_FOR_BULK = "ValueFor-Bulk-"; - private static final String MISSING_KEY = "missing-key"; - private static final String NULL_KEY = "null-key"; - public static final String ERROR_MESSAGE = "missing key"; - public static final String TIMEOUT_KEY = "timeOutKey"; - public static final String TIMEOUT_MESSAGE = "timeout occurred"; - public static final String TEMPORARY_ERROR_MESSAGE = "Load failed temporarily"; - - private static class ExceptionalCacheLoader implements CacheLoader { - private final AtomicBoolean generateLoaderErrors = new AtomicBoolean(false); - - public void setGenerateLoaderErrors(final boolean val) { - generateLoaderErrors.set(val); - } - - @Override - public ComposableFuture load(final String cacheName, final String key) { - if (generateLoaderErrors.get()) { - return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); - } - if (key.equals(MISSING_KEY)) { - return ComposableFutures.fromError(new RuntimeException(ERROR_MESSAGE)); - } - if (key.equals(NULL_KEY)) { - return ComposableFutures.fromNull(); - } - return ComposableFutures.fromValue(VALUE_FOR+key); - } - - @Override - public ComposableFuture> load(final String cacheName, final Iterable keys) { - if (generateLoaderErrors.get()) { - return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); - } - if (Iterables.contains(keys, TIMEOUT_KEY)) { - return ComposableFutures.fromError(new RuntimeException(TIMEOUT_MESSAGE)); - } - final HashMap res = new HashMap<>(); - for (String key:keys) { - if (key.equals(NULL_KEY)) { - res.put(key, null); - } if (!key.equals(MISSING_KEY)) { - res.put(key, VALUE_FOR_BULK + key); - } - } - return ComposableFutures.fromValue(res); - } - } - private static LocalAsyncCache createCache() { - return new LocalAsyncCache<>(3, 100, TimeUnit.MILLISECONDS, new ExceptionalCacheLoader()); + return new LocalAsyncCache<>(3, 100, TimeUnit.MILLISECONDS, new CacheLoaderForTesting()); } @Test @@ -82,7 +47,25 @@ public void testLocalCache() { final Map values = cache.getBulkAsync(Arrays.asList("key3", "key4")).get(); assertEquals(values.get("key3"), VALUE_FOR_BULK + "key3"); assertEquals(values.get("key4"), VALUE_FOR_BULK + "key4"); - assertEquals(values.get("key1"), null); + assertNull(values.get("key1")); + + } catch (final Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testLocalCacheWithoutLoader() { + final LocalAsyncCache cache = new LocalAsyncCache<>(3, 100, TimeUnit.MILLISECONDS); + try { + cache.setAsync("key1", "value1"); + final String value1 = cache.getAsync("key1").get(); + assertEquals(value1, "value1"); + final String value2 = cache.getAsync("key2").get(); + assertNull(value2); + final Map values = cache.getBulkAsync(Arrays.asList("key1", "key2")).get(); + assertEquals(values.get("key1"), "value1"); + assertNull(values.get("key2")); } catch (final Exception e) { fail(e.getMessage()); @@ -93,7 +76,7 @@ public void testLocalCache() { public void testMissingKey() { final LocalAsyncCache cache = createCache(); final ComposableFuture missingValue = cache.getAsync(MISSING_KEY); - assertComposableFutureError(ERROR_MESSAGE,RuntimeException.class,missingValue); + assertComposableFutureError(ERROR_MESSAGE, RuntimeException.class, missingValue); } @Test @@ -140,19 +123,19 @@ public ComposableFuture> load(final String cacheName, final } public static class Box { - public final int number; + public final int number; public Box(final int number) { this.number = number; } - } + } public static class BoxUpdater extends Thread { + private final TypedCache cache; private final String key; private final int iterations; private volatile int successCounter; - public BoxUpdater(final TypedCache cache, final String key, final int iterations) { this.cache = cache; this.key = key; @@ -189,10 +172,10 @@ public Box map(final String key, final Box value) { public int getSuccessCounter() { return successCounter; } - } + } @Test - public void testCasWithMapper() throws Exception { + public void testCacheWithMapper() throws Exception { final TypedCache cache = new LocalAsyncCache<>(300, 100, TimeUnit.SECONDS); final String cacheKey = "box"; cache.setAsync(cacheKey, new Box(0)); @@ -225,7 +208,7 @@ public void testCasWithMapper() throws Exception { @Test public void testCacheExceptionRemoval() throws Exception { - final ExceptionalCacheLoader loader = new ExceptionalCacheLoader() ; + final CacheLoaderForTesting loader = new CacheLoaderForTesting() ; loader.setGenerateLoaderErrors(true); final LocalAsyncCache myCache = new LocalAsyncCache(10, 10, TimeUnit.MINUTES, loader, null, "MyCache"); assertComposableFutureError(TEMPORARY_ERROR_MESSAGE,RuntimeException.class,myCache.getAsync("a")); @@ -241,24 +224,128 @@ public void testCacheExceptionRemoval() throws Exception { @Test public void testCacheExceptionRemovalBulk() throws Exception { - final ExceptionalCacheLoader loader = new ExceptionalCacheLoader() ; + final CacheLoaderForTesting loader = new CacheLoaderForTesting() ; final LocalAsyncCache myCache = new LocalAsyncCache(10, 10, TimeUnit.MINUTES, loader, null, "MyCache"); loader.setGenerateLoaderErrors(true); final Iterable keys = Arrays.asList("a","b","null"); - assertComposableFutureError(TEMPORARY_ERROR_MESSAGE,RuntimeException.class,myCache.getBulkAsync(keys)); + assertComposableFutureError(TEMPORARY_ERROR_MESSAGE, RuntimeException.class, myCache.getBulkAsync(keys)); loader.setGenerateLoaderErrors(false); assertEquals(VALUE_FOR+"a", myCache.getAsync("a").get()); - final Map bulk = (Map)myCache.getBulkAsync(keys).get(); - assertEquals(VALUE_FOR+"a",bulk.get("a")); - assertEquals(VALUE_FOR_BULK+"b",bulk.get("b")); + final Map bulk = (Map) myCache.getBulkAsync(keys).get(); + assertEquals(VALUE_FOR+"a", bulk.get("a")); + assertEquals(VALUE_FOR_BULK+"b", bulk.get("b")); assertNull(bulk.get(NULL_KEY)); loader.setGenerateLoaderErrors(true); assertEquals(VALUE_FOR+"a", myCache.getAsync("a").get()); // already populated assertEquals(VALUE_FOR_BULK+"b", myCache.getAsync("b").get()); // already populated - assertComposableFutureError(TEMPORARY_ERROR_MESSAGE,RuntimeException.class,myCache.getAsync(NULL_KEY)); // nulls are not cached on bulk + assertComposableFutureError(TEMPORARY_ERROR_MESSAGE, RuntimeException.class, myCache.getAsync(NULL_KEY)); // nulls are not cached on bulk + } + + @Test + public void testRefreshAfterWrite() throws ExecutionException, InterruptedException { + final CacheLoader loader = mock(CacheLoader.class); + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> ComposableFutures.submit(() -> { + Thread.sleep(100); + String key = (String) invocation.getArguments()[1]; + return VALUE_FOR + key; + })); + + final LocalAsyncCache myCache = + new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") + .refreshAfterWrite(500, TimeUnit.MILLISECONDS) + .withLoader(loader) + .build(); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(500); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(200); + //get value after refresh + assertEquals(VALUE_FOR + "key1", myCache.getAsync("key1").get()); + verify(loader, times(1)).load("myCache", "key1"); + } + + @Test + public void testGetAsyncAndRefreshNull() throws ExecutionException, InterruptedException { + final CacheLoader loader = mock(CacheLoader.class); + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> ComposableFutures.submit(() -> { + Thread.sleep(100); + return null; + })); + final LocalAsyncCache myCache = + new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") + .refreshAfterWrite(500, TimeUnit.MILLISECONDS) + .withLoader(loader) + .build(); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(500); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(200); + //get value after refresh + assertNull(myCache.getAsync("key1").get()); + verify(loader, times(1)).load("myCache", "key1"); + } + + @Test + public void testGetAsyncAndRefreshException() throws ExecutionException, InterruptedException { + final CacheLoader loader = mock(CacheLoader.class); + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> ComposableFutures.submit(() -> { + Thread.sleep(100); + throw new RuntimeException(); + })); + final LocalAsyncCache myCache = + new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") + .refreshAfterWrite(1000, TimeUnit.MILLISECONDS) + .withLoader(loader) + .build(); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(1000); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(200); + //get value after refresh + assertEquals("value1", myCache.getAsync("key1").get()); + verify(loader, times(2)).load("myCache", "key1"); + } + + @Test + public void getBulkAsyncAndRefresh() throws ExecutionException, InterruptedException { + CacheLoader loader = mock(CacheLoader.class); + final LocalAsyncCache myCache = + new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") + .refreshAfterWrite(500, TimeUnit.MILLISECONDS) + .withLoader(loader) + .build(); + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> ComposableFutures.submit(() -> { + Thread.sleep(100); + String key = (String) invocation.getArguments()[1]; + return VALUE_FOR + key; + })); + myCache.setAsync("key1", "value1").get(); + // get first value before refresh + assertEquals("value1", myCache.getBulkAsync(Arrays.asList("key1")).get().get("key1")); + Thread.sleep(500); + // get first value and trigger refresh, second value should not trigger refresh + myCache.setAsync("key2", "value2").get(); + Map res = myCache.getBulkAsync(Arrays.asList("key1", "key2")).get(); + assertEquals("value1", res.get("key1")); + assertEquals("value2", res.get("key2")); + Thread.sleep(200); + //get value after refresh + Map res2 = myCache.getBulkAsync(Arrays.asList("key1", "key2")).get(); + assertEquals(VALUE_FOR + "key1", res2.get("key1")); + assertEquals("value2", res2.get("key2")); + verify(loader, times(1)).load("myCache", "key1"); } private static void assertComposableFutureError(String expectedMsg, Class exceptionType, ComposableFuture future) { diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java new file mode 100644 index 00000000..9925f97d --- /dev/null +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java @@ -0,0 +1,242 @@ +package com.outbrain.ob1k.cache; + +import com.outbrain.ob1k.concurrent.ComposableFuture; +import com.outbrain.ob1k.concurrent.ComposableFutures; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.MISSING_KEY; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.NULL_KEY; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.VALUE_FOR; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.VALUE_FOR_BULK; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RefreshLoadingCacheDelegateTest { + + private static final long REFRESH_AFTER_WRITE_DURATION = 500L; + private static final long LOAD_DURATION = 100L; + private static final long LOAD_GRACE_DURATION = 200L; + private TypedCache> cacheMock = new TypedCacheMock(); + + private CacheLoader cacheLoaderStub = new CacheLoaderForTesting(); + + private RefreshLoadingCacheDelegate refreshingCache; + + @Before + public void setup() { + refreshingCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderStub, "myCache", null, 10, TimeUnit.SECONDS, false, 50, TimeUnit.MILLISECONDS, null); + } + + @Test + public void testGetAsyncFromLoader() throws ExecutionException, InterruptedException { + assertEquals(VALUE_FOR + "key1", refreshingCache.getAsync("key1").get()); + } + + @Test + public void testGetAsyncFromLoaderNull() throws ExecutionException, InterruptedException { + assertNull(refreshingCache.getAsync(NULL_KEY).get()); + } + + @Test + public void testGetAsyncFromLoaderMissing() throws InterruptedException { + try { + assertNull(refreshingCache.getAsync(MISSING_KEY).get()); + fail(); + } catch (ExecutionException e) { + // nothing + } + } + + @Test + @SuppressWarnings("unchecked") + public void testGetAsyncAndRefresh() throws ExecutionException, InterruptedException { + CacheLoader cacheLoaderMock = mock(CacheLoader.class); + TimeSupplierMock timeSupplierMock = new TimeSupplierMock(); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, REFRESH_AFTER_WRITE_DURATION, TimeUnit.MILLISECONDS, timeSupplierMock); + when(cacheLoaderMock.load(anyString(), anyString())).thenAnswer(invocation -> { + timeSupplierMock.inc(LOAD_DURATION); + String key = (String) invocation.getArguments()[1]; + return ComposableFutures.fromValue(VALUE_FOR + key); + }); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + timeSupplierMock.inc(REFRESH_AFTER_WRITE_DURATION); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + timeSupplierMock.inc(LOAD_GRACE_DURATION); + //get value after refresh + assertEquals(VALUE_FOR + "key1", myCache.getAsync("key1").get()); + verify(cacheLoaderMock, times(1)).load("myCache", "key1"); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetAsyncAndRefreshNull() throws ExecutionException, InterruptedException { + CacheLoader cacheLoaderMock = mock(CacheLoader.class); + TimeSupplierMock timeSupplierMock = new TimeSupplierMock(); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, REFRESH_AFTER_WRITE_DURATION, TimeUnit.MILLISECONDS, timeSupplierMock); + when(cacheLoaderMock.load(anyString(), anyString())).thenAnswer(invocation -> { + timeSupplierMock.inc(LOAD_DURATION); + return ComposableFutures.fromNull(); + }); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + timeSupplierMock.inc(REFRESH_AFTER_WRITE_DURATION); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + timeSupplierMock.inc(LOAD_GRACE_DURATION); + //get value after refresh + assertNull(myCache.getAsync("key1").get()); + verify(cacheLoaderMock, times(1)).load("myCache", "key1"); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetAsyncAndRefreshException() throws ExecutionException, InterruptedException { + CacheLoader cacheLoaderMock = mock(CacheLoader.class); + TimeSupplierMock timeSupplierMock = new TimeSupplierMock(); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, REFRESH_AFTER_WRITE_DURATION, TimeUnit.MILLISECONDS, timeSupplierMock); + when(cacheLoaderMock.load("myCache", "key1")).thenAnswer(invocation -> { + timeSupplierMock.inc(LOAD_DURATION); + return ComposableFutures.fromError(new RuntimeException()); + }); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + timeSupplierMock.inc(REFRESH_AFTER_WRITE_DURATION); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + timeSupplierMock.inc(LOAD_GRACE_DURATION); + //get value after refresh + assertEquals("value1", myCache.getAsync("key1").get()); + verify(cacheLoaderMock, times(2)).load("myCache", "key1"); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetBulkAsyncFromLoader() throws ExecutionException, InterruptedException { + final Map values = refreshingCache.getBulkAsync(Arrays.asList("key1", NULL_KEY, MISSING_KEY)).get(); + assertEquals(VALUE_FOR_BULK + "key1", values.get("key1")); + assertNull(values.get(NULL_KEY)); + assertNull(values.get(MISSING_KEY)); + } + + @Test + @SuppressWarnings("unchecked") + public void getBulkAsyncAndRefresh() throws ExecutionException, InterruptedException { + CacheLoader cacheLoaderMock = mock(CacheLoader.class); + TimeSupplierMock timeSupplierMock = new TimeSupplierMock(); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, REFRESH_AFTER_WRITE_DURATION, TimeUnit.MILLISECONDS, timeSupplierMock); + when(cacheLoaderMock.load(anyString(), any(Iterable.class))).thenAnswer(invocation -> { + timeSupplierMock.inc(LOAD_DURATION); + Iterable keys = (Iterable) invocation.getArguments()[1]; + Map res = new HashMap<>(); + for (String key : keys) { + res.put(key, VALUE_FOR_BULK + key); + } + return ComposableFutures.fromValue(res); + }); + myCache.setAsync("key1", "value1").get(); + // get first value before refresh + assertEquals("value1", myCache.getBulkAsync(Collections.singletonList("key1")).get().get("key1")); + timeSupplierMock.inc(REFRESH_AFTER_WRITE_DURATION); + // get first value and trigger refresh, second value should not trigger refresh + myCache.setAsync("key2", "value2").get(); + Map res = myCache.getBulkAsync(Arrays.asList("key1", "key2")).get(); + assertEquals("value1", res.get("key1")); + assertEquals("value2", res.get("key2")); + timeSupplierMock.inc(LOAD_GRACE_DURATION); + //get value after refresh + Map res2 = myCache.getBulkAsync(Arrays.asList("key1", "key2")).get(); + assertEquals(VALUE_FOR_BULK + "key1", res2.get("key1")); + assertEquals("value2", res2.get("key2")); + verify(cacheLoaderMock, times(1)).load(eq("myCache"), eq(Collections.singletonList("key1"))); + } + + + private static class TypedCacheMock implements TypedCache> { + + private Map> map = new HashMap<>(); + + @Override + public ComposableFuture> getAsync(final String key) { + return ComposableFutures.fromValue(map.get(key)); + } + + @Override + public ComposableFuture>> getBulkAsync(final Iterable keys) { + Map> res = new HashMap<>(); + for (String key : keys) { + ValueWithWriteTime value = map.get(key); + if (value != null) { + res.put(key, value); + } + } + return ComposableFutures.fromValue(res); + } + + @Override + public ComposableFuture setAsync(final String key, final ValueWithWriteTime value) { + map.put(key, value); + return ComposableFutures.fromValue(true); + } + + @Override + public ComposableFuture setAsync(final String key, final EntryMapper> mapper, final int maxIterations) { + throw new UnsupportedOperationException(); + } + + @Override + public ComposableFuture> setBulkAsync(final Map> entries) { + Map res = new HashMap<>(); + entries.forEach((key, value) -> { + map.put(key, value); + res.put(key, true); + }); + return ComposableFutures.fromValue(res); + } + + @Override + public ComposableFuture setIfAbsentAsync(final String key, final ValueWithWriteTime value) { + return ComposableFutures.fromValue(map.putIfAbsent(key, value) == null); + } + + @Override + public ComposableFuture deleteAsync(final String key) { + return ComposableFutures.fromValue(map.remove(key) != null); + } + } + + private static class TimeSupplierMock implements Supplier { + + private Long currentTime = 0L; + + @Override + public Long get() { + return currentTime; + } + + void inc(final Long time) { + this.currentTime += time; + } + } +}