From 5dd77e85e793bc62d1cc759d41ee2f5aaad6cff6 Mon Sep 17 00:00:00 2001 From: Pedro Pereira Assis Date: Tue, 26 Sep 2023 20:01:43 -0300 Subject: [PATCH 1/3] feat: separate and alternate heartbeat instances --- src/engine/engine.js | 4 +++- src/engine/heartbeat/base.js | 16 +++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/engine/engine.js b/src/engine/engine.js index 18e778b..50b8f23 100644 --- a/src/engine/engine.js +++ b/src/engine/engine.js @@ -75,6 +75,8 @@ class Engine { Engine.instance = this; this.emitter = emitter; if (heartBeat) { + Engine.instance.beat_instances = process.env.HEARTBEAT_INSTANCES || 'TIMER,PROCESS'; + Engine.instance.beat_instance = Engine.instance.beat_instances?.split(',')?.[0]; Engine.beat = engineHeartBeat; if (process.env.TIMER_BATCH && process.env.TIMER_BATCH > 0 && process.env.TIMER_QUEUE) { emitter.emit("ENGINE.CONTRUCTOR", "BOTH BATCH AND QUEUE ACTIVE", { @@ -96,7 +98,7 @@ class Engine { static setNextHeartBeat() { return setTimeout(async () => { try { - await Engine._beat(); + await Engine._beat(Engine.instance); } catch (e) { emitter.emit("ENGINE.HEART.ERROR", `HEART FAILURE @ ENGINE_ID [${ENGINE_ID}]`, { engine_id: ENGINE_ID, diff --git a/src/engine/heartbeat/base.js b/src/engine/heartbeat/base.js index 9099cd8..6505072 100644 --- a/src/engine/heartbeat/base.js +++ b/src/engine/heartbeat/base.js @@ -1,12 +1,22 @@ +require('dotenv').config(); const emitter = require("../../core/utils/emitter"); const { timerHeartBeat } = require("./timer"); const { processHeartBeat } = require("./process"); -const engineHeartBeat = async () => { + +const engineHeartBeat = async (engine) => { emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT @ [${new Date().toISOString()}]`); + emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT INSTANCE [${engine?.beat_instance}]`); + + if (engine?.beat_instance === 'TIMER') { + await timerHeartBeat(); + } else if (engine?.beat_instance === 'PROCESS') { + await processHeartBeat(); + } - await timerHeartBeat(); - await processHeartBeat(); + if (engine?.beat_instances?.includes('TIMER') && engine?.beat_instances?.includes('PROCESS')) { + engine.beat_instance = engine?.beat_instance === 'TIMER' ? 'PROCESS' : 'TIMER'; + } } module.exports = { From 12489c959a25622aa5437c13f126eb2c3a0bbde8 Mon Sep 17 00:00:00 2001 From: Pedro Pereira Assis Date: Wed, 27 Sep 2023 19:32:57 -0300 Subject: [PATCH 2/3] test: add unitary tests for engineHeartBeat method --- src/engine/heartbeat/base.js | 2 +- src/engine/heartbeat/tests/heartbeat.test.js | 87 ++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 src/engine/heartbeat/tests/heartbeat.test.js diff --git a/src/engine/heartbeat/base.js b/src/engine/heartbeat/base.js index 6505072..8133baa 100644 --- a/src/engine/heartbeat/base.js +++ b/src/engine/heartbeat/base.js @@ -1,4 +1,4 @@ -require('dotenv').config(); +require("dotenv").config(); const emitter = require("../../core/utils/emitter"); const { timerHeartBeat } = require("./timer"); const { processHeartBeat } = require("./process"); diff --git a/src/engine/heartbeat/tests/heartbeat.test.js b/src/engine/heartbeat/tests/heartbeat.test.js new file mode 100644 index 0000000..493f241 --- /dev/null +++ b/src/engine/heartbeat/tests/heartbeat.test.js @@ -0,0 +1,87 @@ +const settings = require("../../../../settings/tests/settings"); +const { Engine } = require("../../engine"); +const { PersistorProvider } = require("../../../core/persist/provider"); +const { Process } = require("../../../core/workflow/process"); +const { processHeartBeat } = require("../process"); +const { timerHeartBeat } = require("../timer"); +const { engineHeartBeat } = require("../base"); + +jest.mock("../process", () => { + const actualModule = jest.requireActual("../process"); + return { + ...actualModule, + processHeartBeat: jest.spyOn(actualModule, "processHeartBeat"), + }; +}); + +jest.mock("../timer", () => { + const actualModule = jest.requireActual("../timer"); + return { + ...actualModule, + timerHeartBeat: jest.spyOn(actualModule, "timerHeartBeat"), + }; +}); + +beforeEach(async () => { + await _clean(); +}); + +afterAll(async () => { + Engine.kill(); + await _clean(); + if (settings.persist_options[0] === "knex") { + await Process.getPersist()._db.destroy(); + } +}); + +test("engineHeartBeat runs only for TIMER when beat_instance=TIMER", async () => { + const minimalEngine = { + beat_instance: 'TIMER', + beat_instances: 'TIMER', + } + await engineHeartBeat(minimalEngine); + + expect(timerHeartBeat).toHaveBeenCalled(); + expect(processHeartBeat).not.toHaveBeenCalled(); +}); + +test("engineHeartBeat runs only for PROCESS when beat_instance=PROCESS", async () => { + const minimalEngine = { + beat_instance: 'PROCESS', + beat_instances: 'PROCESS', + } + await engineHeartBeat(minimalEngine); + + expect(processHeartBeat).toHaveBeenCalled(); + expect(timerHeartBeat).not.toHaveBeenCalled(); +}); + +test("engineHeartBeat runs for both TIMER and PROCESS when beat_instance=TIMER,PROCESS", async () => { + const minimalEngine = { + beat_instance: 'TIMER', + beat_instances: 'TIMER,PROCESS', + } + + await engineHeartBeat(minimalEngine); + await engineHeartBeat(minimalEngine); + + expect(processHeartBeat).toHaveBeenCalled(); + expect(timerHeartBeat).toHaveBeenCalled(); +}); + +const _clean = async () => { + const persistor = PersistorProvider.getPersistor(...settings.persist_options); + const activity_persist = persistor.getPersistInstance("Activity"); + const activity_manager_persist = persistor.getPersistInstance("ActivityManager"); + const process_state_persist = persistor.getPersistInstance("ProcessState"); + const process_persist = persistor.getPersistInstance("Process"); + const workflow_persist = persistor.getPersistInstance("Workflow"); + const timer_persist = persistor.getPersistInstance("Timer"); + + await activity_persist.deleteAll(); + await activity_manager_persist.deleteAll(); + await process_state_persist.deleteAll(); + await process_persist.deleteAll(); + await workflow_persist.deleteAll(); + await timer_persist.deleteAll(); +}; \ No newline at end of file From 6ef8d0b79b8406ea022297b02e1fd9e72fd7325d Mon Sep 17 00:00:00 2001 From: Pedro Pereira Assis Date: Mon, 2 Oct 2023 19:04:20 -0300 Subject: [PATCH 3/3] refactor: alter engineHeartBeat logics --- src/engine/engine.js | 6 +-- src/engine/heartbeat/base.js | 34 ++++++++++---- src/engine/heartbeat/process.js | 4 +- src/engine/heartbeat/tests/heartbeat.test.js | 47 +++++++++++++++----- src/engine/heartbeat/timer.js | 4 +- 5 files changed, 68 insertions(+), 27 deletions(-) diff --git a/src/engine/engine.js b/src/engine/engine.js index 50b8f23..fece1c8 100644 --- a/src/engine/engine.js +++ b/src/engine/engine.js @@ -18,7 +18,7 @@ const { ProcessStatus } = require("./../core/workflow/process_state"); const { validateTimeInterval } = require("../core/utils/ajvValidator"); const { validate: uuidValidate } = require("uuid"); const { readEnvironmentVariableAsBool, readEnvironmentVariableAsNumber } = require("../core/utils/environment"); -const { engineHeartBeat } = require("./heartbeat/base"); +const { engineHeartBeat, getBeatInstances } = require("./heartbeat/base"); function getActivityManagerFromData(activity_manager_data) { const activity_manager = ActivityManager.deserialize(activity_manager_data); @@ -75,8 +75,8 @@ class Engine { Engine.instance = this; this.emitter = emitter; if (heartBeat) { - Engine.instance.beat_instances = process.env.HEARTBEAT_INSTANCES || 'TIMER,PROCESS'; - Engine.instance.beat_instance = Engine.instance.beat_instances?.split(',')?.[0]; + Engine.instance.beat_instances = getBeatInstances(); + Engine.instance.current_instance = Engine.instance?.beat_instances?.[0]; Engine.beat = engineHeartBeat; if (process.env.TIMER_BATCH && process.env.TIMER_BATCH > 0 && process.env.TIMER_QUEUE) { emitter.emit("ENGINE.CONTRUCTOR", "BOTH BATCH AND QUEUE ACTIVE", { diff --git a/src/engine/heartbeat/base.js b/src/engine/heartbeat/base.js index 8133baa..e62b2d4 100644 --- a/src/engine/heartbeat/base.js +++ b/src/engine/heartbeat/base.js @@ -2,23 +2,41 @@ require("dotenv").config(); const emitter = require("../../core/utils/emitter"); const { timerHeartBeat } = require("./timer"); const { processHeartBeat } = require("./process"); - +const { readEnvironmentVariableAsNumber } = require("../../core/utils/environment"); const engineHeartBeat = async (engine) => { emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT @ [${new Date().toISOString()}]`); - emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT INSTANCE [${engine?.beat_instance}]`); + const TIMER_BATCH = readEnvironmentVariableAsNumber("TIMER_BATCH", 0); + const PROCESS_BATCH = readEnvironmentVariableAsNumber("PROCESS_BATCH", 0); + + if (engine?.current_instance === 'TIMER') { + await timerHeartBeat(TIMER_BATCH); + } else if (engine?.current_instance === 'PROCESS') { + await processHeartBeat(PROCESS_BATCH); + } + + if (engine?.beat_instances?.length === 2) { + engine.current_instance = engine?.current_instance === 'PROCESS' ? 'TIMER' : 'PROCESS'; + } +} - if (engine?.beat_instance === 'TIMER') { - await timerHeartBeat(); - } else if (engine?.beat_instance === 'PROCESS') { - await processHeartBeat(); +const getBeatInstances = () => { + const TIMER_BATCH = readEnvironmentVariableAsNumber("TIMER_BATCH", 0); + const PROCESS_BATCH = readEnvironmentVariableAsNumber("PROCESS_BATCH", 0); + const beatInstances = []; + + if (TIMER_BATCH > 0) { + beatInstances.push("TIMER"); } - if (engine?.beat_instances?.includes('TIMER') && engine?.beat_instances?.includes('PROCESS')) { - engine.beat_instance = engine?.beat_instance === 'TIMER' ? 'PROCESS' : 'TIMER'; + if (PROCESS_BATCH > 0) { + beatInstances.push("PROCESS"); } + + return beatInstances; } module.exports = { engineHeartBeat, + getBeatInstances, } \ No newline at end of file diff --git a/src/engine/heartbeat/process.js b/src/engine/heartbeat/process.js index e3055be..9356870 100644 --- a/src/engine/heartbeat/process.js +++ b/src/engine/heartbeat/process.js @@ -3,8 +3,8 @@ const emitter = require("../../core/utils/emitter"); const { Process } = require("../../core/workflow/process"); const { ProcessState } = require("../../core/workflow/process_state"); -const processHeartBeat = async () => { - const PROCESS_BATCH = process.env.PROCESS_BATCH || 10; +const processHeartBeat = async (PROCESS_BATCH) => { + emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT INSTANCE PROCESS`); const processes = await Process.getPersist()._db.transaction(async (trx) => { try { emitter.emit("ENGINE.PROCESSES_FETCHING", ` FETCHING PROCESSES ON HEARTBEAT BATCH [${PROCESS_BATCH}]`); diff --git a/src/engine/heartbeat/tests/heartbeat.test.js b/src/engine/heartbeat/tests/heartbeat.test.js index 493f241..e4d93dd 100644 --- a/src/engine/heartbeat/tests/heartbeat.test.js +++ b/src/engine/heartbeat/tests/heartbeat.test.js @@ -1,10 +1,9 @@ const settings = require("../../../../settings/tests/settings"); -const { Engine } = require("../../engine"); const { PersistorProvider } = require("../../../core/persist/provider"); const { Process } = require("../../../core/workflow/process"); const { processHeartBeat } = require("../process"); const { timerHeartBeat } = require("../timer"); -const { engineHeartBeat } = require("../base"); +const { engineHeartBeat, getBeatInstances } = require("../base"); jest.mock("../process", () => { const actualModule = jest.requireActual("../process"); @@ -27,17 +26,15 @@ beforeEach(async () => { }); afterAll(async () => { - Engine.kill(); await _clean(); if (settings.persist_options[0] === "knex") { await Process.getPersist()._db.destroy(); } }); -test("engineHeartBeat runs only for TIMER when beat_instance=TIMER", async () => { +test("engineHeartBeat runs only for TIMER when current_instance=TIMER", async () => { const minimalEngine = { - beat_instance: 'TIMER', - beat_instances: 'TIMER', + current_instance: 'TIMER', } await engineHeartBeat(minimalEngine); @@ -45,10 +42,9 @@ test("engineHeartBeat runs only for TIMER when beat_instance=TIMER", async () => expect(processHeartBeat).not.toHaveBeenCalled(); }); -test("engineHeartBeat runs only for PROCESS when beat_instance=PROCESS", async () => { +test("engineHeartBeat runs only for PROCESS when current_instance=PROCESS", async () => { const minimalEngine = { - beat_instance: 'PROCESS', - beat_instances: 'PROCESS', + current_instance: 'PROCESS', } await engineHeartBeat(minimalEngine); @@ -56,10 +52,10 @@ test("engineHeartBeat runs only for PROCESS when beat_instance=PROCESS", async ( expect(timerHeartBeat).not.toHaveBeenCalled(); }); -test("engineHeartBeat runs for both TIMER and PROCESS when beat_instance=TIMER,PROCESS", async () => { +test("engineHeartBeat runs for both TIMER and PROCESS when beat_instances length is 2", async () => { const minimalEngine = { - beat_instance: 'TIMER', - beat_instances: 'TIMER,PROCESS', + current_instance: 'TIMER', + beat_instances: ['TIMER' , 'PROCESS'], } await engineHeartBeat(minimalEngine); @@ -69,6 +65,33 @@ test("engineHeartBeat runs for both TIMER and PROCESS when beat_instance=TIMER,P expect(timerHeartBeat).toHaveBeenCalled(); }); +test("getBeatInstances runs for TIMER_BATCH = 10", async () => { + process.env.TIMER_BATCH = 10; + + const beatInstances = getBeatInstances(); + + expect(beatInstances).toHaveLength(1); + process.env.TIMER_BATCH = 0; +}); + +test("getBeatInstances runs for PROCESS_BATCH = 10", async () => { + process.env.PROCESS_BATCH = 10; + + const beatInstances = getBeatInstances(); + + expect(beatInstances).toHaveLength(1); + process.env.PROCESS_BATCH = 0; +}); + +test("getBeatInstances runs for TIMER_BATCH = 10 and PROCESS_BATCH = 10", async () => { + process.env.TIMER_BATCH = 10; + process.env.PROCESS_BATCH = 10; + + const beatInstances = getBeatInstances(); + + expect(beatInstances).toHaveLength(2); +}); + const _clean = async () => { const persistor = PersistorProvider.getPersistor(...settings.persist_options); const activity_persist = persistor.getPersistInstance("Activity"); diff --git a/src/engine/heartbeat/timer.js b/src/engine/heartbeat/timer.js index 69f9a31..7f60c51 100644 --- a/src/engine/heartbeat/timer.js +++ b/src/engine/heartbeat/timer.js @@ -1,8 +1,8 @@ const emitter = require("../../core/utils/emitter"); const { Timer } = require("../../core/workflow/timer"); -const timerHeartBeat = async () => { - const TIMER_BATCH = process.env.TIMER_BATCH || 40; +const timerHeartBeat = async (TIMER_BATCH) => { + emitter.emit("ENGINE.HEARTBEAT", `HEARTBEAT INSTANCE TIMER`); const max_connection_pool = Timer.getPersist()._db.context.client.pool.max const connections = Timer.getPersist()._db.context.client.pool.free.length