From 7617093df280050b8766e8a030e2a614f771d115 Mon Sep 17 00:00:00 2001 From: gglevy Date: Thu, 23 Aug 2018 12:35:05 +0300 Subject: [PATCH 1/6] LocalAsyncCache - Support refresh of a cache value --- .../outbrain/ob1k/cache/LocalAsyncCache.java | 210 +++++++++++----- .../ob1k/cache/CacheLoaderForTesting.java | 60 +++++ .../ob1k/cache/LocalAsyncCacheTest.java | 229 ++++++++++++------ 3 files changed, 374 insertions(+), 125 deletions(-) create mode 100644 ob1k-cache/src/test/java/com/outbrain/ob1k/cache/CacheLoaderForTesting.java diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java index fe6afe38..9e8ee2c4 100644 --- a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java @@ -4,16 +4,21 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ExecutionError; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; import com.outbrain.ob1k.concurrent.ComposableFuture; +import com.outbrain.ob1k.concurrent.ComposableFutures; import com.outbrain.ob1k.concurrent.UncheckedExecutionException; import com.outbrain.swinfra.metrics.api.MetricFactory; +import javax.annotation.Nonnull; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -29,11 +34,14 @@ * Time: 6:08 PM */ public class LocalAsyncCache implements TypedCache { + private static final int DEFAULT_LOAD_TIMEOUT = 1000; + private static final TimeUnit DEFAULT_LOAD_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + private static final int DEFAULT_REFRESH_THREAD_COUNT = 10; + private final LoadingCache> loadingCache; private final Cache> localCache; - private final CacheLoader loader; - private final String cacheName; - private final boolean failOnMissingEntries; + + private final ExecutorService executor; public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, final MetricFactory metricFactory, final String cacheName) { @@ -42,29 +50,35 @@ public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, final MetricFactory metricFactory, final String cacheName, final boolean failOnMissingEntries) { - this.loader = loader; - this.cacheName = cacheName; - this.failOnMissingEntries = failOnMissingEntries; + this(maximumSize, ttl, unit, loader, metricFactory, cacheName, failOnMissingEntries, DEFAULT_LOAD_TIMEOUT, DEFAULT_LOAD_TIMEOUT_UNIT, -1, null, DEFAULT_REFRESH_THREAD_COUNT); + } + public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, + final MetricFactory metricFactory, final String cacheName, final boolean failOnMissingEntries, + final long loadTimeout, final TimeUnit loadTimeoutUnit, + final long refreshAfterWriteDuration, final TimeUnit refreshAfterWriteUnit, final int refreshThreadCount) { final boolean collectStats = metricFactory != null; final CacheBuilder builder = CacheBuilder.newBuilder() - .maximumSize(maximumSize) - .expireAfterWrite(ttl, unit); + .maximumSize(maximumSize) + .expireAfterWrite(ttl, unit); + + if (refreshAfterWriteDuration != -1) { + builder.refreshAfterWrite(refreshAfterWriteDuration, refreshAfterWriteUnit); + + if (refreshThreadCount < 1) { + this.executor = Executors.newFixedThreadPool(DEFAULT_REFRESH_THREAD_COUNT); + } else { + this.executor = Executors.newFixedThreadPool(refreshThreadCount); + } + + } else { + this.executor = null; + } if (collectStats) builder.recordStats(); - this.loadingCache = builder - .build(new com.google.common.cache.CacheLoader>() { - public ComposableFuture load(final K key) throws Exception { - return loadElement(key); - } - - @Override - public Map> loadAll(final Iterable keys) throws Exception { - return loadElements(Lists.newArrayList(keys)); - } - }); + this.loadingCache = builder.build(new InternalCacheLoader(loader, cacheName, failOnMissingEntries, loadTimeout, loadTimeoutUnit)); if (collectStats) { GuavaCacheGaugesFactory.createGauges(metricFactory, loadingCache, "LocalAsyncCache-" + cacheName); @@ -78,10 +92,8 @@ public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit } public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final MetricFactory metricFactory, final String cacheName) { - this.loader = null; this.loadingCache = null; - this.failOnMissingEntries = true; // fake value, not in use. - this.cacheName = cacheName; + this.executor = null; final boolean collectStats = metricFactory != null; final CacheBuilder builder = CacheBuilder.newBuilder() @@ -107,33 +119,6 @@ public LocalAsyncCache() { this(1000, 20, TimeUnit.SECONDS); } - private ComposableFuture loadElement(final K key) { - return loader.load(cacheName, key).materialize(); - } - - private Function, ComposableFuture> extractLoaderResultEntry(final K key) { - return loaderResults -> { - final V res = loaderResults.get(key); - if (res != null) { - return fromValue(res); - } else { - if (failOnMissingEntries) { - return fromError(new RuntimeException(key + " is missing from" + (cacheName == null ? "" : " " + cacheName) + " loader response.")); - } else { - return fromNull(); - } - } - }; - } - - private Map> loadElements(final Iterable keys) { - final ComposableFuture> loaded = loader.load(cacheName, keys).materialize(); - final Map> result = new HashMap<>(); - for (final K key : keys) { - result.put(key, loaded.flatMap(extractLoaderResultEntry(key))); - } - return result; - } @Override public ComposableFuture getAsync(final K key) { @@ -154,9 +139,7 @@ public ComposableFuture getAsync(final K key) { } return res; } - } catch (final com.google.common.util.concurrent.UncheckedExecutionException e) { - return fromError(e.getCause()); - } catch (final ExecutionException | UncheckedExecutionException | ExecutionError e) { + } catch (final com.google.common.util.concurrent.UncheckedExecutionException | ExecutionException | UncheckedExecutionException | ExecutionError e) { return fromError(e.getCause()); } } @@ -189,9 +172,7 @@ public ComposableFuture> getBulkAsync(final Iterable keys } return all(true, result); - } catch (final com.google.common.util.concurrent.UncheckedExecutionException e) { - return fromError(e.getCause()); - } catch (final ExecutionException | UncheckedExecutionException | ExecutionError e) { + } catch (final com.google.common.util.concurrent.UncheckedExecutionException | ExecutionException | UncheckedExecutionException | ExecutionError e) { return fromError(e.getCause()); } } @@ -281,4 +262,121 @@ public ComposableFuture> setBulkAsync(final Map { + private final int maximumSize; + private final int ttl; + private final TimeUnit unit; + private final MetricFactory metricFactory; + private final String cacheName; + private CacheLoader loader; + private boolean failOnMissingEntries = false; + private long refreshAfterWriteDuration; + private TimeUnit refreshAfterWriteUnit; + private long loadTimeout = DEFAULT_LOAD_TIMEOUT; + private TimeUnit loadTimeoutUnit = DEFAULT_LOAD_TIMEOUT_UNIT; + private int refreshThreadCount = DEFAULT_REFRESH_THREAD_COUNT; + + public Builder(final int maximumSize, final int ttl, final TimeUnit unit, final MetricFactory metricFactory, final String cacheName) { + this.maximumSize = maximumSize; + this.ttl = ttl; + this.unit = unit; + this.metricFactory = metricFactory; + this.cacheName = cacheName; + } + + public Builder withLoader(final CacheLoader loader) { + this.loader = loader; + return this; + } + + public Builder withLoadTimeout(final long loadTimeout, final TimeUnit loadTimeoutUnit) { + this.loadTimeout = loadTimeout; + this.loadTimeoutUnit = loadTimeoutUnit; + return this; + } + + public Builder failOnMissingEntries() { + this.failOnMissingEntries = true; + return this; + } + + public Builder refreshAfterWrite(long duration, TimeUnit unit) { + this.refreshAfterWriteDuration = duration; + this.refreshAfterWriteUnit = unit; + return this; + } + + public Builder withRefreshThreadCount(int refreshThreadCount) { + this.refreshThreadCount = refreshThreadCount; + return this; + } + + public LocalAsyncCache build() { + if (loader == null) { + return new LocalAsyncCache<>(maximumSize, ttl, unit, metricFactory, cacheName); + } + return new LocalAsyncCache<>(maximumSize, ttl, unit, loader, metricFactory, cacheName, failOnMissingEntries, + loadTimeout, loadTimeoutUnit, refreshAfterWriteDuration, refreshAfterWriteUnit, refreshThreadCount); + } + } + + private class InternalCacheLoader extends com.google.common.cache.CacheLoader> { + + private final CacheLoader loader; + private final String cacheName; + private final boolean failOnMissingEntries; + private final long loadTimeout; + private final TimeUnit loadTimeoutUnit; + + InternalCacheLoader(final CacheLoader loader, final String cacheName, final boolean failOnMissingEntries, + final long loadTimeout, final TimeUnit loadTimeoutUnit) { + this.loader = loader; + this.cacheName = cacheName; + this.failOnMissingEntries = failOnMissingEntries; + this.loadTimeout = loadTimeout; + this.loadTimeoutUnit = loadTimeoutUnit; + } + + @Override + public ComposableFuture load(@Nonnull final K key) { + return loader.load(cacheName, key).withTimeout(loadTimeout, loadTimeoutUnit).materialize(); + } + + @Override + public Map> loadAll(final Iterable keys) { + final ComposableFuture> loaded = loader.load(cacheName, keys).withTimeout(loadTimeout, loadTimeoutUnit).materialize(); + final Map> result = new HashMap<>(); + for (final K key : keys) { + result.put(key, loaded.flatMap(extractLoaderResultEntry(key))); + } + return result; + } + + @Override + public ListenableFuture> reload(final K key, final ComposableFuture oldValue) { + ListenableFutureTask> task = ListenableFutureTask.create(() -> { + ComposableFuture loadFuture = loader.load(cacheName, key).recoverWith(e -> oldValue); + V value = loadFuture.get(loadTimeout, loadTimeoutUnit); + return ComposableFutures.fromValue(value); + }); + + executor.execute(task); + return task; + } + + private Function, ComposableFuture> extractLoaderResultEntry(final K key) { + return loaderResults -> { + final V res = loaderResults.get(key); + if (res != null) { + return fromValue(res); + } else { + if (failOnMissingEntries) { + return fromError(new RuntimeException(key + " is missing from" + (cacheName == null ? "" : " " + cacheName) + " loader response.")); + } else { + return fromNull(); + } + } + }; + } + } } diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/CacheLoaderForTesting.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/CacheLoaderForTesting.java new file mode 100644 index 00000000..b0ed8309 --- /dev/null +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/CacheLoaderForTesting.java @@ -0,0 +1,60 @@ +package com.outbrain.ob1k.cache; + +import com.google.common.collect.Iterables; +import com.outbrain.ob1k.concurrent.ComposableFuture; +import com.outbrain.ob1k.concurrent.ComposableFutures; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class CacheLoaderForTesting implements CacheLoader{ + static final String VALUE_FOR = "ValueFor-"; + static final String VALUE_FOR_BULK = "ValueFor-Bulk-"; + static final String MISSING_KEY = "missing-key"; + static final String NULL_KEY = "null-key"; + static final String ERROR_MESSAGE = "missing key"; + static final String TIMEOUT_KEY = "timeOutKey"; + static final String TIMEOUT_MESSAGE = "timeout occurred"; + static final String TEMPORARY_ERROR_MESSAGE = "Load failed temporarily"; + + private final AtomicBoolean generateLoaderErrors = new AtomicBoolean(false); + + void setGenerateLoaderErrors(final boolean val) { + generateLoaderErrors.set(val); + } + + @Override + public ComposableFuture load(final String cacheName, final String key) { + if (generateLoaderErrors.get()) { + return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); + } + if (key.equals(MISSING_KEY)) { + return ComposableFutures.fromError(new RuntimeException(ERROR_MESSAGE)); + } + if (key.equals(NULL_KEY)) { + return ComposableFutures.fromNull(); + } + return ComposableFutures.fromValue(VALUE_FOR + key); + } + + @Override + public ComposableFuture> load(final String cacheName, final Iterable keys) { + if (generateLoaderErrors.get()) { + return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); + } + if (Iterables.contains(keys, TIMEOUT_KEY)) { + return ComposableFutures.fromError(new RuntimeException(TIMEOUT_MESSAGE)); + } + final HashMap res = new HashMap<>(); + for (String key : keys) { + if (key.equals(NULL_KEY)) { + res.put(key, null); + } + else if (!key.equals(MISSING_KEY)) { + res.put(key, VALUE_FOR_BULK + key); + } + } + return ComposableFutures.fromValue(res); + } +} diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java index df495b37..adf57320 100644 --- a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java @@ -1,76 +1,41 @@ package com.outbrain.ob1k.cache; -import com.google.common.collect.Iterables; import com.outbrain.ob1k.concurrent.ComposableFuture; import com.outbrain.ob1k.concurrent.ComposableFutures; import org.junit.Test; + import java.util.Arrays; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import static org.junit.Assert.*; + +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.ERROR_MESSAGE; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.MISSING_KEY; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.NULL_KEY; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.TEMPORARY_ERROR_MESSAGE; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.TIMEOUT_KEY; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.TIMEOUT_MESSAGE; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.VALUE_FOR; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.VALUE_FOR_BULK; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Created by aronen on 4/22/14. *

* test local cache behavior */ +@SuppressWarnings("unchecked") public class LocalAsyncCacheTest { - private static final String VALUE_FOR = "ValueFor-"; - private static final String VALUE_FOR_BULK = "ValueFor-Bulk-"; - private static final String MISSING_KEY = "missing-key"; - private static final String NULL_KEY = "null-key"; - public static final String ERROR_MESSAGE = "missing key"; - public static final String TIMEOUT_KEY = "timeOutKey"; - public static final String TIMEOUT_MESSAGE = "timeout occurred"; - public static final String TEMPORARY_ERROR_MESSAGE = "Load failed temporarily"; - - private static class ExceptionalCacheLoader implements CacheLoader { - private final AtomicBoolean generateLoaderErrors = new AtomicBoolean(false); - - public void setGenerateLoaderErrors(final boolean val) { - generateLoaderErrors.set(val); - } - - @Override - public ComposableFuture load(final String cacheName, final String key) { - if (generateLoaderErrors.get()) { - return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); - } - if (key.equals(MISSING_KEY)) { - return ComposableFutures.fromError(new RuntimeException(ERROR_MESSAGE)); - } - if (key.equals(NULL_KEY)) { - return ComposableFutures.fromNull(); - } - return ComposableFutures.fromValue(VALUE_FOR+key); - } - - @Override - public ComposableFuture> load(final String cacheName, final Iterable keys) { - if (generateLoaderErrors.get()) { - return ComposableFutures.fromError(new RuntimeException(TEMPORARY_ERROR_MESSAGE)); - } - if (Iterables.contains(keys, TIMEOUT_KEY)) { - return ComposableFutures.fromError(new RuntimeException(TIMEOUT_MESSAGE)); - } - final HashMap res = new HashMap<>(); - for (String key:keys) { - if (key.equals(NULL_KEY)) { - res.put(key, null); - } if (!key.equals(MISSING_KEY)) { - res.put(key, VALUE_FOR_BULK + key); - } - } - return ComposableFutures.fromValue(res); - } - } - private static LocalAsyncCache createCache() { - return new LocalAsyncCache<>(3, 100, TimeUnit.MILLISECONDS, new ExceptionalCacheLoader()); + return new LocalAsyncCache<>(3, 100, TimeUnit.MILLISECONDS, new CacheLoaderForTesting()); } @Test @@ -82,7 +47,25 @@ public void testLocalCache() { final Map values = cache.getBulkAsync(Arrays.asList("key3", "key4")).get(); assertEquals(values.get("key3"), VALUE_FOR_BULK + "key3"); assertEquals(values.get("key4"), VALUE_FOR_BULK + "key4"); - assertEquals(values.get("key1"), null); + assertNull(values.get("key1")); + + } catch (final Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testLocalCacheWithoutLoader() { + final LocalAsyncCache cache = new LocalAsyncCache<>(3, 100, TimeUnit.MILLISECONDS); + try { + cache.setAsync("key1", "value1"); + final String value1 = cache.getAsync("key1").get(); + assertEquals(value1, "value1"); + final String value2 = cache.getAsync("key2").get(); + assertNull(value2); + final Map values = cache.getBulkAsync(Arrays.asList("key1", "key2")).get(); + assertEquals(values.get("key1"), "value1"); + assertNull(values.get("key2")); } catch (final Exception e) { fail(e.getMessage()); @@ -93,7 +76,7 @@ public void testLocalCache() { public void testMissingKey() { final LocalAsyncCache cache = createCache(); final ComposableFuture missingValue = cache.getAsync(MISSING_KEY); - assertComposableFutureError(ERROR_MESSAGE,RuntimeException.class,missingValue); + assertComposableFutureError(ERROR_MESSAGE, RuntimeException.class, missingValue); } @Test @@ -140,19 +123,19 @@ public ComposableFuture> load(final String cacheName, final } public static class Box { - public final int number; + public final int number; public Box(final int number) { this.number = number; } - } + } public static class BoxUpdater extends Thread { + private final TypedCache cache; private final String key; private final int iterations; private volatile int successCounter; - public BoxUpdater(final TypedCache cache, final String key, final int iterations) { this.cache = cache; this.key = key; @@ -189,10 +172,10 @@ public Box map(final String key, final Box value) { public int getSuccessCounter() { return successCounter; } - } + } @Test - public void testCasWithMapper() throws Exception { + public void testCacheWithMapper() throws Exception { final TypedCache cache = new LocalAsyncCache<>(300, 100, TimeUnit.SECONDS); final String cacheKey = "box"; cache.setAsync(cacheKey, new Box(0)); @@ -225,7 +208,7 @@ public void testCasWithMapper() throws Exception { @Test public void testCacheExceptionRemoval() throws Exception { - final ExceptionalCacheLoader loader = new ExceptionalCacheLoader() ; + final CacheLoaderForTesting loader = new CacheLoaderForTesting() ; loader.setGenerateLoaderErrors(true); final LocalAsyncCache myCache = new LocalAsyncCache(10, 10, TimeUnit.MINUTES, loader, null, "MyCache"); assertComposableFutureError(TEMPORARY_ERROR_MESSAGE,RuntimeException.class,myCache.getAsync("a")); @@ -241,24 +224,132 @@ public void testCacheExceptionRemoval() throws Exception { @Test public void testCacheExceptionRemovalBulk() throws Exception { - final ExceptionalCacheLoader loader = new ExceptionalCacheLoader() ; + final CacheLoaderForTesting loader = new CacheLoaderForTesting() ; final LocalAsyncCache myCache = new LocalAsyncCache(10, 10, TimeUnit.MINUTES, loader, null, "MyCache"); loader.setGenerateLoaderErrors(true); final Iterable keys = Arrays.asList("a","b","null"); - assertComposableFutureError(TEMPORARY_ERROR_MESSAGE,RuntimeException.class,myCache.getBulkAsync(keys)); + assertComposableFutureError(TEMPORARY_ERROR_MESSAGE, RuntimeException.class, myCache.getBulkAsync(keys)); loader.setGenerateLoaderErrors(false); assertEquals(VALUE_FOR+"a", myCache.getAsync("a").get()); - final Map bulk = (Map)myCache.getBulkAsync(keys).get(); - assertEquals(VALUE_FOR+"a",bulk.get("a")); - assertEquals(VALUE_FOR_BULK+"b",bulk.get("b")); + final Map bulk = (Map) myCache.getBulkAsync(keys).get(); + assertEquals(VALUE_FOR+"a", bulk.get("a")); + assertEquals(VALUE_FOR_BULK+"b", bulk.get("b")); assertNull(bulk.get(NULL_KEY)); loader.setGenerateLoaderErrors(true); assertEquals(VALUE_FOR+"a", myCache.getAsync("a").get()); // already populated assertEquals(VALUE_FOR_BULK+"b", myCache.getAsync("b").get()); // already populated - assertComposableFutureError(TEMPORARY_ERROR_MESSAGE,RuntimeException.class,myCache.getAsync(NULL_KEY)); // nulls are not cached on bulk + assertComposableFutureError(TEMPORARY_ERROR_MESSAGE, RuntimeException.class, myCache.getAsync(NULL_KEY)); // nulls are not cached on bulk + } + + @Test + public void testRefreshAfterWrite() throws ExecutionException, InterruptedException { + final CacheLoader loader = mock(CacheLoader.class); + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> { + Thread.sleep(100); + String key = (String) invocation.getArguments()[1]; + return ComposableFutures.fromValue(VALUE_FOR + key); + }); + + final LocalAsyncCache myCache = + new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") + .refreshAfterWrite(500, TimeUnit.MILLISECONDS) + .withLoader(loader) + .withLoadTimeout(5, TimeUnit.MILLISECONDS) + .build(); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(500); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(200); + //get value after refresh + assertEquals(VALUE_FOR + "key1", myCache.getAsync("key1").get()); + verify(loader, times(1)).load("myCache", "key1"); + } + + @Test + public void testGetAsyncAndRefreshNull() throws ExecutionException, InterruptedException { + final CacheLoader loader = mock(CacheLoader.class); + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> { + Thread.sleep(100); + return ComposableFutures.fromNull(); + }); + final LocalAsyncCache myCache = + new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") + .refreshAfterWrite(500, TimeUnit.MILLISECONDS) + .withLoader(loader) + .withLoadTimeout(5, TimeUnit.MILLISECONDS) + .build(); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(500); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(200); + //get value after refresh + assertNull(myCache.getAsync("key1").get()); + verify(loader, times(1)).load("myCache", "key1"); + } + + @Test + public void testGetAsyncAndRefreshException() throws ExecutionException, InterruptedException { + final CacheLoader loader = mock(CacheLoader.class); + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> { + Thread.sleep(100); + return ComposableFutures.fromError(new RuntimeException()); + }); + final LocalAsyncCache myCache = + new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") + .refreshAfterWrite(500, TimeUnit.MILLISECONDS) + .withLoader(loader) + .withLoadTimeout(5, TimeUnit.MILLISECONDS) + .build(); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(500); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(200); + //get value after refresh + assertEquals("value1", myCache.getAsync("key1").get()); + verify(loader, times(1)).load("myCache", "key1"); + } + + @Test + public void getBulkAsyncAndRefresh() throws ExecutionException, InterruptedException { + CacheLoader loader = mock(CacheLoader.class); + final LocalAsyncCache myCache = + new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") + .refreshAfterWrite(50, TimeUnit.MILLISECONDS) + .withLoader(loader) + .withLoadTimeout(5, TimeUnit.MILLISECONDS) + .build(); + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> { + Thread.sleep(100); + String key = (String) invocation.getArguments()[1]; + return ComposableFutures.fromValue(VALUE_FOR + key); + }); + myCache.setAsync("key1", "value1").get(); + // get first value before refresh + assertEquals("value1", myCache.getBulkAsync(Arrays.asList("key1")).get().get("key1")); + Thread.sleep(500); + // get first value and trigger refresh, second value should not trigger refresh + myCache.setAsync("key2", "value2").get(); + Map res = myCache.getBulkAsync(Arrays.asList("key1", "key2")).get(); + assertEquals("value1", res.get("key1")); + assertEquals("value2", res.get("key2")); + Thread.sleep(200); + //get value after refresh + Map res2 = myCache.getBulkAsync(Arrays.asList("key1", "key2")).get(); + assertEquals(VALUE_FOR + "key1", res2.get("key1")); + assertEquals("value2", res2.get("key2")); + verify(loader, times(1)).load("myCache", "key1"); } private static void assertComposableFutureError(String expectedMsg, Class exceptionType, ComposableFuture future) { From a70358c10ca6484ce34cc41f93fc56c2055872af Mon Sep 17 00:00:00 2001 From: gglevy Date: Thu, 23 Aug 2018 12:37:01 +0300 Subject: [PATCH 2/6] Introduce new RefreshLoadingCacheDelegate - a wrapper of LoadingCacheDelegate that adds the ability to refresh cache values in a configured time --- .../cache/RefreshLoadingCacheDelegate.java | 245 ++++++++++++++++++ .../ob1k/cache/ValueWithWriteTime.java | 40 +++ .../RefreshLoadingCacheDelegateTest.java | 220 ++++++++++++++++ 3 files changed, 505 insertions(+) create mode 100644 ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java create mode 100644 ob1k-cache/src/main/java/com/outbrain/ob1k/cache/ValueWithWriteTime.java create mode 100644 ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java new file mode 100644 index 00000000..15ddbe00 --- /dev/null +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java @@ -0,0 +1,245 @@ +package com.outbrain.ob1k.cache; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Iterables; +import com.outbrain.ob1k.concurrent.ComposableFuture; +import com.outbrain.swinfra.metrics.api.Counter; +import com.outbrain.swinfra.metrics.api.MetricFactory; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * A wrapper of LoadingCacheDelegate that adds the ability to refresh cache values in the configured interval on access + */ +public class RefreshLoadingCacheDelegate implements TypedCache { + + private final LoadingCacheDelegate> cache; + private final InternalCacheLoader internalCacheLoader; + private final String cacheName; + private final long loadDuration; + private final TimeUnit loadTimeUnit; + private final long refreshAfterWriteDuration; + + private final ConcurrentMap refreshingKeys; + private final Cache failedReloads; + + private final Counter refreshes; + private final Counter refreshErrors; + private final Counter refreshTimeouts; + + public RefreshLoadingCacheDelegate(final TypedCache> cache, final CacheLoader loader, final String cacheName, final MetricFactory metricFactory, + final long duration, final TimeUnit timeUnit, final boolean failOnError, + final long refreshAfterWriteDuration, final TimeUnit refreshAfterWriteUnit, + final long refreshRetryInterval, final TimeUnit refreshRetryTimeUnit) { + this.cacheName = cacheName; + this.internalCacheLoader = new InternalCacheLoader(loader); + this.cache = new LoadingCacheDelegate<>(cache, internalCacheLoader, cacheName, metricFactory, duration, timeUnit, failOnError); + this.loadDuration = duration; + this.loadTimeUnit = timeUnit; + this.refreshAfterWriteDuration = refreshAfterWriteUnit.toMillis(refreshAfterWriteDuration); + + refreshingKeys = new ConcurrentHashMap<>(); + long failedReloadsExpiration = refreshRetryInterval > 0 ? refreshRetryInterval : refreshAfterWriteDuration / 10; + TimeUnit failedReloadsTimeUnit = refreshRetryInterval > 0 ? refreshRetryTimeUnit : refreshRetryTimeUnit; + failedReloads = CacheBuilder.newBuilder() + .maximumSize(10000) + .expireAfterWrite(failedReloadsExpiration, failedReloadsTimeUnit) + .build(); + + if (metricFactory != null) { + metricFactory.registerGauge("RefreshLoadingCacheDelegate." + cacheName, "refreshMapSize", refreshingKeys::size); + metricFactory.registerGauge("RefreshLoadingCacheDelegate." + cacheName, "failedRefreshesSize", failedReloads::size); + + refreshes = metricFactory.createCounter("RefreshLoadingCacheDelegate." + cacheName, "refreshes"); + refreshErrors = metricFactory.createCounter("RefreshLoadingCacheDelegate." + cacheName, "refreshErrors"); + refreshTimeouts = metricFactory.createCounter("RefreshLoadingCacheDelegate." + cacheName, "refreshTimeouts"); + } else { + refreshes = null; + refreshErrors = null; + refreshTimeouts = null; + } + } + + @Override + public ComposableFuture getAsync(final K key) { + ComposableFuture> futureValue = cache.getAsync(key); + futureValue.consume(value -> { + if (value.isSuccess() && shouldRefresh(value.getValue(), System.currentTimeMillis())) { + this.refresh(key); + } + }); + return futureValue.map(ValueWithWriteTime::getValue); + } + + private boolean shouldRefresh(final ValueWithWriteTime value, long currentTimeMillis) { + return value != null && currentTimeMillis - value.getWriteTime() >= refreshAfterWriteDuration; + } + + @Override + public ComposableFuture> getBulkAsync(final Iterable keys) { + ComposableFuture>> resultMap = cache.getBulkAsync(keys); + resultMap.consume(result -> { + if (result.isSuccess()) { + final List keysToRefresh = collectKeysToRefresh(result.getValue(), System.currentTimeMillis()); + this.refresh(keysToRefresh); + } + }); + return extractValues(resultMap); + } + + private List collectKeysToRefresh(final Map> result, final long currentTimeMillis) { + return result.entrySet().stream() + .filter(entry -> shouldRefresh(entry.getValue(), currentTimeMillis)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + private ComposableFuture> extractValues(final ComposableFuture>> valuesMap) { + return valuesMap.map(map -> map.entrySet().stream() + .filter(entry -> entry.getValue() != null && entry.getValue().getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getValue()))); + } + + @Override + public ComposableFuture setAsync(final K key, final V value) { + return cache.setAsync(key, new ValueWithWriteTime<>(value)); + } + + @Override + public ComposableFuture setAsync(final K key, final EntryMapper mapper, final int maxIterations) { + return cache.setAsync(key, new InternalEntityMapper(mapper), maxIterations); + } + + @Override + public ComposableFuture> setBulkAsync(final Map entries) { + Map> newEntries = entries.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ValueWithWriteTime<>(e.getValue()))); + return cache.setBulkAsync(newEntries); + } + + @Override + public ComposableFuture setIfAbsentAsync(final K key, final V value) { + return cache.setIfAbsentAsync(key, new ValueWithWriteTime<>(value)); + } + + @Override + public ComposableFuture deleteAsync(final K key) { + return cache.deleteAsync(key); + } + + private class InternalCacheLoader implements CacheLoader> { + + private final CacheLoader loader; + + private InternalCacheLoader(final CacheLoader loader) { + this.loader = loader; + } + + @Override + public ComposableFuture> load(final String cacheName, final K key) { + return loader.load(cacheName, key).map(ValueWithWriteTime::new); + } + + @Override + public ComposableFuture>> load(final String cacheName, final Iterable keys) { + return loader.load(cacheName, keys).map(entries -> entries.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ValueWithWriteTime<>(e.getValue())))); + } + } + + private class InternalEntityMapper implements EntryMapper> { + private final EntryMapper mapper; + + InternalEntityMapper(final EntryMapper mapper) { + this.mapper = mapper; + } + + @Override + public ValueWithWriteTime map(final K key, final ValueWithWriteTime value) { + V extractedValue = value == null ? null : value.getValue(); + return new ValueWithWriteTime<>(mapper.map(key, extractedValue)); + } + } + + /** + * loads a fresh value from loader and sets into cache + * @param key to refresh value from loader + */ + private void refresh(final K key) { + if (failedLoadRecently(key)) { + return; + } + + Boolean alreadyRefreshing = refreshingKeys.putIfAbsent(key, true); + if (alreadyRefreshing == null) { + incRefreshCount(); + internalCacheLoader.load(cacheName, key) + .withTimeout(loadDuration, loadTimeUnit, "RefreshLoadingCacheDelegate fetch from loader; cache name: " + cacheName) + .consume(res -> { + refreshingKeys.remove(key); + if (res.isSuccess()) { + cache.setAsync(key, res.getValue()); + } else { + failedReloads.put(key, true); + + collectRefreshErrorMetrics(res.getError()); + } + }); + } + } + + /** + * loads fresh values from loader and sets into cache + * @param keys to refresh value from loader + */ + private void refresh(final Collection keys) { + if (keys == null || Iterables.isEmpty(keys)) { + return; + } + + final List keysToRefresh = keys.stream() + .filter(key -> !failedLoadRecently(key) && null == refreshingKeys.putIfAbsent(key, true)) + .collect(Collectors.toList()); + + if (!keysToRefresh.isEmpty()) { + incRefreshCount(); + internalCacheLoader.load(cacheName, keysToRefresh) + .withTimeout(loadDuration, loadTimeUnit, "RefreshLoadingCacheDelegate fetch bulk from loader; cache name: " + cacheName) + .consume(res -> { + keysToRefresh.forEach(refreshingKeys::remove); + if (res.isSuccess()) { + cache.setBulkAsync(res.getValue()); + } else { + keysToRefresh.forEach(k -> failedReloads.put(k, true)); + + collectRefreshErrorMetrics(res.getError()); + } + }); + } + } + + private boolean failedLoadRecently(K key) { + return failedReloads.getIfPresent(key) != null; + } + + private void incRefreshCount() { + if (refreshes != null) { + refreshes.inc(); + } + } + + private void collectRefreshErrorMetrics(final Throwable error) { + if (refreshErrors != null) { + refreshErrors.inc(); + if (error instanceof TimeoutException) { + refreshTimeouts.inc(); + } + } + } +} diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/ValueWithWriteTime.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/ValueWithWriteTime.java new file mode 100644 index 00000000..78053e04 --- /dev/null +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/ValueWithWriteTime.java @@ -0,0 +1,40 @@ +package com.outbrain.ob1k.cache; + +/** + * wrapper for cache value that holds the write time of the value in the cache + */ +public class ValueWithWriteTime { + + private V value; + private long writeTime; + + /** + * empty constructor for message pack + */ + public ValueWithWriteTime() { } + + /** + * creates an object with the given value and the current timestamp + * @param value + */ + public ValueWithWriteTime(final V value) { + this.value = value; + writeTime = System.currentTimeMillis(); + } + + /** + * + * @return the actual value + */ + public V getValue() { + return value; + } + + /** + * + * @return the write timestamp in milliseconds + */ + public long getWriteTime() { + return writeTime; + } +} diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java new file mode 100644 index 00000000..c7df9c45 --- /dev/null +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java @@ -0,0 +1,220 @@ +package com.outbrain.ob1k.cache; + +import com.outbrain.ob1k.concurrent.ComposableFuture; +import com.outbrain.ob1k.concurrent.ComposableFutures; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.MISSING_KEY; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.NULL_KEY; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.VALUE_FOR; +import static com.outbrain.ob1k.cache.CacheLoaderForTesting.VALUE_FOR_BULK; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RefreshLoadingCacheDelegateTest { + + private TypedCache> cacheMock = new TypedCacheMock(); + + private CacheLoader cacheLoaderStub = new CacheLoaderForTesting(); + + private RefreshLoadingCacheDelegate refreshingCache; + + @Before + public void setup() { + refreshingCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderStub, "myCache", null, 10, TimeUnit.SECONDS, false, 50, TimeUnit.MILLISECONDS, -1, null); + } + + @Test + public void testGetAsyncFromLoader() throws ExecutionException, InterruptedException { + assertEquals(VALUE_FOR + "key1", refreshingCache.getAsync("key1").get()); + } + + @Test + public void testGetAsyncFromLoaderNull() throws ExecutionException, InterruptedException { + assertNull(refreshingCache.getAsync(NULL_KEY).get()); + } + + @Test + public void testGetAsyncFromLoaderMissing() throws InterruptedException { + try { + assertNull(refreshingCache.getAsync(MISSING_KEY).get()); + fail(); + } catch (ExecutionException e) { + // nothing + } + } + + @Test + @SuppressWarnings("unchecked") + public void testGetAsyncAndRefresh() throws ExecutionException, InterruptedException { + CacheLoader cacheLoaderMock = mock(CacheLoader.class); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS, -1, null); + when(cacheLoaderMock.load(anyString(), anyString())).thenAnswer(invocation -> { + Thread.sleep(100); + String key = (String) invocation.getArguments()[1]; + return ComposableFutures.fromValue(VALUE_FOR + key); + }); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(500); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(200); + //get value after refresh + assertEquals(VALUE_FOR + "key1", myCache.getAsync("key1").get()); + verify(cacheLoaderMock, times(1)).load("myCache", "key1"); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetAsyncAndRefreshNull() throws ExecutionException, InterruptedException { + CacheLoader cacheLoaderMock = mock(CacheLoader.class); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS, -1, null); + when(cacheLoaderMock.load(anyString(), anyString())).thenAnswer(invocation -> { + Thread.sleep(100); + return ComposableFutures.fromNull(); + }); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(500); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(200); + //get value after refresh + assertNull(myCache.getAsync("key1").get()); + verify(cacheLoaderMock, times(1)).load("myCache", "key1"); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetAsyncAndRefreshException() throws ExecutionException, InterruptedException { + CacheLoader cacheLoaderMock = mock(CacheLoader.class); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 5000, TimeUnit.MILLISECONDS, -1, null); + when(cacheLoaderMock.load("myCache", "key1")).thenAnswer(invocation -> { + Thread.sleep(100); + return ComposableFutures.fromError(new RuntimeException()); + }); + myCache.setAsync("key1", "value1").get(); + // get value before refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(5000); + // get value and trigger refresh + assertEquals("value1", myCache.getAsync("key1").get()); + Thread.sleep(200); + //get value after refresh + verify(cacheLoaderMock, times(1)).load("myCache", "key1"); + assertEquals("value1", myCache.getAsync("key1").get()); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetBulkAsyncFromLoader() throws ExecutionException, InterruptedException { + final Map values = refreshingCache.getBulkAsync(Arrays.asList("key1", NULL_KEY, MISSING_KEY)).get(); + assertEquals(VALUE_FOR_BULK + "key1", values.get("key1")); + assertNull(values.get(NULL_KEY)); + assertNull(values.get(MISSING_KEY)); + } + + @Test + @SuppressWarnings("unchecked") + public void getBulkAsyncAndRefresh() throws ExecutionException, InterruptedException { + CacheLoader cacheLoaderMock = mock(CacheLoader.class); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS, -1, null); + when(cacheLoaderMock.load(anyString(), any(Iterable.class))).thenAnswer(invocation -> { + Thread.sleep(100); + Iterable keys = (Iterable) invocation.getArguments()[1]; + Map res = new HashMap<>(); + for (String key : keys) { + res.put(key, VALUE_FOR_BULK + key); + } + return ComposableFutures.fromValue(res); + }); + myCache.setAsync("key1", "value1").get(); + // get first value before refresh + assertEquals("value1", myCache.getBulkAsync(Collections.singletonList("key1")).get().get("key1")); + Thread.sleep(500); + // get first value and trigger refresh, second value should not trigger refresh + myCache.setAsync("key2", "value2").get(); + Map res = myCache.getBulkAsync(Arrays.asList("key1", "key2")).get(); + assertEquals("value1", res.get("key1")); + assertEquals("value2", res.get("key2")); + Thread.sleep(200); + //get value after refresh + Map res2 = myCache.getBulkAsync(Arrays.asList("key1", "key2")).get(); + assertEquals(VALUE_FOR_BULK + "key1", res2.get("key1")); + assertEquals("value2", res2.get("key2")); + verify(cacheLoaderMock, times(1)).load(eq("myCache"), eq(Collections.singletonList("key1"))); + } + + + private static class TypedCacheMock implements TypedCache> { + + private Map> map = new HashMap<>(); + + @Override + public ComposableFuture> getAsync(final String key) { + return ComposableFutures.fromValue(map.get(key)); + } + + @Override + public ComposableFuture>> getBulkAsync(final Iterable keys) { + Map> res = new HashMap<>(); + for (String key : keys) { + ValueWithWriteTime value = map.get(key); + if (value != null) { + res.put(key, value); + } + } + return ComposableFutures.fromValue(res); + } + + @Override + public ComposableFuture setAsync(final String key, final ValueWithWriteTime value) { + map.put(key, value); + return ComposableFutures.fromValue(true); + } + + @Override + public ComposableFuture setAsync(final String key, final EntryMapper> mapper, final int maxIterations) { + throw new UnsupportedOperationException(); + } + + @Override + public ComposableFuture> setBulkAsync(final Map> entries) { + Map res = new HashMap<>(); + entries.forEach((key, value) -> { + map.put(key, value); + res.put(key, true); + }); + return ComposableFutures.fromValue(res); + } + + @Override + public ComposableFuture setIfAbsentAsync(final String key, final ValueWithWriteTime value) { + return ComposableFutures.fromValue(map.putIfAbsent(key, value) == null); + } + + @Override + public ComposableFuture deleteAsync(final String key) { + return ComposableFutures.fromValue(map.remove(key) != null); + } + } +} From 220680cbe94dd5f64bfda18c8ce18270fc99ac2d Mon Sep 17 00:00:00 2001 From: gglevy Date: Thu, 23 Aug 2018 12:38:09 +0300 Subject: [PATCH 3/6] add builder to RefreshLoadingCacheDelegate --- .../cache/RefreshLoadingCacheDelegate.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java index 15ddbe00..36fc02ed 100644 --- a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java @@ -242,4 +242,62 @@ private void collectRefreshErrorMetrics(final Throwable error) { } } } + + public static class Builder { + private static final long DEFAULT_LOAD_TIMEOUT_MS = 500; + + private final TypedCache> cache; + private final CacheLoader loader; + private final String cacheName; + private final MetricFactory metricFactory; + + private boolean failOnError = false; + private long loadTimeout = DEFAULT_LOAD_TIMEOUT_MS; + private TimeUnit loadTimeUnit = TimeUnit.MILLISECONDS; + private long refreshAfterWriteDuration = -1; + private TimeUnit refreshAfterWriteTimeUnit; + private long refreshRetryInterval = -1; + private TimeUnit refreshRetryTimeUnit; + + public Builder(final TypedCache> cache, final CacheLoader loader, final String cacheName, final MetricFactory metricFactory) { + this.cache = cache; + this.loader = loader; + this.cacheName = cacheName; + this.metricFactory = metricFactory; + } + + public Builder failOnError() { + this.failOnError = true; + return this; + } + + public Builder withLoadTimeout(final long timeout, final TimeUnit timeUnit) { + this.loadTimeout = timeout; + this.loadTimeUnit = timeUnit; + return this; + } + + public Builder refreshAfterWrite(final long duration, final TimeUnit timeUnit) { + this.refreshAfterWriteDuration = duration; + this.refreshAfterWriteTimeUnit = timeUnit; + return this; + } + + public Builder withRefreshRetryInterval(final long duration, final TimeUnit timeUnit) { + this.refreshRetryInterval = duration; + this.refreshRetryTimeUnit = timeUnit; + return this; + } + + public RefreshLoadingCacheDelegate build() { + if (refreshAfterWriteDuration == -1) { + throw new IllegalArgumentException("missing refreshAfterWrite config"); + } + if (refreshRetryInterval == -1) { + refreshRetryInterval = refreshAfterWriteDuration / 10; + refreshRetryTimeUnit = refreshAfterWriteTimeUnit; + } + return new RefreshLoadingCacheDelegate<>(cache, loader, cacheName, metricFactory, loadTimeout, loadTimeUnit, failOnError, refreshAfterWriteDuration, refreshRetryTimeUnit, refreshRetryInterval, refreshRetryTimeUnit); + } + } } From 49a16ab5541b3c39b686f921be6b8af480810380 Mon Sep 17 00:00:00 2001 From: gglevy Date: Mon, 27 Aug 2018 15:21:05 +0300 Subject: [PATCH 4/6] LocalAsyncCache - fix reload to be non blocking. remove load timeout option. --- .../outbrain/ob1k/cache/LocalAsyncCache.java | 69 +++++-------------- .../ob1k/cache/LocalAsyncCacheTest.java | 36 +++++----- 2 files changed, 33 insertions(+), 72 deletions(-) diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java index 9e8ee2c4..d3385187 100644 --- a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java @@ -6,7 +6,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ExecutionError; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.SettableFuture; import com.outbrain.ob1k.concurrent.ComposableFuture; import com.outbrain.ob1k.concurrent.ComposableFutures; import com.outbrain.ob1k.concurrent.UncheckedExecutionException; @@ -17,8 +17,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -34,15 +32,10 @@ * Time: 6:08 PM */ public class LocalAsyncCache implements TypedCache { - private static final int DEFAULT_LOAD_TIMEOUT = 1000; - private static final TimeUnit DEFAULT_LOAD_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; - private static final int DEFAULT_REFRESH_THREAD_COUNT = 10; private final LoadingCache> loadingCache; private final Cache> localCache; - private final ExecutorService executor; - public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, final MetricFactory metricFactory, final String cacheName) { this(maximumSize, ttl, unit, loader, metricFactory, cacheName, false); @@ -50,13 +43,12 @@ public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, final MetricFactory metricFactory, final String cacheName, final boolean failOnMissingEntries) { - this(maximumSize, ttl, unit, loader, metricFactory, cacheName, failOnMissingEntries, DEFAULT_LOAD_TIMEOUT, DEFAULT_LOAD_TIMEOUT_UNIT, -1, null, DEFAULT_REFRESH_THREAD_COUNT); + this(maximumSize, ttl, unit, loader, metricFactory, cacheName, failOnMissingEntries, -1, null); } public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader loader, final MetricFactory metricFactory, final String cacheName, final boolean failOnMissingEntries, - final long loadTimeout, final TimeUnit loadTimeoutUnit, - final long refreshAfterWriteDuration, final TimeUnit refreshAfterWriteUnit, final int refreshThreadCount) { + final long refreshAfterWriteDuration, final TimeUnit refreshAfterWriteUnit) { final boolean collectStats = metricFactory != null; final CacheBuilder builder = CacheBuilder.newBuilder() .maximumSize(maximumSize) @@ -64,21 +56,12 @@ public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit if (refreshAfterWriteDuration != -1) { builder.refreshAfterWrite(refreshAfterWriteDuration, refreshAfterWriteUnit); - - if (refreshThreadCount < 1) { - this.executor = Executors.newFixedThreadPool(DEFAULT_REFRESH_THREAD_COUNT); - } else { - this.executor = Executors.newFixedThreadPool(refreshThreadCount); - } - - } else { - this.executor = null; } if (collectStats) builder.recordStats(); - this.loadingCache = builder.build(new InternalCacheLoader(loader, cacheName, failOnMissingEntries, loadTimeout, loadTimeoutUnit)); + this.loadingCache = builder.build(new InternalCacheLoader(loader, cacheName, failOnMissingEntries)); if (collectStats) { GuavaCacheGaugesFactory.createGauges(metricFactory, loadingCache, "LocalAsyncCache-" + cacheName); @@ -93,7 +76,6 @@ public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final MetricFactory metricFactory, final String cacheName) { this.loadingCache = null; - this.executor = null; final boolean collectStats = metricFactory != null; final CacheBuilder builder = CacheBuilder.newBuilder() @@ -272,9 +254,6 @@ public static class Builder { private boolean failOnMissingEntries = false; private long refreshAfterWriteDuration; private TimeUnit refreshAfterWriteUnit; - private long loadTimeout = DEFAULT_LOAD_TIMEOUT; - private TimeUnit loadTimeoutUnit = DEFAULT_LOAD_TIMEOUT_UNIT; - private int refreshThreadCount = DEFAULT_REFRESH_THREAD_COUNT; public Builder(final int maximumSize, final int ttl, final TimeUnit unit, final MetricFactory metricFactory, final String cacheName) { this.maximumSize = maximumSize; @@ -289,12 +268,6 @@ public Builder withLoader(final CacheLoader loader) { return this; } - public Builder withLoadTimeout(final long loadTimeout, final TimeUnit loadTimeoutUnit) { - this.loadTimeout = loadTimeout; - this.loadTimeoutUnit = loadTimeoutUnit; - return this; - } - public Builder failOnMissingEntries() { this.failOnMissingEntries = true; return this; @@ -306,45 +279,35 @@ public Builder refreshAfterWrite(long duration, TimeUnit unit) { return this; } - public Builder withRefreshThreadCount(int refreshThreadCount) { - this.refreshThreadCount = refreshThreadCount; - return this; - } - public LocalAsyncCache build() { if (loader == null) { return new LocalAsyncCache<>(maximumSize, ttl, unit, metricFactory, cacheName); } return new LocalAsyncCache<>(maximumSize, ttl, unit, loader, metricFactory, cacheName, failOnMissingEntries, - loadTimeout, loadTimeoutUnit, refreshAfterWriteDuration, refreshAfterWriteUnit, refreshThreadCount); + refreshAfterWriteDuration, refreshAfterWriteUnit); } } - private class InternalCacheLoader extends com.google.common.cache.CacheLoader> { + private static class InternalCacheLoader extends com.google.common.cache.CacheLoader> { private final CacheLoader loader; private final String cacheName; private final boolean failOnMissingEntries; - private final long loadTimeout; - private final TimeUnit loadTimeoutUnit; - InternalCacheLoader(final CacheLoader loader, final String cacheName, final boolean failOnMissingEntries, - final long loadTimeout, final TimeUnit loadTimeoutUnit) { + InternalCacheLoader(final CacheLoader loader, final String cacheName, final boolean failOnMissingEntries) { this.loader = loader; this.cacheName = cacheName; this.failOnMissingEntries = failOnMissingEntries; - this.loadTimeout = loadTimeout; - this.loadTimeoutUnit = loadTimeoutUnit; } @Override public ComposableFuture load(@Nonnull final K key) { - return loader.load(cacheName, key).withTimeout(loadTimeout, loadTimeoutUnit).materialize(); + return loader.load(cacheName, key).materialize(); } @Override public Map> loadAll(final Iterable keys) { - final ComposableFuture> loaded = loader.load(cacheName, keys).withTimeout(loadTimeout, loadTimeoutUnit).materialize(); + final ComposableFuture> loaded = loader.load(cacheName, keys).materialize(); final Map> result = new HashMap<>(); for (final K key : keys) { result.put(key, loaded.flatMap(extractLoaderResultEntry(key))); @@ -354,14 +317,16 @@ public Map> loadAll(final Iterable keys) { @Override public ListenableFuture> reload(final K key, final ComposableFuture oldValue) { - ListenableFutureTask> task = ListenableFutureTask.create(() -> { - ComposableFuture loadFuture = loader.load(cacheName, key).recoverWith(e -> oldValue); - V value = loadFuture.get(loadTimeout, loadTimeoutUnit); - return ComposableFutures.fromValue(value); + final SettableFuture> future = SettableFuture.create(); + loader.load(cacheName, key).materialize().consume(t -> { + if (t.isSuccess()) { + future.set(ComposableFutures.fromValue(t.getValue())); + } else { + future.setException(t.getError()); + } }); - executor.execute(task); - return task; + return future; } private Function, ComposableFuture> extractLoaderResultEntry(final K key) { diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java index adf57320..8180817d 100644 --- a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/LocalAsyncCacheTest.java @@ -247,17 +247,16 @@ public void testCacheExceptionRemovalBulk() throws Exception { @Test public void testRefreshAfterWrite() throws ExecutionException, InterruptedException { final CacheLoader loader = mock(CacheLoader.class); - when(loader.load(anyString(), anyString())).thenAnswer(invocation -> { + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> ComposableFutures.submit(() -> { Thread.sleep(100); String key = (String) invocation.getArguments()[1]; - return ComposableFutures.fromValue(VALUE_FOR + key); - }); + return VALUE_FOR + key; + })); final LocalAsyncCache myCache = new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") .refreshAfterWrite(500, TimeUnit.MILLISECONDS) .withLoader(loader) - .withLoadTimeout(5, TimeUnit.MILLISECONDS) .build(); myCache.setAsync("key1", "value1").get(); // get value before refresh @@ -274,15 +273,14 @@ public void testRefreshAfterWrite() throws ExecutionException, InterruptedExcept @Test public void testGetAsyncAndRefreshNull() throws ExecutionException, InterruptedException { final CacheLoader loader = mock(CacheLoader.class); - when(loader.load(anyString(), anyString())).thenAnswer(invocation -> { + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> ComposableFutures.submit(() -> { Thread.sleep(100); - return ComposableFutures.fromNull(); - }); + return null; + })); final LocalAsyncCache myCache = new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") .refreshAfterWrite(500, TimeUnit.MILLISECONDS) .withLoader(loader) - .withLoadTimeout(5, TimeUnit.MILLISECONDS) .build(); myCache.setAsync("key1", "value1").get(); // get value before refresh @@ -299,26 +297,25 @@ public void testGetAsyncAndRefreshNull() throws ExecutionException, InterruptedE @Test public void testGetAsyncAndRefreshException() throws ExecutionException, InterruptedException { final CacheLoader loader = mock(CacheLoader.class); - when(loader.load(anyString(), anyString())).thenAnswer(invocation -> { + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> ComposableFutures.submit(() -> { Thread.sleep(100); - return ComposableFutures.fromError(new RuntimeException()); - }); + throw new RuntimeException(); + })); final LocalAsyncCache myCache = new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") - .refreshAfterWrite(500, TimeUnit.MILLISECONDS) + .refreshAfterWrite(1000, TimeUnit.MILLISECONDS) .withLoader(loader) - .withLoadTimeout(5, TimeUnit.MILLISECONDS) .build(); myCache.setAsync("key1", "value1").get(); // get value before refresh assertEquals("value1", myCache.getAsync("key1").get()); - Thread.sleep(500); + Thread.sleep(1000); // get value and trigger refresh assertEquals("value1", myCache.getAsync("key1").get()); Thread.sleep(200); //get value after refresh assertEquals("value1", myCache.getAsync("key1").get()); - verify(loader, times(1)).load("myCache", "key1"); + verify(loader, times(2)).load("myCache", "key1"); } @Test @@ -326,15 +323,14 @@ public void getBulkAsyncAndRefresh() throws ExecutionException, InterruptedExcep CacheLoader loader = mock(CacheLoader.class); final LocalAsyncCache myCache = new LocalAsyncCache.Builder(10, 200000, TimeUnit.MILLISECONDS, null, "myCache") - .refreshAfterWrite(50, TimeUnit.MILLISECONDS) + .refreshAfterWrite(500, TimeUnit.MILLISECONDS) .withLoader(loader) - .withLoadTimeout(5, TimeUnit.MILLISECONDS) .build(); - when(loader.load(anyString(), anyString())).thenAnswer(invocation -> { + when(loader.load(anyString(), anyString())).thenAnswer(invocation -> ComposableFutures.submit(() -> { Thread.sleep(100); String key = (String) invocation.getArguments()[1]; - return ComposableFutures.fromValue(VALUE_FOR + key); - }); + return VALUE_FOR + key; + })); myCache.setAsync("key1", "value1").get(); // get first value before refresh assertEquals("value1", myCache.getBulkAsync(Arrays.asList("key1")).get().get("key1")); From 6a50830c833df7cd0b17b4f6dd5d84a0ca95b09e Mon Sep 17 00:00:00 2001 From: gglevy Date: Mon, 27 Aug 2018 15:26:47 +0300 Subject: [PATCH 5/6] RefreshLoadingCacheDelegate - remove failedReloadsCache --- .../cache/RefreshLoadingCacheDelegate.java | 41 ++----------------- .../RefreshLoadingCacheDelegateTest.java | 12 +++--- 2 files changed, 9 insertions(+), 44 deletions(-) diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java index 36fc02ed..6dbb4a22 100644 --- a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java @@ -1,7 +1,5 @@ package com.outbrain.ob1k.cache; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.collect.Iterables; import com.outbrain.ob1k.concurrent.ComposableFuture; import com.outbrain.swinfra.metrics.api.Counter; @@ -29,7 +27,6 @@ public class RefreshLoadingCacheDelegate implements TypedCache { private final long refreshAfterWriteDuration; private final ConcurrentMap refreshingKeys; - private final Cache failedReloads; private final Counter refreshes; private final Counter refreshErrors; @@ -37,8 +34,7 @@ public class RefreshLoadingCacheDelegate implements TypedCache { public RefreshLoadingCacheDelegate(final TypedCache> cache, final CacheLoader loader, final String cacheName, final MetricFactory metricFactory, final long duration, final TimeUnit timeUnit, final boolean failOnError, - final long refreshAfterWriteDuration, final TimeUnit refreshAfterWriteUnit, - final long refreshRetryInterval, final TimeUnit refreshRetryTimeUnit) { + final long refreshAfterWriteDuration, final TimeUnit refreshAfterWriteUnit) { this.cacheName = cacheName; this.internalCacheLoader = new InternalCacheLoader(loader); this.cache = new LoadingCacheDelegate<>(cache, internalCacheLoader, cacheName, metricFactory, duration, timeUnit, failOnError); @@ -47,16 +43,9 @@ public RefreshLoadingCacheDelegate(final TypedCache> ca this.refreshAfterWriteDuration = refreshAfterWriteUnit.toMillis(refreshAfterWriteDuration); refreshingKeys = new ConcurrentHashMap<>(); - long failedReloadsExpiration = refreshRetryInterval > 0 ? refreshRetryInterval : refreshAfterWriteDuration / 10; - TimeUnit failedReloadsTimeUnit = refreshRetryInterval > 0 ? refreshRetryTimeUnit : refreshRetryTimeUnit; - failedReloads = CacheBuilder.newBuilder() - .maximumSize(10000) - .expireAfterWrite(failedReloadsExpiration, failedReloadsTimeUnit) - .build(); if (metricFactory != null) { metricFactory.registerGauge("RefreshLoadingCacheDelegate." + cacheName, "refreshMapSize", refreshingKeys::size); - metricFactory.registerGauge("RefreshLoadingCacheDelegate." + cacheName, "failedRefreshesSize", failedReloads::size); refreshes = metricFactory.createCounter("RefreshLoadingCacheDelegate." + cacheName, "refreshes"); refreshErrors = metricFactory.createCounter("RefreshLoadingCacheDelegate." + cacheName, "refreshErrors"); @@ -172,10 +161,6 @@ public ValueWithWriteTime map(final K key, final ValueWithWriteTime value) * @param key to refresh value from loader */ private void refresh(final K key) { - if (failedLoadRecently(key)) { - return; - } - Boolean alreadyRefreshing = refreshingKeys.putIfAbsent(key, true); if (alreadyRefreshing == null) { incRefreshCount(); @@ -186,8 +171,6 @@ private void refresh(final K key) { if (res.isSuccess()) { cache.setAsync(key, res.getValue()); } else { - failedReloads.put(key, true); - collectRefreshErrorMetrics(res.getError()); } }); @@ -204,7 +187,7 @@ private void refresh(final Collection keys) { } final List keysToRefresh = keys.stream() - .filter(key -> !failedLoadRecently(key) && null == refreshingKeys.putIfAbsent(key, true)) + .filter(key -> null == refreshingKeys.putIfAbsent(key, true)) .collect(Collectors.toList()); if (!keysToRefresh.isEmpty()) { @@ -216,18 +199,12 @@ private void refresh(final Collection keys) { if (res.isSuccess()) { cache.setBulkAsync(res.getValue()); } else { - keysToRefresh.forEach(k -> failedReloads.put(k, true)); - collectRefreshErrorMetrics(res.getError()); } }); } } - private boolean failedLoadRecently(K key) { - return failedReloads.getIfPresent(key) != null; - } - private void incRefreshCount() { if (refreshes != null) { refreshes.inc(); @@ -256,8 +233,6 @@ public static class Builder { private TimeUnit loadTimeUnit = TimeUnit.MILLISECONDS; private long refreshAfterWriteDuration = -1; private TimeUnit refreshAfterWriteTimeUnit; - private long refreshRetryInterval = -1; - private TimeUnit refreshRetryTimeUnit; public Builder(final TypedCache> cache, final CacheLoader loader, final String cacheName, final MetricFactory metricFactory) { this.cache = cache; @@ -283,21 +258,11 @@ public Builder refreshAfterWrite(final long duration, final TimeUnit timeU return this; } - public Builder withRefreshRetryInterval(final long duration, final TimeUnit timeUnit) { - this.refreshRetryInterval = duration; - this.refreshRetryTimeUnit = timeUnit; - return this; - } - public RefreshLoadingCacheDelegate build() { if (refreshAfterWriteDuration == -1) { throw new IllegalArgumentException("missing refreshAfterWrite config"); } - if (refreshRetryInterval == -1) { - refreshRetryInterval = refreshAfterWriteDuration / 10; - refreshRetryTimeUnit = refreshAfterWriteTimeUnit; - } - return new RefreshLoadingCacheDelegate<>(cache, loader, cacheName, metricFactory, loadTimeout, loadTimeUnit, failOnError, refreshAfterWriteDuration, refreshRetryTimeUnit, refreshRetryInterval, refreshRetryTimeUnit); + return new RefreshLoadingCacheDelegate<>(cache, loader, cacheName, metricFactory, loadTimeout, loadTimeUnit, failOnError, refreshAfterWriteDuration, refreshAfterWriteTimeUnit); } } } diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java index c7df9c45..65713472 100644 --- a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java @@ -37,7 +37,7 @@ public class RefreshLoadingCacheDelegateTest { @Before public void setup() { - refreshingCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderStub, "myCache", null, 10, TimeUnit.SECONDS, false, 50, TimeUnit.MILLISECONDS, -1, null); + refreshingCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderStub, "myCache", null, 10, TimeUnit.SECONDS, false, 50, TimeUnit.MILLISECONDS); } @Test @@ -64,7 +64,7 @@ public void testGetAsyncFromLoaderMissing() throws InterruptedException { @SuppressWarnings("unchecked") public void testGetAsyncAndRefresh() throws ExecutionException, InterruptedException { CacheLoader cacheLoaderMock = mock(CacheLoader.class); - TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS, -1, null); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS); when(cacheLoaderMock.load(anyString(), anyString())).thenAnswer(invocation -> { Thread.sleep(100); String key = (String) invocation.getArguments()[1]; @@ -86,7 +86,7 @@ public void testGetAsyncAndRefresh() throws ExecutionException, InterruptedExcep @SuppressWarnings("unchecked") public void testGetAsyncAndRefreshNull() throws ExecutionException, InterruptedException { CacheLoader cacheLoaderMock = mock(CacheLoader.class); - TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS, -1, null); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS); when(cacheLoaderMock.load(anyString(), anyString())).thenAnswer(invocation -> { Thread.sleep(100); return ComposableFutures.fromNull(); @@ -107,7 +107,7 @@ public void testGetAsyncAndRefreshNull() throws ExecutionException, InterruptedE @SuppressWarnings("unchecked") public void testGetAsyncAndRefreshException() throws ExecutionException, InterruptedException { CacheLoader cacheLoaderMock = mock(CacheLoader.class); - TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 5000, TimeUnit.MILLISECONDS, -1, null); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 5000, TimeUnit.MILLISECONDS); when(cacheLoaderMock.load("myCache", "key1")).thenAnswer(invocation -> { Thread.sleep(100); return ComposableFutures.fromError(new RuntimeException()); @@ -120,8 +120,8 @@ public void testGetAsyncAndRefreshException() throws ExecutionException, Interru assertEquals("value1", myCache.getAsync("key1").get()); Thread.sleep(200); //get value after refresh - verify(cacheLoaderMock, times(1)).load("myCache", "key1"); assertEquals("value1", myCache.getAsync("key1").get()); + verify(cacheLoaderMock, times(2)).load("myCache", "key1"); } @Test @@ -137,7 +137,7 @@ public void testGetBulkAsyncFromLoader() throws ExecutionException, InterruptedE @SuppressWarnings("unchecked") public void getBulkAsyncAndRefresh() throws ExecutionException, InterruptedException { CacheLoader cacheLoaderMock = mock(CacheLoader.class); - TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS, -1, null); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS); when(cacheLoaderMock.load(anyString(), any(Iterable.class))).thenAnswer(invocation -> { Thread.sleep(100); Iterable keys = (Iterable) invocation.getArguments()[1]; From 075946886437c28b5d58f0498215294d759d40aa Mon Sep 17 00:00:00 2001 From: gglevy Date: Thu, 6 Sep 2018 10:00:28 +0300 Subject: [PATCH 6/6] RefreshLoadingCacheDelegate - use timeSupplier to avoid sleep in tests --- .../cache/RefreshLoadingCacheDelegate.java | 78 +++++++++++-------- .../ob1k/cache/ValueWithWriteTime.java | 4 +- .../RefreshLoadingCacheDelegateTest.java | 56 +++++++++---- 3 files changed, 88 insertions(+), 50 deletions(-) diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java index 6dbb4a22..52ee2008 100644 --- a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegate.java @@ -12,6 +12,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -20,11 +21,10 @@ public class RefreshLoadingCacheDelegate implements TypedCache { private final LoadingCacheDelegate> cache; - private final InternalCacheLoader internalCacheLoader; + private final InternalCacheLoader internalCacheLoader; private final String cacheName; - private final long loadDuration; - private final TimeUnit loadTimeUnit; private final long refreshAfterWriteDuration; + private final Supplier timeSupplier; private final ConcurrentMap refreshingKeys; @@ -32,18 +32,24 @@ public class RefreshLoadingCacheDelegate implements TypedCache { private final Counter refreshErrors; private final Counter refreshTimeouts; - public RefreshLoadingCacheDelegate(final TypedCache> cache, final CacheLoader loader, final String cacheName, final MetricFactory metricFactory, - final long duration, final TimeUnit timeUnit, final boolean failOnError, - final long refreshAfterWriteDuration, final TimeUnit refreshAfterWriteUnit) { + RefreshLoadingCacheDelegate(final TypedCache> cache, final CacheLoader loader, final String cacheName, final MetricFactory metricFactory, + final long duration, final TimeUnit timeUnit, final boolean failOnError, + final long refreshAfterWriteDuration, final TimeUnit refreshAfterWriteUnit, + final Supplier timeSupplier) { this.cacheName = cacheName; - this.internalCacheLoader = new InternalCacheLoader(loader); - this.cache = new LoadingCacheDelegate<>(cache, internalCacheLoader, cacheName, metricFactory, duration, timeUnit, failOnError); - this.loadDuration = duration; - this.loadTimeUnit = timeUnit; this.refreshAfterWriteDuration = refreshAfterWriteUnit.toMillis(refreshAfterWriteDuration); refreshingKeys = new ConcurrentHashMap<>(); + if (timeSupplier == null) { + this.timeSupplier = new SystemTimeSupplier(); + } else { + this.timeSupplier = timeSupplier; + } + + this.internalCacheLoader = new InternalCacheLoader<>(loader, this.timeSupplier); + this.cache = new LoadingCacheDelegate<>(cache, internalCacheLoader, cacheName, metricFactory, duration, timeUnit, failOnError); + if (metricFactory != null) { metricFactory.registerGauge("RefreshLoadingCacheDelegate." + cacheName, "refreshMapSize", refreshingKeys::size); @@ -61,7 +67,7 @@ public RefreshLoadingCacheDelegate(final TypedCache> ca public ComposableFuture getAsync(final K key) { ComposableFuture> futureValue = cache.getAsync(key); futureValue.consume(value -> { - if (value.isSuccess() && shouldRefresh(value.getValue(), System.currentTimeMillis())) { + if (value.isSuccess() && shouldRefresh(value.getValue(), timeSupplier.get())) { this.refresh(key); } }); @@ -77,7 +83,7 @@ public ComposableFuture> getBulkAsync(final Iterable keys ComposableFuture>> resultMap = cache.getBulkAsync(keys); resultMap.consume(result -> { if (result.isSuccess()) { - final List keysToRefresh = collectKeysToRefresh(result.getValue(), System.currentTimeMillis()); + final List keysToRefresh = collectKeysToRefresh(result.getValue(), timeSupplier.get()); this.refresh(keysToRefresh); } }); @@ -99,23 +105,23 @@ private ComposableFuture> extractValues(final ComposableFuture setAsync(final K key, final V value) { - return cache.setAsync(key, new ValueWithWriteTime<>(value)); + return cache.setAsync(key, new ValueWithWriteTime<>(value, timeSupplier.get())); } @Override public ComposableFuture setAsync(final K key, final EntryMapper mapper, final int maxIterations) { - return cache.setAsync(key, new InternalEntityMapper(mapper), maxIterations); + return cache.setAsync(key, new InternalEntityMapper<>(mapper, timeSupplier), maxIterations); } @Override public ComposableFuture> setBulkAsync(final Map entries) { - Map> newEntries = entries.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ValueWithWriteTime<>(e.getValue()))); + Map> newEntries = entries.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ValueWithWriteTime<>(e.getValue(), timeSupplier.get()))); return cache.setBulkAsync(newEntries); } @Override public ComposableFuture setIfAbsentAsync(final K key, final V value) { - return cache.setIfAbsentAsync(key, new ValueWithWriteTime<>(value)); + return cache.setIfAbsentAsync(key, new ValueWithWriteTime<>(value, timeSupplier.get())); } @Override @@ -123,36 +129,40 @@ public ComposableFuture deleteAsync(final K key) { return cache.deleteAsync(key); } - private class InternalCacheLoader implements CacheLoader> { + private static class InternalCacheLoader implements CacheLoader> { private final CacheLoader loader; + private Supplier timeSupplier; - private InternalCacheLoader(final CacheLoader loader) { + private InternalCacheLoader(final CacheLoader loader, final Supplier timeSupplier) { this.loader = loader; + this.timeSupplier = timeSupplier; } @Override public ComposableFuture> load(final String cacheName, final K key) { - return loader.load(cacheName, key).map(ValueWithWriteTime::new); + return loader.load(cacheName, key).map(v -> new ValueWithWriteTime<>(v, timeSupplier.get())); } @Override public ComposableFuture>> load(final String cacheName, final Iterable keys) { - return loader.load(cacheName, keys).map(entries -> entries.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ValueWithWriteTime<>(e.getValue())))); + return loader.load(cacheName, keys).map(entries -> entries.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ValueWithWriteTime<>(e.getValue(), timeSupplier.get())))); } } - private class InternalEntityMapper implements EntryMapper> { + private static class InternalEntityMapper implements EntryMapper> { private final EntryMapper mapper; + private Supplier timeSupplier; - InternalEntityMapper(final EntryMapper mapper) { + InternalEntityMapper(final EntryMapper mapper, final Supplier timeSupplier) { this.mapper = mapper; + this.timeSupplier = timeSupplier; } @Override public ValueWithWriteTime map(final K key, final ValueWithWriteTime value) { V extractedValue = value == null ? null : value.getValue(); - return new ValueWithWriteTime<>(mapper.map(key, extractedValue)); + return new ValueWithWriteTime<>(mapper.map(key, extractedValue), timeSupplier.get()); } } @@ -163,9 +173,8 @@ public ValueWithWriteTime map(final K key, final ValueWithWriteTime value) private void refresh(final K key) { Boolean alreadyRefreshing = refreshingKeys.putIfAbsent(key, true); if (alreadyRefreshing == null) { - incRefreshCount(); + incRefreshCount(1); internalCacheLoader.load(cacheName, key) - .withTimeout(loadDuration, loadTimeUnit, "RefreshLoadingCacheDelegate fetch from loader; cache name: " + cacheName) .consume(res -> { refreshingKeys.remove(key); if (res.isSuccess()) { @@ -191,11 +200,10 @@ private void refresh(final Collection keys) { .collect(Collectors.toList()); if (!keysToRefresh.isEmpty()) { - incRefreshCount(); + incRefreshCount(keys.size()); internalCacheLoader.load(cacheName, keysToRefresh) - .withTimeout(loadDuration, loadTimeUnit, "RefreshLoadingCacheDelegate fetch bulk from loader; cache name: " + cacheName) .consume(res -> { - keysToRefresh.forEach(refreshingKeys::remove); + refreshingKeys.keySet().removeAll(keysToRefresh); if (res.isSuccess()) { cache.setBulkAsync(res.getValue()); } else { @@ -205,9 +213,9 @@ private void refresh(final Collection keys) { } } - private void incRefreshCount() { + private void incRefreshCount(int amount) { if (refreshes != null) { - refreshes.inc(); + refreshes.inc(amount); } } @@ -262,7 +270,15 @@ public RefreshLoadingCacheDelegate build() { if (refreshAfterWriteDuration == -1) { throw new IllegalArgumentException("missing refreshAfterWrite config"); } - return new RefreshLoadingCacheDelegate<>(cache, loader, cacheName, metricFactory, loadTimeout, loadTimeUnit, failOnError, refreshAfterWriteDuration, refreshAfterWriteTimeUnit); + return new RefreshLoadingCacheDelegate<>(cache, loader, cacheName, metricFactory, loadTimeout, loadTimeUnit, failOnError, refreshAfterWriteDuration, refreshAfterWriteTimeUnit, null); + } + } + + private static class SystemTimeSupplier implements Supplier { + + @Override + public Long get() { + return System.currentTimeMillis(); } } } diff --git a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/ValueWithWriteTime.java b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/ValueWithWriteTime.java index 78053e04..032250cc 100644 --- a/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/ValueWithWriteTime.java +++ b/ob1k-cache/src/main/java/com/outbrain/ob1k/cache/ValueWithWriteTime.java @@ -17,9 +17,9 @@ public ValueWithWriteTime() { } * creates an object with the given value and the current timestamp * @param value */ - public ValueWithWriteTime(final V value) { + public ValueWithWriteTime(final V value, final long writeTime) { this.value = value; - writeTime = System.currentTimeMillis(); + this.writeTime = writeTime; } /** diff --git a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java index 65713472..9925f97d 100644 --- a/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java +++ b/ob1k-cache/src/test/java/com/outbrain/ob1k/cache/RefreshLoadingCacheDelegateTest.java @@ -11,6 +11,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static com.outbrain.ob1k.cache.CacheLoaderForTesting.MISSING_KEY; import static com.outbrain.ob1k.cache.CacheLoaderForTesting.NULL_KEY; @@ -29,6 +30,9 @@ public class RefreshLoadingCacheDelegateTest { + private static final long REFRESH_AFTER_WRITE_DURATION = 500L; + private static final long LOAD_DURATION = 100L; + private static final long LOAD_GRACE_DURATION = 200L; private TypedCache> cacheMock = new TypedCacheMock(); private CacheLoader cacheLoaderStub = new CacheLoaderForTesting(); @@ -37,7 +41,7 @@ public class RefreshLoadingCacheDelegateTest { @Before public void setup() { - refreshingCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderStub, "myCache", null, 10, TimeUnit.SECONDS, false, 50, TimeUnit.MILLISECONDS); + refreshingCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderStub, "myCache", null, 10, TimeUnit.SECONDS, false, 50, TimeUnit.MILLISECONDS, null); } @Test @@ -64,19 +68,20 @@ public void testGetAsyncFromLoaderMissing() throws InterruptedException { @SuppressWarnings("unchecked") public void testGetAsyncAndRefresh() throws ExecutionException, InterruptedException { CacheLoader cacheLoaderMock = mock(CacheLoader.class); - TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS); + TimeSupplierMock timeSupplierMock = new TimeSupplierMock(); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, REFRESH_AFTER_WRITE_DURATION, TimeUnit.MILLISECONDS, timeSupplierMock); when(cacheLoaderMock.load(anyString(), anyString())).thenAnswer(invocation -> { - Thread.sleep(100); + timeSupplierMock.inc(LOAD_DURATION); String key = (String) invocation.getArguments()[1]; return ComposableFutures.fromValue(VALUE_FOR + key); }); myCache.setAsync("key1", "value1").get(); // get value before refresh assertEquals("value1", myCache.getAsync("key1").get()); - Thread.sleep(500); + timeSupplierMock.inc(REFRESH_AFTER_WRITE_DURATION); // get value and trigger refresh assertEquals("value1", myCache.getAsync("key1").get()); - Thread.sleep(200); + timeSupplierMock.inc(LOAD_GRACE_DURATION); //get value after refresh assertEquals(VALUE_FOR + "key1", myCache.getAsync("key1").get()); verify(cacheLoaderMock, times(1)).load("myCache", "key1"); @@ -86,18 +91,19 @@ public void testGetAsyncAndRefresh() throws ExecutionException, InterruptedExcep @SuppressWarnings("unchecked") public void testGetAsyncAndRefreshNull() throws ExecutionException, InterruptedException { CacheLoader cacheLoaderMock = mock(CacheLoader.class); - TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS); + TimeSupplierMock timeSupplierMock = new TimeSupplierMock(); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, REFRESH_AFTER_WRITE_DURATION, TimeUnit.MILLISECONDS, timeSupplierMock); when(cacheLoaderMock.load(anyString(), anyString())).thenAnswer(invocation -> { - Thread.sleep(100); + timeSupplierMock.inc(LOAD_DURATION); return ComposableFutures.fromNull(); }); myCache.setAsync("key1", "value1").get(); // get value before refresh assertEquals("value1", myCache.getAsync("key1").get()); - Thread.sleep(500); + timeSupplierMock.inc(REFRESH_AFTER_WRITE_DURATION); // get value and trigger refresh assertEquals("value1", myCache.getAsync("key1").get()); - Thread.sleep(200); + timeSupplierMock.inc(LOAD_GRACE_DURATION); //get value after refresh assertNull(myCache.getAsync("key1").get()); verify(cacheLoaderMock, times(1)).load("myCache", "key1"); @@ -107,18 +113,19 @@ public void testGetAsyncAndRefreshNull() throws ExecutionException, InterruptedE @SuppressWarnings("unchecked") public void testGetAsyncAndRefreshException() throws ExecutionException, InterruptedException { CacheLoader cacheLoaderMock = mock(CacheLoader.class); - TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 5000, TimeUnit.MILLISECONDS); + TimeSupplierMock timeSupplierMock = new TimeSupplierMock(); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, REFRESH_AFTER_WRITE_DURATION, TimeUnit.MILLISECONDS, timeSupplierMock); when(cacheLoaderMock.load("myCache", "key1")).thenAnswer(invocation -> { - Thread.sleep(100); + timeSupplierMock.inc(LOAD_DURATION); return ComposableFutures.fromError(new RuntimeException()); }); myCache.setAsync("key1", "value1").get(); // get value before refresh assertEquals("value1", myCache.getAsync("key1").get()); - Thread.sleep(5000); + timeSupplierMock.inc(REFRESH_AFTER_WRITE_DURATION); // get value and trigger refresh assertEquals("value1", myCache.getAsync("key1").get()); - Thread.sleep(200); + timeSupplierMock.inc(LOAD_GRACE_DURATION); //get value after refresh assertEquals("value1", myCache.getAsync("key1").get()); verify(cacheLoaderMock, times(2)).load("myCache", "key1"); @@ -137,9 +144,10 @@ public void testGetBulkAsyncFromLoader() throws ExecutionException, InterruptedE @SuppressWarnings("unchecked") public void getBulkAsyncAndRefresh() throws ExecutionException, InterruptedException { CacheLoader cacheLoaderMock = mock(CacheLoader.class); - TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, 500, TimeUnit.MILLISECONDS); + TimeSupplierMock timeSupplierMock = new TimeSupplierMock(); + TypedCache myCache = new RefreshLoadingCacheDelegate<>(cacheMock, cacheLoaderMock, "myCache", null, 10, TimeUnit.SECONDS, false, REFRESH_AFTER_WRITE_DURATION, TimeUnit.MILLISECONDS, timeSupplierMock); when(cacheLoaderMock.load(anyString(), any(Iterable.class))).thenAnswer(invocation -> { - Thread.sleep(100); + timeSupplierMock.inc(LOAD_DURATION); Iterable keys = (Iterable) invocation.getArguments()[1]; Map res = new HashMap<>(); for (String key : keys) { @@ -150,13 +158,13 @@ public void getBulkAsyncAndRefresh() throws ExecutionException, InterruptedExcep myCache.setAsync("key1", "value1").get(); // get first value before refresh assertEquals("value1", myCache.getBulkAsync(Collections.singletonList("key1")).get().get("key1")); - Thread.sleep(500); + timeSupplierMock.inc(REFRESH_AFTER_WRITE_DURATION); // get first value and trigger refresh, second value should not trigger refresh myCache.setAsync("key2", "value2").get(); Map res = myCache.getBulkAsync(Arrays.asList("key1", "key2")).get(); assertEquals("value1", res.get("key1")); assertEquals("value2", res.get("key2")); - Thread.sleep(200); + timeSupplierMock.inc(LOAD_GRACE_DURATION); //get value after refresh Map res2 = myCache.getBulkAsync(Arrays.asList("key1", "key2")).get(); assertEquals(VALUE_FOR_BULK + "key1", res2.get("key1")); @@ -217,4 +225,18 @@ public ComposableFuture deleteAsync(final String key) { return ComposableFutures.fromValue(map.remove(key) != null); } } + + private static class TimeSupplierMock implements Supplier { + + private Long currentTime = 0L; + + @Override + public Long get() { + return currentTime; + } + + void inc(final Long time) { + this.currentTime += time; + } + } }