diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java index 17b92c09..403913d2 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java @@ -65,8 +65,6 @@ public void configure() throws Exception { } exchange.setProperty(COMPLETION_RATE, percentage); - - producerTemplate.send(RouteId.SEND_CALLBACK.getValue(), exchange); }).otherwise().log(LoggingLevel.ERROR, "Batch aggregate request unsuccessful").process(exchange -> { exchange.setProperty(BATCH_STATUS_FAILED, true); exchange.setProperty(ERROR_DESCRIPTION, exchange.getIn().getBody(String.class)); diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java index 9bdd5d3f..4188735b 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java @@ -9,6 +9,7 @@ import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_CODE; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_DESCRIPTION; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MAX_CALLBACK_RETRY; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MAX_STATUS_RETRY; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASES; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASE_COUNT; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.RETRY; @@ -47,19 +48,21 @@ public void setup() { sendToCamelRoute(RouteId.BATCH_AGGREGATE, exchange); Boolean batchStatusFailed = exchange.getProperty(BATCH_STATUS_FAILED, Boolean.class); - if (batchStatusFailed == null || !batchStatusFailed) { - if (exchange.getException() != null && exchange.getException().getMessage() != null - && exchange.getException().getMessage().contains("404")) { - logger.error("An error occurred, retrying"); - successRate = 0; - } else { - successRate = exchange.getProperty(COMPLETION_RATE, Long.class).intValue(); - } - } else { + // if (batchStatusFailed == null || !batchStatusFailed) { + if (exchange.getException() != null && exchange.getException().getMessage() != null + && exchange.getException().getMessage().contains("404")) { + logger.error("An error occurred, retrying"); + successRate = 0; variables.put(ERROR_CODE, exchange.getProperty(ERROR_CODE)); variables.put(ERROR_DESCRIPTION, exchange.getProperty(ERROR_DESCRIPTION)); - logger.info("Error: {}, {}", variables.get(ERROR_CODE), variables.get(ERROR_DESCRIPTION)); + logger.info("Retry: {} , Error cause: {}, message: {}",retry, exchange.getException().getCause(), exchange.getException().getMessage()); + } else { + logger.info("BATCH SUCCESS retry: {} , and maxRetry: {}",retry,variables.get(MAX_STATUS_RETRY)); + successRate = exchange.getProperty(COMPLETION_RATE, Long.class).intValue(); } + // } else { + + // } variables.put(COMPLETION_RATE, successRate); variables.put(RETRY, ++retry); diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java index a34df04b..723e64b2 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java @@ -1,16 +1,20 @@ package org.mifos.processor.bulk.zeebe.worker; import static org.mifos.processor.bulk.camel.config.CamelProperties.CALLBACK_RESPONSE_CODE; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BATCH_ID; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK_RETRY; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK_SUCCESS; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CLIENT_CORRELATION_ID; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.COMPLETION_RATE; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.COMPLETION_THRESHOLD; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_CODE; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_DESCRIPTION; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MAX_CALLBACK_RETRY; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MAX_STATUS_RETRY; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASES; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASE_COUNT; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.RETRY; import java.util.Map; import org.apache.camel.Exchange; @@ -41,7 +45,15 @@ public void setup() { exchange.setProperty(COMPLETION_RATE, variables.get(COMPLETION_RATE)); exchange.setProperty(PHASES, variables.get(PHASES)); exchange.setProperty(PHASE_COUNT, variables.get(PHASE_COUNT)); - sendToCamelRoute(RouteId.SEND_CALLBACK, exchange); + exchange.setProperty(BATCH_ID, variables.get(BATCH_ID)); + exchange.setProperty(CLIENT_CORRELATION_ID, variables.get(CLIENT_CORRELATION_ID)); + Integer maxRetry = Integer.parseInt(variables.get(MAX_STATUS_RETRY).toString()); + Integer completionRate = Integer.parseInt(variables.get(COMPLETION_RATE).toString()); + Integer completionThreshold = Integer.parseInt(variables.get(COMPLETION_THRESHOLD).toString()); + Integer statusRetry = Integer.parseInt(variables.get(RETRY).toString()); + if (statusRetry >= maxRetry || completionRate >= completionThreshold) { + sendToCamelRoute(RouteId.SEND_CALLBACK, exchange); + } } Boolean callbackSuccess = exchange.getProperty(CALLBACK_SUCCESS, Boolean.class); if (callbackSuccess == null || !callbackSuccess) { diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index b2c65cb6..c4012d69 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -111,7 +111,7 @@ config: completion-threshold-check: enable: false completion-threshold: 95 # in percentage - max-retry: 15 #can be as high as 30 + max-retry: 3 #can be as high as 30 delay: 2 # in seconds deduplication: enabled: true