From 3622a511d1a65ab1b8642a62f1bf0aa92b992c48 Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Sat, 15 Nov 2025 10:19:59 -0300 Subject: [PATCH] feat(driver): lock jobs inside drivers --- package-lock.json | 135 +++++++++++++++++----------------- package.json | 2 +- src/drivers/AwsSqsDriver.ts | 32 ++++++++ src/drivers/DatabaseDriver.ts | 43 +++++++++-- src/drivers/Driver.ts | 15 +++- src/drivers/MemoryDriver.ts | 10 ++- 6 files changed, 159 insertions(+), 78 deletions(-) diff --git a/package-lock.json b/package-lock.json index 3dd740b..c4fa96c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@athenna/queue", - "version": "5.17.0", + "version": "5.18.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@athenna/queue", - "version": "5.17.0", + "version": "5.18.0", "license": "MIT", "dependencies": { "@aws-sdk/client-sqs": "^3.859.0" @@ -966,14 +966,14 @@ } }, "node_modules/@commitlint/config-validator": { - "version": "19.8.1", - "resolved": "https://registry.npmjs.org/@commitlint/config-validator/-/config-validator-19.8.1.tgz", - "integrity": "sha512-0jvJ4u+eqGPBIzzSdqKNX1rvdbSU1lPNYlfQQRIFnBgLy26BtC0cFnr7c/AyuzExMxWsMOte6MkTi9I3SQ3iGQ==", + "version": "20.0.0", + "resolved": "https://registry.npmjs.org/@commitlint/config-validator/-/config-validator-20.0.0.tgz", + "integrity": "sha512-BeyLMaRIJDdroJuYM2EGhDMGwVBMZna9UiIqV9hxj+J551Ctc6yoGuGSmghOy/qPhBSuhA6oMtbEiTmxECafsg==", "dev": true, "license": "MIT", "optional": true, "dependencies": { - "@commitlint/types": "^19.8.1", + "@commitlint/types": "^20.0.0", "ajv": "^8.11.0" }, "engines": { @@ -981,9 +981,9 @@ } }, "node_modules/@commitlint/execute-rule": { - "version": "19.8.1", - "resolved": "https://registry.npmjs.org/@commitlint/execute-rule/-/execute-rule-19.8.1.tgz", - "integrity": "sha512-YfJyIqIKWI64Mgvn/sE7FXvVMQER/Cd+s3hZke6cI1xgNT/f6ZAz5heND0QtffH+KbcqAwXDEE1/5niYayYaQA==", + "version": "20.0.0", + "resolved": "https://registry.npmjs.org/@commitlint/execute-rule/-/execute-rule-20.0.0.tgz", + "integrity": "sha512-xyCoOShoPuPL44gVa+5EdZsBVao/pNzpQhkzq3RdtlFdKZtjWcLlUFQHSWBuhk5utKYykeJPSz2i8ABHQA+ZZw==", "dev": true, "license": "MIT", "optional": true, @@ -992,17 +992,17 @@ } }, "node_modules/@commitlint/load": { - "version": "19.8.1", - "resolved": "https://registry.npmjs.org/@commitlint/load/-/load-19.8.1.tgz", - "integrity": "sha512-9V99EKG3u7z+FEoe4ikgq7YGRCSukAcvmKQuTtUyiYPnOd9a2/H9Ak1J9nJA1HChRQp9OA/sIKPugGS+FK/k1A==", + "version": "20.1.0", + "resolved": "https://registry.npmjs.org/@commitlint/load/-/load-20.1.0.tgz", + "integrity": "sha512-qo9ER0XiAimATQR5QhvvzePfeDfApi/AFlC1G+YN+ZAY8/Ua6IRrDrxRvQAr+YXUKAxUsTDSp9KXeXLBPsNRWg==", "dev": true, "license": "MIT", "optional": true, "dependencies": { - "@commitlint/config-validator": "^19.8.1", - "@commitlint/execute-rule": "^19.8.1", - "@commitlint/resolve-extends": "^19.8.1", - "@commitlint/types": "^19.8.1", + "@commitlint/config-validator": "^20.0.0", + "@commitlint/execute-rule": "^20.0.0", + "@commitlint/resolve-extends": "^20.1.0", + "@commitlint/types": "^20.0.0", "chalk": "^5.3.0", "cosmiconfig": "^9.0.0", "cosmiconfig-typescript-loader": "^6.1.0", @@ -1015,15 +1015,15 @@ } }, "node_modules/@commitlint/resolve-extends": { - "version": "19.8.1", - "resolved": "https://registry.npmjs.org/@commitlint/resolve-extends/-/resolve-extends-19.8.1.tgz", - "integrity": "sha512-GM0mAhFk49I+T/5UCYns5ayGStkTt4XFFrjjf0L4S26xoMTSkdCf9ZRO8en1kuopC4isDFuEm7ZOm/WRVeElVg==", + "version": "20.1.0", + "resolved": "https://registry.npmjs.org/@commitlint/resolve-extends/-/resolve-extends-20.1.0.tgz", + "integrity": "sha512-cxKXQrqHjZT3o+XPdqDCwOWVFQiae++uwd9dUBC7f2MdV58ons3uUvASdW7m55eat5sRiQ6xUHyMWMRm6atZWw==", "dev": true, "license": "MIT", "optional": true, "dependencies": { - "@commitlint/config-validator": "^19.8.1", - "@commitlint/types": "^19.8.1", + "@commitlint/config-validator": "^20.0.0", + "@commitlint/types": "^20.0.0", "global-directory": "^4.0.1", "import-meta-resolve": "^4.0.0", "lodash.mergewith": "^4.6.2", @@ -1034,9 +1034,9 @@ } }, "node_modules/@commitlint/types": { - "version": "19.8.1", - "resolved": "https://registry.npmjs.org/@commitlint/types/-/types-19.8.1.tgz", - "integrity": "sha512-/yCrWGCoA1SVKOks25EGadP9Pnj0oAIHGpl2wH2M2Y46dPM2ueb8wyCVOD7O3WCTkaJ0IkKvzhl1JY7+uCT2Dw==", + "version": "20.0.0", + "resolved": "https://registry.npmjs.org/@commitlint/types/-/types-20.0.0.tgz", + "integrity": "sha512-bVUNBqG6aznYcYjTjnc3+Cat/iBgbgpflxbIBTnsHTX0YVpnmINPEkSRWymT2Q8aSH3Y7aKnEbunilkYe8TybA==", "dev": true, "license": "MIT", "optional": true, @@ -1700,6 +1700,13 @@ "node": ">= 8" } }, + "node_modules/@pinojs/redact": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/@pinojs/redact/-/redact-0.4.0.tgz", + "integrity": "sha512-k2ENnmBugE/rzQfEcdWHcCY+/FM3VLzH9cYEsbdsoqrvzAKRhUZeRNhAZvB8OitQJ1TBed3yqWtdjzS6wJKBwg==", + "dev": true, + "license": "MIT" + }, "node_modules/@pkgjs/parseargs": { "version": "0.11.0", "resolved": "https://registry.npmjs.org/@pkgjs/parseargs/-/parseargs-0.11.0.tgz", @@ -2771,9 +2778,9 @@ } }, "node_modules/@types/conventional-commits-parser": { - "version": "5.0.1", - "resolved": "https://registry.npmjs.org/@types/conventional-commits-parser/-/conventional-commits-parser-5.0.1.tgz", - "integrity": "sha512-7uz5EHdzz2TqoMfV7ee61Egf5y6NkcO4FB/1iCCQnbeiI1F3xzv3vK5dBCXUCLQgGYS+mUeigK1iKQzvED+QnQ==", + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/@types/conventional-commits-parser/-/conventional-commits-parser-5.0.2.tgz", + "integrity": "sha512-BgT2szDXnVypgpNxOK8aL5SGjUdaQbC++WZNjF1Qge3Og2+zhHj+RWhmehLhYyvQwqAmvezruVfOf8+3m74W+g==", "dev": true, "license": "MIT", "optional": true, @@ -5055,14 +5062,14 @@ } }, "node_modules/cosmiconfig-typescript-loader": { - "version": "6.1.0", - "resolved": "https://registry.npmjs.org/cosmiconfig-typescript-loader/-/cosmiconfig-typescript-loader-6.1.0.tgz", - "integrity": "sha512-tJ1w35ZRUiM5FeTzT7DtYWAFFv37ZLqSRkGi2oeCK1gPhvaWjkAtfXvLmvE1pRfxxp9aQo6ba/Pvg1dKj05D4g==", + "version": "6.2.0", + "resolved": "https://registry.npmjs.org/cosmiconfig-typescript-loader/-/cosmiconfig-typescript-loader-6.2.0.tgz", + "integrity": "sha512-GEN39v7TgdxgIoNcdkRE3uiAzQt3UXLyHbRHD6YoL048XAeOomyxaP+Hh/+2C6C2wYjxJ2onhJcsQp+L4YEkVQ==", "dev": true, "license": "MIT", "optional": true, "dependencies": { - "jiti": "^2.4.1" + "jiti": "^2.6.1" }, "engines": { "node": ">=v18" @@ -5703,9 +5710,9 @@ } }, "node_modules/error-ex": { - "version": "1.3.2", - "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.2.tgz", - "integrity": "sha512-7dFHNmqeFSEt2ZBsCriorKnn3Z2pj+fd9kmI6QoWw4//DL+icEBfc0U7qJCisqrTsKTjw4fNFy2pW9OqStD84g==", + "version": "1.3.4", + "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.4.tgz", + "integrity": "sha512-sqQamAnR14VgCr1A618A3sGrygcpK+HEbenA/HiEAkkUwcZIIB/tgWqHFxWgOyDh4nB4JCRimh79dR5Ywc9MDQ==", "dev": true, "license": "MIT", "dependencies": { @@ -6682,16 +6689,6 @@ "fast-decode-uri-component": "^1.0.1" } }, - "node_modules/fast-redact": { - "version": "3.5.0", - "resolved": "https://registry.npmjs.org/fast-redact/-/fast-redact-3.5.0.tgz", - "integrity": "sha512-dwsoQlS7h9hMeYUq1W++23NDcBLV4KqONnITDV9DjfS3q1SgDGVrBdvvTLUotWtPSD7asWDV9/CmsZPy8Hf70A==", - "dev": true, - "license": "MIT", - "engines": { - "node": ">=6" - } - }, "node_modules/fast-uri": { "version": "3.0.6", "resolved": "https://registry.npmjs.org/fast-uri/-/fast-uri-3.0.6.tgz", @@ -7871,9 +7868,9 @@ } }, "node_modules/husky/node_modules/js-yaml": { - "version": "3.14.1", - "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.1.tgz", - "integrity": "sha512-okMH7OXXJ7YrN9Ok3/SXrnu4iX9yOk+25nqX4imS2npuvTYDmo/QEZoqwZkYaIDk3jVvBOTOIEgEhaLOynBS9g==", + "version": "3.14.2", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.2.tgz", + "integrity": "sha512-PMSmkqxr106Xa156c2M265Z+FTrPl+oxd/rgOQy2tijQeK5TxQ43psO1ZCwhVOSdnn+RzkzlRz/eY4BgJBYVpg==", "dev": true, "license": "MIT", "dependencies": { @@ -8157,9 +8154,9 @@ } }, "node_modules/import-meta-resolve": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/import-meta-resolve/-/import-meta-resolve-4.1.0.tgz", - "integrity": "sha512-I6fiaX09Xivtk+THaMfAwnA3MVA5Big1WHF1Dfx9hFuvNIWpXnorlkzhcQf6ehrqQiiZECRt1poOAkPmer3ruw==", + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/import-meta-resolve/-/import-meta-resolve-4.2.0.tgz", + "integrity": "sha512-Iqv2fzaTQN28s/FwZAoFq0ZSs/7hMAHJVX+w8PZl3cY19Pxk6jFFalxQoIfW2826i/fDLXv8IiEZRIT0lDuWcg==", "dev": true, "license": "MIT", "optional": true, @@ -9212,9 +9209,9 @@ } }, "node_modules/jiti": { - "version": "2.5.1", - "resolved": "https://registry.npmjs.org/jiti/-/jiti-2.5.1.tgz", - "integrity": "sha512-twQoecYPiVA5K/h6SxtORw/Bs3ar+mLUtoPSc7iMXzQzK8d7eJ/R09wmTwAjiamETn1cXYPGfNnu7DMoHgu12w==", + "version": "2.6.1", + "resolved": "https://registry.npmjs.org/jiti/-/jiti-2.6.1.tgz", + "integrity": "sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ==", "dev": true, "license": "MIT", "optional": true, @@ -9237,9 +9234,9 @@ "license": "MIT" }, "node_modules/js-yaml": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz", - "integrity": "sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==", + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.1.tgz", + "integrity": "sha512-qQKT4zQxXl8lLwBtHMWwaTcGfFOZviOJet3Oy/xmGk2gZH677CJM9EvtfdSkgWcATZhj/55JZ0rmy3myCT5lsA==", "dev": true, "license": "MIT", "dependencies": { @@ -9332,9 +9329,9 @@ } }, "node_modules/jsonfile": { - "version": "6.1.0", - "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-6.1.0.tgz", - "integrity": "sha512-5dgndWOriYSm5cnYaJNhalLNDKOqFwyDB/rr1E9ZsGciGvKPs8R2xYGCacuf3z6K1YKDz182fd+fY3cn3pMqXQ==", + "version": "6.2.0", + "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-6.2.0.tgz", + "integrity": "sha512-FGuPw30AdOIUTRMC2OMRtQV+jkVj2cfPqSeWXv1NEAJ1qZ5zb1X6z1mFhbfOB/iy3ssJCD+3KuZ8r8C3uVFlAg==", "dev": true, "license": "MIT", "dependencies": { @@ -11198,14 +11195,14 @@ } }, "node_modules/pino": { - "version": "9.7.0", - "resolved": "https://registry.npmjs.org/pino/-/pino-9.7.0.tgz", - "integrity": "sha512-vnMCM6xZTb1WDmLvtG2lE/2p+t9hDEIvTWJsu6FejkE62vB7gDhvzrpFR4Cw2to+9JNQxVnkAKVPA1KPB98vWg==", + "version": "9.14.0", + "resolved": "https://registry.npmjs.org/pino/-/pino-9.14.0.tgz", + "integrity": "sha512-8OEwKp5juEvb/MjpIc4hjqfgCNysrS94RIOMXYvpYCdm/jglrKEiAYmiumbmGhCvs+IcInsphYDFwqrjr7398w==", "dev": true, "license": "MIT", "dependencies": { + "@pinojs/redact": "^0.4.0", "atomic-sleep": "^1.0.0", - "fast-redact": "^3.1.1", "on-exit-leak-free": "^2.1.0", "pino-abstract-transport": "^2.0.0", "pino-std-serializers": "^7.0.0", @@ -12637,9 +12634,9 @@ } }, "node_modules/spdx-license-ids": { - "version": "3.0.21", - "resolved": "https://registry.npmjs.org/spdx-license-ids/-/spdx-license-ids-3.0.21.tgz", - "integrity": "sha512-Bvg/8F5XephndSK3JffaRqdT+gyhfqIPwDHpX80tJrF8QQRYMo8sNMeaZ2Dp5+jhwKnUmIOyFFQfHRkjJm5nXg==", + "version": "3.0.22", + "resolved": "https://registry.npmjs.org/spdx-license-ids/-/spdx-license-ids-3.0.22.tgz", + "integrity": "sha512-4PRT4nh1EImPbt2jASOKHX7PB7I+e4IWNLvkKFDxNhJlfjbYlleYQh285Z/3mPTHSAK/AvdMmw5BNNuYH8ShgQ==", "dev": true, "license": "CC0-1.0" }, @@ -12995,9 +12992,9 @@ } }, "node_modules/tar-fs": { - "version": "2.1.3", - "resolved": "https://registry.npmjs.org/tar-fs/-/tar-fs-2.1.3.tgz", - "integrity": "sha512-090nwYJDmlhwFwEW3QQl+vaNnxsO2yVsd45eTKRBzSzu+hlb1w2K9inVq5b0ngXuLVqQ4ApvsUHHnu/zQNkWAg==", + "version": "2.1.4", + "resolved": "https://registry.npmjs.org/tar-fs/-/tar-fs-2.1.4.tgz", + "integrity": "sha512-mDAjwmZdh7LTT6pNleZ05Yt65HC3E+NiQzl672vQG38jIrehtJk/J3mNwIg+vShQPcLF/LV7CMnDW6vjj6sfYQ==", "dev": true, "license": "MIT", "dependencies": { diff --git a/package.json b/package.json index 7c610c6..e81f9f5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@athenna/queue", - "version": "5.17.0", + "version": "5.18.0", "description": "The Athenna queue handler.", "license": "MIT", "author": "João Lenon ", diff --git a/src/drivers/AwsSqsDriver.ts b/src/drivers/AwsSqsDriver.ts index 436420a..9ec0ed7 100644 --- a/src/drivers/AwsSqsDriver.ts +++ b/src/drivers/AwsSqsDriver.ts @@ -387,13 +387,43 @@ export class AwsSqsDriver extends Driver { AwsSqsDriver.ackedIds.delete(job.id) + const heartbeatDelay = this.calculateHeartbeatDelay() + + let heartbeatTimeout: NodeJS.Timeout + + const startHeartbeat = () => { + if (heartbeatDelay <= 0) { + return + } + + heartbeatTimeout = setInterval(() => { + this.changeJobVisibility( + job.id, + this.msToS(this.visibilityTimeout) + ).catch(() => {}) + }, heartbeatDelay) + } + + const stopHeartbeat = () => { + if (!heartbeatTimeout) { + return + } + + clearInterval(heartbeatTimeout) + heartbeatTimeout = undefined + } + try { + startHeartbeat() + await processor({ id: job.id, attempts: job.attempts, data: job.data }) + stopHeartbeat() + if (!AwsSqsDriver.ackedIds.has(job.id)) { await this.changeJobVisibility( job.id, @@ -401,6 +431,8 @@ export class AwsSqsDriver extends Driver { ) } } catch (err) { + stopHeartbeat() + const receiveCount = Number( job.metadata.Attributes?.ApproximateReceiveCount ?? '1' ) diff --git a/src/drivers/DatabaseDriver.ts b/src/drivers/DatabaseDriver.ts index 30a6d26..87e6df1 100644 --- a/src/drivers/DatabaseDriver.ts +++ b/src/drivers/DatabaseDriver.ts @@ -283,22 +283,53 @@ export class DatabaseDriver extends Driver { * ``` */ public async process(processor: (data: unknown) => any | Promise) { - const job = await this.peek() + await this.releaseExpiredLeases() + + const now = Date.now() const requeueJitterMs = Math.floor(Math.random() * this.workerInterval) + const job = await this.client + .table(this.table) + .where('queue', this.queueName) + .where('availableAt', '<=', now) + .where((qb: any) => + qb.whereNull('reservedUntil').orWhere('reservedUntil', '<=', now) + ) + .orderBy('availableAt', 'asc') + .orderBy('createdAt', 'asc') + .find() + if (!job) { return } - DatabaseDriver.ackedIds.delete(job.id) + if (Is.Json(job.data)) { + job.data = JSON.parse(job.data) + } job.attempts-- job.reservedUntil = Date.now() + this.visibilityTimeout - await this.client.table(this.table).where('id', job.id).update({ - attempts: job.attempts, - reservedUntil: job.reservedUntil - }) + /** + * Trying to claim the job for this worker. + */ + const updatedJob = await this.client + .table(this.table) + .where('id', job.id) + .where('queue', this.queueName) + .where((qb: any) => + qb.whereNull('reservedUntil').orWhere('reservedUntil', '<=', now) + ) + .update({ attempts: job.attempts, reservedUntil: job.reservedUntil }) + + /** + * If job is not updated, it means another worker has claimed the job. + */ + if (!updatedJob) { + return + } + + DatabaseDriver.ackedIds.delete(job.id) try { await processor({ diff --git a/src/drivers/Driver.ts b/src/drivers/Driver.ts index e66d2f9..9fe8b95 100644 --- a/src/drivers/Driver.ts +++ b/src/drivers/Driver.ts @@ -129,7 +129,20 @@ export abstract class Driver { } /** - * Calculate the backoff delay. + * Calculate the heartbeat delay. Used to define if job is still + * running. + */ + public calculateHeartbeatDelay() { + if (!this.visibilityTimeout) { + return 0 + } + + return Math.max(5_000, Math.floor(this.visibilityTimeout * 0.8)) + } + + /** + * Calculate the backoff delay. Used to define the retry delays when + * a job fails processing. */ public calculateBackoffDelay(attempts: number) { if (!this.backoff) { diff --git a/src/drivers/MemoryDriver.ts b/src/drivers/MemoryDriver.ts index 74b2687..7903962 100644 --- a/src/drivers/MemoryDriver.ts +++ b/src/drivers/MemoryDriver.ts @@ -244,9 +244,17 @@ export class MemoryDriver extends Driver { * ``` */ public async process(processor: (data: unknown) => any | Promise) { - const job = await this.peek() + this.defineQueue() + + await this.releaseExpiredLeases() + + const now = Date.now() const requeueJitterMs = Math.floor(Math.random() * this.workerInterval) + const job = this.client.queues[this.queueName].find(j => { + return j.availableAt <= now && !j.reservedUntil + }) + if (!job) { return }