From a1de62b26ebc3c26bb5e4a2f57d2f9a44a0515ab Mon Sep 17 00:00:00 2001 From: Prudhviraj Karumanchi Date: Fri, 15 Aug 2025 10:08:30 -0700 Subject: [PATCH 1/5] fix: handle null states in EVCacheBulkGetFuture to prevent NPE Fixes NullPointerException in bulk get operations when some operations never signal completion through callbacks. The issue was introduced in commit e4a2a9bd which optimized operation state checking by collecting state during callbacks, but didn't handle cases where callbacks don't fire. This fix adds a fallback mechanism that preserves the original behavior: - Use pre-collected state when available (maintains performance optimization) - Fall back to direct operation state checking when state is null - Ensures all operations are processed, matching pre-e4a2a9bd behavior The root cause was that operationStates[i] could remain null if signalSingleOpComplete() was never called for operations that timeout, fail, or are cancelled before their completion callbacks fire. --- .../operation/EVCacheBulkGetFuture.java | 117 ++++++++++++++---- 1 file changed, 95 insertions(+), 22 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 07d60bd8..3484a53e 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -120,13 +120,31 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo } boolean hadTimedoutOp = false; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed && !allCompleted) { - MemcachedConnection.opTimedOut(state.op); - hadTimedoutOp = true; + Operation op = opsArray[i]; + + if (state == null) { + // Operation never signaled completion, fall back to direct checking + if (op.getState() != OperationState.COMPLETE) { + if (!allCompleted) { + MemcachedConnection.opTimedOut(op); + hadTimedoutOp = true; + } else { + MemcachedConnection.opSucceeded(op); + } + } else { + MemcachedConnection.opSucceeded(op); + } } else { - MemcachedConnection.opSucceeded(state.op); + // Use pre-collected state for performance + if (!state.completed && !allCompleted) { + MemcachedConnection.opTimedOut(state.op); + hadTimedoutOp = true; + } else { + MemcachedConnection.opSucceeded(state.op); + } } } @@ -134,11 +152,23 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (state.cancelled) { - if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; - if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.isCancelled()) { + if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; + if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); + } + if (op.hasErrored() && throwException) throw new ExecutionException(op.getException()); + } else { + // Use pre-collected state + if (state.cancelled) { + if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; + if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); + } + if (state.errored && throwException) throw new ExecutionException(state.op.getException()); } - if (state.errored && throwException) throw new ExecutionException(state.op.getException()); } Map m = new HashMap(); @@ -219,20 +249,41 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni public void handleBulkException() { ExecutionException t = null; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed) { - if (state.cancelled) { - throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); - } else if (state.errored) { - throw new RuntimeException(new ExecutionException(state.op.getException())); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.getState() != OperationState.COMPLETE) { + if (op.isCancelled()) { + throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); + } else if (op.hasErrored()) { + throw new RuntimeException(new ExecutionException(op.getException())); + } else { + op.timeOut(); + MemcachedConnection.opTimedOut(op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op)); + } } else { - state.op.timeOut(); - MemcachedConnection.opTimedOut(state.op); - t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); + MemcachedConnection.opSucceeded(op); } } else { - MemcachedConnection.opSucceeded(state.op); + // Use pre-collected state + if (!state.completed) { + if (state.cancelled) { + throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); + } else if (state.errored) { + throw new RuntimeException(new ExecutionException(state.op.getException())); + } else { + state.op.timeOut(); + MemcachedConnection.opTimedOut(state.op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); + } + } else { + MemcachedConnection.opSucceeded(state.op); + } } } @@ -257,19 +308,41 @@ public void doAsyncGetSome(CompletableFuture> promise) { public Single> getSome(long to, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) { return observe().timeout(to, units, Single.create(subscriber -> { try { + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed) { - MemcachedConnection.opTimedOut(state.op); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.getState() != OperationState.COMPLETE) { + MemcachedConnection.opTimedOut(op); + } else { + MemcachedConnection.opSucceeded(op); + } } else { - MemcachedConnection.opSucceeded(state.op); + // Use pre-collected state + if (!state.completed) { + MemcachedConnection.opTimedOut(state.op); + } else { + MemcachedConnection.opSucceeded(state.op); + } } } for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled")); - if (state.errored && throwException) throw new ExecutionException(state.op.getException()); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.isCancelled() && throwException) throw new ExecutionException(new CancellationException("Cancelled")); + if (op.hasErrored() && throwException) throw new ExecutionException(op.getException()); + } else { + // Use pre-collected state + if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled")); + if (state.errored && throwException) throw new ExecutionException(state.op.getException()); + } } Map m = new HashMap(); From 00ff3064673088795c097777b50e4a9b53affedd Mon Sep 17 00:00:00 2001 From: Shih-Hao Yeh Date: Fri, 23 Jan 2026 09:14:58 -0800 Subject: [PATCH 2/5] fix EVCacheBulkGetFuture --- .../operation/EVCacheBulkGetFuture.java | 115 +++++++----------- 1 file changed, 42 insertions(+), 73 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 3484a53e..55b0c858 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -51,6 +51,7 @@ public class EVCacheBulkGetFuture extends BulkGetFuture { private static final Logger log = LoggerFactory.getLogger(EVCacheBulkGetFuture.class); private final Map> rvMap; private final Collection ops; + private final Operation[] opsArray; private final CountDownLatch latch; private final long start; private final EVCacheClient client; @@ -60,6 +61,7 @@ public EVCacheBulkGetFuture(Map> m, Collection getO super(m, getOps, l, service); rvMap = m; ops = getOps; + opsArray = ops.toArray(new Operation[0]); latch = l; this.start = System.currentTimeMillis(); this.client = client; @@ -70,6 +72,8 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo throws InterruptedException, ExecutionException { assert operationStates != null; + // Note: The latch here is counterintuitive. Based on the implementation in EVCacheMemcachedClient.asyncGetBulk(), + // the latch count is set to 1 no matter the chunk size and only decrement when pendingChunks counts down to 0. boolean allCompleted = latch.await(to, unit); if(log.isDebugEnabled()) log.debug("Took " + (System.currentTimeMillis() - start)+ " to fetch " + rvMap.size() + " keys from " + client); long pauseDuration = -1; @@ -120,26 +124,18 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo } boolean hadTimedoutOp = false; - Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - Operation op = opsArray[i]; - + if (state == null) { - // Operation never signaled completion, fall back to direct checking - if (op.getState() != OperationState.COMPLETE) { - if (!allCompleted) { - MemcachedConnection.opTimedOut(op); - hadTimedoutOp = true; - } else { - MemcachedConnection.opSucceeded(op); - } - } else { - MemcachedConnection.opSucceeded(op); - } + // Operation not yet signaled completion (cancel should still trigger completion) ==> latch timed out + // This also indicates allCompleted == false because the latch count wouldn't have drop to 0. + Operation op = opsArray[i]; + op.timeOut(); + MemcachedConnection.opTimedOut(op); + hadTimedoutOp = true; } else { - // Use pre-collected state for performance - if (!state.completed && !allCompleted) { + if (state.timedOut) { MemcachedConnection.opTimedOut(state.op); hadTimedoutOp = true; } else { @@ -148,26 +144,22 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo } } - if (!allCompleted && !hasZF && hadTimedoutOp) statusString = EVCacheMetricsFactory.TIMEOUT; + if (hadTimedoutOp && !hasZF) statusString = EVCacheMetricsFactory.TIMEOUT; + // Should we throw when timeout? for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - Operation op = opsArray[i]; - - if (state == null) { - // Fall back to direct operation checking - if (op.isCancelled()) { - if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; - if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); - } - if (op.hasErrored() && throwException) throw new ExecutionException(op.getException()); - } else { - // Use pre-collected state + + // state == null always means timed out and was handled. + if (state != null) { if (state.cancelled) { if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); } - if (state.errored && throwException) throw new ExecutionException(state.op.getException()); + if (state.errored) { + if (hasZF) statusString = EVCacheMetricsFactory.ERROR; + if (throwException) throw new ExecutionException(state.op.getException()); + } } } @@ -249,38 +241,23 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni public void handleBulkException() { ExecutionException t = null; - Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - Operation op = opsArray[i]; - + if (state == null) { - // Fall back to direct operation checking - if (op.getState() != OperationState.COMPLETE) { - if (op.isCancelled()) { - throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); - } else if (op.hasErrored()) { - throw new RuntimeException(new ExecutionException(op.getException())); - } else { - op.timeOut(); - MemcachedConnection.opTimedOut(op); - t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op)); - } - } else { - MemcachedConnection.opSucceeded(op); - } + Operation op = opsArray[i]; + op.timeOut(); + MemcachedConnection.opTimedOut(op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op)); } else { // Use pre-collected state - if (!state.completed) { - if (state.cancelled) { - throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); - } else if (state.errored) { - throw new RuntimeException(new ExecutionException(state.op.getException())); - } else { - state.op.timeOut(); - MemcachedConnection.opTimedOut(state.op); - t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); - } + if (state.cancelled) { + throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); + } else if (state.errored) { + throw new RuntimeException(new ExecutionException(state.op.getException())); + } else if (state.timedOut) { + MemcachedConnection.opTimedOut(state.op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); } else { MemcachedConnection.opSucceeded(state.op); } @@ -308,22 +285,18 @@ public void doAsyncGetSome(CompletableFuture> promise) { public Single> getSome(long to, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) { return observe().timeout(to, units, Single.create(subscriber -> { try { - Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); Operation op = opsArray[i]; - + if (state == null) { - // Fall back to direct operation checking - if (op.getState() != OperationState.COMPLETE) { - MemcachedConnection.opTimedOut(op); - } else { - MemcachedConnection.opSucceeded(op); - } + op.timeOut(); + MemcachedConnection.opTimedOut(op); + // Should we throw when timeout? } else { - // Use pre-collected state - if (!state.completed) { + if (state.timedOut) { MemcachedConnection.opTimedOut(state.op); + // Should we throw when timeout? } else { MemcachedConnection.opSucceeded(state.op); } @@ -333,13 +306,9 @@ public Single> getSome(long to, TimeUnit units, boolean throwExce for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); Operation op = opsArray[i]; - - if (state == null) { - // Fall back to direct operation checking - if (op.isCancelled() && throwException) throw new ExecutionException(new CancellationException("Cancelled")); - if (op.hasErrored() && throwException) throw new ExecutionException(op.getException()); - } else { - // Use pre-collected state + + // state == null always means timed out and was handled. + if (state != null) { if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled")); if (state.errored && throwException) throw new ExecutionException(state.op.getException()); } From e93ef48c3c7965036f0fd5cc7de380728712593a Mon Sep 17 00:00:00 2001 From: Shih-Hao Yeh Date: Fri, 23 Jan 2026 10:39:31 -0800 Subject: [PATCH 3/5] fix opsArray should be initialized later --- .../evcache/operation/EVCacheBulkGetFuture.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 55b0c858..9021839c 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -51,7 +51,6 @@ public class EVCacheBulkGetFuture extends BulkGetFuture { private static final Logger log = LoggerFactory.getLogger(EVCacheBulkGetFuture.class); private final Map> rvMap; private final Collection ops; - private final Operation[] opsArray; private final CountDownLatch latch; private final long start; private final EVCacheClient client; @@ -61,7 +60,6 @@ public EVCacheBulkGetFuture(Map> m, Collection getO super(m, getOps, l, service); rvMap = m; ops = getOps; - opsArray = ops.toArray(new Operation[0]); latch = l; this.start = System.currentTimeMillis(); this.client = client; @@ -73,7 +71,7 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo assert operationStates != null; // Note: The latch here is counterintuitive. Based on the implementation in EVCacheMemcachedClient.asyncGetBulk(), - // the latch count is set to 1 no matter the chunk size and only decrement when pendingChunks counts down to 0. + // the latch count is set to 1 no matter the chunk size (when > 0) and only decrement when pendingChunks counts down to 0. boolean allCompleted = latch.await(to, unit); if(log.isDebugEnabled()) log.debug("Took " + (System.currentTimeMillis() - start)+ " to fetch " + rvMap.size() + " keys from " + client); long pauseDuration = -1; @@ -124,6 +122,7 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo } boolean hadTimedoutOp = false; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); @@ -241,6 +240,7 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni public void handleBulkException() { ExecutionException t = null; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); @@ -285,11 +285,12 @@ public void doAsyncGetSome(CompletableFuture> promise) { public Single> getSome(long to, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) { return observe().timeout(to, units, Single.create(subscriber -> { try { + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - Operation op = opsArray[i]; if (state == null) { + Operation op = opsArray[i]; op.timeOut(); MemcachedConnection.opTimedOut(op); // Should we throw when timeout? @@ -305,7 +306,6 @@ public Single> getSome(long to, TimeUnit units, boolean throwExce for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - Operation op = opsArray[i]; // state == null always means timed out and was handled. if (state != null) { From 7eb75e81db505d5da8bcef07d4c2d5bc546aaab5 Mon Sep 17 00:00:00 2001 From: Shih-Hao Yeh Date: Tue, 3 Feb 2026 12:18:13 -0800 Subject: [PATCH 4/5] let's make less refactoring at this point.. --- .../operation/EVCacheBulkGetFuture.java | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 9021839c..906fe4b6 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -134,7 +134,7 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo MemcachedConnection.opTimedOut(op); hadTimedoutOp = true; } else { - if (state.timedOut) { + if (!state.completed && !allCompleted) { MemcachedConnection.opTimedOut(state.op); hadTimedoutOp = true; } else { @@ -143,8 +143,7 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo } } - if (hadTimedoutOp && !hasZF) statusString = EVCacheMetricsFactory.TIMEOUT; - // Should we throw when timeout? + if (!allCompleted && !hasZF && hadTimedoutOp) statusString = EVCacheMetricsFactory.TIMEOUT; for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); @@ -155,10 +154,7 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); } - if (state.errored) { - if (hasZF) statusString = EVCacheMetricsFactory.ERROR; - if (throwException) throw new ExecutionException(state.op.getException()); - } + if (state.errored && throwException) throw new ExecutionException(state.op.getException()); } } @@ -246,18 +242,20 @@ public void handleBulkException() { if (state == null) { Operation op = opsArray[i]; - op.timeOut(); - MemcachedConnection.opTimedOut(op); - t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op)); + op.cancel(); + MemcachedConnection.opSucceeded(op); } else { - // Use pre-collected state - if (state.cancelled) { - throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); - } else if (state.errored) { - throw new RuntimeException(new ExecutionException(state.op.getException())); - } else if (state.timedOut) { - MemcachedConnection.opTimedOut(state.op); - t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); + if (!state.completed) { + // Use pre-collected state + if (state.cancelled) { + throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); + } else if (state.errored) { + throw new RuntimeException(new ExecutionException(state.op.getException())); + } else { + state.op.timeOut(); + MemcachedConnection.opTimedOut(state.op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); + } } else { MemcachedConnection.opSucceeded(state.op); } @@ -293,11 +291,9 @@ public Single> getSome(long to, TimeUnit units, boolean throwExce Operation op = opsArray[i]; op.timeOut(); MemcachedConnection.opTimedOut(op); - // Should we throw when timeout? } else { - if (state.timedOut) { + if (!state.completed) { MemcachedConnection.opTimedOut(state.op); - // Should we throw when timeout? } else { MemcachedConnection.opSucceeded(state.op); } From f14eaa1ade16da7087430edfd379ae5df98f4cb9 Mon Sep 17 00:00:00 2001 From: Shih-Hao Yeh Date: Wed, 4 Feb 2026 15:48:59 -0800 Subject: [PATCH 5/5] fix handleBulkException behavior --- .../com/netflix/evcache/operation/EVCacheBulkGetFuture.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 906fe4b6..b8379136 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -242,8 +242,9 @@ public void handleBulkException() { if (state == null) { Operation op = opsArray[i]; - op.cancel(); - MemcachedConnection.opSucceeded(op); + op.timeOut(); + MemcachedConnection.opTimedOut(op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op)); } else { if (!state.completed) { // Use pre-collected state