diff --git a/src/engine/engine.js b/src/engine/engine.js index 18e778b..fece1c8 100644 --- a/src/engine/engine.js +++ b/src/engine/engine.js @@ -18,7 +18,7 @@ const { ProcessStatus } = require("./../core/workflow/process_state"); const { validateTimeInterval } = require("../core/utils/ajvValidator"); const { validate: uuidValidate } = require("uuid"); const { readEnvironmentVariableAsBool, readEnvironmentVariableAsNumber } = require("../core/utils/environment"); -const { engineHeartBeat } = require("./heartbeat/base"); +const { engineHeartBeat, getBeatInstances } = require("./heartbeat/base"); function getActivityManagerFromData(activity_manager_data) { const activity_manager = ActivityManager.deserialize(activity_manager_data); @@ -75,6 +75,8 @@ class Engine { Engine.instance = this; this.emitter = emitter; if (heartBeat) { + Engine.instance.beat_instances = getBeatInstances(); + Engine.instance.current_instance = Engine.instance?.beat_instances?.[0]; Engine.beat = engineHeartBeat; if (process.env.TIMER_BATCH && process.env.TIMER_BATCH > 0 && process.env.TIMER_QUEUE) { emitter.emit("ENGINE.CONTRUCTOR", "BOTH BATCH AND QUEUE ACTIVE", { @@ -96,7 +98,7 @@ class Engine { static setNextHeartBeat() { return setTimeout(async () => { try { - await Engine._beat(); + await Engine._beat(Engine.instance); } catch (e) { emitter.emit("ENGINE.HEART.ERROR", `HEART FAILURE @ ENGINE_ID [${ENGINE_ID}]`, { engine_id: ENGINE_ID, diff --git a/src/engine/heartbeat/base.js b/src/engine/heartbeat/base.js index 9099cd8..e62b2d4 100644 --- a/src/engine/heartbeat/base.js +++ b/src/engine/heartbeat/base.js @@ -1,14 +1,42 @@ +require("dotenv").config(); const emitter = require("../../core/utils/emitter"); const { timerHeartBeat } = require("./timer"); const { processHeartBeat } = require("./process"); +const { readEnvironmentVariableAsNumber } = require("../../core/utils/environment"); -const engineHeartBeat = async () => { +const engineHeartBeat = async (engine) => { emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT @ [${new Date().toISOString()}]`); + const TIMER_BATCH = readEnvironmentVariableAsNumber("TIMER_BATCH", 0); + const PROCESS_BATCH = readEnvironmentVariableAsNumber("PROCESS_BATCH", 0); - await timerHeartBeat(); - await processHeartBeat(); + if (engine?.current_instance === 'TIMER') { + await timerHeartBeat(TIMER_BATCH); + } else if (engine?.current_instance === 'PROCESS') { + await processHeartBeat(PROCESS_BATCH); + } + + if (engine?.beat_instances?.length === 2) { + engine.current_instance = engine?.current_instance === 'PROCESS' ? 'TIMER' : 'PROCESS'; + } +} + +const getBeatInstances = () => { + const TIMER_BATCH = readEnvironmentVariableAsNumber("TIMER_BATCH", 0); + const PROCESS_BATCH = readEnvironmentVariableAsNumber("PROCESS_BATCH", 0); + const beatInstances = []; + + if (TIMER_BATCH > 0) { + beatInstances.push("TIMER"); + } + + if (PROCESS_BATCH > 0) { + beatInstances.push("PROCESS"); + } + + return beatInstances; } module.exports = { engineHeartBeat, + getBeatInstances, } \ No newline at end of file diff --git a/src/engine/heartbeat/process.js b/src/engine/heartbeat/process.js index e3055be..9356870 100644 --- a/src/engine/heartbeat/process.js +++ b/src/engine/heartbeat/process.js @@ -3,8 +3,8 @@ const emitter = require("../../core/utils/emitter"); const { Process } = require("../../core/workflow/process"); const { ProcessState } = require("../../core/workflow/process_state"); -const processHeartBeat = async () => { - const PROCESS_BATCH = process.env.PROCESS_BATCH || 10; +const processHeartBeat = async (PROCESS_BATCH) => { + emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT INSTANCE PROCESS`); const processes = await Process.getPersist()._db.transaction(async (trx) => { try { emitter.emit("ENGINE.PROCESSES_FETCHING", ` FETCHING PROCESSES ON HEARTBEAT BATCH [${PROCESS_BATCH}]`); diff --git a/src/engine/heartbeat/tests/heartbeat.test.js b/src/engine/heartbeat/tests/heartbeat.test.js new file mode 100644 index 0000000..e4d93dd --- /dev/null +++ b/src/engine/heartbeat/tests/heartbeat.test.js @@ -0,0 +1,110 @@ +const settings = require("../../../../settings/tests/settings"); +const { PersistorProvider } = require("../../../core/persist/provider"); +const { Process } = require("../../../core/workflow/process"); +const { processHeartBeat } = require("../process"); +const { timerHeartBeat } = require("../timer"); +const { engineHeartBeat, getBeatInstances } = require("../base"); + +jest.mock("../process", () => { + const actualModule = jest.requireActual("../process"); + return { + ...actualModule, + processHeartBeat: jest.spyOn(actualModule, "processHeartBeat"), + }; +}); + +jest.mock("../timer", () => { + const actualModule = jest.requireActual("../timer"); + return { + ...actualModule, + timerHeartBeat: jest.spyOn(actualModule, "timerHeartBeat"), + }; +}); + +beforeEach(async () => { + await _clean(); +}); + +afterAll(async () => { + await _clean(); + if (settings.persist_options[0] === "knex") { + await Process.getPersist()._db.destroy(); + } +}); + +test("engineHeartBeat runs only for TIMER when current_instance=TIMER", async () => { + const minimalEngine = { + current_instance: 'TIMER', + } + await engineHeartBeat(minimalEngine); + + expect(timerHeartBeat).toHaveBeenCalled(); + expect(processHeartBeat).not.toHaveBeenCalled(); +}); + +test("engineHeartBeat runs only for PROCESS when current_instance=PROCESS", async () => { + const minimalEngine = { + current_instance: 'PROCESS', + } + await engineHeartBeat(minimalEngine); + + expect(processHeartBeat).toHaveBeenCalled(); + expect(timerHeartBeat).not.toHaveBeenCalled(); +}); + +test("engineHeartBeat runs for both TIMER and PROCESS when beat_instances length is 2", async () => { + const minimalEngine = { + current_instance: 'TIMER', + beat_instances: ['TIMER' , 'PROCESS'], + } + + await engineHeartBeat(minimalEngine); + await engineHeartBeat(minimalEngine); + + expect(processHeartBeat).toHaveBeenCalled(); + expect(timerHeartBeat).toHaveBeenCalled(); +}); + +test("getBeatInstances runs for TIMER_BATCH = 10", async () => { + process.env.TIMER_BATCH = 10; + + const beatInstances = getBeatInstances(); + + expect(beatInstances).toHaveLength(1); + process.env.TIMER_BATCH = 0; +}); + +test("getBeatInstances runs for PROCESS_BATCH = 10", async () => { + process.env.PROCESS_BATCH = 10; + + const beatInstances = getBeatInstances(); + + expect(beatInstances).toHaveLength(1); + process.env.PROCESS_BATCH = 0; +}); + +test("getBeatInstances runs for TIMER_BATCH = 10 and PROCESS_BATCH = 10", async () => { + process.env.TIMER_BATCH = 10; + process.env.PROCESS_BATCH = 10; + + const beatInstances = getBeatInstances(); + + expect(beatInstances).toHaveLength(2); +}); + +const _clean = async () => { + const persistor = PersistorProvider.getPersistor(...settings.persist_options); + const activity_persist = persistor.getPersistInstance("Activity"); + const activity_manager_persist = persistor.getPersistInstance("ActivityManager"); + const process_state_persist = persistor.getPersistInstance("ProcessState"); + const process_persist = persistor.getPersistInstance("Process"); + const workflow_persist = persistor.getPersistInstance("Workflow"); + const timer_persist = persistor.getPersistInstance("Timer"); + + await activity_persist.deleteAll(); + await activity_manager_persist.deleteAll(); + await process_state_persist.deleteAll(); + await process_persist.deleteAll(); + await workflow_persist.deleteAll(); + await timer_persist.deleteAll(); +}; \ No newline at end of file diff --git a/src/engine/heartbeat/timer.js b/src/engine/heartbeat/timer.js index 69f9a31..7f60c51 100644 --- a/src/engine/heartbeat/timer.js +++ b/src/engine/heartbeat/timer.js @@ -1,8 +1,8 @@ const emitter = require("../../core/utils/emitter"); const { Timer } = require("../../core/workflow/timer"); -const timerHeartBeat = async () => { - const TIMER_BATCH = process.env.TIMER_BATCH || 40; +const timerHeartBeat = async (TIMER_BATCH) => { + emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT INSTANCE TIMER`); const max_connection_pool = Timer.getPersist()._db.context.client.pool.max const connections = Timer.getPersist()._db.context.client.pool.free.length