diff --git a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java index ef44f2ff..e69280e3 100644 --- a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java +++ b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java @@ -1,29 +1,35 @@ package com.netflix.evcache.test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +import com.netflix.evcache.EVCache; +import com.netflix.evcache.EVCacheException; +import com.netflix.evcache.EVCacheGetOperationListener; +import com.netflix.evcache.EVCacheLatch; +import com.netflix.evcache.operation.EVCacheOperationFuture; +import com.netflix.evcache.pool.EVCacheClient; +import com.netflix.evcache.pool.ServerGroup; +import com.netflix.evcache.test.transcoder.Movie; +import com.netflix.evcache.test.transcoder.MovieTranscoder; +import com.netflix.evcache.util.KeyHasher; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; - -import java.util.*; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - -import com.netflix.evcache.*; -import com.netflix.evcache.pool.EVCacheClient; -import com.netflix.evcache.pool.ServerGroup; -import com.netflix.evcache.util.KeyHasher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; - -import com.netflix.evcache.operation.EVCacheOperationFuture; import rx.schedulers.Schedulers; - -import static org.testng.Assert.*; - public class EVCacheTestDI extends DIBase implements EVCacheGetOperationListener { private static final Logger log = LoggerFactory.getLogger(EVCacheTestDI.class); private int loops = 1; @@ -280,6 +286,8 @@ public void functionalTestsWithAppLevelAndASGLevelHashingScenarios() throws Exce assertTrue(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".auto.hash.keys", Boolean.class).orElse(false).get()); assertFalse(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElse(false).get()); testWithLargeKey(); + testWithMixedKeys(); + testWithMixedKeysAndCustomTranscoder(); // negative scenario propertiesToSet.remove(appName + ".auto.hash.keys"); refreshEVCache(); @@ -314,8 +322,13 @@ public void functionalTestsWithAppLevelAndASGLevelHashingScenarios() throws Exce } doFunctionalTests(true); for (ServerGroup serverGroup : clientsByServerGroup.keySet()) { - propertiesToSet.remove(serverGroup.getName()); + propertiesToSet.remove(serverGroup.getName() + ".hash.key"); + propertiesToSet.remove(serverGroup.getName() + ".hash.algo"); } + + propertiesToSet.remove(appName + ".hash.key", "true"); + + refreshEVCache(); } private void testWithLargeKey() throws Exception { @@ -330,8 +343,135 @@ private void testWithLargeKey() throws Exception { EVCacheLatch latch = evCache.set(key, value, EVCacheLatch.Policy.ALL); latch.await(1000, TimeUnit.MILLISECONDS); + String val = evCache.get(key); + // get + assertEquals(val, value); + + // delete + Future[] futures = evCache.delete(key); + for (Future future : futures) { + future.get(); + } + } + + private void testWithMixedKeys() throws Exception { + + EVCache[] evcacheInstance = new EVCache[2]; + evcacheInstance[0] = getNewBuilder().setAppName(appName).setCachePrefix("cid").enableRetry().build(); + evcacheInstance[1] = this.evCache; + + Map kv = new HashMap<>(6); + String oneLargeKey = null; + String oneSmallKey = null; + for (int k = 0; k < 3; k ++) { + StringBuilder sb = new StringBuilder(); + sb.append("testWithSmallAndLargeKeysMixed"); + for (int i= 0; i < 100; i++) { + sb.append(System.nanoTime()); + } + oneLargeKey = sb.toString(); + kv.put(oneLargeKey, UUID.randomUUID().toString()); + } + for (int k = 3; k < 6; k ++) { + oneSmallKey = "testWithSmallAndLargeKeysMixed" + System.nanoTime(); + kv.put(oneSmallKey, UUID.randomUUID().toString()); + } + + for (Map.Entry entry : kv.entrySet()) { + EVCacheLatch latch = evCache.set(entry.getKey(), entry.getValue(), EVCacheLatch.Policy.ALL); + latch.await(10000, TimeUnit.MILLISECONDS); + } + // get - assertEquals(evCache.get(key), value); + String val = evCache.get(oneLargeKey); + assertEquals(val, kv.get(oneLargeKey)); + val = evCache.get(oneSmallKey); + assertEquals(val, kv.get(oneSmallKey)); + + // async bulk get + for (int op : new int[]{0, 1}) { + Map results; + if (op == 0) { + CompletableFuture> future = evCache.getAsyncBulk(kv.keySet().toArray(new String[0])); + results = future.get(10000, TimeUnit.MILLISECONDS); + } else { + results = evCache.getBulk(kv.keySet().toArray(new String[0])); + } + assertEquals(results.size(), kv.size()); + for (Map.Entry result : results.entrySet()) { + assertEquals(result.getValue(), kv.get(result.getKey())); + } + } + + // delete + for (Map.Entry entry : kv.entrySet()) { + Future[] deleteFutures = evCache.delete(entry.getKey()); + for (Future deleteFuture : deleteFutures) { + deleteFuture.get(); + } + } + + } + + private void testWithMixedKeysAndCustomTranscoder() throws Exception { + + com.netflix.evcache.EVCache evCache = getNewBuilder().setAppName(appName).setCachePrefix("cid").enableRetry() + .setTranscoder(new MovieTranscoder()) + .build(); + + Map kv = new HashMap<>(6); + String oneLargeKey = null; + String oneSmallKey = null; + for (int k = 0; k < 3; k ++) { + StringBuilder sb = new StringBuilder(); + sb.append("testWithSmallAndLargeKeysMixed"); + for (int i= 0; i < 100; i++) { + sb.append(System.nanoTime()); + } + oneLargeKey = sb.toString(); + kv.put(oneLargeKey, new Movie(k, String.valueOf(k))); + } + for (int k = 3; k < 6; k ++) { + oneSmallKey = "testWithSmallAndLargeKeysMixed" + System.nanoTime(); + kv.put(oneSmallKey, new Movie(k, String.valueOf(k))); + } + + for (Map.Entry entry : kv.entrySet()) { + EVCacheLatch latch = evCache.set(entry.getKey(), entry.getValue(), EVCacheLatch.Policy.ALL); + latch.await(10000, TimeUnit.MILLISECONDS); + } + + // get + Movie val = evCache.get(oneLargeKey); + assertEquals(val, kv.get(oneLargeKey)); + val = evCache.get(oneSmallKey); + assertEquals(val, kv.get(oneSmallKey)); + + // async bulk get + for (int op : new int[]{0, 1}) { + Map results = new HashMap<>(); + if (op == 0) { + CompletableFuture> future = evCache.getAsyncBulk(kv.keySet().toArray(new String[0])); + results = future.get(10000, TimeUnit.MILLISECONDS); + // } else { + // TODO: getBulk api is known to be broken for un-hashed keys not decoding correctly when request contains both hashed and unhashed keys + // results = evCache.getBulk(kv.keySet().toArray(new String[0])); + } + + for (Map.Entry result : results.entrySet()) { + assertEquals(results.size(), kv.size()); + assertEquals(result.getValue(), kv.get(result.getKey()), "Did not get the written value back with op " + (op == 0 ? "getAsyncBulk" : "getBulk")); + } + } + + // delete + for (Map.Entry entry : kv.entrySet()) { + Future[] deleteFutures = evCache.delete(entry.getKey()); + for (Future deleteFuture : deleteFutures) { + deleteFuture.get(); + } + } + } private void doFunctionalTests(boolean isHashingEnabled) throws Exception { @@ -458,6 +598,7 @@ public void testAll() { testGetAndTouchObservable(); waitForCallbacks(); testAppendOrAdd(); + // functionalTestsWithAppLevelAndASGLevelHashingScenarios(); // slow, requires EVCache refresh testTouch(); testDelete(); testInsert(); diff --git a/evcache-client/test/com/netflix/evcache/test/transcoder/Movie.java b/evcache-client/test/com/netflix/evcache/test/transcoder/Movie.java new file mode 100644 index 00000000..2d1668be --- /dev/null +++ b/evcache-client/test/com/netflix/evcache/test/transcoder/Movie.java @@ -0,0 +1,44 @@ +package com.netflix.evcache.test.transcoder; + +import java.util.Objects; + +public class Movie { + long id; + String name; + + public Movie() { // required for decode + } + + public Movie(long id, String name) { + this.id = id; + this.name = name; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Movie)) return false; + Movie movie = (Movie) o; + return id == movie.id && Objects.equals(name, movie.name); + } + + @Override + public int hashCode() { + return Objects.hash(id, name); + } +} diff --git a/evcache-client/test/com/netflix/evcache/test/transcoder/MovieTranscoder.java b/evcache-client/test/com/netflix/evcache/test/transcoder/MovieTranscoder.java new file mode 100644 index 00000000..e20a0b0a --- /dev/null +++ b/evcache-client/test/com/netflix/evcache/test/transcoder/MovieTranscoder.java @@ -0,0 +1,44 @@ +package com.netflix.evcache.test.transcoder; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import net.spy.memcached.CachedData; +import net.spy.memcached.transcoders.Transcoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MovieTranscoder implements Transcoder { + private static final Logger LOGGER = LoggerFactory.getLogger(MovieTranscoder.class); + ObjectMapper mapper = new ObjectMapper(); + + @Override + public boolean asyncDecode(CachedData d) { + return false; + } + + @Override + public CachedData encode(Movie movie) { + byte[] bytes; + try { + bytes = mapper.writeValueAsBytes(movie); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + return new CachedData(0, bytes, bytes.length); + } + + @Override + public Movie decode(CachedData d) { + try { + return mapper.readValue(d.getData(), Movie.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public int getMaxSize() { + return CachedData.MAX_SIZE; + } +} diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index 24c09e39..865a7975 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -43,12 +43,14 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiPredicate; import java.util.function.Function; import java.util.stream.Collectors; import javax.management.MBeanServer; @@ -1899,29 +1901,34 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient List evcacheKeys, Transcoder tc) { KeyMapDto keyMapDto = buildKeyMap(client, evcacheKeys); - final Map keyMap = keyMapDto.getKeyMap(); - boolean hasHashedKey = keyMapDto.isKeyHashed(); - if (hasHashedKey) { - if (log.isDebugEnabled() && shouldLog()) { - log.debug("fetching bulk data with hashedKey {} ",evcacheKeys); - } - return client.getAsyncBulk(keyMap.keySet(), evcacheValueTranscoder) - .thenApply(data -> buildHashedKeyValueResult(data, tc, client, keyMap)) - .exceptionally(t -> handleBulkException(t, evcacheKeys)); - } else { - final Transcoder tcCopy; - if (tc == null && _transcoder != null) { - tcCopy = (Transcoder) _transcoder; + final Set plainKeys = keyMapDto.getPlainKeysMap().keySet(); + final Set hashedKeys = keyMapDto.getHashedKeysMap().keySet(); + + BiPredicate collisionChecker = (hashedKey, decodedKey) -> { + final EVCacheKey evcKey = keyMapDto.getHashedKeysMap().get(hashedKey); + if (evcKey.getCanonicalKey(client.isDuetClient()).equals(decodedKey)) { + if (log.isDebugEnabled() && shouldLog()) + log.debug("APP " + _appName + ", key [" + hashedKey + "] EVCacheKey " + evcKey); } else { - tcCopy = tc; - } - if (log.isDebugEnabled() && shouldLog()) { - log.debug("fetching bulk data with non hashedKey {} ",keyMap.keySet()); + if (log.isDebugEnabled() && shouldLog()) + log.debug("CACHE COLLISION : APP " + _appName + ", key [" + hashedKey + "] EVCacheKey " + evcKey + " with decodedKey [" + decodedKey + "]"); + incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.COMPLETABLE_FUTURE_GET_BULK.name(), EVCacheMetricsFactory.READ); + return true; } - return client.getAsyncBulk(keyMap.keySet(), tcCopy ) - .thenApply(data -> buildNonHashedKeyValueResult(data, keyMap)) - .exceptionally(t -> handleBulkException(t, evcacheKeys)); + return false; + }; + + final Transcoder valueTranscoder; + if (keyMapDto.getHashedKeysMap() != null && !keyMapDto.getHashedKeysMap().isEmpty()) { + if (log.isDebugEnabled() && shouldLog()) log.debug("fetching bulk data with set of keys containing hashed key(s) {} ", evcacheKeys); + valueTranscoder = tc != null ? tc : (Transcoder) client.getTranscoder(); + } else { + valueTranscoder = tc != null ? tc : (Transcoder) _transcoder; + if (log.isDebugEnabled() && shouldLog()) log.debug("fetching bulk data with no hashed key(s) {} ", plainKeys); } + return client.getAsyncBulk(plainKeys, hashedKeys, valueTranscoder, evcacheValueTranscoder, _appName, shouldLog(), collisionChecker) + .thenApply(data -> buildKeyValueResult(data, keyMapDto)) + .exceptionally(t -> handleBulkException(t, evcacheKeys)); } private Map handleBulkException(Throwable t, Collection evCacheKeys) { @@ -1931,68 +1938,31 @@ private Map handleBulkException(Throwable t, Collection evcacheKeys) { - boolean hasHashedKey = false; - final Map keyMap = new HashMap(evcacheKeys.size() * 2); + final Map plainKeysMap = new HashMap<>(evcacheKeys.size()); + final Map hashedKeysMap = new HashMap<>(evcacheKeys.size()); for (EVCacheKey evcKey : evcacheKeys) { String key = evcKey.getCanonicalKey(client.isDuetClient()); String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxDigestBytes(), client.getMaxHashLength(), client.getBaseEncoder()); if (hashKey != null) { if (log.isDebugEnabled() && shouldLog()) - log.debug("APP " + _appName + ", key [" + key + "], has been hashed [" + hashKey + "]"); - key = hashKey; - hasHashedKey = true; + log.debug("APP " + _appName + ", key [" + key + "], has been hashed to [" + hashKey + "]"); + hashedKeysMap.put(hashKey, evcKey); + } else { + plainKeysMap.put(key, evcKey); } - keyMap.put(key, evcKey); } - return new KeyMapDto(keyMap, hasHashedKey); + return new KeyMapDto(plainKeysMap, hashedKeysMap); } - private Map buildNonHashedKeyValueResult(Map objMap, - Map keyMap) { + private Map buildKeyValueResult(Map objMap, + KeyMapDto keyMapDto) { final Map retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1); for (Map.Entry i : objMap.entrySet()) { - final EVCacheKey evcKey = keyMap.get(i.getKey()); + final T val = i.getValue(); + final EVCacheKey evcKey = keyMapDto.getPlainKeysMap().getOrDefault(i.getKey(), keyMapDto.getHashedKeysMap().get(i.getKey())); if (log.isDebugEnabled() && shouldLog()) - log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); - retMap.put(evcKey, i.getValue()); - } - return retMap; - } - - private Map buildHashedKeyValueResult(Map objMap, - Transcoder tc, - EVCacheClient client, - Map keyMap) { - final Map retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1); - for (Map.Entry i : objMap.entrySet()) { - final Object obj = i.getValue(); - if (obj instanceof EVCacheValue) { - if (log.isDebugEnabled() && shouldLog()) - log.debug("APP " + _appName + ", The value for key [" + i.getKey() + "] is EVCache Value"); - final EVCacheValue val = (EVCacheValue) obj; - final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - final T tVal; - if (tc == null) { - tVal = (T) client.getTranscoder().decode(cd); - } else { - tVal = tc.decode(cd); - } - final EVCacheKey evcKey = keyMap.get(i.getKey()); - if (evcKey.getCanonicalKey(client.isDuetClient()).equals(val.getKey())) { - if (log.isDebugEnabled() && shouldLog()) - log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); - retMap.put(evcKey, tVal); - } else { - if (log.isDebugEnabled() && shouldLog()) - log.debug("CACHE COLLISION : APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); - incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.COMPLETABLE_FUTURE_GET_BULK.name(), EVCacheMetricsFactory.READ); - } - } else { - final EVCacheKey evcKey = keyMap.get(i.getKey()); - if (log.isDebugEnabled() && shouldLog()) - log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); - retMap.put(evcKey, (T) obj); - } + log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey + ", val [" + val + "]"); + retMap.put(evcKey, val); } return retMap; } diff --git a/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java b/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java index 64098e27..3ffad189 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java +++ b/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java @@ -1,23 +1,22 @@ package com.netflix.evcache.dto; import com.netflix.evcache.EVCacheKey; - import java.util.Map; public class KeyMapDto { - Map keyMap; - boolean isKeyHashed; + Map plainKeysMap; + Map hashedKeysMap; - public KeyMapDto(Map keyMap, boolean isKeyHashed) { - this.keyMap = keyMap; - this.isKeyHashed = isKeyHashed; + public KeyMapDto(Map plainKeysMap, Map hashedKeysMap) { + this.plainKeysMap = plainKeysMap; + this.hashedKeysMap = hashedKeysMap; } - public Map getKeyMap() { - return keyMap; + public Map getPlainKeysMap() { + return plainKeysMap; } - public boolean isKeyHashed() { - return isKeyHashed; + public Map getHashedKeysMap() { + return hashedKeysMap; } } diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index bef04d88..579fee2b 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -1,29 +1,5 @@ package com.netflix.evcache.pool; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.URLDecoder; -import java.nio.charset.StandardCharsets; -import java.util.AbstractMap.SimpleEntry; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.*; -import java.util.function.BiPredicate; -import java.util.zip.CRC32; -import java.util.zip.Checksum; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.netflix.archaius.api.Property; import com.netflix.evcache.EVCache; import com.netflix.evcache.EVCache.Call; @@ -32,6 +8,7 @@ import com.netflix.evcache.EVCacheLatch; import com.netflix.evcache.EVCacheReadQueueException; import com.netflix.evcache.EVCacheSerializingTranscoder; +import com.netflix.evcache.EVCacheTranscoder; import com.netflix.evcache.metrics.EVCacheMetricsFactory; import com.netflix.evcache.operation.EVCacheFutures; import com.netflix.evcache.operation.EVCacheItem; @@ -44,19 +21,45 @@ import com.netflix.spectator.api.BasicTag; import com.netflix.spectator.api.Counter; import com.netflix.spectator.api.Tag; - +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiPredicate; +import java.util.zip.CRC32; +import java.util.zip.Checksum; import net.spy.memcached.CASValue; import net.spy.memcached.CachedData; import net.spy.memcached.ConnectionFactory; import net.spy.memcached.EVCacheMemcachedClient; import net.spy.memcached.EVCacheNode; -import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedNode; import net.spy.memcached.NodeLocator; import net.spy.memcached.internal.ListenableFuture; import net.spy.memcached.internal.OperationCompletionListener; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.transcoders.Transcoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Scheduler; import rx.Single; @@ -983,11 +986,24 @@ public Map getBulk(Collection canonicalKeys, Transcoder CompletableFuture> getAsyncBulk(Collection canonicalKeys, Transcoder tc) { + /** + * @Deprecated This method does NOT support a mix of plain and hashed keys in {@code keys}. All keys are + * decoded exactly using the given transcoder (note that hashed keys require two step decoding). + * For supporting a mix of hashed and plain keys in the {@code keys} collection, + * use {@link #getAsyncBulk(Collection, Set, Transcoder, EVCacheTranscoder, String, boolean, BiPredicate)}. + */ + public CompletableFuture> getAsyncBulk(Collection keys, Transcoder tc) { + return getAsyncBulk(keys, null, tc, null, null, false, null); + + } + + public CompletableFuture> getAsyncBulk(Collection plainKeys, Set hashedKeys, + Transcoder tc, EVCacheTranscoder evcacheValueTranscoder, + String appName, boolean shouldLog, BiPredicate collisionChecker) { final BiPredicate validator = (node, key) -> validateReadQueueSize(node, Call.COMPLETABLE_FUTURE_GET_BULK); if (tc == null) tc = (Transcoder) getTranscoder(); return evcacheMemcachedClient - .asyncGetBulk(canonicalKeys, tc, null, validator) + .asyncGetBulk(plainKeys, hashedKeys, tc, evcacheValueTranscoder, validator, appName, shouldLog, collisionChecker) .getAsyncSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS); } diff --git a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java index 9fc6f875..f7ee415d 100644 --- a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java +++ b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java @@ -1,5 +1,26 @@ package net.spy.memcached; +import com.netflix.archaius.api.Property; +import com.netflix.archaius.api.Property.Subscription; +import com.netflix.archaius.api.PropertyRepository; +import com.netflix.evcache.EVCacheGetOperationListener; +import com.netflix.evcache.EVCacheLatch; +import com.netflix.evcache.EVCacheTranscoder; +import com.netflix.evcache.metrics.EVCacheMetricsFactory; +import com.netflix.evcache.operation.EVCacheAsciiOperationFactory; +import com.netflix.evcache.operation.EVCacheBulkGetFuture; +import com.netflix.evcache.operation.EVCacheItem; +import com.netflix.evcache.operation.EVCacheItemMetaData; +import com.netflix.evcache.operation.EVCacheLatchImpl; +import com.netflix.evcache.operation.EVCacheOperationFuture; +import com.netflix.evcache.pool.EVCacheClient; +import com.netflix.evcache.pool.EVCacheClientUtil; +import com.netflix.evcache.pool.EVCacheValue; +import com.netflix.evcache.util.EVCacheConfig; +import com.netflix.spectator.api.BasicTag; +import com.netflix.spectator.api.DistributionSummary; +import com.netflix.spectator.api.Tag; +import com.netflix.spectator.api.Timer; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -11,9 +32,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -21,31 +42,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; import java.util.function.Consumer; - -import com.netflix.archaius.api.PropertyRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.netflix.archaius.api.Property; -import com.netflix.archaius.api.Property.Subscription; -import com.netflix.evcache.EVCacheGetOperationListener; -import com.netflix.evcache.EVCacheLatch; -import com.netflix.evcache.metrics.EVCacheMetricsFactory; -import com.netflix.evcache.operation.EVCacheAsciiOperationFactory; -import com.netflix.evcache.operation.EVCacheBulkGetFuture; -import com.netflix.evcache.operation.EVCacheItem; -import com.netflix.evcache.operation.EVCacheItemMetaData; -import com.netflix.evcache.operation.EVCacheLatchImpl; -import com.netflix.evcache.operation.EVCacheOperationFuture; -import com.netflix.evcache.pool.EVCacheClient; -import com.netflix.evcache.pool.EVCacheClientUtil; -import com.netflix.evcache.util.EVCacheConfig; -import com.netflix.spectator.api.BasicTag; -import com.netflix.spectator.api.DistributionSummary; -import com.netflix.spectator.api.Tag; -import com.netflix.spectator.api.Timer; -import com.netflix.spectator.ipc.IpcStatus; - import net.spy.memcached.internal.GetFuture; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.ops.ConcatenationType; @@ -56,16 +52,16 @@ import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.OperationStatus; -import net.spy.memcached.ops.StatsOperation; import net.spy.memcached.ops.StatusCode; import net.spy.memcached.ops.StoreOperation; import net.spy.memcached.ops.StoreType; -import net.spy.memcached.protocol.binary.BinaryOperationFactory; -import net.spy.memcached.transcoders.Transcoder; -import net.spy.memcached.util.StringUtils; import net.spy.memcached.protocol.ascii.ExecCmdOperation; import net.spy.memcached.protocol.ascii.MetaDebugOperation; import net.spy.memcached.protocol.ascii.MetaGetOperation; +import net.spy.memcached.protocol.binary.BinaryOperationFactory; +import net.spy.memcached.transcoders.Transcoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @edu.umd.cs.findbugs.annotations.SuppressFBWarnings({ "PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS", "SIC_INNER_SHOULD_BE_STATIC_ANON" }) @@ -291,24 +287,91 @@ public int getMaxSize() { }; } + /** + * Asynchronously retrieves multiple key-value pairs from memcached. + * + * @Deprecated This method does NOT support a mix of plain and hashed keys in {@code keys}. All keys are + * decoded exactly using the given transcoder (note that hashed keys require two step decoding). + * For supporting a mix of hashed and plain keys in the {@code keys} collection, + * use {@link #asyncGetBulk(Collection, Set, Transcoder, EVCacheTranscoder, BiPredicate, String, boolean, BiPredicate)}. + */ public EVCacheBulkGetFuture asyncGetBulk(Collection keys, final Transcoder tc, EVCacheGetOperationListener listener) { return asyncGetBulk(keys, tc, listener, (node, key) -> true); } + /** + * Asynchronously retrieves multiple key-value pairs from memcached with node validation. + * + * @Deprecated This method does NOT support a mix of plain and hashed keys in {@code keys}. All keys are + * decoded exactly using the given transcoder (note that hashed keys require two step decoding). + * For supporting a mix of hashed and plain keys in the {@code keys} collection, + * use {@link #asyncGetBulk(Collection, Set, Transcoder, EVCacheTranscoder, BiPredicate, String, boolean, BiPredicate)}. + */ + public EVCacheBulkGetFuture asyncGetBulk(Collection keys, final Transcoder tc, EVCacheGetOperationListener listener, BiPredicate nodeValidator) { - final Map> m = new ConcurrentHashMap>(); + return asyncGetBulk(keys, null, tc, null, nodeValidator, null, false, null); + } + + /** + * Asynchronously retrieves multiple key-value pairs from memcached with support for hashed keys and collision detection. + * + *

