Skip to content
This repository was archived by the owner on Nov 10, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 119 additions & 56 deletions ob1k-cache/src/main/java/com/outbrain/ob1k/cache/LocalAsyncCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.outbrain.ob1k.concurrent.ComposableFuture;
import com.outbrain.ob1k.concurrent.ComposableFutures;
import com.outbrain.ob1k.concurrent.UncheckedExecutionException;
import com.outbrain.swinfra.metrics.api.MetricFactory;

import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -29,11 +32,9 @@
* Time: 6:08 PM
*/
public class LocalAsyncCache<K,V> implements TypedCache<K,V> {

private final LoadingCache<K, ComposableFuture<V>> loadingCache;
private final Cache<K, ComposableFuture<V>> localCache;
private final CacheLoader<K, V> loader;
private final String cacheName;
private final boolean failOnMissingEntries;

public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader<K, V> loader,
final MetricFactory metricFactory, final String cacheName) {
Expand All @@ -42,29 +43,25 @@ public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit

public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader<K, V> loader,
final MetricFactory metricFactory, final String cacheName, final boolean failOnMissingEntries) {
this.loader = loader;
this.cacheName = cacheName;
this.failOnMissingEntries = failOnMissingEntries;
this(maximumSize, ttl, unit, loader, metricFactory, cacheName, failOnMissingEntries, -1, null);
}

public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final CacheLoader<K, V> loader,
final MetricFactory metricFactory, final String cacheName, final boolean failOnMissingEntries,
final long refreshAfterWriteDuration, final TimeUnit refreshAfterWriteUnit) {
final boolean collectStats = metricFactory != null;
final CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder()
.maximumSize(maximumSize)
.expireAfterWrite(ttl, unit);
.maximumSize(maximumSize)
.expireAfterWrite(ttl, unit);

if (refreshAfterWriteDuration != -1) {
builder.refreshAfterWrite(refreshAfterWriteDuration, refreshAfterWriteUnit);
}

if (collectStats)
builder.recordStats();

this.loadingCache = builder
.build(new com.google.common.cache.CacheLoader<K, ComposableFuture<V>>() {
public ComposableFuture<V> load(final K key) throws Exception {
return loadElement(key);
}

@Override
public Map<K, ComposableFuture<V>> loadAll(final Iterable<? extends K> keys) throws Exception {
return loadElements(Lists.newArrayList(keys));
}
});
this.loadingCache = builder.build(new InternalCacheLoader(loader, cacheName, failOnMissingEntries));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add diamond: new InternalCacheLoader<>


if (collectStats) {
GuavaCacheGaugesFactory.createGauges(metricFactory, loadingCache, "LocalAsyncCache-" + cacheName);
Expand All @@ -78,10 +75,7 @@ public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit
}

public LocalAsyncCache(final int maximumSize, final int ttl, final TimeUnit unit, final MetricFactory metricFactory, final String cacheName) {
this.loader = null;
this.loadingCache = null;
this.failOnMissingEntries = true; // fake value, not in use.
this.cacheName = cacheName;

final boolean collectStats = metricFactory != null;
final CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder()
Expand All @@ -107,33 +101,6 @@ public LocalAsyncCache() {
this(1000, 20, TimeUnit.SECONDS);
}

private ComposableFuture<V> loadElement(final K key) {
return loader.load(cacheName, key).materialize();
}

private Function<Map<K, V>, ComposableFuture<V>> extractLoaderResultEntry(final K key) {
return loaderResults -> {
final V res = loaderResults.get(key);
if (res != null) {
return fromValue(res);
} else {
if (failOnMissingEntries) {
return fromError(new RuntimeException(key + " is missing from" + (cacheName == null ? "" : " " + cacheName) + " loader response."));
} else {
return fromNull();
}
}
};
}

private Map<K, ComposableFuture<V>> loadElements(final Iterable<? extends K> keys) {
final ComposableFuture<Map<K, V>> loaded = loader.load(cacheName, keys).materialize();
final Map<K, ComposableFuture<V>> result = new HashMap<>();
for (final K key : keys) {
result.put(key, loaded.flatMap(extractLoaderResultEntry(key)));
}
return result;
}

@Override
public ComposableFuture<V> getAsync(final K key) {
Expand All @@ -154,9 +121,7 @@ public ComposableFuture<V> getAsync(final K key) {
}
return res;
}
} catch (final com.google.common.util.concurrent.UncheckedExecutionException e) {
return fromError(e.getCause());
} catch (final ExecutionException | UncheckedExecutionException | ExecutionError e) {
} catch (final com.google.common.util.concurrent.UncheckedExecutionException | ExecutionException | UncheckedExecutionException | ExecutionError e) {
return fromError(e.getCause());
}
}
Expand Down Expand Up @@ -189,9 +154,7 @@ public ComposableFuture<Map<K, V>> getBulkAsync(final Iterable<? extends K> keys
}

