diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 07d60bd8..3484a53e 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -120,13 +120,31 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo } boolean hadTimedoutOp = false; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed && !allCompleted) { - MemcachedConnection.opTimedOut(state.op); - hadTimedoutOp = true; + Operation op = opsArray[i]; + + if (state == null) { + // Operation never signaled completion, fall back to direct checking + if (op.getState() != OperationState.COMPLETE) { + if (!allCompleted) { + MemcachedConnection.opTimedOut(op); + hadTimedoutOp = true; + } else { + MemcachedConnection.opSucceeded(op); + } + } else { + MemcachedConnection.opSucceeded(op); + } } else { - MemcachedConnection.opSucceeded(state.op); + // Use pre-collected state for performance + if (!state.completed && !allCompleted) { + MemcachedConnection.opTimedOut(state.op); + hadTimedoutOp = true; + } else { + MemcachedConnection.opSucceeded(state.op); + } } } @@ -134,11 +152,23 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (state.cancelled) { - if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; - if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.isCancelled()) { + if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; + if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); + } + if (op.hasErrored() && throwException) throw new ExecutionException(op.getException()); + } else { + // Use pre-collected state + if (state.cancelled) { + if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; + if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); + } + if (state.errored && throwException) throw new ExecutionException(state.op.getException()); } - if (state.errored && throwException) throw new ExecutionException(state.op.getException()); } Map m = new HashMap(); @@ -219,20 +249,41 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni public void handleBulkException() { ExecutionException t = null; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed) { - if (state.cancelled) { - throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); - } else if (state.errored) { - throw new RuntimeException(new ExecutionException(state.op.getException())); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.getState() != OperationState.COMPLETE) { + if (op.isCancelled()) { + throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); + } else if (op.hasErrored()) { + throw new RuntimeException(new ExecutionException(op.getException())); + } else { + op.timeOut(); + MemcachedConnection.opTimedOut(op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op)); + } } else { - state.op.timeOut(); - MemcachedConnection.opTimedOut(state.op); - t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); + MemcachedConnection.opSucceeded(op); } } else { - MemcachedConnection.opSucceeded(state.op); + // Use pre-collected state + if (!state.completed) { + if (state.cancelled) { + throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); + } else if (state.errored) { + throw new RuntimeException(new ExecutionException(state.op.getException())); + } else { + state.op.timeOut(); + MemcachedConnection.opTimedOut(state.op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); + } + } else { + MemcachedConnection.opSucceeded(state.op); + } } } @@ -257,19 +308,41 @@ public void doAsyncGetSome(CompletableFuture> promise) { public Single> getSome(long to, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) { return observe().timeout(to, units, Single.create(subscriber -> { try { + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed) { - MemcachedConnection.opTimedOut(state.op); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.getState() != OperationState.COMPLETE) { + MemcachedConnection.opTimedOut(op); + } else { + MemcachedConnection.opSucceeded(op); + } } else { - MemcachedConnection.opSucceeded(state.op); + // Use pre-collected state + if (!state.completed) { + MemcachedConnection.opTimedOut(state.op); + } else { + MemcachedConnection.opSucceeded(state.op); + } } } for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled")); - if (state.errored && throwException) throw new ExecutionException(state.op.getException()); + Operation op = opsArray[i]; + + if (state == null) { + // Fall back to direct operation checking + if (op.isCancelled() && throwException) throw new ExecutionException(new CancellationException("Cancelled")); + if (op.hasErrored() && throwException) throw new ExecutionException(op.getException()); + } else { + // Use pre-collected state + if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled")); + if (state.errored && throwException) throw new ExecutionException(state.op.getException()); + } } Map m = new HashMap();