This method performs bulk retrieval of cached values and supports a two-step decoding process for hashed keys: + *

    + *
  1. Unhashed keys are decoded directly using the valueTranscoder
  2. + *
  3. Hashed keys undergo two-step decoding: + *
      + *
    • First decoded with envelopeTranscoder to unwrap the {@link EVCacheValue} envelope
    • + *
    • Then decoded with valueTranscoder to extract the actual data payload
    • + *
    • Hash collision detection is performed during unwrapping
    • + *
    + *
  4. + *
+ * + *

The method validates node availability and read queue sizes before dispatching operations. + * Results are returned asynchronously via {@link EVCacheBulkGetFuture}. + * + * @param the type of values to be retrieved + * @param plainKeys collection of plain keys to get - this is a Collection for backwards compatibility + * @param hashedKeys set of hashed keys, separately from plain keys because they require two-step decoding; may be null if no hashed keys - this is a Set for O(1) lookups + * @param valueTranscoder transcoder for decoding the actual data payload + * @param envelopeTranscoder transcoder for unwrapping the EVCacheValue envelope (required for hashed keys); may be null if no hashed keys + * @param nodeValidator predicate to validate if a node should be queried for a given key + * @param appName application name for logging and metrics + * @param shouldLog flag indicating whether debug logging should be performed + * @param collisionChecker predicate that checks for hash collisions by comparing hashed key with decoded canonical key; + * returns true if collision detected, false otherwise; required for hashed keys + * @return a future that completes with a map of successfully retrieved key-value pairs + * @throws IllegalStateException if envelopeTranscoder is null when processing hashed keys, or if envelope decoding + * does not yield an EVCacheValue instance + * @throws RuntimeException if decoding with envelopeTranscoder fails + */ + public EVCacheBulkGetFuture asyncGetBulk(Collection plainKeys, + Set hashedKeys, + final Transcoder valueTranscoder, + final EVCacheTranscoder envelopeTranscoder, + BiPredicate nodeValidator, + String appName, + boolean shouldLog, + BiPredicate collisionChecker) { + final Map> m = new ConcurrentHashMap<>(); // Break the gets down into groups by key - final Map> chunks = new HashMap>(); + final Map> chunks = new HashMap<>(); final NodeLocator locator = mconn.getLocator(); - //Populate Node and key Map - for (String key : keys) { + //Populate Node and key Map (from both plain and hashed key collections) + Iterator iter1 = plainKeys != null ? plainKeys.iterator() : Collections.emptyIterator(); + Iterator iter2 = hashedKeys != null ? hashedKeys.iterator() : Collections.emptyIterator(); + int plainKeysSize = plainKeys != null ? plainKeys.size() : 0; + int hashedKeysSize = hashedKeys != null ? hashedKeys.size() : 0; + + while (iter1.hasNext() || iter2.hasNext()) { + String key = iter1.hasNext() ? iter1.next() : iter2.next(); EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory); final MemcachedNode primaryNode = locator.getPrimary(key); if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) { @@ -345,7 +408,8 @@ void bindOp(GetOperation op) { @Override public void receivedStatus(OperationStatus status) { - if (log.isDebugEnabled()) log.debug("GetBulk Keys : " + keys + "; Status : " + status.getStatusCode().name() + "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime())); + if (log.isDebugEnabled()) log.debug("GetBulk Keys : plain[" + plainKeys + "], hashed [" + hashedKeys + + "]; Status : " + status.getStatusCode().name() + "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime())); rv.setStatus(status); } @@ -354,7 +418,37 @@ public void gotData(String k, int flags, byte[] data) { if (data != null) { dataSizeDS.record(data.length); } - m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); + + if (hashedKeys != null && hashedKeys.contains(k)) { + // hashed keys require 2 step decoding, first using envelopeTranscoder then using valueTranscoder + if (envelopeTranscoder == null) throw new IllegalStateException("Both transcoders required for 2-step decode, failed on key " + k + + " of bulk get for plain keys [" + plainKeys + "] and hashed keys [" + hashedKeys + "]"); + + Object obj; + try { + obj = envelopeTranscoder.decode(new CachedData(flags, data, envelopeTranscoder.getMaxSize())); + } catch (Exception e) { + throw new RuntimeException("Failed to decode key " + k + " using envelopeTranscoder " + envelopeTranscoder.getClass().getName(), e); + } + if (obj instanceof EVCacheValue) { + if (log.isDebugEnabled() && shouldLog) + log.debug("APP " + appName + ", The value for key [" + k + "] is EVCache Value"); + final EVCacheValue val = (EVCacheValue) obj; + boolean collision = collisionChecker.test(k, val.getKey()); + if (!collision) { + m.put(k, tcService.decode(valueTranscoder, new CachedData(val.getFlags(), val.getValue(), valueTranscoder.getMaxSize()))); + } + } else { + // This can occur when a hashed key xyz and a raw key also xyz are in the bulk request. envelopeTranscoder would attempt to + // decode, only to find that an instance of EVCacheValue was not returned + if (log.isDebugEnabled() && shouldLog) + log.debug("Applying envelopeTranscoder to hashed key {} did not yield an instance of EVCacheValue, this could be due to collision", k); + + } + } else { + // un hashed keys require this single step decode + m.put(k, tcService.decode(valueTranscoder, new CachedData(flags, data, valueTranscoder.getMaxSize()))); + } } @Override @@ -364,7 +458,7 @@ public void complete() { rv.signalSingleOpComplete(thisOpId, op); if (pendingChunks.decrementAndGet() <= 0) { latch.countDown(); - getTimer(EVCacheMetricsFactory.BULK_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), (m.size() == keys.size() ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO), null, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS); + getTimer(EVCacheMetricsFactory.BULK_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), (m.size() == (plainKeysSize + hashedKeysSize) ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO), null, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS); rv.signalComplete(); } }