Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/engine/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const { ProcessStatus } = require("./../core/workflow/process_state");
const { validateTimeInterval } = require("../core/utils/ajvValidator");
const { validate: uuidValidate } = require("uuid");
const { readEnvironmentVariableAsBool, readEnvironmentVariableAsNumber } = require("../core/utils/environment");
const { engineHeartBeat } = require("./heartbeat/base");
const { engineHeartBeat, getBeatInstances } = require("./heartbeat/base");

function getActivityManagerFromData(activity_manager_data) {
const activity_manager = ActivityManager.deserialize(activity_manager_data);
Expand Down Expand Up @@ -75,6 +75,8 @@ class Engine {
Engine.instance = this;
this.emitter = emitter;
if (heartBeat) {
Engine.instance.beat_instances = getBeatInstances();
Engine.instance.current_instance = Engine.instance?.beat_instances?.[0];
Engine.beat = engineHeartBeat;
if (process.env.TIMER_BATCH && process.env.TIMER_BATCH > 0 && process.env.TIMER_QUEUE) {
emitter.emit("ENGINE.CONTRUCTOR", "BOTH BATCH AND QUEUE ACTIVE", {
Expand All @@ -96,7 +98,7 @@ class Engine {
static setNextHeartBeat() {
return setTimeout(async () => {
try {
await Engine._beat();
await Engine._beat(Engine.instance);
} catch (e) {
emitter.emit("ENGINE.HEART.ERROR", `HEART FAILURE @ ENGINE_ID [${ENGINE_ID}]`, {
engine_id: ENGINE_ID,
Expand Down
34 changes: 31 additions & 3 deletions src/engine/heartbeat/base.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,42 @@
require("dotenv").config();
const emitter = require("../../core/utils/emitter");
const { timerHeartBeat } = require("./timer");
const { processHeartBeat } = require("./process");
const { readEnvironmentVariableAsNumber } = require("../../core/utils/environment");

const engineHeartBeat = async () => {
const engineHeartBeat = async (engine) => {
emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT @ [${new Date().toISOString()}]`);
const TIMER_BATCH = readEnvironmentVariableAsNumber("TIMER_BATCH", 0);
const PROCESS_BATCH = readEnvironmentVariableAsNumber("PROCESS_BATCH", 0);

await timerHeartBeat();
await processHeartBeat();
if (engine?.current_instance === 'TIMER') {
await timerHeartBeat(TIMER_BATCH);
} else if (engine?.current_instance === 'PROCESS') {
await processHeartBeat(PROCESS_BATCH);
}

if (engine?.beat_instances?.length === 2) {
engine.current_instance = engine?.current_instance === 'PROCESS' ? 'TIMER' : 'PROCESS';
}
}

const getBeatInstances = () => {
const TIMER_BATCH = readEnvironmentVariableAsNumber("TIMER_BATCH", 0);
const PROCESS_BATCH = readEnvironmentVariableAsNumber("PROCESS_BATCH", 0);
const beatInstances = [];

if (TIMER_BATCH > 0) {
beatInstances.push("TIMER");
}

if (PROCESS_BATCH > 0) {
beatInstances.push("PROCESS");
}

return beatInstances;
}

module.exports = {
engineHeartBeat,
getBeatInstances,
}
4 changes: 2 additions & 2 deletions src/engine/heartbeat/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ const emitter = require("../../core/utils/emitter");
const { Process } = require("../../core/workflow/process");
const { ProcessState } = require("../../core/workflow/process_state");

const processHeartBeat = async () => {
const PROCESS_BATCH = process.env.PROCESS_BATCH || 10;
const processHeartBeat = async (PROCESS_BATCH) => {
emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT INSTANCE PROCESS`);
const processes = await Process.getPersist()._db.transaction(async (trx) => {
try {
emitter.emit("ENGINE.PROCESSES_FETCHING", ` FETCHING PROCESSES ON HEARTBEAT BATCH [${PROCESS_BATCH}]`);
Expand Down
110 changes: 110 additions & 0 deletions src/engine/heartbeat/tests/heartbeat.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
const settings = require("../../../../settings/tests/settings");
const { PersistorProvider } = require("../../../core/persist/provider");
const { Process } = require("../../../core/workflow/process");
const { processHeartBeat } = require("../process");
const { timerHeartBeat } = require("../timer");
const { engineHeartBeat, getBeatInstances } = require("../base");

jest.mock("../process", () => {
const actualModule = jest.requireActual("../process");
return {
...actualModule,
processHeartBeat: jest.spyOn(actualModule, "processHeartBeat"),
};
});

jest.mock("../timer", () => {
const actualModule = jest.requireActual("../timer");
return {
...actualModule,
timerHeartBeat: jest.spyOn(actualModule, "timerHeartBeat"),
};
});

beforeEach(async () => {
await _clean();
});

afterAll(async () => {
await _clean();
if (settings.persist_options[0] === "knex") {
await Process.getPersist()._db.destroy();
}
});

test("engineHeartBeat runs only for TIMER when current_instance=TIMER", async () => {
const minimalEngine = {
current_instance: 'TIMER',
}
await engineHeartBeat(minimalEngine);

expect(timerHeartBeat).toHaveBeenCalled();
expect(processHeartBeat).not.toHaveBeenCalled();
});

test("engineHeartBeat runs only for PROCESS when current_instance=PROCESS", async () => {
const minimalEngine = {
current_instance: 'PROCESS',
}
await engineHeartBeat(minimalEngine);

expect(processHeartBeat).toHaveBeenCalled();
expect(timerHeartBeat).not.toHaveBeenCalled();
});

test("engineHeartBeat runs for both TIMER and PROCESS when beat_instances length is 2", async () => {
const minimalEngine = {
current_instance: 'TIMER',
beat_instances: ['TIMER' , 'PROCESS'],
}

await engineHeartBeat(minimalEngine);
await engineHeartBeat(minimalEngine);

expect(processHeartBeat).toHaveBeenCalled();
expect(timerHeartBeat).toHaveBeenCalled();
});

test("getBeatInstances runs for TIMER_BATCH = 10", async () => {
process.env.TIMER_BATCH = 10;

const beatInstances = getBeatInstances();

expect(beatInstances).toHaveLength(1);
process.env.TIMER_BATCH = 0;
});

test("getBeatInstances runs for PROCESS_BATCH = 10", async () => {
process.env.PROCESS_BATCH = 10;

const beatInstances = getBeatInstances();

expect(beatInstances).toHaveLength(1);
process.env.PROCESS_BATCH = 0;
});

test("getBeatInstances runs for TIMER_BATCH = 10 and PROCESS_BATCH = 10", async () => {
process.env.TIMER_BATCH = 10;
process.env.PROCESS_BATCH = 10;

const beatInstances = getBeatInstances();

expect(beatInstances).toHaveLength(2);
});

const _clean = async () => {
const persistor = PersistorProvider.getPersistor(...settings.persist_options);
const activity_persist = persistor.getPersistInstance("Activity");
const activity_manager_persist = persistor.getPersistInstance("ActivityManager");
const process_state_persist = persistor.getPersistInstance("ProcessState");
const process_persist = persistor.getPersistInstance("Process");
const workflow_persist = persistor.getPersistInstance("Workflow");
const timer_persist = persistor.getPersistInstance("Timer");

await activity_persist.deleteAll();
await activity_manager_persist.deleteAll();
await process_state_persist.deleteAll();
await process_persist.deleteAll();
await workflow_persist.deleteAll();
await timer_persist.deleteAll();
};
4 changes: 2 additions & 2 deletions src/engine/heartbeat/timer.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const emitter = require("../../core/utils/emitter");
const { Timer } = require("../../core/workflow/timer");

const timerHeartBeat = async () => {
const TIMER_BATCH = process.env.TIMER_BATCH || 40;
const timerHeartBeat = async (TIMER_BATCH) => {
emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT INSTANCE TIMER`);

const max_connection_pool = Timer.getPersist()._db.context.client.pool.max
const connections = Timer.getPersist()._db.context.client.pool.free.length
Expand Down