Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/core/workflow/nodes/timer.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,38 @@ class TimerSystemTaskNode extends SystemTaskNode {
return bag;
}

async run({ bag = {}, input = {}, external_input = {}, actor_data = {}, environment = {}, parameters = {} }, lisp) {
const hrt_run_start = process.hrtime();
try {
const execution_data = this._preProcessing({ bag, input, actor_data, environment, parameters });
const [result, status] = await this._run(execution_data, lisp);

let next_node_id = this.next(result);
if (status === ProcessStatus.PENDING) {
next_node_id = this.id;
}

const hrt_run_interval = process.hrtime(hrt_run_start);
const time_elapsed = Math.ceil(hrt_run_interval[0] * 1000 + hrt_run_interval[1] / 1000000);
const node_extract = this._spec?.extract;

return {
node_id: this.id,
bag: this._setBag(bag, result, parameters?._extract, node_extract),
external_input: external_input,
result: result,
error: null,
status: status,
next_node_id: next_node_id,
time_elapsed: time_elapsed,
};
} catch (err) {
const hrt_run_interval = process.hrtime(hrt_run_start);
const time_elapsed = Math.ceil(hrt_run_interval[0] * 1000 + hrt_run_interval[1] / 1000000);
return this._processError(err, { bag, external_input, time_elapsed });
}
}

// eslint-disable-next-line no-unused-vars
async _run(execution_data = {}) {
if (!execution_data.timeout) {
Expand Down
20 changes: 19 additions & 1 deletion src/core/workflow/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,25 @@ class Process extends PersistedEntity {
case ProcessStatus.EXPIRED:
break;
case ProcessStatus.PENDING:
await this.runPendingProcess(timer.params.actor_data, trx);
const step_number = await this.getNextStepNumber();
this.state.result.step_number = step_number;
const current_node = this._blueprint.fetchNode(this._state.node_id);
const next_node_id = current_node.next(this.state.result);
this.state = new ProcessState(
this.id,
step_number,
this._state.node_id,
this._state.bag,
null,
this.state.result,
null,
ProcessStatus.RUNNING,
next_node_id,
this._state.actor_data,
null
);
await this.save(trx);
await this._notifyProcessState({});
break;
case ProcessStatus.RUNNING:
//TODO: Avaliar como expirar um processo running.
Expand Down
12 changes: 6 additions & 6 deletions src/engine/tests/integration/engine_processes.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,9 @@ test("run process with http retries", async () => {

workflow_process = await engine.runProcess(workflow_process.id, actors_.simpleton);

await sleep(5000);
await sleep(7000);

expect(process_state_history).toHaveLength(6);
expect(process_state_history).toHaveLength(8);

let http_script = process_state_history[2];
expect(http_script.node_id).toEqual("2");
Expand All @@ -420,23 +420,23 @@ test("run process with http retries", async () => {
timeout: 1,
});

http_script = process_state_history[3];
http_script = process_state_history[4];
expect(http_script.node_id).toEqual("2");
expect(http_script.result).toEqual({
attempt: 2,
data: "",
status: 503,
step_number: 4,
step_number: 5,
timeout: 1,
});

http_script = process_state_history[4];
http_script = process_state_history[6];
expect(http_script.node_id).toEqual("2");
expect(http_script.result).toEqual({
attempt: 3,
data: "",
status: 503,
step_number: 5,
step_number: 7,
timeout: 1,
});
} finally {
Expand Down