From cdb549d85a381f1405fcb4de3576c5c31ffae235 Mon Sep 17 00:00:00 2001 From: Prudhviraj Karumanchi Date: Fri, 15 Aug 2025 10:08:30 -0700 Subject: [PATCH] fix: handle null states in EVCacheBulkGetFuture to prevent NPE Fixes NullPointerException in bulk get operations when some operations never signal completion through callbacks. The issue was introduced in commit e4a2a9bd which optimized operation state checking by collecting state during callbacks, but didn't handle cases where callbacks don't fire. This fix adds a fallback mechanism that preserves the original behavior: - Use pre-collected state when available (maintains performance optimization) - Fall back to direct operation state checking when state is null - Ensures all operations are processed, matching pre-e4a2a9bd behavior The root cause was that operationStates[i] could remain null if signalSingleOpComplete() was never called for operations that timeout, fail, or are cancelled before their completion callbacks fire. --- .../operation/EVCacheBulkGetFuture.java | 117 ++++++++++++++---- 1 file changed, 95 insertions(+), 22 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 07d60bd8..3484a53e 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -120,13 +120,31 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo } boolean hadTimedoutOp = false; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed && !allCompleted) { - MemcachedConnection.opTimedOut(state.op); - hadTimedoutOp = true; + Operation op = opsArray[i]; + + if (state == null) { + // Operation never signaled completion, fall back to direct checking + if (op.getState() != OperationState.COMPLETE) { + if (!allCompleted) { + MemcachedConnection.opTimedOut(op); + hadTimedoutOp = true; + } else { + MemcachedConnection.opSucceeded(op); + } + } else { + MemcachedConnection.opSucceeded(op); + } } else { - MemcachedConnection.opSucceeded(state.op); + // Use pre-collected state for performance + if (!state.completed && !allCompleted) { + MemcachedConnection.opTimedOut(state.op); + hadTimedoutOp = true; + } else { + MemcachedConnection.opSucceeded(state.op); + } } } @@ -134,11 +152,23 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (state.cancelled) { - if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; - if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.isCancelled()) { + if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; + if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); + } + if (op.hasErrored() && throwException) throw new ExecutionException(op.getException()); + } else { + // Use pre-collected state + if (state.cancelled) { + if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; + if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); + } + if (state.errored && throwException) throw new ExecutionException(state.op.getException()); } - if (state.errored && throwException) throw new ExecutionException(state.op.getException()); } Map m = new HashMap(); @@ -219,20 +249,41 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni public void handleBulkException() { ExecutionException t = null; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed) { - if (state.cancelled) { - throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); - } else if (state.errored) { - throw new RuntimeException(new ExecutionException(state.op.getException())); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.getState() != OperationState.COMPLETE) { + if (op.isCancelled()) { + throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); + } else if (op.hasErrored()) { + throw new RuntimeException(new ExecutionException(op.getException())); + } else { + op.timeOut(); + MemcachedConnection.opTimedOut(op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op)); + } } else { - state.op.timeOut(); - MemcachedConnection.opTimedOut(state.op); - t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); + MemcachedConnection.opSucceeded(op); } } else { - MemcachedConnection.opSucceeded(state.op); + // Use pre-collected state + if (!state.completed) { + if (state.cancelled) { + throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); + } else if (state.errored) { + throw new RuntimeException(new ExecutionException(state.op.getException())); + } else { + state.op.timeOut(); + MemcachedConnection.opTimedOut(state.op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); + } + } else { + MemcachedConnection.opSucceeded(state.op); + } } } @@ -257,19 +308,41 @@ public void doAsyncGetSome(CompletableFuture> promise) { public Single> getSome(long to, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) { return observe().timeout(to, units, Single.create(subscriber -> { try { + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed) { - MemcachedConnection.opTimedOut(state.op); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.getState() != OperationState.COMPLETE) { + MemcachedConnection.opTimedOut(op); + } else { + MemcachedConnection.opSucceeded(op); + } } else { - MemcachedConnection.opSucceeded(state.op); + // Use pre-collected state + if (!state.completed) { + MemcachedConnection.opTimedOut(state.op); + } else { + MemcachedConnection.opSucceeded(state.op); + } } } for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled")); - if (state.errored && throwException) throw new ExecutionException(state.op.getException()); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.isCancelled() && throwException) throw new ExecutionException(new CancellationException("Cancelled")); + if (op.hasErrored() && throwException) throw new ExecutionException(op.getException()); + } else { + // Use pre-collected state + if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled")); + if (state.errored && throwException) throw new ExecutionException(state.op.getException()); + } } Map m = new HashMap();