Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ Pool.prototype._next = function () {
// get the first task from the queue
var me = this;
var task = this.tasks.shift();

// assign worker to promise so it can be referred to by the handler
task.resolver.promise.worker = worker;

// check if the task is still pending (and not cancelled -> promise rejected)
if (task.resolver.promise.pending) {
Expand Down
19 changes: 19 additions & 0 deletions src/Promise.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ function Promise(handler, parent) {
var _onSuccess = [];
var _onFail = [];

// empty worker
this.worker = null;

// status
this.resolved = false;
this.rejected = false;
Expand Down Expand Up @@ -141,6 +144,22 @@ function Promise(handler, parent) {
return me;
};

/**
* Calls a defined worker method with optional parameters
* @param {string} methodName Name of the method to be called as defined in the workerpool.worker() function
* @param {Array} [methodParameters] Array with optional parameters
*/
this.emit = function (methodName, methodParameters) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename this to function (eventName, eventParams) { ?

if (this.worker?.worker && methodName) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, an emitted event will be silently ignored when a worker is not yet attached (i.e. the task is queued). I think it would be better to throw an exception, or queue the message and execute. Or maybe return a boolean indicating whether the message was sent or not. What do you think?

const payload = {
eventName: methodName,
eventParams: methodParameters,
};
this.worker.worker.postMessage(payload);
}
return this;
};

// attach handler passing the resolve and reject functions
handler(function (result) {
_resolve(result);
Expand Down
2 changes: 1 addition & 1 deletion src/generated/embeddedWorker.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ exports.pool = function pool(script, options) {
* Create a worker and optionally register a set of methods to the worker.
* @param {Object} [methods]
*/
exports.worker = function worker(methods) {
exports.worker = function worker(methods, events) {
var worker = require('./worker');
worker.add(methods);
if (events) worker.addOnEvents(events);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personal preference: I find the name addOnEvents a bit confusing. How about naming it registerEvents or registerEventListeners for example?

};

/**
Expand All @@ -38,4 +39,4 @@ exports.Promise = require('./Promise');

exports.platform = environment.platform;
exports.isMainThread = environment.isMainThread;
exports.cpus = environment.cpus;
exports.cpus = environment.cpus;
16 changes: 15 additions & 1 deletion src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ worker.on('message', function (request) {

if (method) {
currentRequestId = request.id;

// execute the function
var result = method.apply(method, request.params);

Expand Down Expand Up @@ -196,7 +196,21 @@ worker.emit = function (payload) {
}
};

worker.onEvents = function(events) {
for (const eventKey of Object.keys(events)) {
worker.on('message', function({eventName, eventParams}) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding a new listener worker.on('message', ...) for every event, it maybe a a good idea to extend the existing worker.on('message', function (request) { ... } instead, so all the logic for handling incoming messages is in one place, so you can clearly see that it is matching methods and events.

We could restructure the handler to do that explicitly, like

worker.on('message', function (request) { 
  try {
    if (isMethod(request)) {
      onMethod(request.method, request.params)
    } else if (isEvent(request)) {
      onEvent(request.eventName, request.eventParams)
    } else {
      // throw error
    }
  } catch (err) { 
    // ...
  }
}

What do you think?

if (eventName === eventKey) {
if (eventParams && !Array.isArray(eventParams)) {
eventParams = [eventParams];
}
events[eventKey].apply(worker, eventParams);
}
});
}
}

if (typeof exports !== 'undefined') {
exports.add = worker.register;
exports.emit = worker.emit;
exports.addOnEvents = worker.onEvents;
}
42 changes: 42 additions & 0 deletions test/CallMethod.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
var assert = require('assert');
var Pool = require('../src/Pool');

it('creates a worker pool with event callbacks', function (done) {
const pool = new Pool(__dirname + "/workers/waitForMessage.js");

const handler = pool.exec('fibonacci', [15]);

handler
.then(function (result) {
assert.strictEqual(result, 610);
})
.catch(function (err) {
console.error(err);
})
.then(function () {
pool.terminate(); // terminate all workers when done
done();
})
.catch(done);
});

it('creates a worker pool and sends a message', function (done) {
const pool = new Pool(__dirname + "/workers/waitForMessage.js");

const handler = pool.exec('fibonacci', [15]);

handler
.then(function (result) {
assert.strictEqual(result, 610);
})
.catch(function (err) {
assert.strictEqual(err.toString().includes('exitCode: `99`'), true);
})
.then(function () {
pool.terminate(); // terminate all workers when done
done();
})
.catch(done);

handler.emit('killme', 99); // forces the worker to exit with exitcode 99
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the feeling that the unit test testing the behavior can be simplified a bit, rather than indirectly testing for the worker killing itself as a result of receiving a message. How about changing it to something more direct like:

const handler = pool.exec('getEmittedMessage', [])
  .then(function (result) {
    assert.strictEqual(result, 'hello world')
  })
    
handler.emit('myCustomEvent', 'hello world')

What do you think?

});
18 changes: 18 additions & 0 deletions test/workers/waitForMessage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// a simple worker that just sits there and receives messages

// load workerpool
var workerpool = require('../../');

function fibonacci(n) {
if (n < 2) return n;
return fibonacci(n - 2) + fibonacci(n - 1);
}

// create a worker and register public functions
workerpool.worker({
fibonacci: fibonacci,
}, {
killme: function (payload) {
process.exit(payload);
}
});