return all(true, result);
} catch (final com.google.common.util.concurrent.UncheckedExecutionException e) {
return fromError(e.getCause());
} catch (final ExecutionException | UncheckedExecutionException | ExecutionError e) {
} catch (final com.google.common.util.concurrent.UncheckedExecutionException | ExecutionException | UncheckedExecutionException | ExecutionError e) {
return fromError(e.getCause());
}
}
Expand Down Expand Up @@ -281,4 +244,104 @@ public ComposableFuture<Map<K, Boolean>> setBulkAsync(final Map<? extends K, ? e
return all(false, result);
}

public static class Builder<K, V> {
private final int maximumSize;
private final int ttl;
private final TimeUnit unit;
private final MetricFactory metricFactory;
private final String cacheName;
private CacheLoader<K, V> loader;
private boolean failOnMissingEntries = false;
private long refreshAfterWriteDuration;
private TimeUnit refreshAfterWriteUnit;

public Builder(final int maximumSize, final int ttl, final TimeUnit unit, final MetricFactory metricFactory, final String cacheName) {
this.maximumSize = maximumSize;
this.ttl = ttl;
this.unit = unit;
this.metricFactory = metricFactory;
this.cacheName = cacheName;
}

public Builder<K, V> withLoader(final CacheLoader<K, V> loader) {
this.loader = loader;
return this;
}

public Builder<K, V> failOnMissingEntries() {
this.failOnMissingEntries = true;
return this;
}

public Builder<K, V> refreshAfterWrite(long duration, TimeUnit unit) {
this.refreshAfterWriteDuration = duration;
this.refreshAfterWriteUnit = unit;
return this;
}

public LocalAsyncCache<K, V> build() {
if (loader == null) {
return new LocalAsyncCache<>(maximumSize, ttl, unit, metricFactory, cacheName);
}
return new LocalAsyncCache<>(maximumSize, ttl, unit, loader, metricFactory, cacheName, failOnMissingEntries,
refreshAfterWriteDuration, refreshAfterWriteUnit);
}
}

private static class InternalCacheLoader<K, V> extends com.google.common.cache.CacheLoader<K, ComposableFuture<V>> {

private final CacheLoader<K, V> loader;
private final String cacheName;
private final boolean failOnMissingEntries;

InternalCacheLoader(final CacheLoader<K, V> loader, final String cacheName, final boolean failOnMissingEntries) {
this.loader = loader;
this.cacheName = cacheName;
this.failOnMissingEntries = failOnMissingEntries;
}

@Override
public ComposableFuture<V> load(@Nonnull final K key) {
return loader.load(cacheName, key).materialize();
}

@Override
public Map<K, ComposableFuture<V>> loadAll(final Iterable<? extends K> keys) {
final ComposableFuture<Map<K, V>> loaded = loader.load(cacheName, keys).materialize();
final Map<K, ComposableFuture<V>> result = new HashMap<>();
for (final K key : keys) {
result.put(key, loaded.flatMap(extractLoaderResultEntry(key)));
}
return result;
}

@Override
public ListenableFuture<ComposableFuture<V>> reload(final K key, final ComposableFuture<V> oldValue) {
final SettableFuture<ComposableFuture<V>> future = SettableFuture.create();
loader.load(cacheName, key).materialize().consume(t -> {
if (t.isSuccess()) {
future.set(ComposableFutures.fromValue(t.getValue()));
} else {
future.setException(t.getError());
}
});

return future;
}

private Function<Map<K, V>, ComposableFuture<V>> extractLoaderResultEntry(final K key) {
return loaderResults -> {
final V res = loaderResults.get(key);
if (res != null) {
return fromValue(res);
} else {
if (failOnMissingEntries) {
return fromError(new RuntimeException(key + " is missing from" + (cacheName == null ? "" : " " + cacheName) + " loader response."));
} else {
return fromNull();
}
}
};
}
}
}
Loading