Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Simple Router For Queues
``` js
{
"queue": { // object, required
"ackOnMessageError": "" // boolean, optional, (send ack when handle function failed, default true)
"aws": { // object, required
"credentials": { // object, optional
"region": "", // string, optional, (default from env AWS_REGION)
Expand Down
7 changes: 5 additions & 2 deletions src/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ const messageStatuses = {


class Worker extends EventEmitter {
constructor(consumer, router) {
constructor(consumer, router, options = {}) {
super();
this._consumer = consumer;
this.router = router;
this.router = router;
this.options = options;
if (_.isNil(this.options.ackOnMessageError)) this.options.ackOnMessageError = true;
}

get messageSchema() {
Expand Down Expand Up @@ -61,6 +63,7 @@ class Worker extends EventEmitter {
this.emit('message', { type: jsonMessage.type, status: messageStatuses.proceed });
} catch (error) {
this.emit('error', error);
if (!this.options.ackOnMessageError) throw error;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its not clear from the name of the variable that you are going to throw an error

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error is for cancelling the handling and send the message to sqs again, so queue-router will handle this message again next time

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this way i am "sending nack"

}
}

Expand Down
2 changes: 1 addition & 1 deletion src/workerFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ function create(type, router, options = {}) {
case Types.SQS: {
const SqsConsumer = require('./consumers/SqsConsumer');
const sqsConsumer = new SqsConsumer(options.queue);
return new Worker(sqsConsumer, router).init();
return new Worker(sqsConsumer, router, options.queue).init();
}
default: {
throw new Error('Unsupported type');
Expand Down
40 changes: 31 additions & 9 deletions test/spec/unit-tests/Worker.ut.spec.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
/* eslint no-unused-vars:off */
'use strict';

const Promise = require('bluebird');
const Joi = require('joi');
const Worker = require('../../../src/Worker');
const Router = require('../../../src/Router');
const ConsumerMock = require('./mocks/Consumer');


test('start - 1 valid message and undefined atttributes with existing controller - should call TEST_CONTROLLER1 handler', async (done) => {
test('start - 1 valid message and undefined attributes with existing controller - should call TEST_CONTROLLER1 handler', async (done) => {
const consumerStub = new ConsumerMock();
const expectedMessage = {
type : 'TEST_CONTROLLER1',
Expand All @@ -32,7 +31,6 @@ test('start - 1 valid message and undefined atttributes with existing controller
});
const worker = new Worker(consumerStub, router).init();
worker.start();
await Promise.delay(100);
});

test('start - 1 valid message without existing controller - should not call TEST_CONTROLLER1 handler & emit error', async (done) => {
Expand All @@ -54,7 +52,6 @@ test('start - 1 valid message without existing controller - should not call TEST
worker.on('error', () => {
done();
}).start();
await Promise.delay(100);
});

test('start - 1 invalid message with existing controller - should not call TEST_CONTROLLER1 handler', async (done) => {
Expand All @@ -78,10 +75,9 @@ test('start - 1 invalid message with existing controller - should not call TEST_
worker.on('error', () => {
done();
}).start();
await Promise.delay(100);
});

test('start - 1 valid message and atttributes array with existing controller - should call TEST_CONTROLLER1 handler', async (done) => {
test('start - 1 valid message and attributes array with existing controller - should call TEST_CONTROLLER1 handler', async (done) => {
const consumerStub = new ConsumerMock();
const expectedMessage = {
type : 'TEST_CONTROLLER1',
Expand All @@ -105,8 +101,6 @@ test('start - 1 valid message and atttributes array with existing controller - s
}
};



consumerStub.injectFakeResponseData([JSON.stringify(expectedMessage)], expectedMessageAttributes);

const router = new Router();
Expand All @@ -125,5 +119,33 @@ test('start - 1 valid message and atttributes array with existing controller - s
worker.on('error', () => {
done();
}).start();
await Promise.delay(100);
});

test('start - 1 valid message and ackOnMessageError=true - should call TEST_CONTROLLER1 handler', async (done) => {
const worker = new Worker({}, {});
worker.on('error', () => {
});
await worker._handleMessage(JSON.stringify({
type : 'TEST_CONTROLLER1',
content: {
age: 19
}
}));
done();
});

test('start - 1 valid message and ackOnMessageError=false - should call TEST_CONTROLLER1 handler', async (done) => {
const worker = new Worker({}, {}, { ackOnMessageError: false });
worker.on('error', () => {
});
try {
await worker._handleMessage(JSON.stringify({
type : 'TEST_CONTROLLER1',
content: {
age: 19
}
}));
} catch (error) {
done();
}
});