diff --git a/build.gradle b/build.gradle index 29d95e1b..8895174e 100644 --- a/build.gradle +++ b/build.gradle @@ -80,7 +80,7 @@ dependencies { // miscellaneous dependency implementation 'com.google.code.gson:gson:2.8.9' implementation 'org.json:json:20210307' - implementation 'org.mifos:ph-ee-connector-common:1.8.1-SNAPSHOT' + implementation 'org.mifos:ph-ee-connector-common:1.9.1-SNAPSHOT' implementation 'org.apache.camel.springboot:camel-spring-boot-starter:3.4.0' implementation 'org.apache.camel:camel-undertow:3.4.0' implementation 'org.springframework.boot:spring-boot-starter:2.5.2' diff --git a/src/main/java/org/mifos/processor/bulk/BatchTransactionValidator.java b/src/main/java/org/mifos/processor/bulk/BatchTransactionValidator.java new file mode 100644 index 00000000..811b9d35 --- /dev/null +++ b/src/main/java/org/mifos/processor/bulk/BatchTransactionValidator.java @@ -0,0 +1,97 @@ +package org.mifos.processor.bulk; + + +import org.mifos.connector.common.channel.dto.PhErrorDTO; +import org.mifos.connector.common.exception.PaymentHubErrorCategory; +import org.mifos.connector.common.validation.ValidatorBuilder; +import org.mifos.processor.bulk.BatchTransactionValidatorsEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import static org.mifos.connector.common.exception.PaymentHubError.ExtValidationError; + +@Component +public class BatchTransactionValidator { + + private static final String RESOURCE = "batchTransactionValidator"; + private static final String REQUEST_ID = "requestId"; + private static final int EXPECTED_REQUEST_ID_LENGTH = 15; + private static final String FILE_NAME = "fileName"; + private static final String PURPOSE = "purpose"; + private static final String TYPE = "type"; + private static final String TENANT = "tenant"; + private static final String REGISTERING_INSTITUTION_ID = "registeringInstitutionId"; + private static final String PROGRAM_ID = "programId"; + private static final String CALLBACK_URL = "callbackUrl"; + + public Logger logger = LoggerFactory.getLogger(this.getClass()); + + public PhErrorDTO validateBatchTransactions(String requestId, String fileName, String purpose, String type, + String tenant, String registeringInstitutionId, String programId, String callbackUrl) { + final ValidatorBuilder validatorBuilder = new ValidatorBuilder(); + logger.info("Inside validation"); + + // Check for requestId + validatorBuilder.reset().resource(RESOURCE).parameter(REQUEST_ID).value(requestId) + .isNullWithFailureCode(BatchTransactionValidatorsEnum.INVALID_REQUEST_ID) + .validateFieldMaxLengthWithFailureCodeAndErrorParams(EXPECTED_REQUEST_ID_LENGTH, + BatchTransactionValidatorsEnum.INVALID_REQUEST_ID_LENGTH); + + // Check for fileName (optional) + if (fileName != null) { + validatorBuilder.reset().resource(RESOURCE).parameter(FILE_NAME).value(fileName) + .validateFieldMaxLengthWithFailureCodeAndErrorParams(EXPECTED_REQUEST_ID_LENGTH, + BatchTransactionValidatorsEnum.INVALID_FILE_NAME_LENGTH); + } + + // Check for purpose + validatorBuilder.reset().resource(RESOURCE).parameter(PURPOSE).value(purpose) + .isNullWithFailureCode(BatchTransactionValidatorsEnum.INVALID_PURPOSE); + + // Check for type + validatorBuilder.reset().resource(RESOURCE).parameter(TYPE).value(type) + .isNullOrEmpty(); + + // Check for tenant + validatorBuilder.reset().resource(RESOURCE).parameter(TENANT).value(tenant) + .isNullWithFailureCode(BatchTransactionValidatorsEnum.INVALID_TENANT); + + // Check for registeringInstitutionId (optional) + if (registeringInstitutionId != null) { + validatorBuilder.reset().resource(RESOURCE).parameter(REGISTERING_INSTITUTION_ID).value(registeringInstitutionId) + .validateFieldMaxLengthWithFailureCodeAndErrorParams(EXPECTED_REQUEST_ID_LENGTH, + BatchTransactionValidatorsEnum.INVALID_REGISTERING_INSTITUTION_ID_LENGTH); + } + + // Check for programId (optional) + if (programId != null) { + validatorBuilder.reset().resource(RESOURCE).parameter(PROGRAM_ID).value(programId) + .validateFieldMaxLengthWithFailureCodeAndErrorParams(EXPECTED_REQUEST_ID_LENGTH, + BatchTransactionValidatorsEnum.INVALID_PROGRAM_ID_LENGTH); + } + + // Check for callbackUrl (optional) + if (callbackUrl != null) { + validatorBuilder.reset().resource(RESOURCE).parameter(CALLBACK_URL).value(callbackUrl) + .isNullWithFailureCode(BatchTransactionValidatorsEnum.INVALID_CALLBACK_URL) + .validateFieldMaxLengthWithFailureCodeAndErrorParams(2048, BatchTransactionValidatorsEnum.INVALID_CALLBACK_URL); + } + + // If errors exist, build and return PhErrorDTO + if (validatorBuilder.hasError()) { + logger.info("Found error"); + validatorBuilder.errorCategory(PaymentHubErrorCategory.Validation.toString()) + .errorCode(BatchTransactionValidatorsEnum.BATCH_TRANSACTION_VALIDATION_ERROR.getCode()) + .errorDescription(BatchTransactionValidatorsEnum.BATCH_TRANSACTION_VALIDATION_ERROR.getMessage()) + .developerMessage(BatchTransactionValidatorsEnum.BATCH_TRANSACTION_VALIDATION_ERROR.getMessage()) + .defaultUserMessage(BatchTransactionValidatorsEnum.BATCH_TRANSACTION_VALIDATION_ERROR.getMessage()); + + PhErrorDTO.PhErrorDTOBuilder phErrorDTOBuilder = new PhErrorDTO.PhErrorDTOBuilder(ExtValidationError.getErrorCode()); + phErrorDTOBuilder.fromValidatorBuilder(validatorBuilder); + return phErrorDTOBuilder.build(); + } + + return null; + } +} diff --git a/src/main/java/org/mifos/processor/bulk/BatchTransactionValidatorsEnum.java b/src/main/java/org/mifos/processor/bulk/BatchTransactionValidatorsEnum.java new file mode 100644 index 00000000..af69e615 --- /dev/null +++ b/src/main/java/org/mifos/processor/bulk/BatchTransactionValidatorsEnum.java @@ -0,0 +1,44 @@ +package org.mifos.processor.bulk; + +import org.mifos.connector.common.exception.PaymentHubErrorCategory; +import org.mifos.connector.common.validation.ValidationCodeType; +import org.springframework.stereotype.Component; + +public enum BatchTransactionValidatorsEnum implements ValidationCodeType { + + BATCH_TRANSACTION_VALIDATION_ERROR("error.msg.batch.transaction.validation.errors", "Batch transaction validation failed"), + INVALID_REQUEST_ID("error.msg.schema.request.id.cannot.be.null.or.empty", "Request ID cannot be null or empty"), + INVALID_REQUEST_ID_LENGTH("error.msg.schema.request.id.length.is.invalid", "Request ID length is invalid"), + INVALID_FILE_NAME_LENGTH("error.msg.schema.file.name.length.is.invalid", "File name length is invalid"), + INVALID_PURPOSE("error.msg.schema.purpose.cannot.be.null.or.empty", "Purpose cannot be null or empty"), + INVALID_TYPE("error.msg.schema.type.cannot.be.null.or.empty", "Type cannot be null or empty"), + INVALID_TENANT("error.msg.schema.tenant.cannot.be.null.or.empty", "Tenant cannot be null or empty"), + INVALID_REGISTERING_INSTITUTION_ID_LENGTH("error.msg.schema.registering.institution.id.length.is.invalid", "Registering Institution ID length is invalid"), + INVALID_PROGRAM_ID_LENGTH("error.msg.schema.program.id.length.is.invalid", "Program ID length is invalid"), + INVALID_CALLBACK_URL("error.msg.schema.callback.url.cannot.be.null.or.empty", "Callback URL cannot be null or empty"), + INVALID_CALLBACK_URL_LENGTH("error.msg.schema.callback.url.length.is.invalid", "Callback URL length is invalid"); + + private final String code; + private final String category; + private final String message; + + BatchTransactionValidatorsEnum(String code, String message) { + this.code = code; + this.category = PaymentHubErrorCategory.Validation.toString(); + this.message = message; + } + + public String getCode() { + return this.code; + } + + public String getCategory() { + return this.category; + } + + public String getMessage() { + return message; + } +} + + diff --git a/src/main/java/org/mifos/processor/bulk/kafka/Consumers.java b/src/main/java/org/mifos/processor/bulk/kafka/Consumers.java index 03769b77..b45588c6 100644 --- a/src/main/java/org/mifos/processor/bulk/kafka/Consumers.java +++ b/src/main/java/org/mifos/processor/bulk/kafka/Consumers.java @@ -1,116 +1,116 @@ -package org.mifos.processor.bulk.kafka; - -import static org.mifos.connector.common.mojaloop.type.InitiatorType.CONSUMER; -import static org.mifos.connector.common.mojaloop.type.Scenario.TRANSFER; -import static org.mifos.connector.common.mojaloop.type.TransactionRole.PAYER; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BATCH_ID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.GSMA_CHANNEL_REQUEST; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.INITIATOR_FSPID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.IS_RTP_REQUEST; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID_TYPE; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_LOOKUP_FSPID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TENANT_ID; -import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TRANSACTION_TYPE; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.HashMap; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; -import org.mifos.connector.common.channel.dto.TransactionChannelRequestDTO; -import org.mifos.connector.common.gsma.dto.GSMATransaction; -import org.mifos.connector.common.gsma.dto.GsmaParty; -import org.mifos.connector.common.mojaloop.dto.MoneyData; -import org.mifos.connector.common.mojaloop.dto.Party; -import org.mifos.connector.common.mojaloop.dto.PartyIdInfo; -import org.mifos.connector.common.mojaloop.dto.TransactionType; -import org.mifos.connector.common.mojaloop.type.IdentifierType; -import org.mifos.processor.bulk.schema.TransactionOlder; -import org.mifos.processor.bulk.zeebe.ZeebeProcessStarter; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -@Slf4j -public class Consumers { - - @Value("${bpmn.flows.international-remittance-payer}") - private String internationalRemittancePayer; - - @Autowired - private ObjectMapper objectMapper; - - @Autowired - private ZeebeProcessStarter zeebeProcessStarter; - - @KafkaListener(topics = "${kafka.topic.gsma.name}", groupId = "group_id") - public void listenTopicGsma(String message) throws JsonProcessingException { - log.debug("Received Message in topic GSMA and group group_id: {}", message); - TransactionOlder transaction = objectMapper.readValue(message, TransactionOlder.class); - String tenantId = "ibank-usa"; - - GSMATransaction gsmaChannelRequest = new GSMATransaction(); - gsmaChannelRequest.setAmount(transaction.getAmount()); - gsmaChannelRequest.setCurrency(transaction.getCurrency()); - gsmaChannelRequest.setRequestingLei("ibank-usa"); - gsmaChannelRequest.setReceivingLei("ibank-india"); - GsmaParty creditParty = new GsmaParty(); - creditParty.setKey("msisdn"); - creditParty.setValue(transaction.getAccountNumber()); - GsmaParty debitParty = new GsmaParty(); - debitParty.setKey("msisdn"); - debitParty.setValue(transaction.getAccountNumber()); - gsmaChannelRequest.setCreditParty(new GsmaParty[] { creditParty }); - gsmaChannelRequest.setDebitParty(new GsmaParty[] { debitParty }); - // gsmaChannelRequest.setInternationalTransferInformation().setReceivingAmount(gsmaChannelRequest.getAmount()); - - TransactionChannelRequestDTO channelRequest = new TransactionChannelRequestDTO(); // Fineract Object - Party payee = new Party(new PartyIdInfo(IdentifierType.MSISDN, transaction.getAccountNumber())); - Party payer = new Party(new PartyIdInfo(IdentifierType.MSISDN, "7543010")); - - MoneyData moneyData = new MoneyData(); - moneyData.setAmount(transaction.getAmount()); - moneyData.setCurrency(transaction.getCurrency()); - - channelRequest.setPayer(payer); - channelRequest.setPayee(payee); - channelRequest.setAmount(moneyData); - - TransactionType transactionType = new TransactionType(); - transactionType.setInitiator(PAYER); - transactionType.setInitiatorType(CONSUMER); - transactionType.setScenario(TRANSFER); - - Map extraVariables = new HashMap<>(); - extraVariables.put(IS_RTP_REQUEST, false); - extraVariables.put(TRANSACTION_TYPE, "inttransfer"); - extraVariables.put(TENANT_ID, tenantId); - - extraVariables.put(BATCH_ID, transaction.getBatchId()); - - String tenantSpecificBpmn = internationalRemittancePayer.replace("{dfspid}", tenantId); - channelRequest.setTransactionType(transactionType); - - PartyIdInfo requestedParty = (boolean) extraVariables.get(IS_RTP_REQUEST) ? channelRequest.getPayer().getPartyIdInfo() - : channelRequest.getPayee().getPartyIdInfo(); - extraVariables.put(PARTY_ID_TYPE, requestedParty.getPartyIdType()); - extraVariables.put(PARTY_ID, requestedParty.getPartyIdentifier()); - - extraVariables.put(GSMA_CHANNEL_REQUEST, objectMapper.writeValueAsString(gsmaChannelRequest)); - extraVariables.put(PARTY_LOOKUP_FSPID, gsmaChannelRequest.getReceivingLei()); - extraVariables.put(INITIATOR_FSPID, gsmaChannelRequest.getRequestingLei()); - - String transactionId = zeebeProcessStarter.startZeebeWorkflow(tenantSpecificBpmn, objectMapper.writeValueAsString(channelRequest), - extraVariables); - - log.debug("GSMA Transaction Started with:{} ", transactionId); - } - - @KafkaListener(topics = "${kafka.topic.slcb.name}", groupId = "group_id") - public void listenTopicSlcb(String message) { - log.debug("Received Message in topic SLCB and group group_id:{} ", message); - } -} +//package org.mifos.processor.bulk.kafka; +// +//import static org.mifos.connector.common.mojaloop.type.InitiatorType.CONSUMER; +//import static org.mifos.connector.common.mojaloop.type.Scenario.TRANSFER; +//import static org.mifos.connector.common.mojaloop.type.TransactionRole.PAYER; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BATCH_ID; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.GSMA_CHANNEL_REQUEST; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.INITIATOR_FSPID; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.IS_RTP_REQUEST; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID_TYPE; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_LOOKUP_FSPID; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TENANT_ID; +//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TRANSACTION_TYPE; +// +//import com.fasterxml.jackson.core.JsonProcessingException; +//import com.fasterxml.jackson.databind.ObjectMapper; +//import java.util.HashMap; +//import java.util.Map; +//import lombok.extern.slf4j.Slf4j; +//import org.mifos.connector.common.channel.dto.TransactionChannelRequestDTO; +//import org.mifos.connector.common.gsma.dto.GSMATransaction; +//import org.mifos.connector.common.gsma.dto.GsmaParty; +//import org.mifos.connector.common.mojaloop.dto.MoneyData; +//import org.mifos.connector.common.mojaloop.dto.Party; +//import org.mifos.connector.common.mojaloop.dto.PartyIdInfo; +//import org.mifos.connector.common.mojaloop.dto.TransactionType; +//import org.mifos.connector.common.mojaloop.type.IdentifierType; +//import org.mifos.processor.bulk.schema.TransactionOlder; +//import org.mifos.processor.bulk.zeebe.ZeebeProcessStarter; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.kafka.annotation.KafkaListener; +//import org.springframework.stereotype.Service; +// +//@Service +//@Slf4j +//public class Consumers { +// +// @Value("${bpmn.flows.international-remittance-payer}") +// private String internationalRemittancePayer; +// +// @Autowired +// private ObjectMapper objectMapper; +// +// @Autowired +// private ZeebeProcessStarter zeebeProcessStarter; +// +// @KafkaListener(topics = "${kafka.topic.gsma.name}", groupId = "group_id") +// public void listenTopicGsma(String message) throws JsonProcessingException { +// log.debug("Received Message in topic GSMA and group group_id: {}", message); +// TransactionOlder transaction = objectMapper.readValue(message, TransactionOlder.class); +// String tenantId = "ibank-usa"; +// +// GSMATransaction gsmaChannelRequest = new GSMATransaction(); +// gsmaChannelRequest.setAmount(transaction.getAmount()); +// gsmaChannelRequest.setCurrency(transaction.getCurrency()); +// gsmaChannelRequest.setRequestingLei("ibank-usa"); +// gsmaChannelRequest.setReceivingLei("ibank-india"); +// GsmaParty creditParty = new GsmaParty(); +// creditParty.setKey("msisdn"); +// creditParty.setValue(transaction.getAccountNumber()); +// GsmaParty debitParty = new GsmaParty(); +// debitParty.setKey("msisdn"); +// debitParty.setValue(transaction.getAccountNumber()); +// gsmaChannelRequest.setCreditParty(new GsmaParty[] { creditParty }); +// gsmaChannelRequest.setDebitParty(new GsmaParty[] { debitParty }); +// // gsmaChannelRequest.setInternationalTransferInformation().setReceivingAmount(gsmaChannelRequest.getAmount()); +// +// TransactionChannelRequestDTO channelRequest = new TransactionChannelRequestDTO(); // Fineract Object +// Party payee = new Party(new PartyIdInfo(IdentifierType.MSISDN, transaction.getAccountNumber())); +// Party payer = new Party(new PartyIdInfo(IdentifierType.MSISDN, "7543010")); +// +// MoneyData moneyData = new MoneyData(); +// moneyData.setAmount(transaction.getAmount()); +// moneyData.setCurrency(transaction.getCurrency()); +// +// channelRequest.setPayer(payer); +// channelRequest.setPayee(payee); +// channelRequest.setAmount(moneyData); +// +// TransactionType transactionType = new TransactionType(); +// transactionType.setInitiator(PAYER); +// transactionType.setInitiatorType(CONSUMER); +// transactionType.setScenario(TRANSFER); +// +// Map extraVariables = new HashMap<>(); +// extraVariables.put(IS_RTP_REQUEST, false); +// extraVariables.put(TRANSACTION_TYPE, "inttransfer"); +// extraVariables.put(TENANT_ID, tenantId); +// +// extraVariables.put(BATCH_ID, transaction.getBatchId()); +// +// String tenantSpecificBpmn = internationalRemittancePayer.replace("{dfspid}", tenantId); +// channelRequest.setTransactionType(transactionType); +// +// PartyIdInfo requestedParty = (boolean) extraVariables.get(IS_RTP_REQUEST) ? channelRequest.getPayer().getPartyIdInfo() +// : channelRequest.getPayee().getPartyIdInfo(); +// extraVariables.put(PARTY_ID_TYPE, requestedParty.getPartyIdType()); +// extraVariables.put(PARTY_ID, requestedParty.getPartyIdentifier()); +// +// extraVariables.put(GSMA_CHANNEL_REQUEST, objectMapper.writeValueAsString(gsmaChannelRequest)); +// extraVariables.put(PARTY_LOOKUP_FSPID, gsmaChannelRequest.getReceivingLei()); +// extraVariables.put(INITIATOR_FSPID, gsmaChannelRequest.getRequestingLei()); +// +// String transactionId = zeebeProcessStarter.startZeebeWorkflow(tenantSpecificBpmn, objectMapper.writeValueAsString(channelRequest), +// extraVariables); +// +// log.debug("GSMA Transaction Started with:{} ", transactionId); +// } +// +// @KafkaListener(topics = "${kafka.topic.slcb.name}", groupId = "group_id") +// public void listenTopicSlcb(String message) { +// log.debug("Received Message in topic SLCB and group group_id:{} ", message); +// } +//} diff --git a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaConsumerConfig.java b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaConsumerConfig.java index e9694b89..bb04a764 100644 --- a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaConsumerConfig.java +++ b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaConsumerConfig.java @@ -1,39 +1,39 @@ -package org.mifos.processor.bulk.kafka.config; - -import java.util.HashMap; -import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; - -@EnableKafka -@Configuration -public class KafkaConsumerConfig { - - @Value(value = "${kafka.bootstrapAddress}") - private String bootstrapAddress; - - @Bean - public ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return new DefaultKafkaConsumerFactory<>(props); - } - - @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); - return factory; - } - -} +//package org.mifos.processor.bulk.kafka.config; +// +//import java.util.HashMap; +//import java.util.Map; +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +//import org.springframework.kafka.core.ConsumerFactory; +//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +// +//@EnableKafka +//@Configuration +//public class KafkaConsumerConfig { +// +// @Value(value = "${kafka.bootstrapAddress}") +// private String bootstrapAddress; +// +// @Bean +// public ConsumerFactory consumerFactory() { +// Map props = new HashMap<>(); +// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); +// props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id"); +// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// return new DefaultKafkaConsumerFactory<>(props); +// } +// +// @Bean +// public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// return factory; +// } +// +//} diff --git a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaProducerConfig.java b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaProducerConfig.java index 11b9aa2a..5caa219d 100644 --- a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaProducerConfig.java +++ b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaProducerConfig.java @@ -1,34 +1,34 @@ -package org.mifos.processor.bulk.kafka.config; - -import java.util.HashMap; -import java.util.Map; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; - -@Configuration -public class KafkaProducerConfig { - - @Value(value = "${kafka.bootstrapAddress}") - private String bootstrapAddress; - - @Bean - public ProducerFactory producerFactory() { - Map configProps = new HashMap<>(); - configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); - configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - return new DefaultKafkaProducerFactory<>(configProps); - } - - @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); - } - -} +//package org.mifos.processor.bulk.kafka.config; +// +//import java.util.HashMap; +//import java.util.Map; +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.apache.kafka.common.serialization.StringSerializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.core.DefaultKafkaProducerFactory; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.kafka.core.ProducerFactory; +// +//@Configuration +//public class KafkaProducerConfig { +// +// @Value(value = "${kafka.bootstrapAddress}") +// private String bootstrapAddress; +// +// @Bean +// public ProducerFactory producerFactory() { +// Map configProps = new HashMap<>(); +// configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); +// configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// return new DefaultKafkaProducerFactory<>(configProps); +// } +// +// @Bean +// public KafkaTemplate kafkaTemplate() { +// return new KafkaTemplate<>(producerFactory()); +// } +// +//} diff --git a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaTopicConfig.java b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaTopicConfig.java index 7d2f2ff7..f7dd2735 100644 --- a/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaTopicConfig.java +++ b/src/main/java/org/mifos/processor/bulk/kafka/config/KafkaTopicConfig.java @@ -1,40 +1,40 @@ -package org.mifos.processor.bulk.kafka.config; - -import java.util.HashMap; -import java.util.Map; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.NewTopic; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.KafkaAdmin; - -@Configuration -public class KafkaTopicConfig { - - @Value(value = "${kafka.bootstrapAddress}") - private String bootstrapAddress; - - @Value(value = "${kafka.topic.gsma.name}") - private String gsmaTopicName; - - @Value(value = "${kafka.topic.slcb.name}") - private String slcbTopicName; - - @Bean - public KafkaAdmin kafkaAdmin() { - Map configs = new HashMap<>(); - configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); - return new KafkaAdmin(configs); - } - - @Bean - public NewTopic gsmaTopic() { - return new NewTopic(gsmaTopicName, 1, (short) 1); - } - - @Bean - public NewTopic slcbTopic() { - return new NewTopic(slcbTopicName, 1, (short) 1); - } -} +//package org.mifos.processor.bulk.kafka.config; +// +//import java.util.HashMap; +//import java.util.Map; +//import org.apache.kafka.clients.admin.AdminClientConfig; +//import org.apache.kafka.clients.admin.NewTopic; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.core.KafkaAdmin; +// +//@Configuration +//public class KafkaTopicConfig { +// +// @Value(value = "${kafka.bootstrapAddress}") +// private String bootstrapAddress; +// +// @Value(value = "${kafka.topic.gsma.name}") +// private String gsmaTopicName; +// +// @Value(value = "${kafka.topic.slcb.name}") +// private String slcbTopicName; +// +// @Bean +// public KafkaAdmin kafkaAdmin() { +// Map configs = new HashMap<>(); +// configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); +// return new KafkaAdmin(configs); +// } +// +// @Bean +// public NewTopic gsmaTopic() { +// return new NewTopic(gsmaTopicName, 1, (short) 1); +// } +// +// @Bean +// public NewTopic slcbTopic() { +// return new NewTopic(slcbTopicName, 1, (short) 1); +// } +//} diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/AccountLookupWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/AccountLookupWorker.java index 6107c604..3e61fab6 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/AccountLookupWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/AccountLookupWorker.java @@ -18,6 +18,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.camunda.zeebe.client.ZeebeClient; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; @@ -46,6 +51,9 @@ public class AccountLookupWorker extends BaseWorker { private String accountLookupCallback; @Value("${identity_account_mapper.account_lookup}") private String accountLookupEndpoint; + private static final int TIMEOUT_SECONDS = 10; + private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + @Override public void setup() { @@ -54,36 +62,48 @@ public void setup() { logger.info("Job '{}' started from process '{}' with key {}", job.getType(), job.getBpmnProcessId(), job.getKey()); Map existingVariables = job.getVariablesAsMap(); logger.info(existingVariables.toString()); - existingVariables.put(ACCOUNT_LOOKUP_RETRY_COUNT, 1); - existingVariables.put(CACHED_TRANSACTION_ID, job.getKey()); - - boolean isTransactionRequest = (boolean) existingVariables.get(IS_RTP_REQUEST); - String tenantId = (String) existingVariables.get(TENANT_ID); - Object channelRequest = existingVariables.get(CHANNEL_REQUEST); - TransactionChannelRequestDTO request = objectMapper.readValue((String) channelRequest, TransactionChannelRequestDTO.class); - - existingVariables.put(INITIATOR_FSP_ID, tenantId); - PartyIdInfo requestedParty = isTransactionRequest ? request.getPayer().getPartyIdInfo() : request.getPayee().getPartyIdInfo(); - - String payeeIdentity = requestedParty.getPartyIdentifier(); - String paymentModality = requestedParty.getPartyIdType().toString(); - - Exchange exchange = new DefaultExchange(camelContext); - exchange.setProperty(HOST, identityMapperURL); - exchange.setProperty(PAYEE_IDENTITY, payeeIdentity); - exchange.setProperty(PAYMENT_MODALITY, paymentModality); - exchange.setProperty(CALLBACK, identityMapperURL + accountLookupCallback); - exchange.setProperty(TRANSACTION_ID, existingVariables.get(TRANSACTION_ID)); - exchange.setProperty("requestId", job.getKey()); - exchange.setProperty(CHANNEL_REQUEST, channelRequest); - exchange.setProperty(ORIGIN_DATE, existingVariables.get(ORIGIN_DATE)); - exchange.setProperty(TENANT_ID, tenantId); - exchange.setProperty(HEADER_REGISTERING_INSTITUTE_ID, existingVariables.get(HEADER_REGISTERING_INSTITUTE_ID)); - producerTemplate.send("direct:send-account-lookup", exchange); - - client.newCompleteCommand(job.getKey()).variables(existingVariables).send(); + ScheduledFuture timeoutHandler = scheduler.schedule(() -> { + logger.error("Job '{}' with key {} is stuck.", job.getType(), job.getKey()); + logger.info("Workflow stuck, terminating"); + }, TIMEOUT_SECONDS, TimeUnit.SECONDS); + try { + + existingVariables.put(ACCOUNT_LOOKUP_RETRY_COUNT, 1); + existingVariables.put(CACHED_TRANSACTION_ID, job.getKey()); + + boolean isTransactionRequest = (boolean) existingVariables.get(IS_RTP_REQUEST); + String tenantId = (String) existingVariables.get(TENANT_ID); + Object channelRequest = existingVariables.get(CHANNEL_REQUEST); + TransactionChannelRequestDTO request = objectMapper.readValue((String) channelRequest, TransactionChannelRequestDTO.class); + + existingVariables.put(INITIATOR_FSP_ID, tenantId); + PartyIdInfo requestedParty = isTransactionRequest ? request.getPayer().getPartyIdInfo() : request.getPayee().getPartyIdInfo(); + + String payeeIdentity = requestedParty.getPartyIdentifier(); + String paymentModality = requestedParty.getPartyIdType().toString(); + + Exchange exchange = new DefaultExchange(camelContext); + exchange.setProperty(HOST, identityMapperURL); + exchange.setProperty(PAYEE_IDENTITY, payeeIdentity); + exchange.setProperty(PAYMENT_MODALITY, paymentModality); + exchange.setProperty(CALLBACK, identityMapperURL + accountLookupCallback); + exchange.setProperty(TRANSACTION_ID, existingVariables.get(TRANSACTION_ID)); + exchange.setProperty("requestId", job.getKey()); + exchange.setProperty(CHANNEL_REQUEST, channelRequest); + exchange.setProperty(ORIGIN_DATE, existingVariables.get(ORIGIN_DATE)); + exchange.setProperty(TENANT_ID, tenantId); + exchange.setProperty(HEADER_REGISTERING_INSTITUTE_ID, existingVariables.get(HEADER_REGISTERING_INSTITUTE_ID)); + producerTemplate.send("direct:send-account-lookup", exchange); + + client.newCompleteCommand(job.getKey()).variables(existingVariables).send(); + timeoutHandler.cancel(false); // Cancel the timeout if the job completes successfully + logger.info("Job '{}' with key {} completed successfully.", job.getType(), job.getKey()); + } catch (Exception e) { + timeoutHandler.cancel(false); // Cancel the timeout on error as well + logger.error("Error processing job '{}': {}", job.getType(), e.getMessage(), e); + } + }).name(String.valueOf(ACCOUNT_LOOKUP)).open(); } - } diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/PartyLookupWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/PartyLookupWorker.java index 0b8a0301..ad336ff8 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/PartyLookupWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/PartyLookupWorker.java @@ -3,23 +3,41 @@ import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_LOOKUP_FAILED; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + import org.springframework.stereotype.Component; @Component public class PartyLookupWorker extends BaseWorker { + private static final int TIMEOUT_SECONDS = 3; + private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + @Override public void setup() { newWorker(Worker.PARTY_LOOKUP, (client, job) -> { - logger.debug("Job '{}' started from process '{}' with key {}", job.getType(), job.getBpmnProcessId(), job.getKey()); + long jobKey = job.getKey(); + logger.debug("Job '{}' started from process '{}' with key {}", job.getType(), job.getBpmnProcessId(), jobKey); Map variables = job.getVariablesAsMap(); + ScheduledFuture timeoutHandler = scheduler.schedule(() -> { + logger.error("Job '{}' with key {} is stuck.", job.getType(), jobKey); + logger.info("Job stuck terminating ----------"); + }, 3, TimeUnit.SECONDS); - if (workerConfig.isPartyLookUpWorkerEnabled) { - variables.put(PARTY_LOOKUP_FAILED, false); + try { + if (workerConfig.isPartyLookUpWorkerEnabled) { + variables.put(PARTY_LOOKUP_FAILED, false); + } + client.newCompleteCommand(job.getKey()).variables(variables).send().join(); + timeoutHandler.cancel(false); // Cancel the timeout if the job completes successfully + logger.info("Job '{}' with key {} completed successfully.", job.getType(), jobKey); + } catch (Exception e) { + timeoutHandler.cancel(false); // Cancel the timeout on error as well + logger.error("Error processing job '{}': {}", job.getType(), e.getMessage(), e); } - - client.newCompleteCommand(job.getKey()).variables(variables).send(); }); } - } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 8ed37cf5..e54a7363 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -1,5 +1,5 @@ camel: - server-port: 5000 + server-port: 5001 disable-ssl: false springboot: main-run-controller: true @@ -8,7 +8,7 @@ camel: auto-discover-object-mapper: true kafka: - bootstrapAddress: "kafka:9092" + #bootstrapAddress: "kafka:9092" topic: gsma: name: gsma @@ -27,7 +27,7 @@ zeebe: # number-of-workers: 8 # evenly-allocated-max-jobs: "#{${zeebe.client.max-execution-threads} / ${zeebe.client.number-of-workers}}" broker: - contactpoint: "zeebe-zeebe-gateway:26500" + contactpoint: "localhost:26500" operations-app: contactpoint: "https://ops-bk.sandbox.mifos.io" @@ -50,10 +50,10 @@ channel: cloud: aws: enabled: true - s3BaseUrl: "https://s3.ap-south-1.amazonaws.com" + s3BaseUrl: "http://127.0.0.1:9000" credentials: - access-key: ${AWS_ACCESS_KEY:access_key_from_aws} - secret-key: ${AWS_SECRET_KEY:secret_key_from_aws} + access-key: root + secret-key: password region: static: ap-south-1 @@ -133,13 +133,13 @@ callback-phases: - 100 server: - ssl: - key-alias: "tomcat-https" - key-store: "classpath:keystore.jks" - key-store-type: JKS - key-password: "password" - key-store-password: "password" - port: 8443 +# ssl: +# key-alias: "tomcat-https" +# key-store: "classpath:keystore.jks" +# key-store-type: JKS +# key-password: "password" +# key-store-password: "password" + port: 8444 security: