From d36b4f464b01e59c00d0796e3d1843316cc5e192 Mon Sep 17 00:00:00 2001 From: Mark Logan Date: Thu, 8 Sep 2016 18:09:12 -0700 Subject: [PATCH] Checksum files as they are being streamed in. Speeds up load times significantly. --- index.js | 79 ++++++++++++++++++++++++++++++++++++++++++++------------ test.js | 75 +++++++++++++++++++++++++++-------------------------- 2 files changed, 100 insertions(+), 54 deletions(-) diff --git a/index.js b/index.js index 1f5f29f..30cea3c 100644 --- a/index.js +++ b/index.js @@ -16,6 +16,9 @@ var urllib = require('url'); var debug = require('debug')('http-disk-cache'); var glob = require('glob'); +var util = require('util'); +var stream = require('stream'); + /////////////// CacheEntry /////////////// function canonicalUrl(url) { @@ -178,6 +181,32 @@ CacheWriter.prototype.pipeFrom = function pipeFrom(readable) { }); }; +function ChecksumStream(expectedChecksum, options) { + if (!(this instanceof ChecksumStream)) { + return new ChecksumStream(expectedChecksum, options); + } + stream.Transform.call(this, options); + this.hash = crypto.createHash('md5'); + this.expectedChecksum = expectedChecksum; +} +util.inherits(ChecksumStream, stream.Transform); + +ChecksumStream.prototype._transform = function (chunk, enc, cb) { + var buffer = Buffer.isBuffer(chunk) ? chunk : new Buffer(chunk, enc); + this.hash.update(buffer); // update hash + this.push(chunk, enc); + cb(); +}; + +ChecksumStream.prototype._flush = function (cb) { + var checksum = this.hash.digest('hex'); + if (checksum != this.expectedChecksum) { + return cb(new Error('invalid checksum')); + } + cb(); +}; + + /////////////// HTTPCache /////////////// // HTTPCache handles HTTP requests, and caches them to disk if allowed by the Cache-Control @@ -247,7 +276,7 @@ function deleteEntry(metaPath, cb) { // 'notcached' - the cache entry is missing, invalid, or expired, but ready to be cached anew. // 'error' - the cache entry is corrupted, and could not be deleted. This indicates that // we shouldn't try to cache any responses right now. -HTTPCache.prototype._checkCache = function(cacheEntry, callback) { +HTTPCache.prototype._checkCache = function(cacheEntry, skipVerify, callback) { var _this = this; function loadMetadata(cb) { debug('loading metadata from', cacheEntry.metaPath); @@ -339,8 +368,12 @@ HTTPCache.prototype._checkCache = function(cacheEntry, callback) { }); } - // We now have valid metadata for an un-expired cache entry. Next, we checksum the contents. - validateContents(metadata); + if (skipVerify) { + return callback(null, CACHE_STATE_CACHED, metadata); + } else { + // We now have valid metadata for an un-expired cache entry. Next, we checksum the contents. + validateContents(metadata); + } }); }; @@ -404,20 +437,22 @@ HTTPCache.prototype.assertCached = function(url, onProgress, cb) { options = { url: url }; } - options._skipReadStream = true; - var entry = new CacheEntry(url, options.etagFormat); - this._checkCache(entry, function(err, cacheStatus) { - if (cacheStatus === CACHE_STATE_CACHED) { - debug('assert cache hit', url); - return cb(); - } else { - debug('assert cache miss', url); - _this.openReadStream(options, onProgress, function(err, _, path) { - cb(err); - }); + this.openReadStream(options, onProgress, function (err, readStream, path) { + if (err != null) { + return cb(err); } + if (readStream == null) { throw new Error("HAY"); } + readStream.on('error', function(err) { + readStream.removeAllListeners(); + cb(err); + }); + readStream.on('end', function() { + readStream.removeAllListeners(); + cb(); + }); + readStream.resume(); }); }; @@ -464,14 +499,21 @@ HTTPCache.prototype.openReadStream = function(url, onProgress, cb) { var cacheWriter = this._createCacheWriter(entry); // Check if the entry is available in the cache. - this._checkCache(entry, function(err, cacheStatus) { + this._checkCache(entry, true, function(err, cacheStatus, metadata) { debug("cache entry", entry.url, "status=", cacheStatus); if (cacheStatus === CACHE_STATE_CACHED) { // The cache contents are present and valid, so serve the request from cache. cacheWriter.end(); var readStream = options._skipReadStream ? null : _this._createContentReadStream(entry); - return cb(null, readStream, _this._absPath(entry.contentPath)); + var checksumStream = new ChecksumStream(metadata.contentMD5); + checksumStream.on('error', function (err) { + if (err === 'invalid checksum') { + deleteEntry(_this._absPath(entry.metaPath), function(err) {}); + } + }); + readStream.pipe(checksumStream); + return cb(null, checksumStream, _this._absPath(entry.contentPath)); } else if (cacheStatus == CACHE_STATE_ERROR) { // Some kind of error occurred and we can't access the cache. return cb("Error: There was a problem with the asset cache and we can't write files"); @@ -582,6 +624,8 @@ HTTPCache.prototype.getContents = function(url, cb) { } debug("getContents start", options.url); + options._skipVerify = true; + this.openReadStream(options, function(err, readStream, path) { if (err) { return cb(err); } @@ -754,7 +798,7 @@ HTTPCache.prototype.repair = function(cb) { return; } - _this._checkCache(entry, function (err, status) { + _this._checkCache(entry, false, function (err, status) { if (err != null) { deleteEntry(metaPath, deleteCb); return; @@ -861,3 +905,4 @@ HTTPCache.prototype.clean = function (shouldClean, cb) { }; exports.HTTPCache = HTTPCache; +exports.ChecksumStream = ChecksumStream; diff --git a/test.js b/test.js index 5675553..13e9c61 100644 --- a/test.js +++ b/test.js @@ -13,6 +13,7 @@ var execSync = require('child_process').execSync; var debug = require('debug')('http-disk-cache'); var async = require('artillery-async'); var glob = require('glob'); +var stream = require('stream'); var httpcache = require('./index'); @@ -36,16 +37,19 @@ function newUrlReply(contents, status, headers, defer) { function catStream(stream, cb) { chunks = []; + stream.on('error', function (err) { + cb(err); + }); stream.on('data', function (chunk) { chunks.push(chunk); }); stream.on('end', function () { if (chunks.length === 0) { - cb(null); + cb(null, null); } else if (typeof chunks[0] === 'string') { - cb(chunks.join('')); + cb(null, chunks.join('')); } else { // Buffer - cb(Buffer.concat(chunks)); + cb(null, Buffer.concat(chunks)); } }); } @@ -166,7 +170,7 @@ exports.tests = { var _this = this; this.cache.openReadStream(this.createUrl('/url1'), function(err, stream, path) { test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream"); - catStream(stream, function (contents) { + catStream(stream, function (err, contents) { test.equal(contents.toString('utf8'), 'url1 contents'); test.done(); }); @@ -182,7 +186,7 @@ exports.tests = { test.equal(_this.requests.length, 1); test.equal(_this.requests[0], '/url5'); test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream"); - catStream(stream, function (contents) { + catStream(stream, function (err, contents) { test.equal(contents.toString('utf8'), 'url5 contents'); test.done(); }); @@ -192,7 +196,7 @@ exports.tests = { _this.cache.openReadStream({ url: _this.createUrl('/url5'), etagFormat: 'md5' }, function(err, stream, path) { test.equal(_this.requests.length, 1); // request is handled from cache. test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream"); - catStream(stream, function (contents) { + catStream(stream, function (err, contents) { test.equal(contents.toString('utf8'), 'url5 contents'); test.done(); }); @@ -213,7 +217,7 @@ exports.tests = { test.equal(_this.requests.length, 1); test.equal(_this.requests[0], '/url7'); test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream"); - catStream(stream, function (contents) { + catStream(stream, function (err, contents) { test.equal(contents.toString('utf8'), 'url7 contents'); test.done(); }); @@ -223,7 +227,7 @@ exports.tests = { _this.cache.openReadStream({ url: _this.createUrl('/url7'), etagFormat: 'md5' }, function(err, stream, path) { test.equal(_this.requests.length, 1); // request is handled from cache. test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream"); - catStream(stream, function (contents) { + catStream(stream, function (err, contents) { test.equal(contents.toString('utf8'), 'url7 contents'); test.done(); }); @@ -256,7 +260,7 @@ exports.tests = { _this.cache.openReadStream({ url: _this.createUrl('/url1'), etagFormat: 'md5' }, function(err, stream, path) { test.equal(_this.requests.length, 1); test.equal(_this.requests[0], '/url1'); - catStream(stream, function (contents) { + catStream(stream, function (err, contents) { test.equal(contents.toString('utf8'), 'url1 contents'); cb(); }); @@ -268,7 +272,7 @@ exports.tests = { _this.cache.openReadStream({ url: _this.createUrl('/url1'), etagFormat: 'md5' }, function(err, stream, path) { test.equal(_this.requests.length, 2); test.equal(_this.requests[1], '/url1'); - catStream(stream, function (contents) { + catStream(stream, function (err, contents) { test.equal(contents.toString('utf8'), 'url1 contents'); cb(); }); @@ -358,7 +362,7 @@ exports.tests = { }, testConcurrentRequests: function(test) { - test.expect(4); + test.expect(2); var _this = this; var count = 2; @@ -367,8 +371,7 @@ exports.tests = { if (count === 0) { test.done(); } }; var cb = function(err, stream, path) { - test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream"); - catStream(stream, function (contents) { + catStream(stream, function (err, contents) { test.equal(contents.toString('utf8'), 'url1 contents'); barrier(); }); @@ -379,34 +382,34 @@ exports.tests = { testBasicCaching: function(test) { - test.expect(8); + test.expect(6); doTest(this, test, '/url1', 'url1 contents', false, true, 0, test.done); }, testExplicitNoCache: function(test) { - test.expect(8); + test.expect(6); doTest(this, test, '/url2', 'url2 contents', false, false, 0, test.done); }, testUnparseableCacheControl: function(test) { - test.expect(8); + test.expect(6); doTest(this, test, '/url4', 'url4 contents', false, false, 0, test.done); }, testNoCache: function(test) { // URLs without a Cache-Control header don't get cached. - test.expect(8); + test.expect(6); doTest(this, test, '/url3', 'url3 contents', false, false, 0, test.done); }, testUnexpiredCache: function(test) { - test.expect(8); + test.expect(6); // 200 is the maximum allowable age. doTest(this, test, '/url1', 'url1 contents', false, true, 200, test.done); }, testExpiredCache: function(test) { - test.expect(8); + test.expect(6); doTest(this, test, '/url1', 'url1 contents', false, false, 201, test.done); }, @@ -691,29 +694,27 @@ exports.tests = { function doTest(_this, test, url, contents, firstCached, secondCached, deltaT, cb) { var count = 0; if (!deltaT) { deltaT = 0; } - _this.cache.openReadStream(_this.createUrl(url), function(err, stream, path) { + + _this.cache.getContents(_this.createUrl(url), function(err, buffer, path) { if (!firstCached) { count++; } - if (!stream) { - test.ok(err, "if stream is null there had better be an error"); + if (!buffer) { + test.ok(err, "if buffer is null there had better be an error"); return cb(); } - test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream"); test.ok(fs.existsSync(path)); - catStream(stream, function (contents) { - test.equal(contents.toString('utf8'), contents); + + test.equal(buffer.toString('utf8'), contents); + test.equal(_this.serverUrls[url].fetchCount, count); + _this.nowSeconds += deltaT; + _this.cache.reset(); + + _this.cache.getContents(_this.createUrl(url), function(err, buffer, path) { + if (!secondCached) { count++; } + + test.ok(fs.existsSync(path)); test.equal(_this.serverUrls[url].fetchCount, count); - _this.nowSeconds += deltaT; - _this.cache.reset(); - _this.cache.openReadStream(_this.createUrl(url), function(err, stream, path) { - if (!secondCached) { count++; } - test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream"); - test.ok(fs.existsSync(path)); - test.equal(_this.serverUrls[url].fetchCount, count); - catStream(stream, function (contents) { - test.equal(contents.toString('utf8'), contents); - cb(); - }); - }); + test.equal(buffer.toString('utf8'), contents); + cb(); }); });