From e60aad6113d0c6b66a3b756969b84606d7c0a595 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sat, 28 Dec 2024 01:24:05 +0400 Subject: [PATCH 1/5] Implement lock-free probabilistic cache reloading Signed-off-by: Paolo Di Tommaso --- .../store/cache/AbstractTieredCache.groovy | 104 +++++++++++------- .../tower/client/cache/ClientCache.groovy | 2 +- .../cache/AbstractTieredCacheTest.groovy | 47 +++++++- 3 files changed, 112 insertions(+), 41 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy index 432b754b2d..47996cb61f 100644 --- a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy +++ b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy @@ -49,6 +49,8 @@ import org.jetbrains.annotations.Nullable @CompileStatic abstract class AbstractTieredCache implements TieredCache { + static final private double DEFAULT_REVAL_STEEPNESS = 1/300 + @Canonical @ToString(includePackage = false, includeNames = true) static class Entry implements MoshiExchange { @@ -94,12 +96,20 @@ abstract class AbstractTieredCache implements TieredCac } } - abstract int getMaxSize() + abstract protected int getMaxSize() abstract protected getName() abstract protected String getPrefix() + protected Duration getCacheRevalidationInterval() { + return null + } + + protected double getRevalidationSteepness() { + return DEFAULT_REVAL_STEEPNESS + } + private RemovalListener removalListener0() { new RemovalListener() { @Override @@ -158,42 +168,51 @@ abstract class AbstractTieredCache implements TieredCac private V getOrCompute0(String key, Function> loader) { assert key!=null, "Argument key cannot be null" - log.trace "Cache '${name}' checking key=$key" + final now = Instant.now() // Try L1 cache first - V value = l1Get(key) - if (value != null) { - log.trace "Cache '${name}' L1 hit (a) - key=$key => value=$value" - return value + Entry entry = l1Get(key) + Boolean needsRevalidation = entry ? shouldRevalidate(entry.expiresAt, now) : null + if( entry && !needsRevalidation ) { + log.trace "Cache '${name}' L1 hit (a) - key=$key => entry=$entry" + return (V) entry.value } final sync = locks.computeIfAbsent(key, (k)-> new ReentrantLock()) sync.lock() try { - value = l1Get(key) - if (value != null) { - log.trace "Cache '${name}' L1 hit (b) - key=$key => value=$value" - return value + // check again L1 cache once in the sync block + if( !entry ) { + entry = l1Get(key) + needsRevalidation = entry ? shouldRevalidate(entry.expiresAt, now) : null + } + if( entry && !needsRevalidation ) { + log.trace "Cache '${name}' L1 hit (b) - key=$key => entry=$entry" + return (V)entry.value } // Fallback to L2 cache - final entry = l2GetEntry(key) - if (entry != null) { - log.trace "Cache '${name}' L2 hit - key=$key => entry=$entry" + if( !entry ) { + entry = l2Get(key) + needsRevalidation = entry ? shouldRevalidate(entry.expiresAt, now) : null + } + if (entry && !needsRevalidation) { + log.trace "Cache '${name}' L2 hit (c) - key=$key => entry=$entry" // Rehydrate L1 cache l1.put(key, entry) return (V) entry.value } // still not value found, use loader function to fetch the value - if( value==null && loader!=null ) { + V value = null + if( loader!=null ) { log.trace "Cache '${name}' invoking loader - key=$key" final ret = loader.apply(key) value = ret?.v1 Duration ttl = ret?.v2 if( value!=null && ttl!=null ) { final exp = Instant.now().plus(ttl).toEpochMilli() - final newEntry = new Entry(value,exp) + final newEntry = new Entry(value, exp) l1Put(key, newEntry) l2Put(key, newEntry, ttl) } @@ -221,27 +240,15 @@ abstract class AbstractTieredCache implements TieredCac protected String key0(String k) { return getPrefix() + ':' + k } - protected V l1Get(String key) { - return (V) l1GetEntry(key)?.value - } - - protected Entry l1GetEntry(String key) { - final entry = l1.getIfPresent(key) - if( entry == null ) - return null - - if( System.currentTimeMillis() > entry.expiresAt ) { - log.trace "Cache '${name}' L1 expired - key=$key => entry=$entry" - return null - } - return entry + protected Entry l1Get(String key) { + return l1.getIfPresent(key) } protected void l1Put(String key, Entry entry) { l1.put(key, entry) } - protected Entry l2GetEntry(String key) { + protected Entry l2Get(String key) { if( l2 == null ) return null @@ -249,17 +256,9 @@ abstract class AbstractTieredCache implements TieredCac if( raw == null ) return null - final Entry entry = encoder.decode(raw) - if( System.currentTimeMillis() > entry.expiresAt ) { - log.trace "Cache '${name}' L2 expired - key=$key => value=${entry}" - return null - } - return entry + return encoder.decode(raw) } - protected V l2Get(String key) { - return (V) l2GetEntry(key)?.value - } protected void l2Put(String key, Entry entry, Duration ttl) { if( l2 != null ) { @@ -272,4 +271,31 @@ abstract class AbstractTieredCache implements TieredCac l1.invalidateAll() } + protected boolean shouldRevalidate(long expiration, Instant time=Instant.now()) { + + // when 'remainingCacheTime' is less than or equals to zero, it means + // the current time is beyond the expiration time, therefore a cache validation is needed + final remainingCacheTime = expiration - time.toEpochMilli() + if (remainingCacheTime <= 0) { + return true + } + + // otherwise, when remaining is greater than the cache revalidation interval + // no revalidation is needed + final cacheRevalidationMills = cacheRevalidationInterval?.toMillis() ?: 0 + if( remainingCacheTime > cacheRevalidationMills ) { + return false + } + + // finally the remaining time is shorter the validation interval + // i.e. it's approaching the cache expiration, in this cache the needed + // for cache revalidation is determined in a probabilistic manner + // see https://blog.cloudflare.com/sometimes-i-cache/ + return randomRevalidate((cacheRevalidationMills-remainingCacheTime) /1000 as long) + } + + protected boolean randomRevalidate(long remainingTimeSecs) { + Math.random() < Math.exp(-revalidationSteepness * remainingTimeSecs) + } + } diff --git a/src/main/groovy/io/seqera/wave/tower/client/cache/ClientCache.groovy b/src/main/groovy/io/seqera/wave/tower/client/cache/ClientCache.groovy index 5ce76e13e3..5729a45b3c 100644 --- a/src/main/groovy/io/seqera/wave/tower/client/cache/ClientCache.groovy +++ b/src/main/groovy/io/seqera/wave/tower/client/cache/ClientCache.groovy @@ -66,7 +66,7 @@ class ClientCache extends AbstractTieredCache { } @Override - int getMaxSize() { + protected int getMaxSize() { return maxSize } diff --git a/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy b/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy index 81b302a5bd..962b1ece95 100644 --- a/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy +++ b/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy @@ -19,6 +19,7 @@ package io.seqera.wave.store.cache import java.time.Duration +import java.time.Instant import com.squareup.moshi.JsonAdapter import com.squareup.moshi.adapters.PolymorphicJsonAdapterFactory @@ -102,7 +103,7 @@ class AbstractTieredCacheTest extends Specification implements RedisTestContaine cache1.put(k, value, TTL) then: - def entry1 = cache1.l1GetEntry(k) + def entry1 = cache1.l1Get(k) and: entry1.expiresAt > begin then: @@ -223,4 +224,48 @@ class AbstractTieredCacheTest extends Specification implements RedisTestContaine cache.get(k2) == null } + def 'should validate revalidation logic' () { + given: + def REVALIDATION_INTERVAL_SECS = 10 + def now = Instant.now() + def cache = Spy(MyCache) + cache.getCacheRevalidationInterval() >> Duration.ofSeconds(REVALIDATION_INTERVAL_SECS) + + when: + // when expiration is past, then 'revalidate' should be true + def expiration = now.minusSeconds(1) + def revalidate = cache.shouldRevalidate(expiration.toEpochMilli(), now) + then: + 0 * cache.randomRevalidate(_) >> null + and: + revalidate + + when: + // when expiration is longer than the revalidation internal, then 'revalidate' is false + expiration = now.plusSeconds(REVALIDATION_INTERVAL_SECS +1) + revalidate = cache.shouldRevalidate(expiration.toEpochMilli(), now) + then: + 0 * cache.randomRevalidate(_) >> null + and: + !revalidate + + when: + // when expiration is less than or equal the revalidation internal, then 'revalidate' is computed randomly + expiration = now.plusSeconds(REVALIDATION_INTERVAL_SECS) + revalidate = cache.shouldRevalidate(expiration.toEpochMilli(), now) + then: + 1 * cache.randomRevalidate(_) >> true + and: + revalidate + + when: + // when expiration is less than or equal the revalidation internal, then 'revalidate' is computed randomly + expiration = now.plusSeconds(REVALIDATION_INTERVAL_SECS -1) + revalidate = cache.shouldRevalidate(expiration.toEpochMilli(), now) + then: + 1 * cache.randomRevalidate(_) >> false + and: + !revalidate + } + } From a4a26864796df9b00a1f3cc24c37147596e2d566 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 30 Dec 2024 01:08:56 +0700 Subject: [PATCH 2/5] wip Signed-off-by: Paolo Di Tommaso --- .../store/cache/AbstractTieredCache.groovy | 19 +++++++++---------- .../cache/AbstractTieredCacheTest.groovy | 9 +++++++++ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy index 47996cb61f..cd1b667dfb 100644 --- a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy +++ b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy @@ -49,8 +49,6 @@ import org.jetbrains.annotations.Nullable @CompileStatic abstract class AbstractTieredCache implements TieredCache { - static final private double DEFAULT_REVAL_STEEPNESS = 1/300 - @Canonical @ToString(includePackage = false, includeNames = true) static class Entry implements MoshiExchange { @@ -103,11 +101,11 @@ abstract class AbstractTieredCache implements TieredCac abstract protected String getPrefix() protected Duration getCacheRevalidationInterval() { - return null + return Duration.ZERO } protected double getRevalidationSteepness() { - return DEFAULT_REVAL_STEEPNESS + return 1 / getCacheRevalidationInterval().toMillis() } private RemovalListener removalListener0() { @@ -196,14 +194,15 @@ abstract class AbstractTieredCache implements TieredCac entry = l2Get(key) needsRevalidation = entry ? shouldRevalidate(entry.expiresAt, now) : null } - if (entry && !needsRevalidation) { + if( entry && !needsRevalidation ) { log.trace "Cache '${name}' L2 hit (c) - key=$key => entry=$entry" // Rehydrate L1 cache l1.put(key, entry) return (V) entry.value } - // still not value found, use loader function to fetch the value + // still not entry found or cache revalidation needed + // use the loader function to fetch the value V value = null if( loader!=null ) { log.trace "Cache '${name}' invoking loader - key=$key" @@ -282,7 +281,7 @@ abstract class AbstractTieredCache implements TieredCac // otherwise, when remaining is greater than the cache revalidation interval // no revalidation is needed - final cacheRevalidationMills = cacheRevalidationInterval?.toMillis() ?: 0 + final cacheRevalidationMills = cacheRevalidationInterval.toMillis() if( remainingCacheTime > cacheRevalidationMills ) { return false } @@ -291,11 +290,11 @@ abstract class AbstractTieredCache implements TieredCac // i.e. it's approaching the cache expiration, in this cache the needed // for cache revalidation is determined in a probabilistic manner // see https://blog.cloudflare.com/sometimes-i-cache/ - return randomRevalidate((cacheRevalidationMills-remainingCacheTime) /1000 as long) + return randomRevalidate(cacheRevalidationMills-remainingCacheTime) } - protected boolean randomRevalidate(long remainingTimeSecs) { - Math.random() < Math.exp(-revalidationSteepness * remainingTimeSecs) + protected boolean randomRevalidate(long remainingTime) { + Math.random() < Math.exp(-revalidationSteepness * remainingTime) } } diff --git a/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy b/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy index 962b1ece95..53943b11f2 100644 --- a/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy +++ b/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy @@ -268,4 +268,13 @@ class AbstractTieredCacheTest extends Specification implements RedisTestContaine !revalidate } + def 'should validate random function' () { + given: + def now = Instant.now() + def cache = Spy(MyCache) + cache.getCacheRevalidationInterval() >> Duration.ofSeconds(10) + expect: + cache.randomRevalidate(0) + } + } From 8b4a180c10c64a692aa6316e043914f2ca88f1aa Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 20 Jan 2025 18:29:22 +0100 Subject: [PATCH 3/5] Improve logging Signed-off-by: Paolo Di Tommaso --- .../io/seqera/wave/store/cache/AbstractTieredCache.groovy | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy index 1591b62fb1..f4b594412e 100644 --- a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy +++ b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy @@ -221,7 +221,9 @@ abstract class AbstractTieredCache implements TieredCac // use the loader function to fetch the value V value = null if( loader!=null ) { - if( log.isTraceEnabled() ) + if( entry && needsRevalidation ) + log.debug "Cache '${name}' invoking loader - entry=$entry needs refresh" + else if( log.isTraceEnabled() ) log.trace "Cache '${name}' invoking loader - key=$key" final ret = loader.apply(key) value = ret?.v1 @@ -290,7 +292,6 @@ abstract class AbstractTieredCache implements TieredCac } protected boolean shouldRevalidate(long expiration, Instant time=Instant.now()) { - // when 'remainingCacheTime' is less than or equals to zero, it means // the current time is beyond the expiration time, therefore a cache validation is needed final remainingCacheTime = expiration - time.toEpochMilli() From 2fd472199e66b57a26e5fbb94011965d27232ce7 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 22 Jan 2025 12:55:45 +0100 Subject: [PATCH 4/5] wip Signed-off-by: Paolo Di Tommaso --- .../store/cache/AbstractTieredCache.groovy | 10 ++--- .../cache/AbstractTieredCacheTest.groovy | 39 +++++++++++++++++++ 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy index f4b594412e..4b6217bc39 100644 --- a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy +++ b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy @@ -180,10 +180,10 @@ abstract class AbstractTieredCache implements TieredCac assert key!=null, "Argument key cannot be null" if( log.isTraceEnabled() ) log.trace "Cache '${name}' checking key=$key" - final now = Instant.now() + final ts = Instant.now() // Try L1 cache first Entry entry = l1Get(key) - Boolean needsRevalidation = entry ? shouldRevalidate(entry.expiresAt, now) : null + Boolean needsRevalidation = entry ? shouldRevalidate(entry.expiresAt, ts) : null if( entry && !needsRevalidation ) { if( log.isTraceEnabled() ) log.trace "Cache '${name}' L1 hit (a) - key=$key => entry=$entry" @@ -196,7 +196,7 @@ abstract class AbstractTieredCache implements TieredCac // check again L1 cache once in the sync block if( !entry ) { entry = l1Get(key) - needsRevalidation = entry ? shouldRevalidate(entry.expiresAt, now) : null + needsRevalidation = entry ? shouldRevalidate(entry.expiresAt, ts) : null } if( entry && !needsRevalidation ) { if( log.isTraceEnabled() ) @@ -207,7 +207,7 @@ abstract class AbstractTieredCache implements TieredCac // Fallback to L2 cache if( !entry ) { entry = l2Get(key) - needsRevalidation = entry ? shouldRevalidate(entry.expiresAt, now) : null + needsRevalidation = entry ? shouldRevalidate(entry.expiresAt, ts) : null } if( entry && !needsRevalidation ) { if( log.isTraceEnabled() ) @@ -314,7 +314,7 @@ abstract class AbstractTieredCache implements TieredCac } protected boolean randomRevalidate(long remainingTime) { - Math.random() < Math.exp(-revalidationSteepness * remainingTime) + return Math.random() < Math.exp(-revalidationSteepness * remainingTime) } } diff --git a/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy b/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy index 53943b11f2..a76c0444ea 100644 --- a/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy +++ b/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy @@ -18,6 +18,8 @@ package io.seqera.wave.store.cache +import spock.lang.Retry + import java.time.Duration import java.time.Instant @@ -277,4 +279,41 @@ class AbstractTieredCacheTest extends Specification implements RedisTestContaine cache.randomRevalidate(0) } + @Retry(count = 5) + def 'should validate random revalidate with interval 10s' () { + given: + def now = Instant.now() + def cache = Spy(MyCache) + cache.getCacheRevalidationInterval() >> Duration.ofSeconds(10) + expect: + // when remaining time is approaching 0 + // the function should return true + cache.randomRevalidate(10) // 10 millis + cache.randomRevalidate(100) // 100 millis + and: + // when it's mostly the same as the revalidation interval + // it should return false + !cache.randomRevalidate(9_000) + !cache.randomRevalidate(10_000) + } + + @Retry(count = 5) + def 'should validate random revalidate with interval 300s' () { + given: + def now = Instant.now() + def cache = Spy(MyCache) + cache.getCacheRevalidationInterval() >> Duration.ofSeconds(300) + expect: + // when remaining time is approaching 0 + // the function should return true + cache.randomRevalidate(10) // 10 millis + cache.randomRevalidate(100) // 100 millis + cache.randomRevalidate(500) // 100 millis + and: + // when it's mostly the same as the revalidation interval + // it should return false + !cache.randomRevalidate(250_000) + !cache.randomRevalidate(290_000) + !cache.randomRevalidate(300_000) + } } From 58cf2ea0774df7ffef73c5bb399ed863fd247cf3 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Tue, 28 Jan 2025 11:37:33 +0100 Subject: [PATCH 5/5] Minor change Signed-off-by: Paolo Di Tommaso --- .../store/cache/AbstractTieredCache.groovy | 23 ++++++++++++++++++- .../cache/AbstractTieredCacheTest.groovy | 11 --------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy index 4b6217bc39..e0513cbfb9 100644 --- a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy +++ b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy @@ -33,6 +33,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause import com.github.benmanes.caffeine.cache.RemovalListener import groovy.transform.Canonical import groovy.transform.CompileStatic +import groovy.transform.Memoized import groovy.transform.ToString import groovy.util.logging.Slf4j import io.seqera.wave.encoder.EncodingStrategy @@ -110,10 +111,30 @@ abstract class AbstractTieredCache implements TieredCac abstract protected String getPrefix() + /** + * The cache probabilistic revalidation internal. + * + * See https://blog.cloudflare.com/sometimes-i-cache/ + * + * @return + * The cache cache revalidation internal as a {@link Duration} value. + * When {@link Duration#ZERO} probabilistic revalidation is disabled. + */ protected Duration getCacheRevalidationInterval() { return Duration.ZERO } + /** + * The cache probabilistic revalidation steepness value. + * + * By default is implemented as 1 / {@link #getCacheRevalidationInterval()} (as millis). + * Subclasses can override this method to provide a different value. + * + * See https://blog.cloudflare.com/sometimes-i-cache/ + * + * @return Returns the revalidation steepness value. + */ + @Memoized protected double getRevalidationSteepness() { return 1 / getCacheRevalidationInterval().toMillis() } @@ -302,7 +323,7 @@ abstract class AbstractTieredCache implements TieredCac // otherwise, when remaining is greater than the cache revalidation interval // no revalidation is needed final cacheRevalidationMills = cacheRevalidationInterval.toMillis() - if( remainingCacheTime > cacheRevalidationMills ) { + if( cacheRevalidationMills < remainingCacheTime ) { return false } diff --git a/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy b/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy index a76c0444ea..c2edc8b5b2 100644 --- a/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy +++ b/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy @@ -290,11 +290,6 @@ class AbstractTieredCacheTest extends Specification implements RedisTestContaine // the function should return true cache.randomRevalidate(10) // 10 millis cache.randomRevalidate(100) // 100 millis - and: - // when it's mostly the same as the revalidation interval - // it should return false - !cache.randomRevalidate(9_000) - !cache.randomRevalidate(10_000) } @Retry(count = 5) @@ -309,11 +304,5 @@ class AbstractTieredCacheTest extends Specification implements RedisTestContaine cache.randomRevalidate(10) // 10 millis cache.randomRevalidate(100) // 100 millis cache.randomRevalidate(500) // 100 millis - and: - // when it's mostly the same as the revalidation interval - // it should return false - !cache.randomRevalidate(250_000) - !cache.randomRevalidate(290_000) - !cache.randomRevalidate(300_000) } }