diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheSerializingTranscoder.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheSerializingTranscoder.java index 8f8bb3fb..98dfba28 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheSerializingTranscoder.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheSerializingTranscoder.java @@ -205,7 +205,6 @@ private void updateTimerWithCompressionRatio(long ratio_percentage) { tagList.add(new BasicTag(EVCacheMetricsFactory.COMPRESSION_TYPE, "gzip")); timer = EVCacheMetricsFactory.getInstance().getPercentileTimer(EVCacheMetricsFactory.COMPRESSION_RATIO, tagList, Duration.ofMillis(100)); }; - timer.record(ratio_percentage, TimeUnit.MILLISECONDS); } diff --git a/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java b/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java index 47dc890d..41f8aafd 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java +++ b/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java @@ -280,11 +280,7 @@ public String getStatusCode(StatusCode sc) { public static final String INTERNAL_EXECUTOR = "internal.evc.client.executor"; public static final String INTERNAL_EXECUTOR_SCHEDULED = "internal.evc.client.scheduledExecutor"; public static final String INTERNAL_POOL_INIT_ERROR = "internal.evc.client.init.error"; - - public static final String INTERNAL_NUM_CHUNK_SIZE = "internal.evc.client.chunking.numOfChunks"; - public static final String INTERNAL_CHUNK_DATA_SIZE = "internal.evc.client.chunking.dataSize"; public static final String INTERNAL_ADD_CALL_FIXUP = "internal.evc.client.addCall.fixUp"; - public static final String INTERNAL_POOL_SG_CONFIG = "internal.evc.client.pool.asg.config"; public static final String INTERNAL_POOL_CONFIG = "internal.evc.client.pool.config"; public static final String INTERNAL_POOL_REFRESH = "internal.evc.client.pool.refresh"; diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index dda32499..b28aaf24 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -8,17 +8,13 @@ import java.net.SocketAddress; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.*; -import java.util.zip.CRC32; -import java.util.zip.Checksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +28,6 @@ import com.netflix.evcache.EVCacheReadQueueException; import com.netflix.evcache.EVCacheSerializingTranscoder; import com.netflix.evcache.metrics.EVCacheMetricsFactory; -import com.netflix.evcache.operation.EVCacheFutures; import com.netflix.evcache.operation.EVCacheItem; import com.netflix.evcache.operation.EVCacheItemMetaData; import com.netflix.evcache.operation.EVCacheLatchImpl; @@ -49,21 +44,17 @@ import net.spy.memcached.ConnectionFactory; import net.spy.memcached.EVCacheMemcachedClient; import net.spy.memcached.EVCacheNode; -import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedNode; import net.spy.memcached.NodeLocator; import net.spy.memcached.internal.ListenableFuture; import net.spy.memcached.internal.OperationCompletionListener; -import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.transcoders.Transcoder; import rx.Scheduler; import rx.Single; @SuppressWarnings({"rawtypes", "unchecked"}) -@edu.umd.cs.findbugs.annotations.SuppressFBWarnings({ "REC_CATCH_EXCEPTION", - "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" }) +@edu.umd.cs.findbugs.annotations.SuppressFBWarnings({ "REC_CATCH_EXCEPTION", "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" }) public class EVCacheClient { - private static final Logger log = LoggerFactory.getLogger(EVCacheClient.class); private final ConnectionFactory connectionFactory; private final EVCacheMemcachedClient evcacheMemcachedClient; @@ -82,19 +73,14 @@ public class EVCacheClient { private final Property bulkReadTimeout; private final Property maxReadQueueSize; private final Property ignoreInactiveNodes; - private final Property enableChunking; private final Property hashKeyByServerGroup; private final Property shouldEncodeHashKey; private final Property maxDigestBytes; private final Property maxHashLength; - private final Property chunkSize, writeBlock; private final Property encoderBase; - - private final ChunkTranscoder chunkingTranscoder; private final EVCacheSerializingTranscoder decodingTranscoder; - private static final int SPECIAL_BYTEARRAY = (8 << 8); + private final Property writeBlock; private final EVCacheClientPool pool; -// private Counter addCounter = null; private final Property ignoreTouch; private List tags; private final Map counterMap = new ConcurrentHashMap(); @@ -104,9 +90,7 @@ public class EVCacheClient { EVCacheClient(String appName, String zone, int id, EVCacheServerGroupConfig config, List memcachedNodesInZone, int maxQueueSize, Property maxReadQueueSize, - Property readTimeout, Property bulkReadTimeout, - Property opQueueMaxBlockTime, - Property operationTimeout, EVCacheClientPool pool, boolean isDuetClient) throws IOException { + Property readTimeout, Property bulkReadTimeout, EVCacheClientPool pool, boolean isDuetClient) throws IOException { this.memcachedNodesInZone = memcachedNodesInZone; this.id = id; this.appName = appName; @@ -116,7 +100,6 @@ public class EVCacheClient { this.readTimeout = readTimeout; this.bulkReadTimeout = bulkReadTimeout; this.maxReadQueueSize = maxReadQueueSize; -// this.operationTimeout = operationTimeout; this.pool = pool; this.isDuetClient = isDuetClient; @@ -129,12 +112,9 @@ public class EVCacheClient { tagList.add(new BasicTag(EVCacheMetricsFactory.STAT_NAME, EVCacheMetricsFactory.POOL_OPERATIONS)); operationsCounter = EVCacheMetricsFactory.getInstance().getCounter(EVCacheMetricsFactory.INTERNAL_STATS, tagList); - this.enableChunking = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName()+ ".chunk.data", Boolean.class).orElseGet(appName + ".chunk.data").orElse(false); - this.chunkSize = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName() + ".chunk.size", Integer.class).orElseGet(appName + ".chunk.size").orElse(1180); - this.writeBlock = EVCacheConfig.getInstance().getPropertyRepository().get(appName + "." + this.serverGroup.getName() + ".write.block.duration", Integer.class).orElseGet(appName + ".write.block.duration").orElse(25); - this.chunkingTranscoder = new ChunkTranscoder(); this.maxWriteQueueSize = maxQueueSize; this.ignoreTouch = EVCacheConfig.getInstance().getPropertyRepository().get(appName + "." + this.serverGroup.getName() + ".ignore.touch", Boolean.class).orElseGet(appName + ".ignore.touch").orElse(false); + this.writeBlock = EVCacheConfig.getInstance().getPropertyRepository().get(appName + "." + this.serverGroup.getName() + ".write.block.duration", Integer.class).orElseGet(appName + ".write.block.duration").orElse(25); this.connectionFactory = pool.getEVCacheClientPoolManager().getConnectionFactoryProvider().getConnectionFactory(this); this.connectionObserver = new EVCacheConnectionObserver(this); @@ -328,531 +308,6 @@ private boolean validateNode(String key, boolean _throwException, EVCache.Call c return true; } - private ChunkDetails getChunkDetails(String key) { - - final List firstKeys = new ArrayList(2); - firstKeys.add(key); - final String firstKey = key + "_00"; - firstKeys.add(firstKey); - try { - final Map metadataMap = evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null) - .getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false); - if (metadataMap.containsKey(key)) { - return new ChunkDetails(null, null, false, metadataMap.get(key)); - } else if (metadataMap.containsKey(firstKey)) { - final ChunkInfo ci = getChunkInfo(firstKey, (String) decodingTranscoder.decode(metadataMap.get(firstKey))); - if (ci == null) return null; - - final List keys = new ArrayList<>(); - for (int i = 1; i < ci.getChunks(); i++) { - final String prefix = (i < 10) ? "0" : ""; - keys.add(ci.getKey() + "_" + prefix + i); - } - return new ChunkDetails(keys, ci, true, null); - } else { - return null; - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - return null; - } - - private Single> getChunkDetails(String key, Scheduler scheduler) { - - final List firstKeys = new ArrayList<>(2); - firstKeys.add(key); - final String firstKey = key + "_00"; - firstKeys.add(firstKey); - - return evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null) - .getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler) - .map(metadataMap -> { - if (metadataMap.containsKey(key)) { - return new ChunkDetails(null, null, false, metadataMap.get(key)); - } else if (metadataMap.containsKey(firstKey)) { - final ChunkInfo ci = getChunkInfo(firstKey, (String) decodingTranscoder.decode(metadataMap.get(firstKey))); - if (ci == null) return null; - - final List keys = new ArrayList<>(); - for (int i = 1; i < ci.getChunks(); i++) { - final String prefix = (i < 10) ? "0" : ""; - keys.add(ci.getKey() + "_" + prefix + i); - } - return new ChunkDetails(keys, ci, true, null); - } else { - return null; - } - }); - } - - private T assembleChunks(String key, boolean touch, int ttl, Transcoder tc, boolean hasZF) { - try { - - final ChunkDetails cd = getChunkDetails(key); - if (cd == null) return null; - if (!cd.isChunked()) { - if (cd.getData() == null) return null; - final Transcoder transcoder = (tc == null ? (Transcoder) evcacheMemcachedClient.getTranscoder() - : tc); - return transcoder.decode((CachedData) cd.getData()); - } else { - final List keys = cd.getChunkKeys(); - final ChunkInfo ci = cd.getChunkInfo(); - - final Map dataMap = evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null) - .getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false); - - if (dataMap.size() != ci.getChunks() - 1) { - incrementFailure(EVCacheMetricsFactory.INCORRECT_CHUNKS, null); - return null; - } - - final byte[] data = new byte[(ci.getChunks() - 2) * ci.getChunkSize() + (ci.getLastChunk() == 0 ? ci - .getChunkSize() : ci.getLastChunk())]; - int index = 0; - for (int i = 0; i < keys.size(); i++) { - final String _key = keys.get(i); - final CachedData _cd = dataMap.get(_key); - if (log.isDebugEnabled()) log.debug("Chunk Key " + _key + "; Value : " + _cd); - if (_cd == null) continue; - - final byte[] val = _cd.getData(); - - // If we expect a chunk to be present and it is null then return null immediately. - if (val == null) return null; - final int len = (i == keys.size() - 1) ? ((ci.getLastChunk() == 0 || ci.getLastChunk() > ci - .getChunkSize()) ? ci.getChunkSize() : ci.getLastChunk()) - : val.length; - if (len != ci.getChunkSize() && i != keys.size() - 1) { - incrementFailure(EVCacheMetricsFactory.INVALID_CHUNK_SIZE, null); - if (log.isWarnEnabled()) log.warn("CHUNK_SIZE_ERROR : Chunks : " + ci.getChunks() + " ; " - + "length : " + len + "; expectedLength : " + ci.getChunkSize() + " for key : " + _key); - } - if (len > 0) { - try { - System.arraycopy(val, 0, data, index, len); - } catch (Exception e) { - StringBuilder sb = new StringBuilder(); - sb.append("ArrayCopyError - Key : " + _key + "; final data Size : " + data.length - + "; copy array size : " + len + "; val size : " + val.length - + "; key index : " + i + "; copy from : " + index + "; ChunkInfo : " + ci + "\n"); - for (int j = 0; j < keys.size(); j++) { - final String skey = keys.get(j); - final byte[] sval = (byte[]) dataMap.get(skey).getData(); - sb.append(skey + "=" + sval.length + "\n"); - } - if (log.isWarnEnabled()) log.warn(sb.toString(), e); - throw e; - } - - index += val.length; - if (touch) evcacheMemcachedClient.touch(_key, ttl); - } - } - - final boolean checksumPass = checkCRCChecksum(data, ci, hasZF); - if (!checksumPass) return null; - final Transcoder transcoder = (tc == null ? (Transcoder) evcacheMemcachedClient.getTranscoder() - : tc); - return transcoder.decode(new CachedData(ci.getFlags(), data, Integer.MAX_VALUE)); - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - return null; - } - - private Single assembleChunks(String key, boolean touch, int ttl, Transcoder tc, boolean hasZF, Scheduler scheduler) { - return getChunkDetails(key, scheduler).flatMap(cd -> { - if (cd == null) return Single.just(null); - if (!cd.isChunked()) { - if (cd.getData() == null) return Single.just(null); - final Transcoder transcoder = (tc == null ? (Transcoder) evcacheMemcachedClient.getTranscoder() : tc); - return Single.just(transcoder.decode((CachedData) cd.getData())); - } else { - final List keys = cd.getChunkKeys(); - final ChunkInfo ci = cd.getChunkInfo(); - - return evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null) - .getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler) - .map(dataMap -> { - if (dataMap.size() != ci.getChunks() - 1) { - incrementFailure(EVCacheMetricsFactory.INCORRECT_CHUNKS, null); - return null; - } - - final byte[] data = new byte[(ci.getChunks() - 2) * ci.getChunkSize() + (ci.getLastChunk() == 0 ? ci - .getChunkSize() : ci.getLastChunk())]; - int index = 0; - for (int i = 0; i < keys.size(); i++) { - final String _key = keys.get(i); - final CachedData _cd = dataMap.get(_key); - if (log.isDebugEnabled()) log.debug("Chunk Key " + _key + "; Value : " + _cd); - if (_cd == null) continue; - - final byte[] val = _cd.getData(); - - // If we expect a chunk to be present and it is null then return null immediately. - if (val == null) return null; - final int len = (i == keys.size() - 1) ? ((ci.getLastChunk() == 0 || ci.getLastChunk() > ci - .getChunkSize()) ? ci.getChunkSize() : ci.getLastChunk()) - : val.length; - if (len != ci.getChunkSize() && i != keys.size() - 1) { - incrementFailure(EVCacheMetricsFactory.INVALID_CHUNK_SIZE, null); - if (log.isWarnEnabled()) log.warn("CHUNK_SIZE_ERROR : Chunks : " + ci.getChunks() + " ; " - + "length : " + len + "; expectedLength : " + ci.getChunkSize() + " for key : " + _key); - } - if (len > 0) { - try { - System.arraycopy(val, 0, data, index, len); - } catch (Exception e) { - StringBuilder sb = new StringBuilder(); - sb.append("ArrayCopyError - Key : " + _key + "; final data Size : " + data.length - + "; copy array size : " + len + "; val size : " + val.length - + "; key index : " + i + "; copy from : " + index + "; ChunkInfo : " + ci + "\n"); - for (int j = 0; j < keys.size(); j++) { - final String skey = keys.get(j); - final byte[] sval = (byte[]) dataMap.get(skey).getData(); - sb.append(skey + "=" + sval.length + "\n"); - } - if (log.isWarnEnabled()) log.warn(sb.toString(), e); - throw e; - } - - System.arraycopy(val, 0, data, index, len); - index += val.length; - if (touch) evcacheMemcachedClient.touch(_key, ttl); - } - } - - final boolean checksumPass = checkCRCChecksum(data, ci, hasZF); - if (!checksumPass) return null; - final Transcoder transcoder = (tc == null ? (Transcoder) evcacheMemcachedClient.getTranscoder() - : tc); - return transcoder.decode(new CachedData(ci.getFlags(), data, Integer.MAX_VALUE)); - }); - } - }); - } - - private boolean checkCRCChecksum(byte[] data, final ChunkInfo ci, boolean hasZF) { - if (data == null || data.length == 0) return false; - - final Checksum checksum = new CRC32(); - checksum.update(data, 0, data.length); - final long currentChecksum = checksum.getValue(); - final long expectedChecksum = ci.getChecksum(); - if (log.isDebugEnabled()) log.debug("CurrentChecksum : " + currentChecksum + "; ExpectedChecksum : " - + expectedChecksum + " for key : " + ci.getKey()); - if (currentChecksum != expectedChecksum) { - if (!hasZF) { - if (log.isWarnEnabled()) log.warn("CHECKSUM_ERROR : Chunks : " + ci.getChunks() + " ; " - + "currentChecksum : " + currentChecksum + "; expectedChecksum : " + expectedChecksum - + " for key : " + ci.getKey()); - incrementFailure(EVCacheMetricsFactory.CHECK_SUM_ERROR, null); - } - return false; - } - return true; - } - - private ChunkInfo getChunkInfo(String firstKey, String metadata) { - if (metadata == null) return null; - final String[] metaItems = metadata.split(":"); - if (metaItems.length != 5) return null; - final String key = firstKey.substring(0, firstKey.length() - 3); - - final ChunkInfo ci = new ChunkInfo(Integer.parseInt(metaItems[0]), Integer.parseInt(metaItems[1]), Integer - .parseInt(metaItems[2]), Integer.parseInt(metaItems[3]), key, Long - .parseLong(metaItems[4])); - return ci; - } - - private Map assembleChunks(Collection keyList, Transcoder tc, boolean hasZF) { - final List firstKeys = new ArrayList<>(); - for (String key : keyList) { - firstKeys.add(key); - firstKeys.add(key + "_00"); - } - try { - final Map metadataMap = evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null) - .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false); - if (metadataMap == null) return null; - - final Map returnMap = new HashMap<>(keyList.size() * 2); - for (String key : keyList) { - if (metadataMap.containsKey(key)) { - CachedData val = metadataMap.remove(key); - returnMap.put(key, tc.decode(val)); - } - } - - final List allKeys = new ArrayList<>(); - final Map, byte[]>> responseMap = new HashMap<>(); - for (Entry entry : metadataMap.entrySet()) { - final String firstKey = entry.getKey(); - final String metadata = (String) decodingTranscoder.decode(entry.getValue()); - if (metadata == null) continue; - final ChunkInfo ci = getChunkInfo(firstKey, metadata); - if (ci != null) { - final List ciKeys = new ArrayList<>(); - - for (int i = 1; i < ci.getChunks(); i++) { - final String prefix = (i < 10) ? "0" : ""; - final String _key = ci.getKey() + "_" + prefix + i; - allKeys.add(_key); - ciKeys.add(_key); - } - - final byte[] data = new byte[(ci.getChunks() - 2) * ci.getChunkSize() + ci.getLastChunk()]; - responseMap.put(ci, new SimpleEntry<>(ciKeys, data)); - } - } - - final Map dataMap = evcacheMemcachedClient.asyncGetBulk(allKeys, chunkingTranscoder, null) - .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false); - - for (Entry, byte[]>> entry : responseMap.entrySet()) { - final ChunkInfo ci = entry.getKey(); - final SimpleEntry, byte[]> pair = entry.getValue(); - final List ciKeys = pair.getKey(); - byte[] data = pair.getValue(); - int index = 0; - for (int i = 0; i < ciKeys.size(); i++) { - final String _key = ciKeys.get(i); - final CachedData cd = dataMap.get(_key); - if (log.isDebugEnabled()) log.debug("Chunk Key " + _key + "; Value : " + cd); - if (cd == null) continue; - final byte[] val = cd.getData(); - - if (val == null) { - data = null; - break; - } - final int len = (i == ciKeys.size() - 1) ? ((ci.getLastChunk() == 0 || ci.getLastChunk() > ci - .getChunkSize()) ? ci.getChunkSize() : ci.getLastChunk()) - : val.length; - try { - System.arraycopy(val, 0, data, index, len); - } catch (Exception e) { - StringBuilder sb = new StringBuilder(); - sb.append("ArrayCopyError - Key : " + _key + "; final data Size : " + data.length - + "; copy array size : " + len + "; val size : " + val.length - + "; key index : " + i + "; copy from : " + index + "; ChunkInfo : " + ci + "\n"); - for (int j = 0; j < ciKeys.size(); j++) { - final String skey = ciKeys.get(j); - final byte[] sval = dataMap.get(skey).getData(); - sb.append(skey + "=" + sval.length + "\n"); - } - if (log.isWarnEnabled()) log.warn(sb.toString(), e); - throw e; - } - index += val.length; - } - final boolean checksumPass = checkCRCChecksum(data, ci, hasZF); - if (data != null && checksumPass) { - final CachedData cd = new CachedData(ci.getFlags(), data, Integer.MAX_VALUE); - returnMap.put(ci.getKey(), tc.decode(cd)); - } else { - returnMap.put(ci.getKey(), null); - } - } - return returnMap; - } catch (Exception e) { - log.error(e.getMessage(), e); - } - return null; - } - - private Single> assembleChunks(Collection keyList, Transcoder tc, boolean hasZF, Scheduler scheduler) { - final List firstKeys = new ArrayList<>(); - for (String key : keyList) { - firstKeys.add(key); - firstKeys.add(key + "_00"); - } - - return evcacheMemcachedClient.asyncGetBulk(firstKeys, chunkingTranscoder, null) - .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler) - .flatMap(metadataMap -> { - if (metadataMap == null) return null; - - final Map returnMap = new HashMap<>(keyList.size() * 2); - for (String key : keyList) { - if (metadataMap.containsKey(key)) { - CachedData val = metadataMap.remove(key); - returnMap.put(key, tc.decode(val)); - } - } - - final List allKeys = new ArrayList<>(); - final Map, byte[]>> responseMap = new HashMap<>(); - for (Entry entry : metadataMap.entrySet()) { - final String firstKey = entry.getKey(); - final String metadata = (String) decodingTranscoder.decode(entry.getValue()); - if (metadata == null) continue; - final ChunkInfo ci = getChunkInfo(firstKey, metadata); - if (ci != null) { - final List ciKeys = new ArrayList<>(); - - for (int i = 1; i < ci.getChunks(); i++) { - final String prefix = (i < 10) ? "0" : ""; - final String _key = ci.getKey() + "_" + prefix + i; - allKeys.add(_key); - ciKeys.add(_key); - } - - final byte[] data = new byte[(ci.getChunks() - 2) * ci.getChunkSize() + ci.getLastChunk()]; - responseMap.put(ci, new SimpleEntry<>(ciKeys, data)); - } - } - - return evcacheMemcachedClient.asyncGetBulk(allKeys, chunkingTranscoder, null) - .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler) - .map(dataMap -> { - for (Entry, byte[]>> entry : responseMap.entrySet()) { - final ChunkInfo ci = entry.getKey(); - final SimpleEntry, byte[]> pair = entry.getValue(); - final List ciKeys = pair.getKey(); - byte[] data = pair.getValue(); - int index = 0; - for (int i = 0; i < ciKeys.size(); i++) { - final String _key = ciKeys.get(i); - final CachedData cd = dataMap.get(_key); - if (log.isDebugEnabled()) log.debug("Chunk Key " + _key + "; Value : " + cd); - if (cd == null) continue; - final byte[] val = cd.getData(); - - if (val == null) { - data = null; - break; - } - final int len = (i == ciKeys.size() - 1) ? ((ci.getLastChunk() == 0 || ci.getLastChunk() > ci - .getChunkSize()) ? ci.getChunkSize() : ci.getLastChunk()) - : val.length; - try { - System.arraycopy(val, 0, data, index, len); - } catch (Exception e) { - StringBuilder sb = new StringBuilder(); - sb.append("ArrayCopyError - Key : " + _key + "; final data Size : " + data.length - + "; copy array size : " + len + "; val size : " + val.length - + "; key index : " + i + "; copy from : " + index + "; ChunkInfo : " + ci + "\n"); - for (int j = 0; j < ciKeys.size(); j++) { - final String skey = ciKeys.get(j); - final byte[] sval = dataMap.get(skey).getData(); - sb.append(skey + "=" + sval.length + "\n"); - } - if (log.isWarnEnabled()) log.warn(sb.toString(), e); - throw e; - } - index += val.length; - } - final boolean checksumPass = checkCRCChecksum(data, ci, hasZF); - if (data != null && checksumPass) { - final CachedData cd = new CachedData(ci.getFlags(), data, Integer.MAX_VALUE); - returnMap.put(ci.getKey(), tc.decode(cd)); - } else { - returnMap.put(ci.getKey(), null); - } - } - - return returnMap; - }); - }); - } - - private CachedData[] createChunks(CachedData cd, String key) { - final int cSize = chunkSize.get(); - if ((key.length() + 3) > cSize) throw new IllegalArgumentException("The chunksize " + cSize - + " is smaller than the key size. Will not be able to proceed. key size = " - + key.length()); - final int len = cd.getData().length; - - /* the format of headers in memcached */ - // Key size + 1 + Header( Flags (Characters Number) + Key (Characters Numbers) + 2 bytes ( \r\n ) + 4 bytes (2 spaces and 1 \r)) + Chunk Size + CAS Size - // final int overheadSize = key.length() // Key Size - // + 1 // Space - // + 4 // Flags (Characters Number) - // + 4 // Key (Characters Numbers) - // + 2 // /r/n - // + 4 // 2 spaces and 1 \r - // + 48 // Header Size - // + 8; // CAS - final int overheadSize = key.length() + 71 + 3; - // 3 because we will suffix _00, _01 ... _99; 68 is the size of the memcached header - final int actualChunkSize = cSize - overheadSize; - int lastChunkSize = len % actualChunkSize; - final int numOfChunks = len / actualChunkSize + ((lastChunkSize > 0) ? 1 : 0) + 1; - final CachedData[] chunkData = new CachedData[numOfChunks]; - if (lastChunkSize == 0) lastChunkSize = actualChunkSize; - - final long sTime = System.nanoTime(); - final Checksum checksum = new CRC32(); - checksum.update(cd.getData(), 0, len); - final long checkSumValue = checksum.getValue(); - - int srcPos = 0; - if (log.isDebugEnabled()) log.debug("Ths size of data is " + len + " ; we will create " + (numOfChunks - 1) - + " of " + actualChunkSize + " bytes. Checksum : " - + checkSumValue + "; Checksum Duration : " + (System.nanoTime() - sTime)); - chunkData[0] = decodingTranscoder.encode(numOfChunks + ":" + actualChunkSize + ":" + lastChunkSize + ":" + cd - .getFlags() + ":" + checkSumValue); - for (int i = 1; i < numOfChunks; i++) { - int lengthOfArray = actualChunkSize; - if (srcPos + actualChunkSize > len) { - lengthOfArray = len - srcPos; - } - byte[] dest = new byte[actualChunkSize]; - System.arraycopy(cd.getData(), srcPos, dest, 0, lengthOfArray); - if (actualChunkSize > lengthOfArray) { - for (int j = lengthOfArray; j < actualChunkSize; j++) { - dest[j] = Character.UNASSIGNED;// Adding filler data - } - } - srcPos += lengthOfArray; - //chunkData[i] = decodingTranscoder.encode(dest); - chunkData[i] = new CachedData(SPECIAL_BYTEARRAY, dest, Integer.MAX_VALUE); - } - EVCacheMetricsFactory.getInstance().getDistributionSummary(EVCacheMetricsFactory.INTERNAL_NUM_CHUNK_SIZE, getTagList()).record(numOfChunks); - EVCacheMetricsFactory.getInstance().getDistributionSummary(EVCacheMetricsFactory.INTERNAL_CHUNK_DATA_SIZE, getTagList()).record(len); - - return chunkData; - } - - /** - * Retrieves all the chunks as is. This is mainly used for debugging. - * - * @param key - * @return Returns all the chunks retrieved. - * @throws EVCacheReadQueueException - * @throws EVCacheException - * @throws Exception - */ - public Map getAllChunks(String key) throws EVCacheReadQueueException, EVCacheException, Exception { - try { - final ChunkDetails cd = getChunkDetails(key); - if(log.isDebugEnabled()) log.debug("Chunkdetails " + cd); - if (cd == null) return null; - if (!cd.isChunked()) { - Map rv = new HashMap(); - rv.put(key, (CachedData) cd.getData()); - if(log.isDebugEnabled()) log.debug("Data : " + rv); - return rv; - } else { - final List keys = cd.getChunkKeys(); - if(log.isDebugEnabled()) log.debug("Keys - " + keys); - final Map dataMap = evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null) - .getSome(readTimeout.get().intValue(), TimeUnit.MILLISECONDS, false, false); - - if(log.isDebugEnabled()) log.debug("Datamap " + dataMap); - return dataMap; - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - return null; - } - public long incr(String key, long by, long defaultVal, int timeToLive) throws EVCacheException { return evcacheMemcachedClient.incr(key, by, defaultVal, timeToLive); } @@ -868,34 +323,22 @@ public CompletableFuture getAsync(String key, Transcoder tc) { .getAsync(readTimeout.get(), TimeUnit.MILLISECONDS); } - public T get(String key, Transcoder tc, boolean _throwException, boolean hasZF, boolean chunked) throws Exception { - if (chunked) { - return assembleChunks(key, false, 0, tc, hasZF); - } else { - return evcacheMemcachedClient.asyncGet(key, tc, null).get(readTimeout.get(), - TimeUnit.MILLISECONDS, _throwException, hasZF); - } - } - public T get(String key, Transcoder tc, boolean _throwException, boolean hasZF) throws Exception { if (!validateNode(key, _throwException, Call.GET)) { if(ignoreInactiveNodes.get()) { incrementFailure(EVCacheMetricsFactory.IGNORE_INACTIVE_NODES, Call.GET); - return pool.getEVCacheClientForReadExclude(serverGroup).get(key, tc, _throwException, hasZF, enableChunking.get()); + return pool.getEVCacheClientForReadExclude(serverGroup).get(key, tc, _throwException, hasZF); } else { return null; } } - return get(key, tc, _throwException, hasZF, enableChunking.get()); + return evcacheMemcachedClient.asyncGet(key, tc, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); } public Single get(String key, Transcoder tc, boolean _throwException, boolean hasZF, boolean chunked, Scheduler scheduler) throws Exception { - if (chunked) { - return assembleChunks(key, _throwException, 0, tc, hasZF, scheduler); - } else { - return evcacheMemcachedClient.asyncGet(key, tc, null) - .get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler); - } + return evcacheMemcachedClient + .asyncGet(key, tc, null) + .get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler); } public Single get(String key, Transcoder tc, boolean _throwException, boolean hasZF, Scheduler scheduler) { @@ -903,12 +346,12 @@ public Single get(String key, Transcoder tc, boolean _throwException, if (!validateNode(key, _throwException, Call.GET)) { if(ignoreInactiveNodes.get()) { incrementFailure(EVCacheMetricsFactory.IGNORE_INACTIVE_NODES, Call.GET); - return pool.getEVCacheClientForReadExclude(serverGroup).get(key, tc, _throwException, hasZF, enableChunking.get(), scheduler); + return pool.getEVCacheClientForReadExclude(serverGroup).get(key, tc, _throwException, hasZF, scheduler); } else { return Single.just(null); } } - return get(key, tc, _throwException, hasZF, enableChunking.get(), scheduler); + return get(key, tc, _throwException, hasZF, scheduler); } catch (Throwable e) { return Single.error(e); } @@ -927,15 +370,11 @@ public T getAndTouch(String key, Transcoder tc, int timeToLive, boolean _ if (tc == null) tc = (Transcoder) getTranscoder(); final T returnVal; - if (enableChunking.get()) { - return assembleChunks(key, false, 0, tc, hasZF); + if (ignoreTouch.get()) { + returnVal = _client.asyncGet(key, tc, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); } else { - if(ignoreTouch.get()) { - returnVal = _client.asyncGet(key, tc, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); - } else { - final CASValue value = _client.asyncGetAndTouch(key, timeToLive, tc).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); - returnVal = (value == null) ? null : value.getValue(); - } + final CASValue value = _client.asyncGetAndTouch(key, timeToLive, tc).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); + returnVal = (value == null) ? null : value.getValue(); } return returnVal; } @@ -954,13 +393,9 @@ public Single getAndTouch(String key, Transcoder transcoder, int timeT final EVCacheMemcachedClient _client = client; final Transcoder tc = (transcoder == null) ? (Transcoder) getTranscoder(): transcoder; - if (enableChunking.get()) { - return assembleChunks(key, false, 0, tc, hasZF, scheduler); - } else { - return _client.asyncGetAndTouch(key, timeToLive, tc) + return _client.asyncGetAndTouch(key, timeToLive, tc) .get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler) .map(value -> (value == null) ? null : value.getValue()); - } } catch (Throwable e) { return Single.error(e); } @@ -972,12 +407,8 @@ public Map getBulk(Collection _canonicalKeys, Transcoder< final Map returnVal; try { if (tc == null) tc = (Transcoder) getTranscoder(); - if (enableChunking.get()) { - returnVal = assembleChunks(_canonicalKeys, tc, hasZF); - } else { - returnVal = evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null) - .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); - } + returnVal = evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null) + .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); } catch (Exception e) { if (_throwException) throw e; return Collections. emptyMap(); @@ -999,20 +430,14 @@ public Single> getBulk(Collection _canonicalKeys, fin try { final Collection canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.BULK); final Transcoder tc = (transcoder == null) ? (Transcoder) getTranscoder() : transcoder; - if (enableChunking.get()) { - return assembleChunks(_canonicalKeys, tc, hasZF, scheduler); - } else { - return evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null) + return evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null) .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler); - } } catch (Throwable e) { return Single.error(e); } } public Future append(String key, T value) throws Exception { - if (enableChunking.get()) throw new EVCacheException( - "This operation is not supported as chunking is enabled on this EVCacheClient."); final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key); if (!ensureWriteQueueSize(node, key, Call.APPEND)) return getDefaultFuture(); return evcacheMemcachedClient.append(key, value); @@ -1053,29 +478,7 @@ private Future _set(String key, CachedData value, int timeToLive, EVCac try { final int dataSize = ((CachedData) value).getData().length; - - if (enableChunking.get()) { - if (dataSize > chunkSize.get()) { - final CachedData[] cd = createChunks(value, key); - final int len = cd.length; - final OperationFuture[] futures = new OperationFuture[len]; - for (int i = 0; i < cd.length; i++) { - final String prefix = (i < 10) ? "0" : ""; - futures[i] = evcacheMemcachedClient.set(key + "_" + prefix + i, timeToLive, cd[i], null, null); - } - // ensure we are deleting the unchunked key if it exists. - // Ignore return value since it may not exist. - evcacheMemcachedClient.delete(key); - return new EVCacheFutures(futures, key, appName, serverGroup, evcacheLatch); - } else { - // delete all the chunks if they exist as the - // data is moving from chunked to unchunked - delete(key); - return evcacheMemcachedClient.set(key, timeToLive, value, null, evcacheLatch); - } - } else { - return evcacheMemcachedClient.set(key, timeToLive, value, null, evcacheLatch); - } + return evcacheMemcachedClient.set(key, timeToLive, value, null, evcacheLatch); } catch (Exception e) { log.error(e.getMessage(), e); throw e; @@ -1139,18 +542,7 @@ private Future _replace(String key, CachedData value, int timeToLive, E try { final int dataSize = ((CachedData) value).getData().length; - if (enableChunking.get() && dataSize > chunkSize.get()) { - final CachedData[] cd = createChunks(value, key); - final int len = cd.length; - final OperationFuture[] futures = new OperationFuture[len]; - for (int i = 0; i < cd.length; i++) { - final String prefix = (i < 10) ? "0" : ""; - futures[i] = evcacheMemcachedClient.replace(key + "_" + prefix + i, timeToLive, cd[i], null, null); - } - return new EVCacheFutures(futures, key, appName, serverGroup, evcacheLatch); - } else { - return evcacheMemcachedClient.replace(key, timeToLive, value, null, evcacheLatch); - } + return evcacheMemcachedClient.replace(key, timeToLive, value, null, evcacheLatch); } catch (Exception e) { log.error(e.getMessage(), e); throw e; @@ -1158,8 +550,6 @@ private Future _replace(String key, CachedData value, int timeToLive, E } private Future _add(String key, int exp, CachedData value, EVCacheLatch latch) throws Exception { - if (enableChunking.get()) throw new EVCacheException("This operation is not supported as chunking is enabled on this EVCacheClient."); - final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key); if (!ensureWriteQueueSize(node, key, Call.ADD)) return getDefaultFuture(); return evcacheMemcachedClient.add(key, exp, value, null, latch); @@ -1226,31 +616,11 @@ public Future touch(String key, int timeToLive, EVCacheLatch latch) if (latch != null && latch instanceof EVCacheLatchImpl && !isInWriteOnly()) ((EVCacheLatchImpl) latch).addFuture(defaultFuture); return defaultFuture; } - - if (enableChunking.get()) { - final ChunkDetails cd = getChunkDetails(key); - if (cd.isChunked()) { - final List keys = cd.getChunkKeys(); - OperationFuture[] futures = new OperationFuture[keys.size() + 1]; - futures[0] = evcacheMemcachedClient.touch(key + "_00", timeToLive, latch); - for (int i = 0; i < keys.size(); i++) { - final String prefix = (i < 10) ? "0" : ""; - final String _key = key + "_" + prefix + i; - futures[i + 1] = evcacheMemcachedClient.touch(_key, timeToLive, latch); - } - return new EVCacheFutures(futures, key, appName, serverGroup, latch); - } else { - return evcacheMemcachedClient.touch(key, timeToLive, latch); - } - } else { - return evcacheMemcachedClient.touch(key, timeToLive, latch); - } + return evcacheMemcachedClient.touch(key, timeToLive, latch); } public Future asyncGet(String key, Transcoder tc, boolean _throwException, boolean hasZF) throws Exception { - if (enableChunking.get()) throw new EVCacheException( - "This operation is not supported as chunking is enabled on this EVCacheClient."); if (!validateNode(key, _throwException, Call.ASYNC_GET)) return null; if (tc == null) tc = (Transcoder) getTranscoder(); return evcacheMemcachedClient.asyncGet(key, tc, null); @@ -1267,27 +637,7 @@ public Future delete(String key, EVCacheLatch latch) throws Exception { if (latch != null && latch instanceof EVCacheLatchImpl && !isInWriteOnly()) ((EVCacheLatchImpl) latch).addFuture(defaultFuture); return defaultFuture; } - - if (enableChunking.get()) { - final ChunkDetails cd = getChunkDetails(key); - if (cd == null) { - // Paranoid delete : cases where get fails and we ensure the first key is deleted just in case - return evcacheMemcachedClient.delete(key + "_00", latch); - } - if (!cd.isChunked()) { - return evcacheMemcachedClient.delete(key, latch); - } else { - final List keys = cd.getChunkKeys(); - OperationFuture[] futures = new OperationFuture[keys.size() + 1]; - futures[0] = evcacheMemcachedClient.delete(key + "_00"); - for (int i = 0; i < keys.size(); i++) { - futures[i + 1] = evcacheMemcachedClient.delete(keys.get(i), null); - } - return new EVCacheFutures(futures, key, appName, serverGroup, latch); - } - } else { - return evcacheMemcachedClient.delete(key, latch); - } + return evcacheMemcachedClient.delete(key, latch); } public boolean removeConnectionObserver() { @@ -1486,18 +836,6 @@ public Property getMaxReadQueueSize() { return maxReadQueueSize; } - public Property getEnableChunking() { - return enableChunking; - } - - public Property getChunkSize() { - return chunkSize; - } - - public ChunkTranscoder getChunkingTranscoder() { - return chunkingTranscoder; - } - public EVCacheSerializingTranscoder getDecodingTranscoder() { return decodingTranscoder; } @@ -1510,108 +848,6 @@ public EVCacheServerGroupConfig getEVCacheConfig() { return config; } - static class ChunkDetails { - - final List chunkKeys; - final ChunkInfo chunkInfo; - final boolean chunked; - final T data; - - public ChunkDetails(List chunkKeys, ChunkInfo chunkInfo, boolean chunked, T data) { - super(); - this.chunkKeys = chunkKeys; - this.chunkInfo = chunkInfo; - this.chunked = chunked; - this.data = data; - } - - public List getChunkKeys() { - return chunkKeys; - } - - public ChunkInfo getChunkInfo() { - return chunkInfo; - } - - public boolean isChunked() { - return chunked; - } - - public T getData() { - return data; - } - - @Override - public String toString() { - return "ChunkDetails [chunkKeys=" + chunkKeys + ", chunkInfo=" + chunkInfo + ", chunked=" + chunked - + ", data=" + data + "]"; - } - - } - - static class ChunkInfo { - - final int chunks; - final int chunkSize; - final int lastChunk; - final int flags; - final String key; - final long checksum; - - public ChunkInfo(int chunks, int chunkSize, int lastChunk, int flags, String firstKey, long checksum) { - super(); - this.chunks = chunks; - this.chunkSize = chunkSize; - this.lastChunk = lastChunk; - this.flags = flags; - this.key = firstKey; - this.checksum = checksum; - } - - public int getChunks() { - return chunks; - } - - public int getChunkSize() { - return chunkSize; - } - - public int getLastChunk() { - return lastChunk; - } - - public int getFlags() { - return flags; - } - - public String getKey() { - return key; - } - - public long getChecksum() { - return checksum; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("{\"chunks\":\""); - builder.append(chunks); - builder.append("\",\"chunkSize\":\""); - builder.append(chunkSize); - builder.append("\",\"lastChunk\":\""); - builder.append(lastChunk); - builder.append("\",\"flags\":\""); - builder.append(flags); - builder.append("\",\"key\":\""); - builder.append(key); - builder.append("\",\"checksum\":\""); - builder.append(checksum); - builder.append("\"}"); - return builder.toString(); - } - } - public int getWriteQueueLength() { final Collection allNodes = evcacheMemcachedClient.getNodeLocator().getAll(); int size = 0; diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java index 81cafdae..549f8604 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java @@ -48,9 +48,7 @@ @edu.umd.cs.findbugs.annotations.SuppressFBWarnings({ "PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS", "REC_CATCH_EXCEPTION", "MDM_THREAD_YIELD" }) public class EVCacheClientPool implements Runnable, EVCacheClientPoolMBean { - private static final Logger log = LoggerFactory.getLogger(EVCacheClientPool.class); - private final String _appName; private final String _zone; private final EVCacheClientPoolManager manager; @@ -113,7 +111,7 @@ public Property get(Object _serverGroup) { .orElseGet(_appName + "." + serverGroup.getZone() + ".EVCacheClientPool.writeOnly").orElse(false); put(serverGroup, isServerGroupInWriteOnlyMode); return isServerGroupInWriteOnlyMode; - }; + } }; private final AtomicLong numberOfModOps = new AtomicLong(0); @@ -1006,7 +1004,7 @@ private void cleanupMemcachedInstances(boolean force) { } } - private synchronized void refresh(boolean force) throws IOException { + private synchronized void refresh(boolean force) { final long start = System.currentTimeMillis(); if (log.isDebugEnabled()) log.debug("refresh APP : " + _appName + "; force : " + force); try { @@ -1079,7 +1077,7 @@ private synchronized void refresh(boolean force) throws IOException { EVCacheClient client; try { client = new EVCacheClient(_appName, zone, i, config, memcachedSAInServerGroup, maxQueueSize, - _maxReadQueueSize, _readTimeout, _bulkReadTimeout, _opQueueMaxBlockTime, _operationTimeout, this, isDuet); + _maxReadQueueSize, _readTimeout, _bulkReadTimeout, this, isDuet); newClients.add(client); final int id = client.getId(); if (log.isDebugEnabled()) log.debug("AppName :" + _appName + "; ServerGroup : " + serverGroup + "; intit : client.getId() : " + id);