From 70d97dbb71e26c91d03667eb71af90f013e87a13 Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Fri, 14 Nov 2025 15:46:49 -0300 Subject: [PATCH 1/5] feat(worker): add concurrency option --- bin/test.ts | 1 + src/exceptions/WorkerTimeoutException.ts | 21 +++++ src/kernels/WorkerKernel.ts | 13 +-- src/types/WorkerOptions.ts | 7 ++ src/worker/WorkerTaskBuilder.ts | 112 ++++++++++++++--------- 5 files changed, 103 insertions(+), 51 deletions(-) create mode 100644 src/exceptions/WorkerTimeoutException.ts diff --git a/bin/test.ts b/bin/test.ts index fa89f76..4c168e7 100644 --- a/bin/test.ts +++ b/bin/test.ts @@ -16,4 +16,5 @@ await Runner.setTsEnv() .addPath('tests/unit/**/*.ts') .setCliArgs(process.argv.slice(2)) .setGlobalTimeout(30000) + .setForceExit() .run() diff --git a/src/exceptions/WorkerTimeoutException.ts b/src/exceptions/WorkerTimeoutException.ts new file mode 100644 index 0000000..c439105 --- /dev/null +++ b/src/exceptions/WorkerTimeoutException.ts @@ -0,0 +1,21 @@ +/** + * @athenna/queue + * + * (c) João Lenon + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import { Exception } from '@athenna/common' + +export class WorkerTimeoutException extends Exception { + public constructor(taskName: string) { + const message = `The worker task ${taskName} has timed out.` + + super({ + message, + code: 'E_WORKER_TIMEOUT_ERROR' + }) + } +} diff --git a/src/kernels/WorkerKernel.ts b/src/kernels/WorkerKernel.ts index f6ff8e2..131e17d 100644 --- a/src/kernels/WorkerKernel.ts +++ b/src/kernels/WorkerKernel.ts @@ -7,15 +7,6 @@ * file that was distributed with this source code. */ -/** - * @athenna/http - * - * (c) João Lenon - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - import 'reflect-metadata' import { debug } from '#src/debug' @@ -132,6 +123,10 @@ export class WorkerKernel { ioc.alias(meta.camelAlias, meta.alias) } + if (meta.concurrency) { + builder.concurrency(meta.concurrency) + } + builder.connection(meta.connection).handler(ctx => { const worker = ioc.use(meta.name) || diff --git a/src/types/WorkerOptions.ts b/src/types/WorkerOptions.ts index 424b285..4afba68 100644 --- a/src/types/WorkerOptions.ts +++ b/src/types/WorkerOptions.ts @@ -15,6 +15,13 @@ export type WorkerOptions = { */ name?: string + /** + * Define how much instances of the same worker could run in parallel. + * + * @default 1 + */ + concurrency?: number + /** * The queue connection that will be used to get the configurations. * diff --git a/src/worker/WorkerTaskBuilder.ts b/src/worker/WorkerTaskBuilder.ts index 9dbcee1..d24fc96 100644 --- a/src/worker/WorkerTaskBuilder.ts +++ b/src/worker/WorkerTaskBuilder.ts @@ -7,12 +7,13 @@ * file that was distributed with this source code. */ -import { Is } from '@athenna/common' import { Log } from '@athenna/logger' import { Queue } from '#src/facades/Queue' +import { Is, Parser } from '@athenna/common' import { WorkerImpl } from '#src/worker/WorkerImpl' import type { Context, ConnectionOptions } from '#src/types' import type { WorkerHandler } from '#src/types/WorkerHandler' +import { WorkerTimeoutException } from '#src/exceptions/WorkerTimeoutException' export class WorkerTaskBuilder { public worker: { @@ -21,6 +22,11 @@ export class WorkerTaskBuilder { */ name?: string + /** + * Define the maximun number of concurrent processes of the same worker. + */ + concurrency?: number + /** * The queue connection of the worker task. */ @@ -52,6 +58,8 @@ export class WorkerTaskBuilder { isRunning?: boolean } = {} + private timers: NodeJS.Timeout[] = [] + public constructor() { this.worker.connection = Config.get('queue.default') } @@ -70,6 +78,20 @@ export class WorkerTaskBuilder { return this } + /** + * Set the max number of concurrent worker tasks. + * + * @example + * ```ts + * new WorkerTaskBuilder().name('my_worker_task').concurrency(5) + * ``` + */ + public concurrency(concurrency: number) { + this.worker.concurrency = concurrency + + return this + } + /** * Set the handler of the worker task. * @@ -208,6 +230,21 @@ export class WorkerTaskBuilder { return } + if (this.timers.length) { + return + } + + const n = this.worker.concurrency ?? 1 + + for (let i = 0; i < n; i++) { + this.spawn() + } + } + + /** + * Use spawn to force a worker instance to run. + */ + private spawn() { const intervalToRun = this.worker.options?.workerInterval || Config.get( @@ -215,18 +252,40 @@ export class WorkerTaskBuilder { 1000 ) + const timeoutMs = + this.worker.options?.workerTimeoutMs ?? + Config.get( + `queue.connections.${this.worker.connection}.workerTimeoutMs`, + Parser.timeToMs('5m') + ) + const initialOffset = this.computeInitialOffset(intervalToRun) - this.worker.interval = setTimeout(async () => { - if (!this.worker.isRunning) { - this.worker.isRunning = true + const loop = async () => { + if (!this.worker.isRegistered) { + return + } - await this.run() - this.worker.isRunning = false + try { + await Promise.race([ + this.run(), + new Promise((resolve, reject) => + setTimeout( + () => reject(new WorkerTimeoutException(this.worker.name)), + timeoutMs + ) + ) + ]) + } catch (err) { + Log.channelOrVanilla('exception').error(err) + } finally { + const delay = intervalToRun + this.computeJitter(intervalToRun) + + this.timers.push(setTimeout(loop, delay)) } + } - this.scheduleNext(intervalToRun) - }, initialOffset) + this.timers.push(setTimeout(loop, initialOffset)) } /** @@ -242,17 +301,11 @@ export class WorkerTaskBuilder { return } - if (!this.worker.interval) { - return - } - this.worker.isRegistered = false - this.worker.isRunning = false - if (this.worker.interval) { - clearTimeout(this.worker.interval) - this.worker.interval = undefined - } + this.timers.forEach(t => clearTimeout(t)) + + this.timers = [] } /** @@ -304,29 +357,4 @@ export class WorkerTaskBuilder { return Math.floor(Math.random() * (max + 1)) } - - /** - * Schedule the next worker task. - */ - private scheduleNext(baseMs: number) { - if (!this.worker.isRegistered) { - return - } - - const delay = baseMs + this.computeJitter(baseMs) - - this.worker.interval = setTimeout(async () => { - if (this.worker.isRunning) { - return this.scheduleNext(baseMs) - } - - this.worker.isRunning = true - - await this.run() - - this.worker.isRunning = false - - this.scheduleNext(baseMs) - }, delay) - } } From 0be603a49e9bfe1378297355fa23fb31b4def70d Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Fri, 14 Nov 2025 16:32:59 -0300 Subject: [PATCH 2/5] test(sqs): improve add test --- tests/unit/drivers/AwsSqsDriverTest.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/unit/drivers/AwsSqsDriverTest.ts b/tests/unit/drivers/AwsSqsDriverTest.ts index 0c2c8b9..646418c 100644 --- a/tests/unit/drivers/AwsSqsDriverTest.ts +++ b/tests/unit/drivers/AwsSqsDriverTest.ts @@ -91,9 +91,11 @@ export class AwsSqsDriverTest extends BaseTest { await queue.add({ hello: 'world' }) - const isEmpty = await queue.isEmpty() + const job = await queue.pop() - assert.isFalse(isEmpty) + assert.containSubset(job, { + data: { hello: 'world' } + }) } @Test() From b3fdcb4708fb60679072f98fd06d9de47607a27d Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Fri, 14 Nov 2025 19:04:56 -0300 Subject: [PATCH 3/5] test(kernel): adjust worker tests --- src/worker/WorkerTaskBuilder.ts | 10 ---------- tests/unit/drivers/AwsSqsDriverTest.ts | 23 ++++++++++++++++++++-- tests/unit/providers/WorkerProviderTest.ts | 9 ++++++++- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/worker/WorkerTaskBuilder.ts b/src/worker/WorkerTaskBuilder.ts index d24fc96..dda7c76 100644 --- a/src/worker/WorkerTaskBuilder.ts +++ b/src/worker/WorkerTaskBuilder.ts @@ -32,11 +32,6 @@ export class WorkerTaskBuilder { */ connection?: string - /** - * The interval instance of the worker task. - */ - interval?: NodeJS.Timeout - /** * Define if the worker task is registered. */ @@ -51,11 +46,6 @@ export class WorkerTaskBuilder { * The handler of the worker task. */ handler?: (ctx: Context) => any | Promise - - /** - * Define if the worker task is running. - */ - isRunning?: boolean } = {} private timers: NodeJS.Timeout[] = [] diff --git a/tests/unit/drivers/AwsSqsDriverTest.ts b/tests/unit/drivers/AwsSqsDriverTest.ts index 646418c..3c7fcdf 100644 --- a/tests/unit/drivers/AwsSqsDriverTest.ts +++ b/tests/unit/drivers/AwsSqsDriverTest.ts @@ -11,8 +11,8 @@ import { Is, Path } from '@athenna/common' import { EnvHelper } from '@athenna/config' import { LoggerProvider } from '@athenna/logger' import { BaseTest } from '#tests/helpers/BaseTest' -import { Queue, QueueProvider, WorkerProvider } from '#src' -import { Test, type Context, BeforeEach, AfterEach, Skip } from '@athenna/test' +import { Queue, WorkerProvider, QueueProvider } from '#src' +import { Test, type Context, BeforeEach, AfterEach, Skip, AfterAll } from '@athenna/test' export class AwsSqsDriverTest extends BaseTest { @BeforeEach() @@ -36,6 +36,25 @@ export class AwsSqsDriverTest extends BaseTest { Config.clear() } + @AfterAll() + public async afterAll() { + await Config.loadAll(Path.fixtures('config')) + + new QueueProvider().register() + new WorkerProvider().register() + new LoggerProvider().register() + + await Queue.connection('aws_sqs').truncate().catch() + + await Queue.closeAll() + + Queue.worker().close() + + ioc.reconstruct() + + Config.clear() + } + @Test() public async shouldBeAbleToConnectToDriver({ assert }: Context) { Queue.connection('aws_sqs') diff --git a/tests/unit/providers/WorkerProviderTest.ts b/tests/unit/providers/WorkerProviderTest.ts index 104aa15..a7ce470 100644 --- a/tests/unit/providers/WorkerProviderTest.ts +++ b/tests/unit/providers/WorkerProviderTest.ts @@ -45,7 +45,14 @@ export class WorkerProviderTest { new QueueProvider().register() new WorkerProvider().register() - assert.lengthOf(Queue.worker().getWorkerTasks(), 0) + new QueueProvider().shutdown() + new WorkerProvider().shutdown() + + assert.isTrue( + Queue.worker() + .getWorkerTasks() + .every(worker => worker.worker.isRegistered === false) + ) } @Test() From 851e6a918140ab11a1523db7666f5e8472424d97 Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Fri, 14 Nov 2025 19:08:27 -0300 Subject: [PATCH 4/5] ci(test): remove windows test --- .github/workflows/ci.yml | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c81e5f5..f9b4156 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,29 +36,3 @@ jobs: - name: Test code compilation run: npm run build - - windows: - runs-on: windows-latest - strategy: - matrix: - node-version: - - 21.x - steps: - - uses: actions/checkout@v2 - - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v1 - with: - node-version: ${{ matrix.node-version }} - - - name: Install dependencies - run: npm install - - - name: Run tests - run: npm run test:coverage - env: - AWS_REGION: ${{ secrets.AWS_REGION }} - AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - - - name: Test code compilation - run: npm run build From f55138292b7e118bfb50a4985256bc500f5cb51c Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Fri, 14 Nov 2025 19:48:52 -0300 Subject: [PATCH 5/5] test(driver): truncate jobs --- tests/unit/drivers/FakeDriverTest.ts | 2 ++ tests/unit/drivers/MemoryDriverTest.ts | 3 +++ tests/unit/kernels/WorkerKernelTest.ts | 3 +-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/unit/drivers/FakeDriverTest.ts b/tests/unit/drivers/FakeDriverTest.ts index 5d10e7e..6de82ad 100644 --- a/tests/unit/drivers/FakeDriverTest.ts +++ b/tests/unit/drivers/FakeDriverTest.ts @@ -23,7 +23,9 @@ export class FakeDriverTest { @AfterEach() public async afterEach() { + await Queue.connection('fake').truncate() await Queue.closeAll() + ioc.reconstruct() Config.clear() diff --git a/tests/unit/drivers/MemoryDriverTest.ts b/tests/unit/drivers/MemoryDriverTest.ts index e57c676..6458359 100644 --- a/tests/unit/drivers/MemoryDriverTest.ts +++ b/tests/unit/drivers/MemoryDriverTest.ts @@ -23,7 +23,10 @@ export class MemoryDriverTest { @AfterEach() public async afterEach() { + await Queue.connection('memory').truncate() + await Queue.connection('memoryBackoff').truncate() await Queue.closeAll() + ioc.reconstruct() Config.clear() diff --git a/tests/unit/kernels/WorkerKernelTest.ts b/tests/unit/kernels/WorkerKernelTest.ts index 56a47ab..8b118c2 100644 --- a/tests/unit/kernels/WorkerKernelTest.ts +++ b/tests/unit/kernels/WorkerKernelTest.ts @@ -145,10 +145,9 @@ export class WorkerKernelTest { await Sleep.for(20000).milliseconds().wait() - assert.isTrue(constants.RUN_MAP.helloWorker) + assert.isTrue(constants.RUN_MAP.productWorker) assert.isTrue(constants.RUN_MAP.annotatedWorker) assert.isTrue(constants.RUN_MAP.decoratedWorker) - assert.isTrue(constants.RUN_MAP.productWorker) } @Test()