From 32ede09cb2b7ccc0667e31d1c7fb372125e98ada Mon Sep 17 00:00:00 2001 From: Rami Moshe Date: Mon, 4 Nov 2019 00:16:12 +0200 Subject: [PATCH 1/2] add ackOnMessageError --- README.md | 1 + src/Worker.js | 7 +++-- src/workerFactory.js | 2 +- test/spec/unit-tests/Worker.ut.spec.js | 40 ++++++++++++++++++++------ 4 files changed, 38 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 74a57de..1e97ed5 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ Simple Router For Queues ``` js { "queue": { // object, required + "ackOnMessageError": "" // boolean, optional, (send ack when handle funcrion failed, default true) "aws": { // object, required "credentials": { // object, optional "region": "", // string, optional, (default from env AWS_REGION) diff --git a/src/Worker.js b/src/Worker.js index 26322a4..f84299f 100644 --- a/src/Worker.js +++ b/src/Worker.js @@ -12,10 +12,12 @@ const messageStatuses = { class Worker extends EventEmitter { - constructor(consumer, router) { + constructor(consumer, router, options = {}) { super(); this._consumer = consumer; - this.router = router; + this.router = router; + this.options = options; + if (_.isNil(this.options.ackOnMessageError)) this.options.ackOnMessageError = true; } get messageSchema() { @@ -61,6 +63,7 @@ class Worker extends EventEmitter { this.emit('message', { type: jsonMessage.type, status: messageStatuses.proceed }); } catch (error) { this.emit('error', error); + if (!this.options.ackOnMessageError) throw error; } } diff --git a/src/workerFactory.js b/src/workerFactory.js index 32b093f..f549db2 100644 --- a/src/workerFactory.js +++ b/src/workerFactory.js @@ -11,7 +11,7 @@ function create(type, router, options = {}) { case Types.SQS: { const SqsConsumer = require('./consumers/SqsConsumer'); const sqsConsumer = new SqsConsumer(options.queue); - return new Worker(sqsConsumer, router).init(); + return new Worker(sqsConsumer, router, options.queue).init(); } default: { throw new Error('Unsupported type'); diff --git a/test/spec/unit-tests/Worker.ut.spec.js b/test/spec/unit-tests/Worker.ut.spec.js index ee4d5ac..6030473 100644 --- a/test/spec/unit-tests/Worker.ut.spec.js +++ b/test/spec/unit-tests/Worker.ut.spec.js @@ -1,14 +1,13 @@ /* eslint no-unused-vars:off */ 'use strict'; -const Promise = require('bluebird'); const Joi = require('joi'); const Worker = require('../../../src/Worker'); const Router = require('../../../src/Router'); const ConsumerMock = require('./mocks/Consumer'); -test('start - 1 valid message and undefined atttributes with existing controller - should call TEST_CONTROLLER1 handler', async (done) => { +test('start - 1 valid message and undefined attributes with existing controller - should call TEST_CONTROLLER1 handler', async (done) => { const consumerStub = new ConsumerMock(); const expectedMessage = { type : 'TEST_CONTROLLER1', @@ -32,7 +31,6 @@ test('start - 1 valid message and undefined atttributes with existing controller }); const worker = new Worker(consumerStub, router).init(); worker.start(); - await Promise.delay(100); }); test('start - 1 valid message without existing controller - should not call TEST_CONTROLLER1 handler & emit error', async (done) => { @@ -54,7 +52,6 @@ test('start - 1 valid message without existing controller - should not call TEST worker.on('error', () => { done(); }).start(); - await Promise.delay(100); }); test('start - 1 invalid message with existing controller - should not call TEST_CONTROLLER1 handler', async (done) => { @@ -78,10 +75,9 @@ test('start - 1 invalid message with existing controller - should not call TEST_ worker.on('error', () => { done(); }).start(); - await Promise.delay(100); }); -test('start - 1 valid message and atttributes array with existing controller - should call TEST_CONTROLLER1 handler', async (done) => { +test('start - 1 valid message and attributes array with existing controller - should call TEST_CONTROLLER1 handler', async (done) => { const consumerStub = new ConsumerMock(); const expectedMessage = { type : 'TEST_CONTROLLER1', @@ -105,8 +101,6 @@ test('start - 1 valid message and atttributes array with existing controller - s } }; - - consumerStub.injectFakeResponseData([JSON.stringify(expectedMessage)], expectedMessageAttributes); const router = new Router(); @@ -125,5 +119,33 @@ test('start - 1 valid message and atttributes array with existing controller - s worker.on('error', () => { done(); }).start(); - await Promise.delay(100); +}); + +test('start - 1 valid message and ackOnMessageError=true - should call TEST_CONTROLLER1 handler', async (done) => { + const worker = new Worker({}, {}); + worker.on('error', () => { + }); + await worker._handleMessage(JSON.stringify({ + type : 'TEST_CONTROLLER1', + content: { + age: 19 + } + })); + done(); +}); + +test('start - 1 valid message and ackOnMessageError=false - should call TEST_CONTROLLER1 handler', async (done) => { + const worker = new Worker({}, {}, { ackOnMessageError: false }); + worker.on('error', () => { + }); + try { + await worker._handleMessage(JSON.stringify({ + type : 'TEST_CONTROLLER1', + content: { + age: 19 + } + })); + } catch (error) { + done(); + } }); \ No newline at end of file From 693c2bdcf92b16646747536e6e4711a8f51233f1 Mon Sep 17 00:00:00 2001 From: Rami Moshe Date: Mon, 11 Nov 2019 16:04:40 +0200 Subject: [PATCH 2/2] update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1e97ed5..251c93c 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ Simple Router For Queues ``` js { "queue": { // object, required - "ackOnMessageError": "" // boolean, optional, (send ack when handle funcrion failed, default true) + "ackOnMessageError": "" // boolean, optional, (send ack when handle function failed, default true) "aws": { // object, required "credentials": { // object, optional "region": "", // string, optional, (default from env AWS_REGION)