Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ subprojects {
mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8
java {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

configurations.all {
exclude group: 'com.netflix.rxjava'
Expand Down
4 changes: 2 additions & 2 deletions evcache-client-sample/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ dependencies {
}

task (runSample , dependsOn: 'classes' , type:JavaExec) {
main = "com.netflix.evcache.sample.EVCacheClientSample"
mainClass = "com.netflix.evcache.sample.EVCacheClientSample"
classpath = sourceSets.main.runtimeClasspath
systemProperty 'EVCACHE_APP1.use.simple.node.list.provider' , 'true'
systemProperty 'EVCACHE_APP1-NODES' , 'SERVERGROUP1=localhost:11211;SERVERGROUP2=localhost:11212'
}

task (runZipkinTracingSample , dependsOn: 'classes' , type:JavaExec) {
main = "com.netflix.evcache.sample.EVCacheClientZipkinTracingSample"
mainClass = "com.netflix.evcache.sample.EVCacheClientZipkinTracingSample"
classpath = sourceSets.main.runtimeClasspath
systemProperty 'EVCACHE_APP1.use.simple.node.list.provider' , 'true'
systemProperty 'EVCACHE_APP1-NODES' , 'SERVERGROUP1=localhost:11211'
Expand Down
158 changes: 84 additions & 74 deletions evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,44 +191,47 @@ public Integer getMaxHashLength() {
return this.maxHashLength.get();
}

private boolean validateReadQueueSize(MemcachedNode node, EVCache.Call call) {
if (!(node instanceof EVCacheNode)) {
return true;
}
private enum NodeValidationResult {
OK,
INACTIVE_NODE,
READ_QUEUE_FULL
}

// validates that the node is active and there is capacity in its read queue, logs metrics for violation
private NodeValidationResult validateNodeForRead(MemcachedNode node, EVCache.Call call, int maxReadQueueSizeThreshold) {
if (!(node instanceof EVCacheNode))
return NodeValidationResult.OK;

final EVCacheNode evcNode = (EVCacheNode) node;

if (!evcNode.isAvailable(call)) {
return false;
incrementFailure(EVCacheMetricsFactory.INACTIVE_NODE, call);
if (log.isDebugEnabled()) log.debug("Inactive Node " + evcNode + " on " + call + " operation for app : " + appName
+ "; zone : " + zone);
return NodeValidationResult.INACTIVE_NODE;
}

final int size = evcNode.getReadQueueSize();
final boolean canAddToOpQueue = size < (maxReadQueueSize.get() * 2);
final boolean canAddToOpQueue = size < maxReadQueueSizeThreshold;
if (log.isDebugEnabled())
log.debug("Current Read Queue Size - " + size + " for app " + appName + " & zone " + zone + " and node : " + evcNode);

if (!canAddToOpQueue) {
final String hostName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we remove the hostName from the metric? At a quick glance, the hostName could be useful to identify which node was inactive or under read queue full?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because of the way the metric caching was implemented, it was reporting the wrong host anyway (first host that reported that metric since client startup was cached forever) so it may not have been useful in the past. Removed to be mindful of memory ballooning on the client

if (evcNode.getSocketAddress() instanceof InetSocketAddress) {
hostName = ((InetSocketAddress) evcNode.getSocketAddress()).getHostName();
} else {
hostName = evcNode.getSocketAddress().toString();
}

incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call, hostName);
incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call);
if (log.isDebugEnabled()) {
log.debug("Read Queue Full on Bulk Operation for app : " + appName
+ "; zone : " + zone + "; Current Size : " + size
+ "; Max Size : " + maxReadQueueSize.get() * 2);
log.debug("Read Queue Full on " + call + " operation for app : " + appName
+ "; zone : " + zone + "; node : " + evcNode + "; Current Size : " + size
+ "; Max Size Threshold: " + maxReadQueueSizeThreshold);
}
return NodeValidationResult.READ_QUEUE_FULL;
}

return canAddToOpQueue;
return NodeValidationResult.OK;
}

private void incrementFailure(String metric, EVCache.Call call) {
incrementFailure(metric, call, null);
}

private void incrementFailure(String metric, EVCache.Call call, String host) {
Counter counter = counterMap.get(metric);
final String cacheKey = metric + (call != null ? call.name() : "");
Counter counter = counterMap.get(cacheKey);
if(counter == null) {
final List<Tag> tagList = new ArrayList<Tag>(6);
tagList.addAll(tags);
Expand All @@ -249,15 +252,19 @@ private void incrementFailure(String metric, EVCache.Call call, String host) {
}
}
tagList.add(new BasicTag(EVCacheMetricsFactory.FAILURE_REASON, metric));
if(host != null) tagList.add(new BasicTag(EVCacheMetricsFactory.FAILED_HOST, host));
counter = EVCacheMetricsFactory.getInstance().getCounter(EVCacheMetricsFactory.INTERNAL_FAIL, tagList);
counterMap.put(metric, counter);
counterMap.put(cacheKey, counter);
}
counter.increment();
}

@Deprecated
public void reportWrongKeyReturned(String hostName) {
incrementFailure(EVCacheMetricsFactory.WRONG_KEY_RETURNED, null, hostName);
reportWrongKeyReturned();
}

public void reportWrongKeyReturned() {
incrementFailure(EVCacheMetricsFactory.WRONG_KEY_RETURNED, null);
}

private boolean ensureWriteQueueSize(MemcachedNode node, String key, EVCache.Call call) throws EVCacheException {
Expand All @@ -276,13 +283,7 @@ private boolean ensureWriteQueueSize(MemcachedNode node, String key, EVCache.Cal
}

if(i++ > 3) {
final String hostName;
if(evcNode.getSocketAddress() instanceof InetSocketAddress) {
hostName = ((InetSocketAddress)evcNode.getSocketAddress()).getHostName();
} else {
hostName = evcNode.getSocketAddress().toString();
}
incrementFailure(EVCacheMetricsFactory.INACTIVE_NODE, call, hostName);
incrementFailure(EVCacheMetricsFactory.INACTIVE_NODE, call);
if (log.isDebugEnabled()) log.debug("Node : " + evcNode + " for app : " + appName + "; zone : "
+ zone + " is not active. Will Fail Fast and the write will be dropped for key : " + key);
evcNode.shutdown();
Expand All @@ -295,38 +296,29 @@ private boolean ensureWriteQueueSize(MemcachedNode node, String key, EVCache.Cal

private boolean validateNode(String key, boolean _throwException, EVCache.Call call) throws EVCacheException, EVCacheConnectException {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
// First check if the node is active
if (node instanceof EVCacheNode) {
final EVCacheNode evcNode = (EVCacheNode) node;
final String hostName;
if(evcNode.getSocketAddress() instanceof InetSocketAddress) {
hostName = ((InetSocketAddress)evcNode.getSocketAddress()).getHostName();
} else {
hostName = evcNode.getSocketAddress().toString();
}
if (!evcNode.isAvailable(call)) {
incrementFailure(EVCacheMetricsFactory.INACTIVE_NODE, call, hostName);
if (log.isDebugEnabled()) log.debug("Node : " + node + " for app : " + appName + "; zone : " + zone
+ " is not active. Will Fail Fast so that we can fallback to Other Zone if available.");
if (_throwException) throw new EVCacheConnectException("Connection for Node : " + node + " for app : " + appName
+ "; zone : " + zone + " is not active");
return false;
}

final int size = evcNode.getReadQueueSize();
final boolean canAddToOpQueue = size < maxReadQueueSize.get();
if (log.isDebugEnabled()) log.debug("Current Read Queue Size - " + size + " for app " + appName + " & zone "
+ zone + " and node : " + evcNode);
if (!canAddToOpQueue) {
incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call, hostName);
if (log.isDebugEnabled()) log.debug("Read Queue Full for Node : " + node + "; app : " + appName
+ "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get());
int maxReadQueueSizeThreshold = maxReadQueueSize.get();
NodeValidationResult validationResult = validateNodeForRead(node, call, maxReadQueueSizeThreshold);
if (validationResult == NodeValidationResult.OK)
return true;

switch (validationResult) {
case INACTIVE_NODE:
if (_throwException)
throw new EVCacheConnectException("Connection for Node : " + node + " for app : " + appName + "; zone : "
+ zone + " is not active. Will Fail Fast so that we can fallback to Other Zone if available.");
break;
case READ_QUEUE_FULL:
if (_throwException) throw new EVCacheReadQueueException("Read Queue Full for Node : " + node + "; app : "
+ appName + "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get());
return false;
}
+ appName + "; zone : " + zone + "; Max Size Threshold: " + maxReadQueueSizeThreshold);
break;
}
return true;
return false;
}

// chunking node validator cares about node is active but not the read queue size, since failing one chunk would fail the overall request
private BiPredicate<MemcachedNode, String> chunkingNodeValidator(EVCache.Call call) {
return (node, k) -> validateNodeForRead(node, call, Integer.MAX_VALUE) == NodeValidationResult.OK;
}

private <T> ChunkDetails<T> getChunkDetails(String key) {
Expand All @@ -336,7 +328,7 @@ private <T> ChunkDetails<T> getChunkDetails(String key) {
final String firstKey = key + "_00";
firstKeys.add(firstKey);
try {
final Map<String, CachedData> metadataMap = evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null)
final Map<String, CachedData> metadataMap = evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.GET))
.getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false);
if (metadataMap.containsKey(key)) {
return new ChunkDetails(null, null, false, metadataMap.get(key));
Expand Down Expand Up @@ -366,7 +358,7 @@ private <T> Single<ChunkDetails<T>> getChunkDetails(String key, Scheduler schedu
final String firstKey = key + "_00";
firstKeys.add(firstKey);

return evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null)
return evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.GET))
.getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler)
.map(metadataMap -> {
if (metadataMap.containsKey(key)) {
Expand Down Expand Up @@ -401,7 +393,7 @@ private <T> T assembleChunks(String key, boolean touch, int ttl, Transcoder<T> t
final List<String> keys = cd.getChunkKeys();
final ChunkInfo ci = cd.getChunkInfo();

final Map<String, CachedData> dataMap = evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null)
final Map<String, CachedData> dataMap = evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null, chunkingNodeValidator(Call.GET))
.getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false);

if (dataMap.size() != ci.getChunks() - 1) {
Expand Down Expand Up @@ -475,7 +467,7 @@ private <T> Single<T> assembleChunks(String key, boolean touch, int ttl, Transco
final List<String> keys = cd.getChunkKeys();
final ChunkInfo ci = cd.getChunkInfo();

return evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null)
return evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null, chunkingNodeValidator(Call.GET))
.getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler)
.map(dataMap -> {
if (dataMap.size() != ci.getChunks() - 1) {
Expand Down Expand Up @@ -577,7 +569,7 @@ private <T> Map<String, T> assembleChunks(Collection<String> keyList, Transcoder
firstKeys.add(key + "_00");
}
try {
final Map<String, CachedData> metadataMap = evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null)
final Map<String, CachedData> metadataMap = evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.BULK))
.getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false);
if (metadataMap == null) return null;

Expand Down Expand Up @@ -611,7 +603,7 @@ private <T> Map<String, T> assembleChunks(Collection<String> keyList, Transcoder
}
}

final Map<String, CachedData> dataMap = evcacheMemcachedClient.asyncGetBulk(allKeys, chunkingTranscoder, null)
final Map<String, CachedData> dataMap = evcacheMemcachedClient.asyncGetBulk(allKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.BULK))
.getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false);

