From 396ebe54ba23315bbfb47c70c91c9423f38304c0 Mon Sep 17 00:00:00 2001 From: Sunjeet Date: Wed, 11 Feb 2026 09:40:33 -0800 Subject: [PATCH] Add observability for inactive nodes for getBulk and fix (existing bug) each unique combination of metric + call should get its own counter --- build.gradle | 6 +- evcache-client-sample/build.gradle | 4 +- .../netflix/evcache/pool/EVCacheClient.java | 158 ++++++++++-------- .../spy/memcached/EVCacheMemcachedClient.java | 10 +- 4 files changed, 95 insertions(+), 83 deletions(-) diff --git a/build.gradle b/build.gradle index e9c81328..5366c584 100644 --- a/build.gradle +++ b/build.gradle @@ -23,8 +23,10 @@ subprojects { mavenCentral() } - sourceCompatibility = 1.8 - targetCompatibility = 1.8 + java { + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 + } configurations.all { exclude group: 'com.netflix.rxjava' diff --git a/evcache-client-sample/build.gradle b/evcache-client-sample/build.gradle index 894842cf..04d6a7fa 100644 --- a/evcache-client-sample/build.gradle +++ b/evcache-client-sample/build.gradle @@ -18,14 +18,14 @@ dependencies { } task (runSample , dependsOn: 'classes' , type:JavaExec) { - main = "com.netflix.evcache.sample.EVCacheClientSample" + mainClass = "com.netflix.evcache.sample.EVCacheClientSample" classpath = sourceSets.main.runtimeClasspath systemProperty 'EVCACHE_APP1.use.simple.node.list.provider' , 'true' systemProperty 'EVCACHE_APP1-NODES' , 'SERVERGROUP1=localhost:11211;SERVERGROUP2=localhost:11212' } task (runZipkinTracingSample , dependsOn: 'classes' , type:JavaExec) { - main = "com.netflix.evcache.sample.EVCacheClientZipkinTracingSample" + mainClass = "com.netflix.evcache.sample.EVCacheClientZipkinTracingSample" classpath = sourceSets.main.runtimeClasspath systemProperty 'EVCACHE_APP1.use.simple.node.list.provider' , 'true' systemProperty 'EVCACHE_APP1-NODES' , 'SERVERGROUP1=localhost:11211' diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index 579fee2b..3d54e497 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -191,44 +191,47 @@ public Integer getMaxHashLength() { return this.maxHashLength.get(); } - private boolean validateReadQueueSize(MemcachedNode node, EVCache.Call call) { - if (!(node instanceof EVCacheNode)) { - return true; - } + private enum NodeValidationResult { + OK, + INACTIVE_NODE, + READ_QUEUE_FULL + } + + // validates that the node is active and there is capacity in its read queue, logs metrics for violation + private NodeValidationResult validateNodeForRead(MemcachedNode node, EVCache.Call call, int maxReadQueueSizeThreshold) { + if (!(node instanceof EVCacheNode)) + return NodeValidationResult.OK; final EVCacheNode evcNode = (EVCacheNode) node; + if (!evcNode.isAvailable(call)) { - return false; + incrementFailure(EVCacheMetricsFactory.INACTIVE_NODE, call); + if (log.isDebugEnabled()) log.debug("Inactive Node " + evcNode + " on " + call + " operation for app : " + appName + + "; zone : " + zone); + return NodeValidationResult.INACTIVE_NODE; } final int size = evcNode.getReadQueueSize(); - final boolean canAddToOpQueue = size < (maxReadQueueSize.get() * 2); + final boolean canAddToOpQueue = size < maxReadQueueSizeThreshold; + if (log.isDebugEnabled()) + log.debug("Current Read Queue Size - " + size + " for app " + appName + " & zone " + zone + " and node : " + evcNode); if (!canAddToOpQueue) { - final String hostName; - if (evcNode.getSocketAddress() instanceof InetSocketAddress) { - hostName = ((InetSocketAddress) evcNode.getSocketAddress()).getHostName(); - } else { - hostName = evcNode.getSocketAddress().toString(); - } - - incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call, hostName); + incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call); if (log.isDebugEnabled()) { - log.debug("Read Queue Full on Bulk Operation for app : " + appName - + "; zone : " + zone + "; Current Size : " + size - + "; Max Size : " + maxReadQueueSize.get() * 2); + log.debug("Read Queue Full on " + call + " operation for app : " + appName + + "; zone : " + zone + "; node : " + evcNode + "; Current Size : " + size + + "; Max Size Threshold: " + maxReadQueueSizeThreshold); } + return NodeValidationResult.READ_QUEUE_FULL; } - return canAddToOpQueue; + return NodeValidationResult.OK; } private void incrementFailure(String metric, EVCache.Call call) { - incrementFailure(metric, call, null); - } - - private void incrementFailure(String metric, EVCache.Call call, String host) { - Counter counter = counterMap.get(metric); + final String cacheKey = metric + (call != null ? call.name() : ""); + Counter counter = counterMap.get(cacheKey); if(counter == null) { final List tagList = new ArrayList(6); tagList.addAll(tags); @@ -249,15 +252,19 @@ private void incrementFailure(String metric, EVCache.Call call, String host) { } } tagList.add(new BasicTag(EVCacheMetricsFactory.FAILURE_REASON, metric)); - if(host != null) tagList.add(new BasicTag(EVCacheMetricsFactory.FAILED_HOST, host)); counter = EVCacheMetricsFactory.getInstance().getCounter(EVCacheMetricsFactory.INTERNAL_FAIL, tagList); - counterMap.put(metric, counter); + counterMap.put(cacheKey, counter); } counter.increment(); } + @Deprecated public void reportWrongKeyReturned(String hostName) { - incrementFailure(EVCacheMetricsFactory.WRONG_KEY_RETURNED, null, hostName); + reportWrongKeyReturned(); + } + + public void reportWrongKeyReturned() { + incrementFailure(EVCacheMetricsFactory.WRONG_KEY_RETURNED, null); } private boolean ensureWriteQueueSize(MemcachedNode node, String key, EVCache.Call call) throws EVCacheException { @@ -276,13 +283,7 @@ private boolean ensureWriteQueueSize(MemcachedNode node, String key, EVCache.Cal } if(i++ > 3) { - final String hostName; - if(evcNode.getSocketAddress() instanceof InetSocketAddress) { - hostName = ((InetSocketAddress)evcNode.getSocketAddress()).getHostName(); - } else { - hostName = evcNode.getSocketAddress().toString(); - } - incrementFailure(EVCacheMetricsFactory.INACTIVE_NODE, call, hostName); + incrementFailure(EVCacheMetricsFactory.INACTIVE_NODE, call); if (log.isDebugEnabled()) log.debug("Node : " + evcNode + " for app : " + appName + "; zone : " + zone + " is not active. Will Fail Fast and the write will be dropped for key : " + key); evcNode.shutdown(); @@ -295,38 +296,29 @@ private boolean ensureWriteQueueSize(MemcachedNode node, String key, EVCache.Cal private boolean validateNode(String key, boolean _throwException, EVCache.Call call) throws EVCacheException, EVCacheConnectException { final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key); - // First check if the node is active - if (node instanceof EVCacheNode) { - final EVCacheNode evcNode = (EVCacheNode) node; - final String hostName; - if(evcNode.getSocketAddress() instanceof InetSocketAddress) { - hostName = ((InetSocketAddress)evcNode.getSocketAddress()).getHostName(); - } else { - hostName = evcNode.getSocketAddress().toString(); - } - if (!evcNode.isAvailable(call)) { - incrementFailure(EVCacheMetricsFactory.INACTIVE_NODE, call, hostName); - if (log.isDebugEnabled()) log.debug("Node : " + node + " for app : " + appName + "; zone : " + zone - + " is not active. Will Fail Fast so that we can fallback to Other Zone if available."); - if (_throwException) throw new EVCacheConnectException("Connection for Node : " + node + " for app : " + appName - + "; zone : " + zone + " is not active"); - return false; - } - final int size = evcNode.getReadQueueSize(); - final boolean canAddToOpQueue = size < maxReadQueueSize.get(); - if (log.isDebugEnabled()) log.debug("Current Read Queue Size - " + size + " for app " + appName + " & zone " - + zone + " and node : " + evcNode); - if (!canAddToOpQueue) { - incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call, hostName); - if (log.isDebugEnabled()) log.debug("Read Queue Full for Node : " + node + "; app : " + appName - + "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get()); + int maxReadQueueSizeThreshold = maxReadQueueSize.get(); + NodeValidationResult validationResult = validateNodeForRead(node, call, maxReadQueueSizeThreshold); + if (validationResult == NodeValidationResult.OK) + return true; + + switch (validationResult) { + case INACTIVE_NODE: + if (_throwException) + throw new EVCacheConnectException("Connection for Node : " + node + " for app : " + appName + "; zone : " + + zone + " is not active. Will Fail Fast so that we can fallback to Other Zone if available."); + break; + case READ_QUEUE_FULL: if (_throwException) throw new EVCacheReadQueueException("Read Queue Full for Node : " + node + "; app : " - + appName + "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get()); - return false; - } + + appName + "; zone : " + zone + "; Max Size Threshold: " + maxReadQueueSizeThreshold); + break; } - return true; + return false; + } + + // chunking node validator cares about node is active but not the read queue size, since failing one chunk would fail the overall request + private BiPredicate chunkingNodeValidator(EVCache.Call call) { + return (node, k) -> validateNodeForRead(node, call, Integer.MAX_VALUE) == NodeValidationResult.OK; } private ChunkDetails getChunkDetails(String key) { @@ -336,7 +328,7 @@ private ChunkDetails getChunkDetails(String key) { final String firstKey = key + "_00"; firstKeys.add(firstKey); try { - final Map metadataMap = evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null) + final Map metadataMap = evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.GET)) .getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false); if (metadataMap.containsKey(key)) { return new ChunkDetails(null, null, false, metadataMap.get(key)); @@ -366,7 +358,7 @@ private Single> getChunkDetails(String key, Scheduler schedu final String firstKey = key + "_00"; firstKeys.add(firstKey); - return evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null) + return evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.GET)) .getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler) .map(metadataMap -> { if (metadataMap.containsKey(key)) { @@ -401,7 +393,7 @@ private T assembleChunks(String key, boolean touch, int ttl, Transcoder t final List keys = cd.getChunkKeys(); final ChunkInfo ci = cd.getChunkInfo(); - final Map dataMap = evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null) + final Map dataMap = evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null, chunkingNodeValidator(Call.GET)) .getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false); if (dataMap.size() != ci.getChunks() - 1) { @@ -475,7 +467,7 @@ private Single assembleChunks(String key, boolean touch, int ttl, Transco final List keys = cd.getChunkKeys(); final ChunkInfo ci = cd.getChunkInfo(); - return evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null) + return evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null, chunkingNodeValidator(Call.GET)) .getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler) .map(dataMap -> { if (dataMap.size() != ci.getChunks() - 1) { @@ -577,7 +569,7 @@ private Map assembleChunks(Collection keyList, Transcoder firstKeys.add(key + "_00"); } try { - final Map metadataMap = evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null) + final Map metadataMap = evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.BULK)) .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false); if (metadataMap == null) return null; @@ -611,7 +603,7 @@ private Map assembleChunks(Collection keyList, Transcoder } } - final Map dataMap = evcacheMemcachedClient.asyncGetBulk(allKeys, chunkingTranscoder, null) + final Map dataMap = evcacheMemcachedClient.asyncGetBulk(allKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.BULK)) .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false); for (Entry, byte[]>> entry : responseMap.entrySet()) { @@ -673,7 +665,7 @@ private Single> assembleChunks(Collection keyList, Tr firstKeys.add(key + "_00"); } - return evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null) + return evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.BULK)) .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler) .flatMap(metadataMap -> { if (metadataMap == null) return null; @@ -708,7 +700,7 @@ private Single> assembleChunks(Collection keyList, Tr } } - return evcacheMemcachedClient.asyncGetBulk(allKeys, chunkingTranscoder, null) + return evcacheMemcachedClient.asyncGetBulk(allKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.BULK)) .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler) .map(dataMap -> { for (Entry, byte[]>> entry : responseMap.entrySet()) { @@ -842,7 +834,7 @@ public Map getAllChunks(String key) throws EVCacheReadQueueE } else { final List keys = cd.getChunkKeys(); if(log.isDebugEnabled()) log.debug("Keys - " + keys); - final Map dataMap = evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null) + final Map dataMap = evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null, chunkingNodeValidator(Call.GET)) .getSome(readTimeout.get().intValue(), TimeUnit.MILLISECONDS, false, false); if(log.isDebugEnabled()) log.debug("Datamap " + dataMap); @@ -975,7 +967,13 @@ public Map getBulk(Collection canonicalKeys, Transcoder validator = (node, key) -> validateReadQueueSize(node, Call.BULK); + final BiPredicate validator = (node, key) -> { + NodeValidationResult result = validateNodeForRead(node, Call.BULK, 2 * maxReadQueueSize.get()); + if (result != NodeValidationResult.OK) { + return false; + } + return true; + }; returnVal = evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null, validator) .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); } @@ -1000,7 +998,13 @@ public CompletableFuture> getAsyncBulk(Collection key public CompletableFuture> getAsyncBulk(Collection plainKeys, Set hashedKeys, Transcoder tc, EVCacheTranscoder evcacheValueTranscoder, String appName, boolean shouldLog, BiPredicate collisionChecker) { - final BiPredicate validator = (node, key) -> validateReadQueueSize(node, Call.COMPLETABLE_FUTURE_GET_BULK); + final BiPredicate validator = (node, key) -> { + NodeValidationResult result = validateNodeForRead(node, Call.COMPLETABLE_FUTURE_GET_BULK, 2 * maxReadQueueSize.get()); + if (result != NodeValidationResult.OK) { + return false; + } + return true; + }; if (tc == null) tc = (Transcoder) getTranscoder(); return evcacheMemcachedClient .asyncGetBulk(plainKeys, hashedKeys, tc, evcacheValueTranscoder, validator, appName, shouldLog, collisionChecker) @@ -1015,7 +1019,13 @@ public Single> getBulk(Collection canonicalKeys, fina if (enableChunking.get()) { return assembleChunks(canonicalKeys, tc, hasZF, scheduler); } else { - final BiPredicate validator = (node, key) -> validateReadQueueSize(node, Call.BULK); + final BiPredicate validator = (node, key) -> { + NodeValidationResult result = validateNodeForRead(node, Call.BULK, 2 * maxReadQueueSize.get()); + if (result != NodeValidationResult.OK) { + return false; + } + return true; + }; return evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null, validator) .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler); } diff --git a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java index 42d6489e..8272f0cb 100644 --- a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java +++ b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java @@ -141,7 +141,7 @@ private boolean isWrongKeyReturned(String original_key, String returned_key) { final String returned_host = getHostNameByKey(returned_key); log.error("Wrong key returned. Key - " + original_key + " (Host: " + original_host + ") ; Returned Key " + returned_key + " (Host: " + returned_host + ")", new Exception()); - client.reportWrongKeyReturned(original_host); + client.reportWrongKeyReturned(); // If we are configured to dynamically switch log levels to DEBUG on a wrong key error, do so here. if (enableDebugLogsOnWrongKey.get()) { @@ -290,15 +290,15 @@ public int getMaxSize() { /** * Asynchronously retrieves multiple key-value pairs from memcached. * - * @Deprecated This method does NOT support a mix of plain and hashed keys in {@code keys}. All keys are - * decoded exactly using the given transcoder (note that hashed keys require two step decoding). + * @Deprecated This method does NOT support a mix of plain and hashed keys in {@code keys}, nor validation of read + * queue capacity. All keys are decoded exactly using the given transcoder (note that hashed keys require two step decoding). * For supporting a mix of hashed and plain keys in the {@code keys} collection, * use {@link #asyncGetBulk(Collection, Set, Transcoder, EVCacheTranscoder, BiPredicate, String, boolean, BiPredicate)}. */ public EVCacheBulkGetFuture asyncGetBulk(Collection keys, final Transcoder tc, EVCacheGetOperationListener listener) { - return asyncGetBulk(keys, tc, listener, (node, key) -> true); + return asyncGetBulk(keys, tc, listener, (node, k) -> node.isActive()); } /** @@ -374,7 +374,7 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection plainKeys, String key = iter1.hasNext() ? iter1.next() : iter2.next(); EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory); final MemcachedNode primaryNode = locator.getPrimary(key); - if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) { + if (nodeValidator.test(primaryNode, key)) { Collection ks = chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()); ks.add(key); }