Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,3 @@ jobs:

- name: Test code compilation
run: npm run build

windows:
runs-on: windows-latest
strategy:
matrix:
node-version:
- 21.x
steps:
- uses: actions/checkout@v2
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}

- name: Install dependencies
run: npm install

- name: Run tests
run: npm run test:coverage
env:
AWS_REGION: ${{ secrets.AWS_REGION }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

- name: Test code compilation
run: npm run build
1 change: 1 addition & 0 deletions bin/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ await Runner.setTsEnv()
.addPath('tests/unit/**/*.ts')
.setCliArgs(process.argv.slice(2))
.setGlobalTimeout(30000)
.setForceExit()
.run()
21 changes: 21 additions & 0 deletions src/exceptions/WorkerTimeoutException.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* @athenna/queue
*
* (c) João Lenon <lenon@athenna.io>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

import { Exception } from '@athenna/common'

export class WorkerTimeoutException extends Exception {
public constructor(taskName: string) {
const message = `The worker task ${taskName} has timed out.`

super({
message,
code: 'E_WORKER_TIMEOUT_ERROR'
})
}
}
13 changes: 4 additions & 9 deletions src/kernels/WorkerKernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,6 @@
* file that was distributed with this source code.
*/

/**
* @athenna/http
*
* (c) João Lenon <lenon@athenna.io>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

import 'reflect-metadata'

import { debug } from '#src/debug'
Expand Down Expand Up @@ -132,6 +123,10 @@ export class WorkerKernel {
ioc.alias(meta.camelAlias, meta.alias)
}

if (meta.concurrency) {
builder.concurrency(meta.concurrency)
}

builder.connection(meta.connection).handler(ctx => {
const worker =
ioc.use(meta.name) ||
Expand Down
7 changes: 7 additions & 0 deletions src/types/WorkerOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ export type WorkerOptions = {
*/
name?: string

/**
* Define how much instances of the same worker could run in parallel.
*
* @default 1
*/
concurrency?: number

/**
* The queue connection that will be used to get the configurations.
*
Expand Down
120 changes: 69 additions & 51 deletions src/worker/WorkerTaskBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
* file that was distributed with this source code.
*/

import { Is } from '@athenna/common'
import { Log } from '@athenna/logger'
import { Queue } from '#src/facades/Queue'
import { Is, Parser } from '@athenna/common'
import { WorkerImpl } from '#src/worker/WorkerImpl'
import type { Context, ConnectionOptions } from '#src/types'
import type { WorkerHandler } from '#src/types/WorkerHandler'
import { WorkerTimeoutException } from '#src/exceptions/WorkerTimeoutException'

export class WorkerTaskBuilder {
public worker: {
Expand All @@ -22,14 +23,14 @@ export class WorkerTaskBuilder {
name?: string

/**
* The queue connection of the worker task.
* Define the maximun number of concurrent processes of the same worker.
*/
connection?: string
concurrency?: number

/**
* The interval instance of the worker task.
* The queue connection of the worker task.
*/
interval?: NodeJS.Timeout
connection?: string

/**
* Define if the worker task is registered.
Expand All @@ -45,13 +46,10 @@ export class WorkerTaskBuilder {
* The handler of the worker task.
*/
handler?: (ctx: Context) => any | Promise<any>

/**
* Define if the worker task is running.
*/
isRunning?: boolean
} = {}

private timers: NodeJS.Timeout[] = []

public constructor() {
this.worker.connection = Config.get('queue.default')
}
Expand All @@ -70,6 +68,20 @@ export class WorkerTaskBuilder {
return this
}

/**
* Set the max number of concurrent worker tasks.
*
* @example
* ```ts
* new WorkerTaskBuilder().name('my_worker_task').concurrency(5)
* ```
*/
public concurrency(concurrency: number) {
this.worker.concurrency = concurrency

return this
}

/**
* Set the handler of the worker task.
*
Expand Down Expand Up @@ -208,25 +220,62 @@ export class WorkerTaskBuilder {
return
}

if (this.timers.length) {
return
}

const n = this.worker.concurrency ?? 1

for (let i = 0; i < n; i++) {
this.spawn()
}
}

/**
* Use spawn to force a worker instance to run.
*/
private spawn() {
const intervalToRun =
this.worker.options?.workerInterval ||
Config.get(
`queue.connections.${this.worker.connection}.workerInterval`,
1000
)

const timeoutMs =
this.worker.options?.workerTimeoutMs ??
Config.get(
`queue.connections.${this.worker.connection}.workerTimeoutMs`,
Parser.timeToMs('5m')
)

const initialOffset = this.computeInitialOffset(intervalToRun)

this.worker.interval = setTimeout(async () => {
if (!this.worker.isRunning) {
this.worker.isRunning = true
const loop = async () => {
if (!this.worker.isRegistered) {
return
}

await this.run()
this.worker.isRunning = false
try {
await Promise.race([
this.run(),
new Promise((resolve, reject) =>
setTimeout(
() => reject(new WorkerTimeoutException(this.worker.name)),
timeoutMs
)
)
])
} catch (err) {
Log.channelOrVanilla('exception').error(err)
} finally {
const delay = intervalToRun + this.computeJitter(intervalToRun)

this.timers.push(setTimeout(loop, delay))
}
}

this.scheduleNext(intervalToRun)
}, initialOffset)
this.timers.push(setTimeout(loop, initialOffset))
}

/**
Expand All @@ -242,17 +291,11 @@ export class WorkerTaskBuilder {
return
}

if (!this.worker.interval) {
return
}

this.worker.isRegistered = false
this.worker.isRunning = false

if (this.worker.interval) {
clearTimeout(this.worker.interval)
this.worker.interval = undefined
}
this.timers.forEach(t => clearTimeout(t))

this.timers = []
}

/**
Expand Down Expand Up @@ -304,29 +347,4 @@ export class WorkerTaskBuilder {

return Math.floor(Math.random() * (max + 1))
}

/**
* Schedule the next worker task.
*/
private scheduleNext(baseMs: number) {
if (!this.worker.isRegistered) {
return
}

const delay = baseMs + this.computeJitter(baseMs)

this.worker.interval = setTimeout(async () => {
if (this.worker.isRunning) {
return this.scheduleNext(baseMs)
}

this.worker.isRunning = true

await this.run()

this.worker.isRunning = false

this.scheduleNext(baseMs)
}, delay)
}
}
29 changes: 25 additions & 4 deletions tests/unit/drivers/AwsSqsDriverTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import { Is, Path } from '@athenna/common'
import { EnvHelper } from '@athenna/config'
import { LoggerProvider } from '@athenna/logger'
import { BaseTest } from '#tests/helpers/BaseTest'
import { Queue, QueueProvider, WorkerProvider } from '#src'
import { Test, type Context, BeforeEach, AfterEach, Skip } from '@athenna/test'
import { Queue, WorkerProvider, QueueProvider } from '#src'
import { Test, type Context, BeforeEach, AfterEach, Skip, AfterAll } from '@athenna/test'

export class AwsSqsDriverTest extends BaseTest {
@BeforeEach()
Expand All @@ -36,6 +36,25 @@ export class AwsSqsDriverTest extends BaseTest {
Config.clear()
}

@AfterAll()
public async afterAll() {
await Config.loadAll(Path.fixtures('config'))

new QueueProvider().register()
new WorkerProvider().register()
new LoggerProvider().register()

await Queue.connection('aws_sqs').truncate().catch()

await Queue.closeAll()

Queue.worker().close()

ioc.reconstruct()

Config.clear()
}

@Test()
public async shouldBeAbleToConnectToDriver({ assert }: Context) {
Queue.connection('aws_sqs')
Expand Down Expand Up @@ -91,9 +110,11 @@ export class AwsSqsDriverTest extends BaseTest {

await queue.add({ hello: 'world' })

const isEmpty = await queue.isEmpty()
const job = await queue.pop()

assert.isFalse(isEmpty)
assert.containSubset(job, {
data: { hello: 'world' }
})
}

@Test()
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/drivers/FakeDriverTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ export class FakeDriverTest {

@AfterEach()
public async afterEach() {
await Queue.connection('fake').truncate()
await Queue.closeAll()

ioc.reconstruct()

Config.clear()
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/drivers/MemoryDriverTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ export class MemoryDriverTest {

@AfterEach()
public async afterEach() {
await Queue.connection('memory').truncate()
await Queue.connection('memoryBackoff').truncate()
await Queue.closeAll()

ioc.reconstruct()

Config.clear()
Expand Down
3 changes: 1 addition & 2 deletions tests/unit/kernels/WorkerKernelTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,9 @@ export class WorkerKernelTest {

await Sleep.for(20000).milliseconds().wait()

assert.isTrue(constants.RUN_MAP.helloWorker)
assert.isTrue(constants.RUN_MAP.productWorker)
assert.isTrue(constants.RUN_MAP.annotatedWorker)
assert.isTrue(constants.RUN_MAP.decoratedWorker)
assert.isTrue(constants.RUN_MAP.productWorker)
}

@Test()
Expand Down
9 changes: 8 additions & 1 deletion tests/unit/providers/WorkerProviderTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ export class WorkerProviderTest {
new QueueProvider().register()
new WorkerProvider().register()

assert.lengthOf(Queue.worker().getWorkerTasks(), 0)
new QueueProvider().shutdown()
new WorkerProvider().shutdown()

assert.isTrue(
Queue.worker()
.getWorkerTasks()
.every(worker => worker.worker.isRegistered === false)
)
}

@Test()
Expand Down