for (Entry<ChunkInfo, SimpleEntry<List<String>, byte[]>> entry : responseMap.entrySet()) {
Expand Down Expand Up @@ -673,7 +665,7 @@ private <T> Single<Map<String, T>> assembleChunks(Collection<String> keyList, Tr
firstKeys.add(key + "_00");
}

return evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null)
return evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.BULK))
.getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler)
.flatMap(metadataMap -> {
if (metadataMap == null) return null;
Expand Down Expand Up @@ -708,7 +700,7 @@ private <T> Single<Map<String, T>> assembleChunks(Collection<String> keyList, Tr
}
}

return evcacheMemcachedClient.asyncGetBulk(allKeys, chunkingTranscoder, null)
return evcacheMemcachedClient.asyncGetBulk(allKeys, chunkingTranscoder, null, chunkingNodeValidator(Call.BULK))
.getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler)
.map(dataMap -> {
for (Entry<ChunkInfo, SimpleEntry<List<String>, byte[]>> entry : responseMap.entrySet()) {
Expand Down Expand Up @@ -842,7 +834,7 @@ public Map<String, CachedData> getAllChunks(String key) throws EVCacheReadQueueE
} else {
final List<String> keys = cd.getChunkKeys();
if(log.isDebugEnabled()) log.debug("Keys - " + keys);
final Map<String, CachedData> dataMap = evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null)
final Map<String, CachedData> dataMap = evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null, chunkingNodeValidator(Call.GET))
.getSome(readTimeout.get().intValue(), TimeUnit.MILLISECONDS, false, false);

if(log.isDebugEnabled()) log.debug("Datamap " + dataMap);
Expand Down Expand Up @@ -975,7 +967,13 @@ public <T> Map<String, T> getBulk(Collection<String> canonicalKeys, Transcoder<T
if (enableChunking.get()) {
returnVal = assembleChunks(canonicalKeys, tc, hasZF);
} else {
final BiPredicate<MemcachedNode, String> validator = (node, key) -> validateReadQueueSize(node, Call.BULK);
final BiPredicate<MemcachedNode, String> validator = (node, key) -> {
NodeValidationResult result = validateNodeForRead(node, Call.BULK, 2 * maxReadQueueSize.get());
if (result != NodeValidationResult.OK) {
return false;
}
return true;
};
returnVal = evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null, validator)
.getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF);
}
Expand All @@ -1000,7 +998,13 @@ public <T> CompletableFuture<Map<String, T>> getAsyncBulk(Collection<String> key
public <T> CompletableFuture<Map<String, T>> getAsyncBulk(Collection<String> plainKeys, Set<String> hashedKeys,
Transcoder<T> tc, EVCacheTranscoder evcacheValueTranscoder,
String appName, boolean shouldLog, BiPredicate<String, String> collisionChecker) {
final BiPredicate<MemcachedNode, String> validator = (node, key) -> validateReadQueueSize(node, Call.COMPLETABLE_FUTURE_GET_BULK);
final BiPredicate<MemcachedNode, String> validator = (node, key) -> {
NodeValidationResult result = validateNodeForRead(node, Call.COMPLETABLE_FUTURE_GET_BULK, 2 * maxReadQueueSize.get());
if (result != NodeValidationResult.OK) {
return false;
}
return true;
};
if (tc == null) tc = (Transcoder<T>) getTranscoder();
return evcacheMemcachedClient
.asyncGetBulk(plainKeys, hashedKeys, tc, evcacheValueTranscoder, validator, appName, shouldLog, collisionChecker)
Expand All @@ -1015,7 +1019,13 @@ public <T> Single<Map<String, T>> getBulk(Collection<String> canonicalKeys, fina
if (enableChunking.get()) {
return assembleChunks(canonicalKeys, tc, hasZF, scheduler);
} else {
final BiPredicate<MemcachedNode, String> validator = (node, key) -> validateReadQueueSize(node, Call.BULK);
final BiPredicate<MemcachedNode, String> validator = (node, key) -> {
NodeValidationResult result = validateNodeForRead(node, Call.BULK, 2 * maxReadQueueSize.get());
if (result != NodeValidationResult.OK) {
return false;
}
return true;
};
return evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null, validator)
.getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler);
}
Expand Down
Loading