diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 07d60bd8..b8379136 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -70,6 +70,8 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo throws InterruptedException, ExecutionException { assert operationStates != null; + // Note: The latch here is counterintuitive. Based on the implementation in EVCacheMemcachedClient.asyncGetBulk(), + // the latch count is set to 1 no matter the chunk size (when > 0) and only decrement when pendingChunks counts down to 0. boolean allCompleted = latch.await(to, unit); if(log.isDebugEnabled()) log.debug("Took " + (System.currentTimeMillis() - start)+ " to fetch " + rvMap.size() + " keys from " + client); long pauseDuration = -1; @@ -120,13 +122,24 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo } boolean hadTimedoutOp = false; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed && !allCompleted) { - MemcachedConnection.opTimedOut(state.op); + + if (state == null) { + // Operation not yet signaled completion (cancel should still trigger completion) ==> latch timed out + // This also indicates allCompleted == false because the latch count wouldn't have drop to 0. + Operation op = opsArray[i]; + op.timeOut(); + MemcachedConnection.opTimedOut(op); hadTimedoutOp = true; } else { - MemcachedConnection.opSucceeded(state.op); + if (!state.completed && !allCompleted) { + MemcachedConnection.opTimedOut(state.op); + hadTimedoutOp = true; + } else { + MemcachedConnection.opSucceeded(state.op); + } } } @@ -134,11 +147,15 @@ public Map getSome(long to, TimeUnit unit, boolean throwException, bo for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (state.cancelled) { - if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; - if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); + + // state == null always means timed out and was handled. + if (state != null) { + if (state.cancelled) { + if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; + if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); + } + if (state.errored && throwException) throw new ExecutionException(state.op.getException()); } - if (state.errored && throwException) throw new ExecutionException(state.op.getException()); } Map m = new HashMap(); @@ -219,20 +236,30 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni public void handleBulkException() { ExecutionException t = null; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed) { - if (state.cancelled) { - throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); - } else if (state.errored) { - throw new RuntimeException(new ExecutionException(state.op.getException())); + + if (state == null) { + Operation op = opsArray[i]; + op.timeOut(); + MemcachedConnection.opTimedOut(op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op)); + } else { + if (!state.completed) { + // Use pre-collected state + if (state.cancelled) { + throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); + } else if (state.errored) { + throw new RuntimeException(new ExecutionException(state.op.getException())); + } else { + state.op.timeOut(); + MemcachedConnection.opTimedOut(state.op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); + } } else { - state.op.timeOut(); - MemcachedConnection.opTimedOut(state.op); - t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); + MemcachedConnection.opSucceeded(state.op); } - } else { - MemcachedConnection.opSucceeded(state.op); } } @@ -257,19 +284,31 @@ public void doAsyncGetSome(CompletableFuture> promise) { public Single> getSome(long to, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) { return observe().timeout(to, units, Single.create(subscriber -> { try { + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (!state.completed) { - MemcachedConnection.opTimedOut(state.op); + + if (state == null) { + Operation op = opsArray[i]; + op.timeOut(); + MemcachedConnection.opTimedOut(op); } else { - MemcachedConnection.opSucceeded(state.op); + if (!state.completed) { + MemcachedConnection.opTimedOut(state.op); + } else { + MemcachedConnection.opSucceeded(state.op); + } } } for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); - if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled")); - if (state.errored && throwException) throw new ExecutionException(state.op.getException()); + + // state == null always means timed out and was handled. + if (state != null) { + if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled")); + if (state.errored && throwException) throw new ExecutionException(state.op.getException()); + } } Map m = new HashMap();