From 132bfc020a0a935f45ee7967f56bcd5f41b211d0 Mon Sep 17 00:00:00 2001 From: Clint Zirker Date: Tue, 18 Feb 2025 16:31:58 -0700 Subject: [PATCH 1/2] Fixing issue with streams stalling for large files using autoConfigure for read --- lib/stream/helper/leo-stream-helper.ts | 2 +- lib/stream/leo-stream.js | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/stream/helper/leo-stream-helper.ts b/lib/stream/helper/leo-stream-helper.ts index 9af522d..1899194 100644 --- a/lib/stream/helper/leo-stream-helper.ts +++ b/lib/stream/helper/leo-stream-helper.ts @@ -320,7 +320,7 @@ export function createFastS3ReadHooks(settings: ReadHooksParams): ReadOptionHook let data = Array.isArray(result.data) ? result.data : [result.data]; for (let i = 0; i < data.length; i++) { - if (!stream.push(data[i])) { + if (!stream.write(data[i])) { logger.debug(result.id, "parsePool Backpressure - start"); await new Promise(resolve => stream.once("drain", () => { logger.debug(result.id, "parsePool Backpressure - done"); diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index f2d18d2..d06e02d 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -361,7 +361,7 @@ module.exports = function(configure) { timestamp: opts.timestamp || moment.now(), source_timestamp: opts.source_timestamp || opts.event_source_timestamp || moment.now() }, function(err, data) { - logger.error(err, data); + logger.error("logSourceRead.checkpoint", err, data); if (callback) callback(); }); }, @@ -881,7 +881,7 @@ module.exports = function(configure) { }, function(err, data) { logger.debug(process.memoryUsage()); if (err) { - logger.error(err); + logger.error("toLeo.firehose.putRecordBatch", err); retry.backoff(); } else if (data.FailedPutCount && data.FailedPutCount > 0) { var left = []; @@ -928,7 +928,7 @@ module.exports = function(configure) { StreamName: configure.stream }, function(err, data) { if (err) { - logger.error(err); + logger.error("toLeo.kinesis.putRecords", err); retry.backoff(); } else if (data.FailedRecordCount && data.FailedRecordCount > 0) { var left = []; @@ -1677,7 +1677,7 @@ module.exports = function(configure) { dynamodb.docClient.query(params, function(err, data) { logger.debug("Consumed Capacity", data && data.ConsumedCapacity); if (err) { - logger.error(err); + logger.error("fromLeo.getEventsQuery.dynamodb.query", err); callback(err); return; } @@ -1720,7 +1720,7 @@ module.exports = function(configure) { leo.getEvents(ID, q, Object.assign({}, opts, { start: start }), (err, events, checkpoint) => { - err && logger.error(err); + err && logger.error("fromLeo.getEventsOld", err); callback(err, events); }); }; @@ -2140,7 +2140,7 @@ module.exports = function(configure) { err.code != "RequestAbortedError" && !(isPassDestroyed && err.code == "Z_BUF_ERROR") && !(isPassDestroyed && err.message == "premature close")) { - logger.error(err); + logger.error("fromLeo.stream.pipe.error", err); done(err); } else { done(); @@ -2165,7 +2165,7 @@ module.exports = function(configure) { }); }, (err) => { logger.debug("Calling Pass.end"); - if (err) logger.error(err); + if (err) logger.error("fromLeo.stream.pipe.end", err); hasEnded = true; hooks.onEnd().finally(() => { if (err) { @@ -2331,7 +2331,7 @@ module.exports = function(configure) { }, function(err, data) { if (err) { logger.info(`All ${myRecords.length} records failed! Retryable: ${err.retryable}`, err); - logger.error(myRecords); + logger.error("toDynamoDB.batchWrite", myRecords); if (err.retryable) { retry.backoff(err); } else { From 2be8becb208e0d89a68dab387e7d8e77f58f66f4 Mon Sep 17 00:00:00 2001 From: Clint Zirker Date: Tue, 18 Feb 2025 16:38:18 -0700 Subject: [PATCH 2/2] fixing deprecated github action version --- .github/workflows/node.js.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml index 3675d6e..f789d87 100644 --- a/.github/workflows/node.js.yml +++ b/.github/workflows/node.js.yml @@ -31,7 +31,7 @@ jobs: - run: npm i - run: npm run compile --if-present - run: npm run coverage-all --if-present - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 with: name: code-coverage-${{ matrix.node-version }} path: coverage/