diff --git a/src/dist/cfg/log4j2.xml b/src/dist/cfg/log4j2.xml index 226fec133..ee832ae31 100644 --- a/src/dist/cfg/log4j2.xml +++ b/src/dist/cfg/log4j2.xml @@ -344,6 +344,24 @@ + + + + + + + + + + + + + + + @@ -541,6 +559,16 @@ + + + + + + + + # Errors logger diff --git a/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessor.kt b/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessor.kt index 5b0c3b436..6fb6d3cfc 100644 --- a/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessor.kt +++ b/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessor.kt @@ -3,15 +3,9 @@ package com.lykke.matching.engine.balance import com.lykke.matching.engine.daos.WalletOperation import com.lykke.matching.engine.daos.wallet.AssetBalance import com.lykke.matching.engine.daos.wallet.Wallet -import com.lykke.matching.engine.database.PersistenceManager import com.lykke.matching.engine.database.common.entity.BalancesData -import com.lykke.matching.engine.database.common.entity.OrderBooksPersistenceData -import com.lykke.matching.engine.database.common.entity.PersistenceData -import com.lykke.matching.engine.deduplication.ProcessedMessage import com.lykke.matching.engine.holders.ApplicationSettingsHolder import com.lykke.matching.engine.holders.AssetsHolder -import com.lykke.matching.engine.holders.BalancesHolder -import com.lykke.matching.engine.outgoing.messages.BalanceUpdate import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolder import com.lykke.matching.engine.order.transaction.WalletAssetBalance @@ -20,21 +14,18 @@ import com.lykke.utils.logging.MetricsLogger import org.slf4j.Logger import org.slf4j.LoggerFactory import java.math.BigDecimal -import java.util.Date -class WalletOperationsProcessor(private val balancesHolder: BalancesHolder, - private val currentTransactionBalancesHolder: CurrentTransactionBalancesHolder, +class WalletOperationsProcessor(private val currentTransactionBalancesHolder: CurrentTransactionBalancesHolder, private val applicationSettingsHolder: ApplicationSettingsHolder, - private val persistenceManager: PersistenceManager, private val assetsHolder: AssetsHolder, - private val logger: Logger?): BalancesGetter { + private val logger: Logger?) : BalancesGetter { companion object { private val LOGGER = LoggerFactory.getLogger(WalletOperationsProcessor::class.java.name) private val METRICS_LOGGER = MetricsLogger.getLogger() } - private val clientBalanceUpdatesByClientIdAndAssetId = HashMap() + private val clientBalanceUpdatesByClientIdAndAssetId = HashMap() fun preProcess(operations: Collection, allowInvalidBalance: Boolean = false, @@ -60,6 +51,14 @@ class WalletOperationsProcessor(private val balancesHolder: BalancesHolder, changedAssetBalance.reserved } + validateChangedAssetBalances(changedAssetBalances, allowInvalidBalance) + + + changedAssetBalances.forEach { processChangedAssetBalance(it.value) } + return this + } + + private fun validateChangedAssetBalances(changedAssetBalances: HashMap, allowInvalidBalance: Boolean) { try { changedAssetBalances.values.forEach { validateBalanceChange(it) } } catch (e: BalanceException) { @@ -70,9 +69,6 @@ class WalletOperationsProcessor(private val balancesHolder: BalancesHolder, (logger ?: LOGGER).error(message) METRICS_LOGGER.logError(message, e) } - - changedAssetBalances.forEach { processChangedAssetBalance(it.value) } - return this } private fun processChangedAssetBalance(changedAssetBalance: ChangedAssetBalance) { @@ -114,23 +110,6 @@ class WalletOperationsProcessor(private val balancesHolder: BalancesHolder, return currentTransactionBalancesHolder.persistenceData() } - fun persistBalances(processedMessage: ProcessedMessage?, - orderBooksData: OrderBooksPersistenceData?, - stopOrderBooksData: OrderBooksPersistenceData?, - messageSequenceNumber: Long?): Boolean { - return persistenceManager.persist(PersistenceData(persistenceData(), - processedMessage, - orderBooksData, - stopOrderBooksData, - messageSequenceNumber)) - } - - fun sendNotification(id: String, type: String, messageId: String) { - if (clientBalanceUpdatesByClientIdAndAssetId.isNotEmpty()) { - balancesHolder.sendBalanceUpdate(BalanceUpdate(id, type, Date(), clientBalanceUpdatesByClientIdAndAssetId.values.toList(), messageId)) - } - } - fun getClientBalanceUpdates(): List { return clientBalanceUpdatesByClientIdAndAssetId.values.toList() } diff --git a/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorFactory.kt b/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorFactory.kt new file mode 100644 index 000000000..a1f6802d4 --- /dev/null +++ b/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorFactory.kt @@ -0,0 +1,21 @@ +package com.lykke.matching.engine.balance + +import com.lykke.matching.engine.holders.ApplicationSettingsHolder +import com.lykke.matching.engine.holders.AssetsHolder +import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolderFactory +import org.slf4j.Logger +import org.springframework.stereotype.Component + +@Component +class WalletOperationsProcessorFactory(private val currentTransactionBalancesHolderFactory: CurrentTransactionBalancesHolderFactory, + private val applicationSettingsHolder: ApplicationSettingsHolder, + private val assetsHolder: AssetsHolder) { + fun create(logger: Logger?): WalletOperationsProcessor { + + return WalletOperationsProcessor( + currentTransactionBalancesHolderFactory.create(), + applicationSettingsHolder, + assetsHolder, + logger) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/AccountsMigrationService.kt b/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/AccountsMigrationService.kt index 8b3e1afb2..b91f3cc24 100644 --- a/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/AccountsMigrationService.kt +++ b/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/AccountsMigrationService.kt @@ -5,7 +5,7 @@ import com.lykke.matching.engine.database.Storage import com.lykke.matching.engine.database.azure.AzureWalletDatabaseAccessor import com.lykke.matching.engine.database.redis.accessor.impl.RedisWalletDatabaseAccessor import com.lykke.matching.engine.exception.MatchingEngineException -import com.lykke.matching.engine.holders.BalancesHolder +import com.lykke.matching.engine.services.BalancesService import com.lykke.matching.engine.utils.config.Config import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired @@ -17,9 +17,9 @@ import java.util.* @Component @Order(1) -class AccountsMigrationService @Autowired constructor (private val balancesHolder: BalancesHolder, - private val config: Config, - private val redisWalletDatabaseAccessor: Optional): ApplicationRunner { +class AccountsMigrationService @Autowired constructor (private val config: Config, + private val redisWalletDatabaseAccessor: Optional, + private val balancesService: BalancesService): ApplicationRunner { override fun run(args: ApplicationArguments?) { if (config.me.walletsMigration) { migrateAccounts() @@ -55,38 +55,41 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde } val startTime = Date().time - teeLog("Starting wallets migration from azure to redis; azure table: $azureAccountsTableName, redis: ${config.me.redis.host}.${config.me.redis.port}") + LOGGER.info("Starting wallets migration from azure to redis; azure table: $azureAccountsTableName, redis: ${config.me.redis.host}.${config.me.redis.port}") val wallets = azureDatabaseAccessor.loadWallets() val loadTime = Date().time - teeLog("Loaded ${wallets.size} wallets from azure (ms: ${loadTime - startTime})") - balancesHolder.insertOrUpdateWallets(wallets.values.toList(), null) + LOGGER.info("Loaded ${wallets.size} wallets from azure (ms: ${loadTime - startTime})") + val balancesSaved = balancesService.insertOrUpdateWallets(wallets.values.toList(), null) + if (!balancesSaved) { + LOGGER.error("Can not save balances data during migration from azure db to redis") + return + } val saveTime = Date().time - teeLog("Saved ${wallets.size} wallets to redis (ms: ${saveTime - loadTime})") + LOGGER.info("Saved ${wallets.size} wallets to redis (ms: ${saveTime - loadTime})") compare() } fun fromRedisToDb() { val startTime = Date().time - teeLog("Starting wallets migration from redis to azure; redis: ${config.me.redis.host}.${config.me.redis.port}, azure table: $azureAccountsTableName") + LOGGER.info("Starting wallets migration from redis to azure; redis: ${config.me.redis.host}.${config.me.redis.port}, azure table: $azureAccountsTableName") val loadTime = Date().time val wallets = redisWalletDatabaseAccessor.get().loadWallets() if (wallets.isEmpty()) { throw AccountsMigrationException("There are no wallets in redis ${config.me.redis.host}.${config.me.redis.port}") } - teeLog("Loaded ${wallets.size} wallets from redis (ms: ${loadTime - startTime})") - balancesHolder.insertOrUpdateWallets(wallets.values.toList(), null) + LOGGER.info("Loaded ${wallets.size} wallets from redis (ms: ${loadTime - startTime})") + val balancesSaved = balancesService.insertOrUpdateWallets(wallets.values.toList(), null) + if (!balancesSaved) { + LOGGER.error("Can not save balances data during migration from redis to azure db") + return + } val saveTime = Date().time - teeLog("Saved ${wallets.size} wallets to azure (ms: ${saveTime - loadTime})") + LOGGER.info("Saved ${wallets.size} wallets to azure (ms: ${saveTime - loadTime})") compare() } - private fun teeLog(message: String) { - println(message) - LOGGER.info(message) - } - /** Compares balances stored in redis & azure; logs comparison result */ private fun compare() { val azureWallets = azureDatabaseAccessor.loadWallets().filter { it.value.balances.isNotEmpty() } @@ -98,8 +101,8 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde val differentWallets = LinkedList() - teeLog("Comparison result. Differences: ") - teeLog("---------------------------------------------------------------------------------------------") + LOGGER.info("Comparison result. Differences: ") + LOGGER.info("---------------------------------------------------------------------------------------------") commonClients.forEach { val azureWallet = azureWallets[it] val redisWallet = redisWallets[it] @@ -107,19 +110,19 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde differentWallets.add(it) } } - teeLog("---------------------------------------------------------------------------------------------") - - teeLog("Total: ") - teeLog("azure clients count: ${azureWallets.size}") - teeLog("redis clients count: ${redisWallets.size}") - teeLog("only azure clients (count: ${onlyAzureClients.size}): $onlyAzureClients") - teeLog("only redis clients (count: ${onlyRedisClients.size}): $onlyRedisClients") - teeLog("clients with different wallets (count: ${differentWallets.size}): $differentWallets") + LOGGER.info("---------------------------------------------------------------------------------------------") + + LOGGER.info("Total: ") + LOGGER.info("azure clients count: ${azureWallets.size}") + LOGGER.info("redis clients count: ${redisWallets.size}") + LOGGER.info("only azure clients (count: ${onlyAzureClients.size}): $onlyAzureClients") + LOGGER.info("only redis clients (count: ${onlyRedisClients.size}): $onlyRedisClients") + LOGGER.info("clients with different wallets (count: ${differentWallets.size}): $differentWallets") } private fun compareBalances(azureWallet: Wallet, redisWallet: Wallet): Boolean { if (azureWallet.clientId != redisWallet.clientId) { - teeLog("different clients: ${azureWallet.clientId} & ${redisWallet.clientId}") + LOGGER.info("different clients: ${azureWallet.clientId} & ${redisWallet.clientId}") return false } val clientId = azureWallet.clientId @@ -130,7 +133,7 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde val onlyRedisAssets = redisBalances.keys.filterNot { azureBalances.keys.contains(it) } if (onlyAzureAssets.isNotEmpty() || onlyRedisAssets.isNotEmpty()) { - teeLog("different asset sets: $onlyAzureAssets & $onlyRedisAssets, client: $clientId") + LOGGER.info("different asset sets: $onlyAzureAssets & $onlyRedisAssets, client: $clientId") return false } @@ -139,11 +142,11 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde val azureBalance = azureBalances[it] val redisBalance = redisBalances[it] if (azureBalance!!.balance != redisBalance!!.balance) { - teeLog("different balances: ${azureBalance.balance} & ${redisBalance.balance}, client: $clientId") + LOGGER.info("different balances: ${azureBalance.balance} & ${redisBalance.balance}, client: $clientId") return false } if (azureBalance.reserved != redisBalance.reserved) { - teeLog("different reserved balances: ${azureBalance.reserved} & ${redisBalance.reserved}, client: $clientId") + LOGGER.info("different reserved balances: ${azureBalance.reserved} & ${redisBalance.reserved}, client: $clientId") return false } } diff --git a/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/OrdersMigrationService.kt b/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/OrdersMigrationService.kt index 3dba034ea..9f14af98f 100644 --- a/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/OrdersMigrationService.kt +++ b/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/OrdersMigrationService.kt @@ -45,7 +45,7 @@ class OrdersMigrationService(private val config: Config, return } if (config.me.storage != Storage.Redis) { - teeLog("Do not perform migration to files") + LOGGER.info("Do not perform migration to files") return } fromFilesToRedis() @@ -59,12 +59,12 @@ class OrdersMigrationService(private val config: Config, throw Exception("Stop orders already exist in redis ${config.me.redis.host}.${config.me.redis.port}/${config.me.redis.ordersDatabase}") } val startTime = Date().time - teeLog("Starting orders migration from files to redis; files dirs: ${config.me.orderBookPath}, ${config.me.stopOrderBookPath}" + + LOGGER.info("Starting orders migration from files to redis; files dirs: ${config.me.orderBookPath}, ${config.me.stopOrderBookPath}" + ", redis: ${config.me.redis.host}.${config.me.redis.port}/${config.me.redis.ordersDatabase}") val orders = fileOrderBookDatabaseAccessor.loadLimitOrders() val stopOrders = fileStopOrderBookDatabaseAccessor.loadStopLimitOrders() val loadTime = Date().time - teeLog("Loaded ${orders.size} orders from files (ms: ${loadTime - startTime})") + LOGGER.info("Loaded ${orders.size} orders from files (ms: ${loadTime - startTime})") persistenceManager.persist(PersistenceData(null, null, OrderBooksPersistenceData(mapOrdersToOrderBookPersistenceDataList(orders, LOGGER), @@ -77,11 +77,6 @@ class OrdersMigrationService(private val config: Config, genericLimitOrderService.update() genericStopLimitOrderService.update() val saveTime = Date().time - teeLog("Saved ${orders.size} orders and ${stopOrders.size} stop orders to redis (ms: ${saveTime - loadTime})") - } - - private fun teeLog(message: String) { - println(message) - LOGGER.info(message) + LOGGER.info("Saved ${orders.size} orders and ${stopOrders.size} stop orders to redis (ms: ${saveTime - loadTime})") } } \ No newline at end of file diff --git a/src/main/kotlin/com/lykke/matching/engine/holders/BalancesHolder.kt b/src/main/kotlin/com/lykke/matching/engine/holders/BalancesHolder.kt index 21659099d..8725482e4 100644 --- a/src/main/kotlin/com/lykke/matching/engine/holders/BalancesHolder.kt +++ b/src/main/kotlin/com/lykke/matching/engine/holders/BalancesHolder.kt @@ -1,31 +1,14 @@ package com.lykke.matching.engine.holders import com.lykke.matching.engine.balance.BalancesGetter -import com.lykke.matching.engine.balance.WalletOperationsProcessor import com.lykke.matching.engine.daos.wallet.AssetBalance import com.lykke.matching.engine.daos.wallet.Wallet -import com.lykke.matching.engine.database.PersistenceManager -import com.lykke.matching.engine.database.common.entity.BalancesData -import com.lykke.matching.engine.database.common.entity.PersistenceData -import com.lykke.matching.engine.deduplication.ProcessedMessage -import com.lykke.matching.engine.outgoing.messages.BalanceUpdate -import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolder -import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import java.math.BigDecimal -import java.util.concurrent.BlockingQueue @Component -class BalancesHolder(private val balancesDbAccessorsHolder: BalancesDatabaseAccessorsHolder, - private val persistenceManager: PersistenceManager, - private val assetsHolder: AssetsHolder, - private val balanceUpdateQueue: BlockingQueue, - private val applicationSettingsHolder: ApplicationSettingsHolder): BalancesGetter { - - companion object { - private val LOGGER = LoggerFactory.getLogger(BalancesHolder::class.java.name) - } +class BalancesHolder(private val balancesDbAccessorsHolder: BalancesDatabaseAccessorsHolder): BalancesGetter { lateinit var wallets: MutableMap var initialClientsCount = 0 @@ -85,65 +68,6 @@ class BalancesHolder(private val balancesDbAccessorsHolder: BalancesDatabaseAcce return BigDecimal.ZERO } - fun updateBalance(processedMessage: ProcessedMessage?, - messageSequenceNumber: Long?, - clientId: String, - assetId: String, - balance: BigDecimal): Boolean { - val currentTransactionBalancesHolder = createCurrentTransactionBalancesHolder() - currentTransactionBalancesHolder.updateBalance(clientId, assetId, balance) - val balancesData = currentTransactionBalancesHolder.persistenceData() - val persisted = persistenceManager.persist(PersistenceData(balancesData, processedMessage, null, null, messageSequenceNumber)) - if (!persisted) { - return false - } - currentTransactionBalancesHolder.apply() - return true - } - - fun updateReservedBalance(processedMessage: ProcessedMessage?, - messageSequenceNumber: Long?, - clientId: String, - assetId: String, - balance: BigDecimal, - skipForTrustedClient: Boolean = true): Boolean { - val currentTransactionBalancesHolder = createCurrentTransactionBalancesHolder() - currentTransactionBalancesHolder.updateReservedBalance(clientId, assetId, balance) - val balancesData = currentTransactionBalancesHolder.persistenceData() - val persisted = persistenceManager.persist(PersistenceData(balancesData, processedMessage, null, null, messageSequenceNumber)) - if (!persisted) { - return false - } - currentTransactionBalancesHolder.apply() - return true - } - - fun insertOrUpdateWallets(wallets: Collection, messageSequenceNumber: Long?) { - persistenceManager.persist(PersistenceData(BalancesData(wallets, wallets.flatMap { it.balances.values }), null, null, null, - messageSequenceNumber = messageSequenceNumber)) - update() - } - - fun sendBalanceUpdate(balanceUpdate: BalanceUpdate) { - if (balanceUpdate.balances.isNotEmpty()) { - LOGGER.info(balanceUpdate.toString()) - balanceUpdateQueue.put(balanceUpdate) - } - } - - fun isTrustedClient(clientId: String) = applicationSettingsHolder.isTrustedClient(clientId) - - fun createWalletProcessor(logger: Logger?): WalletOperationsProcessor { - return WalletOperationsProcessor(this, - createCurrentTransactionBalancesHolder(), - applicationSettingsHolder, - persistenceManager, - assetsHolder, - logger) - } - - private fun createCurrentTransactionBalancesHolder() = CurrentTransactionBalancesHolder(this) - fun setWallets(wallets: Collection) { wallets.forEach { wallet -> this.wallets[wallet.clientId] = wallet diff --git a/src/main/kotlin/com/lykke/matching/engine/order/transaction/CurrentTransactionBalancesHolderFactory.kt b/src/main/kotlin/com/lykke/matching/engine/order/transaction/CurrentTransactionBalancesHolderFactory.kt new file mode 100644 index 000000000..861538756 --- /dev/null +++ b/src/main/kotlin/com/lykke/matching/engine/order/transaction/CurrentTransactionBalancesHolderFactory.kt @@ -0,0 +1,11 @@ +package com.lykke.matching.engine.order.transaction + +import com.lykke.matching.engine.holders.BalancesHolder +import org.springframework.stereotype.Component + +@Component +class CurrentTransactionBalancesHolderFactory(val balancesHolder: BalancesHolder) { + fun create(): CurrentTransactionBalancesHolder { + return CurrentTransactionBalancesHolder(balancesHolder) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/lykke/matching/engine/order/transaction/ExecutionContextFactory.kt b/src/main/kotlin/com/lykke/matching/engine/order/transaction/ExecutionContextFactory.kt index 818ff5284..7085be056 100644 --- a/src/main/kotlin/com/lykke/matching/engine/order/transaction/ExecutionContextFactory.kt +++ b/src/main/kotlin/com/lykke/matching/engine/order/transaction/ExecutionContextFactory.kt @@ -1,10 +1,11 @@ package com.lykke.matching.engine.order.transaction +import com.lykke.matching.engine.balance.WalletOperationsProcessor +import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory import com.lykke.matching.engine.daos.Asset import com.lykke.matching.engine.daos.AssetPair import com.lykke.matching.engine.deduplication.ProcessedMessage import com.lykke.matching.engine.holders.AssetsHolder -import com.lykke.matching.engine.holders.BalancesHolder import com.lykke.matching.engine.messages.MessageType import com.lykke.matching.engine.services.GenericLimitOrderService import com.lykke.matching.engine.services.GenericStopLimitOrderService @@ -14,7 +15,7 @@ import org.springframework.stereotype.Component import java.util.Date @Component -class ExecutionContextFactory(private val balancesHolder: BalancesHolder, +class ExecutionContextFactory(private val walletOperationsProcessorFactory: WalletOperationsProcessorFactory, private val genericLimitOrderService: GenericLimitOrderService, private val genericStopLimitOrderService: GenericStopLimitOrderService, private val assetsHolder: AssetsHolder) { @@ -35,7 +36,7 @@ class ExecutionContextFactory(private val balancesHolder: BalancesHolder, assetPairsById, assetsById, preProcessorValidationResultsByOrderId, - balancesHolder.createWalletProcessor(logger), + walletOperationsProcessorFactory.create(logger), genericLimitOrderService.createCurrentTransactionOrderBooksHolder(), genericStopLimitOrderService.createCurrentTransactionOrderBooksHolder(), date, diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashInOutEventData.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashInOutEventData.kt index eb35bfac1..b2a5e40d9 100644 --- a/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashInOutEventData.kt +++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashInOutEventData.kt @@ -1,6 +1,5 @@ package com.lykke.matching.engine.outgoing.messages -import com.lykke.matching.engine.balance.WalletOperationsProcessor import com.lykke.matching.engine.daos.Asset import com.lykke.matching.engine.daos.OutgoingEventData import com.lykke.matching.engine.daos.WalletOperation @@ -12,7 +11,7 @@ class CashInOutEventData(val messageId: String, val sequenceNumber: Long, val now: Date, val timestamp: Date, - val walletProcessor: WalletOperationsProcessor, + val clientBalanceUpdates: List, val walletOperation: WalletOperation, val asset: Asset, val internalFees: List diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashTransferEventData.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashTransferEventData.kt index cf87b28b5..0877fc289 100644 --- a/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashTransferEventData.kt +++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashTransferEventData.kt @@ -1,13 +1,12 @@ package com.lykke.matching.engine.outgoing.messages -import com.lykke.matching.engine.balance.WalletOperationsProcessor import com.lykke.matching.engine.daos.OutgoingEventData import com.lykke.matching.engine.daos.TransferOperation import com.lykke.matching.engine.daos.fee.v2.Fee import java.util.* class CashTransferEventData(val messageId: String, - val walletProcessor: WalletOperationsProcessor, + val clientBalanceUpdates: List, val fees: List, val transferOperation: TransferOperation, val sequenceNumber: Long, diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/OldFormatBalancesSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/OldFormatBalancesSender.kt new file mode 100644 index 000000000..885f89352 --- /dev/null +++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/OldFormatBalancesSender.kt @@ -0,0 +1,12 @@ +package com.lykke.matching.engine.outgoing.senders.impl + +import com.lykke.matching.engine.messages.MessageType +import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate + +@Deprecated("Old outgoing messages format is deprecated") +interface OldFormatBalancesSender { + fun sendBalanceUpdate(id: String, + type: MessageType, + messageId: String, + clientBalanceUpdates: List) +} \ No newline at end of file diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutEventSender.kt index 4c8be500d..822f361f0 100644 --- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutEventSender.kt +++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutEventSender.kt @@ -20,7 +20,7 @@ class CashInOutEventSender(private val messageSender: MessageSender) : Specializ requestId = event.externalId, date = event.now, messageType = MessageType.CASH_IN_OUT_OPERATION, - clientBalanceUpdates = event.walletProcessor.getClientBalanceUpdates(), + clientBalanceUpdates = event.clientBalanceUpdates, cashInOperation = event.walletOperation, internalFees = event.internalFees) diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutOldEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutOldEventSender.kt index 0a67ac3fc..d17bbba27 100644 --- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutOldEventSender.kt +++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutOldEventSender.kt @@ -1,26 +1,37 @@ package com.lykke.matching.engine.outgoing.senders.impl.specialized import com.lykke.matching.engine.messages.MessageType -import com.lykke.matching.engine.outgoing.messages.CashInOutEventData +import com.lykke.matching.engine.outgoing.messages.BalanceUpdate import com.lykke.matching.engine.outgoing.messages.CashOperation +import com.lykke.matching.engine.outgoing.messages.CashInOutEventData +import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender +import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender import com.lykke.matching.engine.utils.NumberUtils +import org.apache.juli.logging.LogFactory import org.springframework.stereotype.Component +import java.util.* import java.util.concurrent.BlockingQueue @Deprecated("Old format of outgoing message is deprecated") @Component -class CashInOutOldEventSender(private val rabbitCashInOutQueue: BlockingQueue) : SpecializedEventSender { +class CashInOutOldEventSender(private val rabbitCashInOutQueue: BlockingQueue, + private val balanceUpdateQueue: BlockingQueue) : SpecializedEventSender, + OldFormatBalancesSender { + + private companion object { + val LOGGER = LogFactory.getLog(CashInOutOldEventSender::class.java) + } + override fun getEventClass(): Class { return CashInOutEventData::class.java } override fun sendEvent(event: CashInOutEventData) { - event - .walletProcessor - .sendNotification(id = event.externalId, - type = MessageType.CASH_IN_OUT_OPERATION.name, - messageId = event.messageId) + sendBalanceUpdate(event.externalId, + MessageType.CASH_IN_OUT_OPERATION, + event.messageId, + event.clientBalanceUpdates) rabbitCashInOutQueue.put(CashOperation( id = event.externalId, @@ -32,4 +43,15 @@ class CashInOutOldEventSender(private val rabbitCashInOutQueue: BlockingQueue) { + if (clientBalanceUpdates.isNotEmpty()) { + val balanceUpdate = BalanceUpdate(id, type.name, Date(), clientBalanceUpdates, messageId) + LOGGER.info(balanceUpdate.toString()) + balanceUpdateQueue.put(balanceUpdate) + } + } } \ No newline at end of file diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferEventSender.kt index 89b5eefdc..b46424596 100644 --- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferEventSender.kt +++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferEventSender.kt @@ -19,7 +19,7 @@ class CashTransferEventSender(val messageSender: MessageSender) : SpecializedEve event.transferOperation.externalId, event.now, MessageType.CASH_TRANSFER_OPERATION, - event.walletProcessor.getClientBalanceUpdates(), + event.clientBalanceUpdates, event.transferOperation, event.fees) diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferOldEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferOldEventSender.kt index 0c02610a8..a3bc28ce9 100644 --- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferOldEventSender.kt +++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferOldEventSender.kt @@ -5,7 +5,9 @@ import com.lykke.matching.engine.fee.singleFeeTransfer import com.lykke.matching.engine.messages.MessageType import com.lykke.matching.engine.outgoing.messages.CashTransferEventData import com.lykke.matching.engine.outgoing.messages.CashTransferOperation +import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender +import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender import com.lykke.matching.engine.utils.NumberUtils import org.springframework.stereotype.Component import org.springframework.util.CollectionUtils @@ -13,7 +15,8 @@ import java.util.concurrent.BlockingQueue @Component @Deprecated("Old format of outgoing message is deprecated") -class CashTransferOldEventSender(private val notificationQueue: BlockingQueue) : SpecializedEventSender { +class CashTransferOldEventSender(private val notificationQueue: BlockingQueue, + private val oldFormatBalancesSender: OldFormatBalancesSender) : SpecializedEventSender { override fun getEventClass(): Class { return CashTransferEventData::class.java } @@ -21,7 +24,9 @@ class CashTransferOldEventSender(private val notificationQueue: BlockingQueue) { + oldFormatBalancesSender.sendBalanceUpdate(externalId, + MessageType.CASH_TRANSFER_OPERATION, + messageId, + clientBalanceUpdates) } } \ No newline at end of file diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/OldFormatExecutionEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/OldFormatExecutionEventSender.kt index 19f6f02f6..f2f24d859 100644 --- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/OldFormatExecutionEventSender.kt +++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/OldFormatExecutionEventSender.kt @@ -5,6 +5,7 @@ import com.lykke.matching.engine.order.transaction.ExecutionContext import com.lykke.matching.engine.outgoing.messages.LimitOrdersReport import com.lykke.matching.engine.outgoing.messages.MarketOrderWithTrades import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender +import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender import com.lykke.matching.engine.utils.event.isThereClientEvent import com.lykke.matching.engine.utils.event.isThereTrustedClientEvent import org.springframework.stereotype.Component @@ -14,7 +15,8 @@ import java.util.concurrent.BlockingQueue @Component class OldFormatExecutionEventSender(private val clientLimitOrdersQueue: BlockingQueue, private val trustedClientsLimitOrdersQueue: BlockingQueue, - private val rabbitSwapQueue: BlockingQueue) : SpecializedEventSender { + private val rabbitSwapQueue: BlockingQueue, + private val oldFormatBalancesSender: OldFormatBalancesSender) : SpecializedEventSender { override fun getEventClass(): Class { return ExecutionData::class.java @@ -28,11 +30,10 @@ class OldFormatExecutionEventSender(private val clientLimitOrdersQueue: Blocking } private fun sendBalanceUpdateEvent(executionContext: ExecutionContext) { - executionContext - .walletOperationsProcessor - .sendNotification(id = executionContext.requestId, - type = executionContext.messageType.name, - messageId = executionContext.messageId) + oldFormatBalancesSender.sendBalanceUpdate(executionContext.requestId, + executionContext.messageType, + executionContext.messageId, + executionContext.walletOperationsProcessor.getClientBalanceUpdates()) } private fun sendTrustedClientsExecutionEventIfNeeded(executionContext: ExecutionContext) { diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/ReservedCashInOutOldEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/ReservedCashInOutOldEventSender.kt index 353243cf9..35ed0eb33 100644 --- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/ReservedCashInOutOldEventSender.kt +++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/ReservedCashInOutOldEventSender.kt @@ -4,20 +4,25 @@ import com.lykke.matching.engine.messages.MessageType import com.lykke.matching.engine.outgoing.messages.ReservedCashInOutEventData import com.lykke.matching.engine.outgoing.messages.ReservedCashOperation import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender +import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender import com.lykke.matching.engine.utils.NumberUtils import org.springframework.stereotype.Component import java.util.concurrent.BlockingQueue @Deprecated("Old format of outgoing message is deprecated") @Component -class ReservedCashInOutOldEventSender(private val reservedCashOperationQueue: BlockingQueue) : SpecializedEventSender { +class ReservedCashInOutOldEventSender(private val reservedCashOperationQueue: BlockingQueue, + private val oldFormatBalancesSender: OldFormatBalancesSender) : SpecializedEventSender { override fun getEventClass(): Class { return ReservedCashInOutEventData::class.java } override fun sendEvent(event: ReservedCashInOutEventData) { - event.walletOperationsProcessor.sendNotification(event.requestId, MessageType.RESERVED_CASH_IN_OUT_OPERATION.name, event.messageId) + oldFormatBalancesSender.sendBalanceUpdate(id = event.requestId, + messageId = event.messageId, + clientBalanceUpdates = event.walletOperationsProcessor.getClientBalanceUpdates(), + type = MessageType.RESERVED_CASH_IN_OUT_OPERATION) reservedCashOperationQueue.put(ReservedCashOperation(event.requestId, event.walletOperation.clientId, diff --git a/src/main/kotlin/com/lykke/matching/engine/services/BalancesService.kt b/src/main/kotlin/com/lykke/matching/engine/services/BalancesService.kt new file mode 100644 index 000000000..4c4fb740c --- /dev/null +++ b/src/main/kotlin/com/lykke/matching/engine/services/BalancesService.kt @@ -0,0 +1,8 @@ +package com.lykke.matching.engine.services + +import com.lykke.matching.engine.daos.wallet.Wallet +import com.lykke.matching.engine.outgoing.messages.BalanceUpdate + +interface BalancesService { + fun insertOrUpdateWallets(wallets: Collection, messageSequenceNumber: Long?): Boolean +} \ No newline at end of file diff --git a/src/main/kotlin/com/lykke/matching/engine/services/BalancesServiceImpl.kt b/src/main/kotlin/com/lykke/matching/engine/services/BalancesServiceImpl.kt new file mode 100644 index 000000000..d9f9c44a7 --- /dev/null +++ b/src/main/kotlin/com/lykke/matching/engine/services/BalancesServiceImpl.kt @@ -0,0 +1,41 @@ +package com.lykke.matching.engine.services + +import com.lykke.matching.engine.daos.wallet.Wallet +import com.lykke.matching.engine.database.PersistenceManager +import com.lykke.matching.engine.database.common.entity.BalancesData +import com.lykke.matching.engine.database.common.entity.PersistenceData +import com.lykke.matching.engine.holders.BalancesHolder +import com.lykke.utils.logging.MetricsLogger +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Service + +@Service +class BalancesServiceImpl(private val balancesHolder: BalancesHolder, + private val persistenceManager: PersistenceManager): BalancesService { + private companion object { + val LOGGER = LoggerFactory.getLogger(BalancesServiceImpl::class.java.name) + val METRICS_LOGGER = MetricsLogger.getLogger() + } + + /** + * Persists wallets to the db and updates values in cache, + * Note: this method will not send balance updates notifications + */ + override fun insertOrUpdateWallets(wallets: Collection, messageSequenceNumber: Long?): Boolean { + val updated = persistenceManager.persist(PersistenceData(BalancesData(wallets, + wallets.flatMap { it.balances.values }), + null, + null, + null, + messageSequenceNumber = messageSequenceNumber)) + if (!updated) { + val message = "Can not persist balances data, wallets: ${wallets.size}" + LOGGER.error(message) + METRICS_LOGGER.logError(message) + return false + } + + balancesHolder.setWallets(wallets) + return true + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/lykke/matching/engine/services/CashInOutOperationService.kt b/src/main/kotlin/com/lykke/matching/engine/services/CashInOutOperationService.kt index 20dc5f7e8..bace34983 100644 --- a/src/main/kotlin/com/lykke/matching/engine/services/CashInOutOperationService.kt +++ b/src/main/kotlin/com/lykke/matching/engine/services/CashInOutOperationService.kt @@ -1,8 +1,12 @@ package com.lykke.matching.engine.services import com.lykke.matching.engine.balance.BalanceException +import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory import com.lykke.matching.engine.daos.context.CashInOutContext import com.lykke.matching.engine.daos.converters.CashInOutOperationConverter +import com.lykke.matching.engine.daos.fee.v2.Fee +import com.lykke.matching.engine.database.PersistenceManager +import com.lykke.matching.engine.database.common.entity.PersistenceData import com.lykke.matching.engine.fee.FeeException import com.lykke.matching.engine.fee.FeeProcessor import com.lykke.matching.engine.holders.BalancesHolder @@ -27,9 +31,11 @@ import java.util.* @Service class CashInOutOperationService(private val balancesHolder: BalancesHolder, private val feeProcessor: FeeProcessor, + private val walletOperationsProcessorFactory: WalletOperationsProcessorFactory, private val cashInOutOperationBusinessValidator: CashInOutOperationBusinessValidator, private val messageSequenceNumberHolder: MessageSequenceNumberHolder, - private val outgoingEventProcessor: OutgoingEventProcessor) : AbstractService { + private val outgoingEventProcessor: OutgoingEventProcessor, + private val persistenceManager: PersistenceManager) : AbstractService { override fun parseMessage(messageWrapper: MessageWrapper) { //do nothing } @@ -67,7 +73,7 @@ class CashInOutOperationService(private val balancesHolder: BalancesHolder, return } - val walletProcessor = balancesHolder.createWalletProcessor(LOGGER) + val walletProcessor = walletOperationsProcessorFactory.create(LOGGER) try { walletProcessor.preProcess(operations) } catch (e: BalanceException) { @@ -76,7 +82,11 @@ class CashInOutOperationService(private val balancesHolder: BalancesHolder, } val sequenceNumber = messageSequenceNumberHolder.getNewValue() - val updated = walletProcessor.persistBalances(cashInOutContext.processedMessage, null, null, sequenceNumber) + val updated = persistenceManager.persist(PersistenceData(walletProcessor.persistenceData(), + cashInOutContext.processedMessage, + null, + null, + sequenceNumber)) messageWrapper.triedToPersist = true messageWrapper.persisted = updated if (!updated) { @@ -91,7 +101,7 @@ class CashInOutOperationService(private val balancesHolder: BalancesHolder, sequenceNumber, now, cashInOutOperation.dateTime, - walletProcessor, + walletProcessor.getClientBalanceUpdates(), walletOperation, asset, fees)) diff --git a/src/main/kotlin/com/lykke/matching/engine/services/CashTransferOperationService.kt b/src/main/kotlin/com/lykke/matching/engine/services/CashTransferOperationService.kt index c21414145..05a5afeea 100644 --- a/src/main/kotlin/com/lykke/matching/engine/services/CashTransferOperationService.kt +++ b/src/main/kotlin/com/lykke/matching/engine/services/CashTransferOperationService.kt @@ -1,9 +1,12 @@ package com.lykke.matching.engine.services import com.lykke.matching.engine.balance.BalanceException +import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory import com.lykke.matching.engine.daos.TransferOperation import com.lykke.matching.engine.daos.WalletOperation import com.lykke.matching.engine.daos.context.CashTransferContext +import com.lykke.matching.engine.database.PersistenceManager +import com.lykke.matching.engine.database.common.entity.PersistenceData import com.lykke.matching.engine.exception.PersistenceException import com.lykke.matching.engine.fee.FeeException import com.lykke.matching.engine.fee.FeeProcessor @@ -31,11 +34,13 @@ import java.util.concurrent.BlockingQueue @Service class CashTransferOperationService(private val balancesHolder: BalancesHolder, + private val walletOperationsProcessorFactory: WalletOperationsProcessorFactory, private val dbTransferOperationQueue: BlockingQueue, private val feeProcessor: FeeProcessor, private val cashTransferOperationBusinessValidator: CashTransferOperationBusinessValidator, private val messageSequenceNumberHolder: MessageSequenceNumberHolder, - private val outgoingEventProcessor: OutgoingEventProcessor) : AbstractService { + private val outgoingEventProcessor: OutgoingEventProcessor, + private val persistenceManager: PersistenceManager) : AbstractService { override fun parseMessage(messageWrapper: MessageWrapper) { //do nothing } @@ -95,11 +100,15 @@ class CashTransferOperationService(private val balancesHolder: BalancesHolder, val fees = feeProcessor.processFee(operation.fees, receiptOperation, operations, balancesGetter = balancesHolder) - val walletProcessor = balancesHolder.createWalletProcessor(LOGGER) - .preProcess(operations, true) + val walletProcessor = walletOperationsProcessorFactory.create(LOGGER) + walletProcessor.preProcess(operations, true) val sequenceNumber = messageSequenceNumberHolder.getNewValue() - val updated = walletProcessor.persistBalances(cashTransferContext.processedMessage, null, null, sequenceNumber) + val updated = persistenceManager.persist(PersistenceData(walletProcessor.persistenceData(), + cashTransferContext.processedMessage, + null, + null, + sequenceNumber)) messageWrapper.triedToPersist = true messageWrapper.persisted = updated if (!updated) { @@ -107,7 +116,7 @@ class CashTransferOperationService(private val balancesHolder: BalancesHolder, } walletProcessor.apply() outgoingEventProcessor.submitCashTransferEvent(CashTransferEventData(cashTransferContext.messageId, - walletProcessor, + walletProcessor.getClientBalanceUpdates(), fees, operation, sequenceNumber, diff --git a/src/main/kotlin/com/lykke/matching/engine/services/ReservedCashInOutOperationService.kt b/src/main/kotlin/com/lykke/matching/engine/services/ReservedCashInOutOperationService.kt index 264f7c859..6accb57b2 100644 --- a/src/main/kotlin/com/lykke/matching/engine/services/ReservedCashInOutOperationService.kt +++ b/src/main/kotlin/com/lykke/matching/engine/services/ReservedCashInOutOperationService.kt @@ -2,8 +2,10 @@ package com.lykke.matching.engine.services import com.lykke.matching.engine.balance.BalanceException import com.lykke.matching.engine.daos.WalletOperation +import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory +import com.lykke.matching.engine.database.PersistenceManager +import com.lykke.matching.engine.database.common.entity.PersistenceData import com.lykke.matching.engine.holders.AssetsHolder -import com.lykke.matching.engine.holders.BalancesHolder import com.lykke.matching.engine.holders.MessageProcessingStatusHolder import com.lykke.matching.engine.holders.MessageSequenceNumberHolder import com.lykke.matching.engine.holders.UUIDHolder @@ -24,13 +26,14 @@ import java.math.BigDecimal import java.util.Date @Service -class ReservedCashInOutOperationService @Autowired constructor(private val assetsHolder: AssetsHolder, - private val balancesHolder: BalancesHolder, - private val reservedCashInOutOperationValidator: ReservedCashInOutOperationValidator, - private val messageProcessingStatusHolder: MessageProcessingStatusHolder, - private val uuidHolder: UUIDHolder, - private val messageSequenceNumberHolder: MessageSequenceNumberHolder, - private val outgoingEventProcessor: OutgoingEventProcessor) : AbstractService { +class ReservedCashInOutOperationService @Autowired constructor (private val assetsHolder: AssetsHolder, + private val walletOperationsProcessorFactory: WalletOperationsProcessorFactory, + private val reservedCashInOutOperationValidator: ReservedCashInOutOperationValidator, + private val messageProcessingStatusHolder: MessageProcessingStatusHolder, + private val persistenceManager: PersistenceManager, + private val messageSequenceNumberHolder: MessageSequenceNumberHolder, + private val uuidHolder: UUIDHolder, + private val outgoingEventProcessor: OutgoingEventProcessor) : AbstractService { companion object { private val LOGGER = LoggerFactory.getLogger(ReservedCashInOutOperationService::class.java.name) @@ -63,7 +66,9 @@ class ReservedCashInOutOperationService @Autowired constructor(private val asset return } - val walletProcessor = balancesHolder.createWalletProcessor(LOGGER) + val accuracy = asset.accuracy + + val walletProcessor = walletOperationsProcessorFactory.create(LOGGER) try { walletProcessor.preProcess(listOf(operation), allowTrustedClientReservedBalanceOperation = true) } catch (e: BalanceException) { @@ -73,7 +78,11 @@ class ReservedCashInOutOperationService @Autowired constructor(private val asset } val sequenceNumber = messageSequenceNumberHolder.getNewValue() - val updated = walletProcessor.persistBalances(messageWrapper.processedMessage, null, null, sequenceNumber) + val updated = persistenceManager.persist(PersistenceData(walletProcessor.persistenceData(), + messageWrapper.processedMessage, + null, + null, + sequenceNumber)) messageWrapper.triedToPersist = true messageWrapper.persisted = updated if (!updated) { diff --git a/src/main/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculator.kt b/src/main/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculator.kt index fe773ab90..6ed351d50 100644 --- a/src/main/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculator.kt +++ b/src/main/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculator.kt @@ -14,9 +14,10 @@ import com.lykke.matching.engine.holders.MessageSequenceNumberHolder import com.lykke.matching.engine.holders.OrdersDatabaseAccessorsHolder import com.lykke.matching.engine.holders.StopOrdersDatabaseAccessorsHolder import com.lykke.matching.engine.messages.MessageType -import com.lykke.matching.engine.outgoing.messages.BalanceUpdate import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate import com.lykke.matching.engine.outgoing.messages.v2.builders.EventFactory +import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender +import com.lykke.matching.engine.services.BalancesService import com.lykke.matching.engine.outgoing.messages.v2.events.ReservedBalanceUpdateEvent import com.lykke.matching.engine.services.MessageSender import com.lykke.matching.engine.utils.NumberUtils @@ -44,18 +45,15 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa private val applicationSettingsHolder: ApplicationSettingsHolder, @Value("#{Config.me.correctReservedVolumes}") private val correctReservedVolumes: Boolean, private val messageSequenceNumberHolder: MessageSequenceNumberHolder, - private val messageSender: MessageSender) : ApplicationRunner { + private val messageSender: MessageSender, + private val balancesService: BalancesService, + private val oldFormatBalancesSender: OldFormatBalancesSender) : ApplicationRunner { override fun run(args: ApplicationArguments?) { correctReservedVolumesIfNeed() } companion object { private val LOGGER = LoggerFactory.getLogger(ReservedVolumesRecalculator::class.java.name) - - fun teeLog(message: String) { - println(message) - LOGGER.info(message) - } } fun correctReservedVolumesIfNeed() { @@ -63,7 +61,7 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa return } - teeLog("Starting order books analyze") + LOGGER.info("Starting order books analyze") recalculate() } @@ -104,7 +102,7 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa balance.orderIds.add(order.externalId) } catch (e: Exception) { val errorMessage = "Unable to handle order (id: ${order.externalId}): ${e.message}" - teeLog(errorMessage) + LOGGER.info(errorMessage) } } } @@ -133,7 +131,7 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa if (!NumberUtils.equalsIgnoreScale(oldBalance, newBalance.volume)) { val correction = ReservedVolumeCorrection(id, assetBalance.asset, newBalance.orderIds.joinToString(","), oldBalance, newBalance.volume) corrections.add(correction) - teeLog("1 $id, ${assetBalance.asset} : Old $oldBalance New $newBalance") + LOGGER.info("1 $id, ${assetBalance.asset} : Old $oldBalance New $newBalance") wallet.setReservedBalance(assetBalance.asset, newBalance.volume) updatedWallets.add(wallet) val balanceUpdate = ClientBalanceUpdate(id, @@ -148,7 +146,7 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa val orderIds = newBalance?.orderIds?.joinToString(",") val correction = ReservedVolumeCorrection(id, assetBalance.asset, orderIds, oldBalance, newBalance?.volume ?: BigDecimal.ZERO) corrections.add(correction) - teeLog("2 $id, ${assetBalance.asset} : Old $oldBalance New ${newBalance ?: 0.0}") + LOGGER.info("2 $id, ${assetBalance.asset} : Old $oldBalance New ${newBalance ?: 0.0}") wallet.setReservedBalance(assetBalance.asset, BigDecimal.ZERO) updatedWallets.add(wallet) val balanceUpdate = ClientBalanceUpdate(id, @@ -183,14 +181,21 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa listOf(clientBalanceUpdate), walletOperation)) } + val balancesPersisted = balancesService.insertOrUpdateWallets(updatedWallets, sequenceNumber) + + if (!balancesPersisted) { + LOGGER.error("Can not persist balances during reserved balance recalculation, updated wallets size: ${updatedWallets.size}") + return + } - balancesHolder.insertOrUpdateWallets(updatedWallets, sequenceNumber) reservedVolumesDatabaseAccessor.addCorrectionsInfo(corrections) - balancesHolder.sendBalanceUpdate(BalanceUpdate(operationId, MessageType.LIMIT_ORDER.name, now, balanceUpdates, operationId)) + oldFormatBalancesSender.sendBalanceUpdate(id = operationId, + messageId = operationId, + clientBalanceUpdates = balanceUpdates, + type = MessageType.LIMIT_ORDER) reservedBalanceUpdateEvents.forEach { messageSender.sendMessage(it) } - } - teeLog("Reserved volume recalculation finished") + LOGGER.info("Reserved volume recalculation finished") } } \ No newline at end of file diff --git a/src/test/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorTest.kt b/src/test/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorTest.kt index e64bb2a30..27f304d49 100644 --- a/src/test/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorTest.kt +++ b/src/test/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorTest.kt @@ -7,8 +7,10 @@ import com.lykke.matching.engine.daos.WalletOperation import com.lykke.matching.engine.daos.setting.AvailableSettingGroup import com.lykke.matching.engine.database.BackOfficeDatabaseAccessor import com.lykke.matching.engine.database.TestBackOfficeDatabaseAccessor -import com.lykke.matching.engine.database.TestSettingsDatabaseAccessor +import com.lykke.matching.engine.database.common.entity.PersistenceData +import com.lykke.matching.engine.messages.MessageType import com.lykke.matching.engine.outgoing.messages.BalanceUpdate +import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender import org.junit.Test import org.junit.runner.RunWith import org.springframework.boot.test.context.SpringBootTest @@ -45,7 +47,10 @@ class WalletOperationsProcessorTest : AbstractTest() { } @Autowired - private lateinit var settingsDatabaseAccessor: TestSettingsDatabaseAccessor + private lateinit var walletOperationsProcessorFactory: WalletOperationsProcessorFactory + + @Autowired + private lateinit var oldFormatBalancesSender: OldFormatBalancesSender @Test fun testPreProcessWalletOperations() { @@ -53,7 +58,7 @@ class WalletOperationsProcessorTest : AbstractTest() { testBalanceHolderWrapper.updateReservedBalance("Client1", "BTC", 0.1) initServices() - val walletOperationsProcessor = balancesHolder.createWalletProcessor(null) + val walletOperationsProcessor = walletOperationsProcessorFactory.create(null) walletOperationsProcessor.preProcess( listOf( @@ -75,8 +80,11 @@ class WalletOperationsProcessorTest : AbstractTest() { ) ) } - assertTrue(walletOperationsProcessor.persistBalances(null, null, null, null)) - walletOperationsProcessor.apply().sendNotification("id", "type", "test") + assertTrue(persistenceManager.persist(PersistenceData( walletOperationsProcessor.persistenceData(), null, null, null, null))) + walletOperationsProcessor.apply() + + oldFormatBalancesSender.sendBalanceUpdate("id", MessageType.CASH_IN_OUT_OPERATION, "test", + walletOperationsProcessor.getClientBalanceUpdates()) assertBalance("Client1", "BTC", 0.5, 0.0) assertBalance("Client2", "ETH", 3.0, 0.3) @@ -86,7 +94,7 @@ class WalletOperationsProcessorTest : AbstractTest() { val balanceUpdate = balanceUpdateHandlerTest.balanceUpdateQueue.poll() as BalanceUpdate assertEquals(2, balanceUpdate.balances.size) assertEquals("id", balanceUpdate.id) - assertEquals("type", balanceUpdate.type) + assertEquals(MessageType.CASH_IN_OUT_OPERATION.name, balanceUpdate.type) val clientBalanceUpdate1 = balanceUpdate.balances.first { it.id == "Client1" } assertNotNull(clientBalanceUpdate1) @@ -109,15 +117,15 @@ class WalletOperationsProcessorTest : AbstractTest() { fun testForceProcessInvalidWalletOperations() { initServices() - val walletOperationsProcessor = balancesHolder.createWalletProcessor(null) + val walletOperationsProcessor = walletOperationsProcessorFactory.create(null) walletOperationsProcessor.preProcess( listOf( WalletOperation("Client1", "BTC", BigDecimal.ZERO, BigDecimal.valueOf(-0.1)) ), true) - assertTrue(walletOperationsProcessor.persistBalances(null, null, null, null)) - walletOperationsProcessor.apply().sendNotification("id", "type", "test") + assertTrue(persistenceManager.persist(PersistenceData(walletOperationsProcessor.persistenceData(), null, null, null, null))) + walletOperationsProcessor.apply() assertBalance("Client1", "BTC", 0.0, -0.1) } @@ -151,7 +159,7 @@ class WalletOperationsProcessorTest : AbstractTest() { testBalanceHolderWrapper.updateBalance("TrustedClient1", "BTC", 1.0) testBalanceHolderWrapper.updateBalance("TrustedClient2", "EUR", 1.0) - val walletOperationsProcessor = balancesHolder.createWalletProcessor(null) + val walletOperationsProcessor = walletOperationsProcessorFactory.create(null) walletOperationsProcessor.preProcess(listOf( WalletOperation("TrustedClient1", "BTC", BigDecimal.ZERO, BigDecimal.valueOf(0.1)), @@ -175,7 +183,7 @@ class WalletOperationsProcessorTest : AbstractTest() { @Test fun testNotChangedBalance() { - val walletOperationsProcessor = balancesHolder.createWalletProcessor(null) + val walletOperationsProcessor = walletOperationsProcessorFactory.create(null) walletOperationsProcessor.preProcess(listOf( WalletOperation("Client1", "BTC", BigDecimal.valueOf(0.1), BigDecimal.valueOf(0.1)), diff --git a/src/test/kotlin/com/lykke/matching/engine/balance/util/TestBalanceHolderWrapper.kt b/src/test/kotlin/com/lykke/matching/engine/balance/util/TestBalanceHolderWrapper.kt index 5891e8b93..fd28cfef9 100644 --- a/src/test/kotlin/com/lykke/matching/engine/balance/util/TestBalanceHolderWrapper.kt +++ b/src/test/kotlin/com/lykke/matching/engine/balance/util/TestBalanceHolderWrapper.kt @@ -1,20 +1,26 @@ package com.lykke.matching.engine.balance.util +import com.lykke.matching.engine.daos.wallet.Wallet import com.lykke.matching.engine.holders.BalancesHolder -import com.lykke.matching.engine.notification.BalanceUpdateHandlerTest +import com.lykke.matching.engine.services.BalancesService import org.springframework.beans.factory.annotation.Autowired import java.math.BigDecimal -class TestBalanceHolderWrapper @Autowired constructor (private val balanceUpdateHandlerTest: BalanceUpdateHandlerTest, - private val balancesHolder: BalancesHolder) { +class TestBalanceHolderWrapper @Autowired constructor(private val balancesService: BalancesService, + private val balancesHolder: BalancesHolder) { fun updateBalance(clientId: String, assetId: String, balance: Double) { - balancesHolder.updateBalance(null, null, clientId, assetId, BigDecimal.valueOf(balance)) - balanceUpdateHandlerTest.clear() + val wallet = balancesHolder.wallets[clientId] ?: Wallet(clientId) + wallet.setBalance(assetId, BigDecimal.valueOf(balance) ) + + balancesService.insertOrUpdateWallets(listOf(wallet), null) } - fun updateReservedBalance(clientId: String, assetId: String, reservedBalance: Double, skip: Boolean = false) { - balancesHolder.updateReservedBalance(null, null, clientId, assetId, BigDecimal.valueOf(reservedBalance), skip) - balanceUpdateHandlerTest.clear() + fun updateReservedBalance(clientId: String, assetId: String, reservedBalance: Double) { + val wallet = balancesHolder.wallets[clientId] ?: Wallet(clientId) + wallet.setReservedBalance(assetId, BigDecimal.valueOf(reservedBalance) ) + + balancesService.insertOrUpdateWallets(listOf(wallet), null) + } } \ No newline at end of file diff --git a/src/test/kotlin/com/lykke/matching/engine/config/TestApplicationContext.kt b/src/test/kotlin/com/lykke/matching/engine/config/TestApplicationContext.kt index 340d14ad0..7ddaf5f9b 100644 --- a/src/test/kotlin/com/lykke/matching/engine/config/TestApplicationContext.kt +++ b/src/test/kotlin/com/lykke/matching/engine/config/TestApplicationContext.kt @@ -1,5 +1,6 @@ package com.lykke.matching.engine.config +import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory import com.lykke.matching.engine.balance.util.TestBalanceHolderWrapper import com.lykke.matching.engine.config.spring.JsonConfig import com.lykke.matching.engine.config.spring.QueueConfig @@ -32,6 +33,7 @@ import com.lykke.matching.engine.order.process.PreviousLimitOrdersProcessor import com.lykke.matching.engine.order.process.StopOrderBookProcessor import com.lykke.matching.engine.order.process.common.LimitOrdersCancelExecutor import com.lykke.matching.engine.order.process.common.MatchingResultHandlingHelper +import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolderFactory import com.lykke.matching.engine.order.transaction.ExecutionContextFactory import com.lykke.matching.engine.order.utils.TestOrderBookWrapper import com.lykke.matching.engine.outgoing.messages.* @@ -40,6 +42,7 @@ import com.lykke.matching.engine.outgoing.messages.v2.events.ExecutionEvent import com.lykke.matching.engine.outgoing.senders.SpecializedEventSendersHolder import com.lykke.matching.engine.outgoing.senders.OutgoingEventProcessor import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender +import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender import com.lykke.matching.engine.outgoing.senders.impl.SpecializedEventSendersHolderImpl import com.lykke.matching.engine.services.CashInOutOperationService import com.lykke.matching.engine.services.CashTransferOperationService @@ -115,12 +118,9 @@ open class TestApplicationContext { @Bean open fun balanceHolder(balancesDatabaseAccessorsHolder: BalancesDatabaseAccessorsHolder, - persistenceManager: PersistenceManager, balanceUpdateQueue: BlockingQueue, - applicationSettingsHolder: ApplicationSettingsHolder, - backOfficeDatabaseAccessor: BackOfficeDatabaseAccessor): BalancesHolder { - return BalancesHolder(balancesDatabaseAccessorsHolder, persistenceManager, assetHolder(backOfficeDatabaseAccessor), - balanceUpdateQueue, applicationSettingsHolder) + applicationSettingsHolder: ApplicationSettingsHolder): BalancesHolder { + return BalancesHolder(balancesDatabaseAccessorsHolder) } @Bean @@ -149,14 +149,15 @@ open class TestApplicationContext { stopOrdersDatabaseAccessorsHolder: StopOrdersDatabaseAccessorsHolder, testReservedVolumesDatabaseAccessor: TestReservedVolumesDatabaseAccessor, assetHolder: AssetsHolder, assetsPairsHolder: AssetsPairsHolder, - balancesHolder: BalancesHolder, applicationSettingsHolder: ApplicationSettingsHolder, + balancesHolder: BalancesHolder, + balancesService: BalancesService, applicationSettingsHolder: ApplicationSettingsHolder, messageSequenceNumberHolder: MessageSequenceNumberHolder, - messageSender: MessageSender): ReservedVolumesRecalculator { - + messageSender: MessageSender, + oldFormatBalancesSender: OldFormatBalancesSender): ReservedVolumesRecalculator { return ReservedVolumesRecalculator(testOrderDatabaseAccessorHolder, stopOrdersDatabaseAccessorsHolder, testReservedVolumesDatabaseAccessor, assetHolder, assetsPairsHolder, balancesHolder, applicationSettingsHolder, - false, messageSequenceNumberHolder, messageSender) + false, messageSequenceNumberHolder, messageSender, balancesService, oldFormatBalancesSender) } @Bean @@ -206,9 +207,9 @@ open class TestApplicationContext { } @Bean - open fun testBalanceHolderWrapper(balanceUpdateHandlerTest: BalanceUpdateHandlerTest, - balancesHolder: BalancesHolder): TestBalanceHolderWrapper { - return TestBalanceHolderWrapper(balanceUpdateHandlerTest, balancesHolder) + open fun testBalanceHolderWrapper(balancesHolder: BalancesHolder, + balancesService: BalancesService): TestBalanceHolderWrapper { + return TestBalanceHolderWrapper(balancesService, balancesHolder) } @Bean @@ -268,15 +269,19 @@ open class TestApplicationContext { @Bean open fun cashInOutOperationService(balancesHolder: BalancesHolder, + walletOperationsProcessorFactory: WalletOperationsProcessorFactory, feeProcessor: FeeProcessor, cashInOutOperationBusinessValidator: CashInOutOperationBusinessValidator, messageSequenceNumberHolder: MessageSequenceNumberHolder, - outgoingEventProcessor: OutgoingEventProcessor): CashInOutOperationService { + outgoingEventProcessor: OutgoingEventProcessor, + persistenceManager: PersistenceManager): CashInOutOperationService { return CashInOutOperationService(balancesHolder, feeProcessor, + walletOperationsProcessorFactory, cashInOutOperationBusinessValidator, messageSequenceNumberHolder, - outgoingEventProcessor) + outgoingEventProcessor, + persistenceManager) } @Bean @@ -304,20 +309,18 @@ open class TestApplicationContext { } @Bean - open fun reservedCashInOutOperation(balancesHolder: BalancesHolder, + open fun reservedCashInOutOperation(walletOperationsProcessorFactory: WalletOperationsProcessorFactory, assetsHolder: AssetsHolder, reservedCashInOutOperationValidator: ReservedCashInOutOperationValidator, messageProcessingStatusHolder: MessageProcessingStatusHolder, + persistenceManager: PersistenceManager, uuidHolder: UUIDHolder, + oldFormatBalancesSender: OldFormatBalancesSender, messageSequenceNumberHolder: MessageSequenceNumberHolder, outgoingEventProcessor: OutgoingEventProcessor): ReservedCashInOutOperationService { - return ReservedCashInOutOperationService(assetsHolder, - balancesHolder, - reservedCashInOutOperationValidator, - messageProcessingStatusHolder, - uuidHolder, - messageSequenceNumberHolder, - outgoingEventProcessor) + return ReservedCashInOutOperationService(assetsHolder, walletOperationsProcessorFactory, + reservedCashInOutOperationValidator, messageProcessingStatusHolder, persistenceManager, messageSequenceNumberHolder, + uuidHolder, outgoingEventProcessor) } @Bean @@ -578,12 +581,14 @@ open class TestApplicationContext { @Bean open fun cashTransferOperationService(balancesHolder: BalancesHolder, + walletOperationsProcessorFactory: WalletOperationsProcessorFactory, dbTransferOperationQueue: BlockingQueue, feeProcessor: FeeProcessor, cashTransferOperationBusinessValidator: CashTransferOperationBusinessValidator, messageSequenceNumberHolder: MessageSequenceNumberHolder, - outgoingEventProcessor: OutgoingEventProcessor): CashTransferOperationService { - return CashTransferOperationService(balancesHolder, dbTransferOperationQueue, feeProcessor, - cashTransferOperationBusinessValidator, messageSequenceNumberHolder, outgoingEventProcessor) + outgoingEventProcessor: OutgoingEventProcessor, + persistenceManager: PersistenceManager): CashTransferOperationService { + return CashTransferOperationService(balancesHolder, walletOperationsProcessorFactory, dbTransferOperationQueue, feeProcessor, + cashTransferOperationBusinessValidator, messageSequenceNumberHolder, outgoingEventProcessor, persistenceManager) } @Bean @@ -715,4 +720,22 @@ open class TestApplicationContext { open fun specializedEventSendersHolder(specializedEventSenders: List>): SpecializedEventSendersHolder { return SpecializedEventSendersHolderImpl(specializedEventSenders) } + + @Bean + open fun currentTransactionBalancesHolderFactory(balancesHolder: BalancesHolder): CurrentTransactionBalancesHolderFactory { + return CurrentTransactionBalancesHolderFactory(balancesHolder) + } + + @Bean + open fun walletOperationsProcessorFactory(currentTransactionBalancesHolderFactory: CurrentTransactionBalancesHolderFactory, + applicationSettingsHolder: ApplicationSettingsHolder, + assetsHolder: AssetsHolder): WalletOperationsProcessorFactory { + return WalletOperationsProcessorFactory(currentTransactionBalancesHolderFactory, applicationSettingsHolder, assetsHolder) + } + + @Bean + open fun balancesService(balancesHolder: BalancesHolder, + persistenceManager: PersistenceManager): BalancesService { + return BalancesServiceImpl(balancesHolder, persistenceManager) + } } \ No newline at end of file diff --git a/src/test/kotlin/com/lykke/matching/engine/config/TestExecutionContext.kt b/src/test/kotlin/com/lykke/matching/engine/config/TestExecutionContext.kt index eb536db0f..6ef5528dd 100644 --- a/src/test/kotlin/com/lykke/matching/engine/config/TestExecutionContext.kt +++ b/src/test/kotlin/com/lykke/matching/engine/config/TestExecutionContext.kt @@ -1,5 +1,6 @@ package com.lykke.matching.engine.config +import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory import com.lykke.matching.engine.daos.ExecutionData import com.lykke.matching.engine.daos.LkkTrade import com.lykke.matching.engine.daos.OutgoingEventData @@ -8,7 +9,6 @@ import com.lykke.matching.engine.fee.FeeProcessor import com.lykke.matching.engine.holders.ApplicationSettingsHolder import com.lykke.matching.engine.holders.AssetsHolder import com.lykke.matching.engine.holders.AssetsPairsHolder -import com.lykke.matching.engine.holders.BalancesHolder import com.lykke.matching.engine.holders.MessageSequenceNumberHolder import com.lykke.matching.engine.holders.UUIDHolder import com.lykke.matching.engine.matching.MatchingEngine @@ -26,6 +26,7 @@ import com.lykke.matching.engine.order.process.common.LimitOrdersCancellerImpl import com.lykke.matching.engine.order.process.common.MatchingResultHandlingHelper import com.lykke.matching.engine.order.transaction.ExecutionContextFactory import com.lykke.matching.engine.order.transaction.ExecutionEventsSequenceNumbersGenerator +import com.lykke.matching.engine.outgoing.messages.BalanceUpdate import com.lykke.matching.engine.outgoing.messages.CashInOutEventData import com.lykke.matching.engine.outgoing.messages.CashOperation import com.lykke.matching.engine.outgoing.messages.CashTransferEventData @@ -38,6 +39,7 @@ import com.lykke.matching.engine.outgoing.messages.ReservedCashOperation import com.lykke.matching.engine.outgoing.senders.SpecializedEventSendersHolder import com.lykke.matching.engine.outgoing.senders.OutgoingEventProcessor import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender +import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender import com.lykke.matching.engine.outgoing.senders.impl.specialized.CashInOutEventSender import com.lykke.matching.engine.outgoing.senders.impl.specialized.CashInOutOldEventSender import com.lykke.matching.engine.outgoing.senders.impl.OutgoingEventProcessorImpl @@ -77,11 +79,11 @@ open class TestExecutionContext { } @Bean - open fun executionContextFactory(balancesHolder: BalancesHolder, + open fun executionContextFactory(walletOperationsProcessorFactory: WalletOperationsProcessorFactory, genericLimitOrderService: GenericLimitOrderService, genericStopLimitOrderService: GenericStopLimitOrderService, assetsHolder: AssetsHolder): ExecutionContextFactory { - return ExecutionContextFactory(balancesHolder, + return ExecutionContextFactory(walletOperationsProcessorFactory, genericLimitOrderService, genericStopLimitOrderService, assetsHolder) @@ -106,8 +108,9 @@ open class TestExecutionContext { } @Bean - open fun cashTransferOldSender(notificationQueue: BlockingQueue): SpecializedEventSender { - return CashTransferOldEventSender(notificationQueue) + open fun cashTransferOldSender(notificationQueue: BlockingQueue, + oldFormatBalancesSender: OldFormatBalancesSender): SpecializedEventSender { + return CashTransferOldEventSender(notificationQueue, oldFormatBalancesSender) } @Bean @@ -127,11 +130,13 @@ open class TestExecutionContext { @Bean open fun specializedOldExecutionEventSender(clientLimitOrdersQueue: BlockingQueue, trustedClientsLimitOrdersQueue: BlockingQueue, - rabbitSwapQueue: BlockingQueue): SpecializedEventSender { + rabbitSwapQueue: BlockingQueue, + oldFormatBalancesSender: OldFormatBalancesSender): SpecializedEventSender { return OldFormatExecutionEventSender( clientLimitOrdersQueue, trustedClientsLimitOrdersQueue, - rabbitSwapQueue) + rabbitSwapQueue, + oldFormatBalancesSender) } @Bean @@ -145,14 +150,16 @@ open class TestExecutionContext { } @Bean - open fun specializedReservedCashInOutOldEventSender(reservedCashOperationQueue: BlockingQueue) + open fun specializedReservedCashInOutOldEventSender(reservedCashOperationQueue: BlockingQueue, + oldFormatBalancesSender: OldFormatBalancesSender) : SpecializedEventSender { - return ReservedCashInOutOldEventSender(reservedCashOperationQueue) + return ReservedCashInOutOldEventSender(reservedCashOperationQueue, oldFormatBalancesSender) } @Bean - open fun specializedCashInOutOldEventSender(rabbitCashInOutQueue: BlockingQueue): SpecializedEventSender { - return CashInOutOldEventSender(rabbitCashInOutQueue) + open fun specializedCashInOutOldEventSender(rabbitCashInOutQueue: BlockingQueue, + balanceUpdateQueue: BlockingQueue): CashInOutOldEventSender { + return CashInOutOldEventSender(rabbitCashInOutQueue, balanceUpdateQueue) } @Bean diff --git a/src/test/kotlin/com/lykke/matching/engine/fee/FeeProcessorTest.kt b/src/test/kotlin/com/lykke/matching/engine/fee/FeeProcessorTest.kt index add382108..269b55c4e 100644 --- a/src/test/kotlin/com/lykke/matching/engine/fee/FeeProcessorTest.kt +++ b/src/test/kotlin/com/lykke/matching/engine/fee/FeeProcessorTest.kt @@ -1,6 +1,7 @@ package com.lykke.matching.engine.fee import com.lykke.matching.engine.balance.BalancesGetter +import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory import com.lykke.matching.engine.balance.util.TestBalanceHolderWrapper import com.lykke.matching.engine.config.TestApplicationContext import com.lykke.matching.engine.daos.Asset @@ -44,10 +45,10 @@ class FeeProcessorTest { private lateinit var feeProcessor: FeeProcessor @Autowired - lateinit var balancesHolder: BalancesHolder + lateinit var walletOperationsProcessorFactory: WalletOperationsProcessorFactory @Autowired - lateinit var testBalanceHolderWrapper: TestBalanceHolderWrapper + lateinit var testBalanceHolderWrapper: TestBalanceHolderWrapper @Autowired lateinit var testBackOfficeDatabaseAccessor: TestBackOfficeDatabaseAccessor @@ -59,9 +60,19 @@ class FeeProcessorTest { open fun testBackOfficeDatabaseAccessor(): TestBackOfficeDatabaseAccessor { val testBackOfficeDatabaseAccessor = TestBackOfficeDatabaseAccessor() testBackOfficeDatabaseAccessor.addAsset(Asset("USD", 2)) + testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 2)) + testBackOfficeDatabaseAccessor.addAsset(Asset("CHF", 4)) return testBackOfficeDatabaseAccessor } + + @Bean + @Primary + fun testDictionaryDatabaseAccessor(): TestDictionariesDatabaseAccessor { + val testDictionariesDatabaseAccessor = TestDictionariesDatabaseAccessor() + testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5)) + return testDictionariesDatabaseAccessor + } } @Before @@ -74,9 +85,6 @@ class FeeProcessorTest { fun testNoPercentageFee() { testBalanceHolderWrapper.updateBalance("Client2", "EUR", 10.0) - testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 2)) - testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5)) - val operations = LinkedList() operations.add(WalletOperation("Client1", "USD", BigDecimal.valueOf(-10.0))) operations.add(WalletOperation("Client2", "USD", BigDecimal.valueOf(10.0))) @@ -145,9 +153,6 @@ class FeeProcessorTest { fun testNoAbsoluteFee() { testBalanceHolderWrapper.updateBalance("Client2", "EUR", 0.09) - testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 2)) - testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5)) - val operations = LinkedList() operations.add(WalletOperation("Client1", "USD", BigDecimal.valueOf(-0.5))) operations.add(WalletOperation("Client2", "USD", BigDecimal.valueOf(0.5))) @@ -171,9 +176,6 @@ class FeeProcessorTest { @Test fun testAbsoluteFeeCashout() { - testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 2)) - testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5)) - val operations = LinkedList() operations.add(WalletOperation("Client1", "USD", BigDecimal.valueOf(-0.5))) val receiptOperation = operations[0] @@ -190,9 +192,6 @@ class FeeProcessorTest { @Test fun testPercentFeeCashout() { - testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 2)) - testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5)) - val operations = LinkedList() operations.add(WalletOperation("Client1", "USD", BigDecimal.valueOf(-0.5))) val receiptOperation = operations[0] @@ -209,19 +208,18 @@ class FeeProcessorTest { @Test fun testAnotherAssetFee() { - testBalanceHolderWrapper.updateBalance("Client2", "EUR", 0.6543) - testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 4)) + testBalanceHolderWrapper.updateBalance("Client2", "CHF", 0.6543) val operations = LinkedList() operations.add(WalletOperation("Client1", "USD", BigDecimal.valueOf(-0.5))) operations.add(WalletOperation("Client2", "USD", BigDecimal.valueOf(0.5))) val receiptOperation = operations[1] - val feeInstructions = buildFeeInstructions(type = FeeType.CLIENT_FEE, size = 0.6543, sizeType = FeeSizeType.ABSOLUTE, targetClientId = "Client3", assetIds = listOf("EUR")) + val feeInstructions = buildFeeInstructions(type = FeeType.CLIENT_FEE, size = 0.6543, sizeType = FeeSizeType.ABSOLUTE, targetClientId = "Client3", assetIds = listOf("CHF")) val fees = feeProcessor.processFee(feeInstructions, receiptOperation, operations, balancesGetter = createBalancesGetter()) assertEquals(1, fees.size) assertEquals(BigDecimal.valueOf(0.6543), fees.first().transfer!!.volume) - assertEquals("EUR", fees.first().transfer!!.asset) + assertEquals("CHF", fees.first().transfer!!.asset) assertEquals(4, operations.size) } @@ -603,7 +601,7 @@ class FeeProcessorTest { } private fun createBalancesGetter(): BalancesGetter { - return balancesHolder.createWalletProcessor(null) + return walletOperationsProcessorFactory.create(null) } } diff --git a/src/test/kotlin/com/lykke/matching/engine/performance/AbstractPerformanceTest.kt b/src/test/kotlin/com/lykke/matching/engine/performance/AbstractPerformanceTest.kt index 708920c92..7dacd9882 100644 --- a/src/test/kotlin/com/lykke/matching/engine/performance/AbstractPerformanceTest.kt +++ b/src/test/kotlin/com/lykke/matching/engine/performance/AbstractPerformanceTest.kt @@ -1,5 +1,6 @@ package com.lykke.matching.engine.performance +import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory import com.lykke.matching.engine.balance.util.TestBalanceHolderWrapper import com.lykke.matching.engine.daos.LkkTrade import com.lykke.matching.engine.daos.OutgoingEventData @@ -25,7 +26,6 @@ import com.lykke.matching.engine.holders.TestUUIDHolder import com.lykke.matching.engine.incoming.parsers.impl.LimitOrderCancelOperationContextParser import com.lykke.matching.engine.incoming.parsers.impl.LimitOrderMassCancelOperationContextParser import com.lykke.matching.engine.matching.MatchingEngine -import com.lykke.matching.engine.notification.BalanceUpdateHandlerTest import com.lykke.matching.engine.order.ExecutionDataApplyService import com.lykke.matching.engine.order.ExecutionPersistenceService import com.lykke.matching.engine.order.ExpiryOrdersQueue @@ -36,6 +36,7 @@ import com.lykke.matching.engine.order.process.StopLimitOrderProcessor import com.lykke.matching.engine.order.process.StopOrderBookProcessor import com.lykke.matching.engine.order.process.common.LimitOrdersCancellerImpl import com.lykke.matching.engine.order.process.common.MatchingResultHandlingHelper +import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolderFactory import com.lykke.matching.engine.order.transaction.ExecutionContextFactory import com.lykke.matching.engine.order.transaction.ExecutionEventsSequenceNumbersGenerator import com.lykke.matching.engine.outgoing.messages.BalanceUpdate @@ -80,6 +81,7 @@ abstract class AbstractPerformanceTest { protected lateinit var assetsHolder: AssetsHolder protected lateinit var balancesHolder: BalancesHolder + protected lateinit var balancesService: BalancesService protected lateinit var assetsPairsHolder: AssetsPairsHolder protected lateinit var assetCache: AssetsCache protected lateinit var balancesDatabaseAccessorsHolder: BalancesDatabaseAccessorsHolder @@ -129,6 +131,11 @@ abstract class AbstractPerformanceTest { val tradeInfoQueue = LinkedBlockingQueue() + val currentTransactionBalancesHolderFactory = CurrentTransactionBalancesHolderFactory(balancesHolder) + + val walletOperationsProcessorFactory = WalletOperationsProcessorFactory(currentTransactionBalancesHolderFactory, + applicationSettingsHolder, assetsHolder) + val outgoingEventData = LinkedBlockingQueue() val messageSender = MessageSender(rabbitEventsQueue, rabbitTrustedClientsEventsQueue) @@ -160,13 +167,10 @@ abstract class AbstractPerformanceTest { persistenceManager = TestPersistenceManager(balancesDatabaseAccessorsHolder.primaryAccessor, ordersDatabaseAccessorsHolder, stopOrdersDatabaseAccessorsHolder) - balancesHolder = BalancesHolder(balancesDatabaseAccessorsHolder, - persistenceManager, - assetsHolder, - balanceUpdateQueue, - applicationSettingsHolder) + balancesHolder = BalancesHolder(balancesDatabaseAccessorsHolder) + balancesService = BalancesServiceImpl(balancesHolder, persistenceManager) - testBalanceHolderWrapper = TestBalanceHolderWrapper(BalanceUpdateHandlerTest(balanceUpdateQueue), balancesHolder) + testBalanceHolderWrapper = TestBalanceHolderWrapper(balancesService, balancesHolder) assetPairsCache = AssetPairsCache(testDictionariesDatabaseAccessor, ApplicationEventPublisher {}) assetsPairsHolder = AssetsPairsHolder(assetPairsCache) @@ -207,7 +211,7 @@ abstract class AbstractPerformanceTest { executionPersistenceService, outgoingEventProcessor) - val executionContextFactory = ExecutionContextFactory(balancesHolder, + val executionContextFactory = ExecutionContextFactory(walletOperationsProcessorFactory, genericLimitOrderService, genericStopLimitOrderService, assetsHolder) diff --git a/src/test/kotlin/com/lykke/matching/engine/services/BalancesServiceTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/BalancesServiceTest.kt new file mode 100644 index 000000000..5100db635 --- /dev/null +++ b/src/test/kotlin/com/lykke/matching/engine/services/BalancesServiceTest.kt @@ -0,0 +1,44 @@ +package com.lykke.matching.engine.services + +import com.lykke.matching.engine.config.TestApplicationContext +import com.lykke.matching.engine.daos.wallet.AssetBalance +import com.lykke.matching.engine.daos.wallet.Wallet +import com.lykke.matching.engine.holders.BalancesDatabaseAccessorsHolder +import com.lykke.matching.engine.holders.BalancesHolder +import com.lykke.matching.engine.notification.BalanceUpdateHandlerTest +import com.lykke.matching.engine.outgoing.messages.BalanceUpdate +import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate +import com.lykke.matching.engine.utils.assertEquals +import org.junit.Test +import org.junit.runner.RunWith +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.junit4.SpringRunner +import java.math.BigDecimal +import java.util.* +import kotlin.test.assertEquals + +@RunWith(SpringRunner::class) +@SpringBootTest(classes = [(TestApplicationContext::class)]) +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) +class BalancesServiceTest { + + @Autowired + private lateinit var balancesService: BalancesService + + @Autowired + private lateinit var balancesHolder: BalancesHolder + + @Autowired + private lateinit var testBalancesDatabaseAccessorsHolder: BalancesDatabaseAccessorsHolder + + @Test + fun testInsertOrUpdateWallets() { + balancesService.insertOrUpdateWallets(listOf(Wallet("test", listOf(AssetBalance("test", "BTC", BigDecimal.valueOf(10))))), null) + + + assertEquals(BigDecimal.valueOf(10), balancesHolder.getBalance("test", "BTC")) + assertEquals(BigDecimal.valueOf(10), testBalancesDatabaseAccessorsHolder.primaryAccessor.loadWallets()["test"]?.balances!!["BTC"]?.balance) + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/lykke/matching/engine/services/CashInOutOperationServiceTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/CashInOutOperationServiceTest.kt index 1fc81a797..865272829 100644 --- a/src/test/kotlin/com/lykke/matching/engine/services/CashInOutOperationServiceTest.kt +++ b/src/test/kotlin/com/lykke/matching/engine/services/CashInOutOperationServiceTest.kt @@ -54,6 +54,9 @@ class CashInOutOperationServiceTest : AbstractTest() { @Autowired private lateinit var reservedCashInOutOperationService: ReservedCashInOutOperationService + @Autowired + private lateinit var balancesService: BalancesService + @Autowired private lateinit var testSettingDatabaseAccessor: TestSettingsDatabaseAccessor @@ -295,7 +298,7 @@ class CashInOutOperationServiceTest : AbstractTest() { @Test fun testRounding() { - balancesHolder.insertOrUpdateWallets(listOf(Wallet("Client1", listOf(AssetBalance("Client1","Asset1", BigDecimal.valueOf(29.99))))), null) + balancesService.insertOrUpdateWallets(listOf(Wallet("Client1", listOf(AssetBalance("Client1","Asset1", BigDecimal.valueOf(29.99))))), null) cashInOutOperationService.processMessage(messageBuilder.buildCashInOutWrapper("Client1", "Asset1", -0.01)) val balance = testWalletDatabaseAccessor.getBalance("Client1", "Asset1") diff --git a/src/test/kotlin/com/lykke/matching/engine/services/InvalidBalanceTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/InvalidBalanceTest.kt index e127745f5..bcb2acf04 100644 --- a/src/test/kotlin/com/lykke/matching/engine/services/InvalidBalanceTest.kt +++ b/src/test/kotlin/com/lykke/matching/engine/services/InvalidBalanceTest.kt @@ -224,10 +224,11 @@ class InvalidBalanceTest : AbstractTest() { IncomingLimitOrder(-0.05, 1010.0, "2"), IncomingLimitOrder(-0.1, 1100.0, "3") ))) - testBalanceHolderWrapper.updateReservedBalance("Client1", "ETH", reservedBalance = 0.09) + testSettingDatabaseAccessor.clear() applicationSettingsCache.update() applicationSettingsHolder.update() + testBalanceHolderWrapper.updateReservedBalance("Client1", "ETH", 0.09) clearMessageQueues() singleLimitOrderService.processMessage(messageBuilder.buildLimitOrderWrapper(buildLimitOrder(uid = "4", clientId = "Client2", assetId = "ETHUSD", volume = 0.25, price = 1100.0))) diff --git a/src/test/kotlin/com/lykke/matching/engine/services/MarketOrderServiceTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/MarketOrderServiceTest.kt index cf29e019e..91b98294d 100644 --- a/src/test/kotlin/com/lykke/matching/engine/services/MarketOrderServiceTest.kt +++ b/src/test/kotlin/com/lykke/matching/engine/services/MarketOrderServiceTest.kt @@ -360,8 +360,7 @@ class MarketOrderServiceTest : AbstractTest() { assertEquals(OrderRejectReason.NOT_ENOUGH_FUNDS, eventMarketOrder.rejectReason) assertEquals(0, eventMarketOrder.trades?.size) - balancesHolder.updateBalance( - ProcessedMessage(MessageType.CASH_IN_OUT_OPERATION.type, System.currentTimeMillis(), "test"), 0, "Client4", "EUR", BigDecimal.valueOf(1000.0)) + testBalanceHolderWrapper.updateBalance("Client4", "EUR", 1000.0) marketOrderService.processMessage(buildMarketOrderWrapper(buildMarketOrder(clientId = "Client4", assetId = "EURUSD", volume = -1000.0))) assertEquals(1, rabbitSwapListener.getCount()) diff --git a/src/test/kotlin/com/lykke/matching/engine/services/MultiLimitOrderServiceTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/MultiLimitOrderServiceTest.kt index 6782f05b7..ad57d783a 100644 --- a/src/test/kotlin/com/lykke/matching/engine/services/MultiLimitOrderServiceTest.kt +++ b/src/test/kotlin/com/lykke/matching/engine/services/MultiLimitOrderServiceTest.kt @@ -28,7 +28,6 @@ import org.junit.runner.RunWith import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.boot.test.context.TestConfiguration -import org.springframework.context.ApplicationEventPublisher import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Primary import org.springframework.test.annotation.DirtiesContext diff --git a/src/test/kotlin/com/lykke/matching/engine/services/RoundingTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/RoundingTest.kt index a6c41197b..2f9718f9c 100644 --- a/src/test/kotlin/com/lykke/matching/engine/services/RoundingTest.kt +++ b/src/test/kotlin/com/lykke/matching/engine/services/RoundingTest.kt @@ -5,6 +5,7 @@ import com.lykke.matching.engine.config.TestApplicationContext import com.lykke.matching.engine.daos.Asset import com.lykke.matching.engine.daos.AssetPair import com.lykke.matching.engine.database.TestBackOfficeDatabaseAccessor +import com.lykke.matching.engine.database.cache.AssetsCache import com.lykke.matching.engine.order.OrderStatus import com.lykke.matching.engine.outgoing.messages.MarketOrderWithTrades import com.lykke.matching.engine.utils.MessageBuilder.Companion.buildLimitOrder @@ -22,14 +23,13 @@ import org.springframework.test.context.junit4.SpringRunner import java.math.BigDecimal import kotlin.test.assertNotNull import com.lykke.matching.engine.utils.assertEquals +import org.springframework.beans.factory.annotation.Autowired import kotlin.test.assertEquals - - @RunWith(SpringRunner::class) @SpringBootTest(classes = [(TestApplicationContext::class), (RoundingTest.Config::class)]) @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) -class RoundingTest: AbstractTest() { +class RoundingTest : AbstractTest() { @TestConfiguration open class Config { @@ -49,6 +49,9 @@ class RoundingTest: AbstractTest() { } } + @Autowired + private lateinit var assetsCache: AssetsCache + @Before fun setUp() { testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5)) @@ -110,7 +113,7 @@ class RoundingTest: AbstractTest() { assertEquals("USD", marketOrderReport.trades.first().limitAsset) assertEquals("Client3", marketOrderReport.trades.first().limitClientId) - assertEquals(BigDecimal.valueOf( 1.0), testWalletDatabaseAccessor.getBalance("Client3", "EUR")) + assertEquals(BigDecimal.valueOf(1.0), testWalletDatabaseAccessor.getBalance("Client3", "EUR")) assertEquals(BigDecimal.valueOf(998.89), testWalletDatabaseAccessor.getBalance("Client3", "USD")) assertEquals(BigDecimal.valueOf(1499.0), testWalletDatabaseAccessor.getBalance("Client4", "EUR")) assertEquals(BigDecimal.valueOf(1.11), testWalletDatabaseAccessor.getBalance("Client4", "USD")) @@ -179,7 +182,7 @@ class RoundingTest: AbstractTest() { testBalanceHolderWrapper.updateBalance("Client4", "CHF", 1.0) initServices() - marketOrderService.processMessage(buildMarketOrderWrapper(buildMarketOrder(clientId = "Client4", assetId = "BTCCHF", volume = -0.38, straight = false))) + marketOrderService.processMessage(buildMarketOrderWrapper(buildMarketOrder(clientId = "Client4", assetId = "BTCCHF", volume = -0.38, straight = false))) assertEquals(1, rabbitSwapListener.getCount()) val marketOrderReport = rabbitSwapListener.getQueue().poll() as MarketOrderWithTrades diff --git a/src/test/kotlin/com/lykke/matching/engine/services/StopLimitOrderTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/StopLimitOrderTest.kt index ebaecfc79..f7cb5cafb 100644 --- a/src/test/kotlin/com/lykke/matching/engine/services/StopLimitOrderTest.kt +++ b/src/test/kotlin/com/lykke/matching/engine/services/StopLimitOrderTest.kt @@ -59,6 +59,7 @@ class StopLimitOrderTest : AbstractTest() { testBackOfficeDatabaseAccessor.addAsset(Asset("USD", 2)) testBackOfficeDatabaseAccessor.addAsset(Asset("BTC", 8)) + testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 8)) return testBackOfficeDatabaseAccessor } diff --git a/src/test/kotlin/com/lykke/matching/engine/services/validator/business/CashTransferOperationBusinessValidatorTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/validator/business/CashTransferOperationBusinessValidatorTest.kt index 70397210e..c6533415f 100644 --- a/src/test/kotlin/com/lykke/matching/engine/services/validator/business/CashTransferOperationBusinessValidatorTest.kt +++ b/src/test/kotlin/com/lykke/matching/engine/services/validator/business/CashTransferOperationBusinessValidatorTest.kt @@ -1,5 +1,6 @@ package com.lykke.matching.engine.services.validator.business +import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory import com.lykke.matching.engine.balance.util.TestBalanceHolderWrapper import com.lykke.matching.engine.config.TestApplicationContext import com.lykke.matching.engine.daos.Asset @@ -11,7 +12,7 @@ import com.lykke.matching.engine.incoming.parsers.impl.CashTransferContextParser import com.lykke.matching.engine.messages.MessageType import com.lykke.matching.engine.messages.MessageWrapper import com.lykke.matching.engine.messages.ProtocolMessages -import com.lykke.matching.engine.notification.BalanceUpdateHandlerTest +import com.lykke.matching.engine.services.BalancesService import com.lykke.matching.engine.services.validators.business.CashTransferOperationBusinessValidator import com.lykke.matching.engine.services.validators.impl.ValidationException import org.junit.Test @@ -54,8 +55,9 @@ class CashTransferOperationBusinessValidatorTest { @Bean @Primary - open fun testBalanceHolderWrapper(balanceUpdateHandlerTest: BalanceUpdateHandlerTest, balancesHolder: BalancesHolder): TestBalanceHolderWrapper { - val testBalanceHolderWrapper = TestBalanceHolderWrapper(balanceUpdateHandlerTest, balancesHolder) + open fun testBalanceHolderWrapper(balancesService: BalancesService, + balancesHolder: BalancesHolder): TestBalanceHolderWrapper { + val testBalanceHolderWrapper = TestBalanceHolderWrapper(balancesService, balancesHolder) testBalanceHolderWrapper.updateBalance(CLIENT_NAME1, ASSET_ID, 100.0) testBalanceHolderWrapper.updateReservedBalance(CLIENT_NAME1, ASSET_ID, 50.0) return testBalanceHolderWrapper diff --git a/src/test/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculatorTest.kt b/src/test/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculatorTest.kt index 97bdd4517..d4ddf0abc 100644 --- a/src/test/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculatorTest.kt +++ b/src/test/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculatorTest.kt @@ -7,6 +7,7 @@ import com.lykke.matching.engine.daos.AssetPair import com.lykke.matching.engine.daos.order.LimitOrderType import com.lykke.matching.engine.daos.setting.AvailableSettingGroup import com.lykke.matching.engine.database.* +import com.lykke.matching.engine.database.cache.ApplicationSettingsCache import com.lykke.matching.engine.holders.BalancesDatabaseAccessorsHolder import com.lykke.matching.engine.notification.BalanceUpdateHandlerTest import com.lykke.matching.engine.order.utils.TestOrderBookWrapper @@ -29,7 +30,6 @@ import org.springframework.test.context.junit4.SpringRunner import java.math.BigDecimal import kotlin.test.assertEquals import com.lykke.matching.engine.utils.assertEquals -import com.lykke.matching.engine.utils.getSetting import java.util.concurrent.BlockingQueue @RunWith(SpringRunner::class) @@ -60,15 +60,6 @@ class ReservedVolumesRecalculatorTest { return testDictionariesDatabaseAccessor } - - @Bean - @Primary - open fun testConfig(): TestSettingsDatabaseAccessor { - val testSettingsDatabaseAccessor = TestSettingsDatabaseAccessor() - testSettingsDatabaseAccessor.createOrUpdateSetting(AvailableSettingGroup.TRUSTED_CLIENTS, getSetting("trustedClient")) - testSettingsDatabaseAccessor.createOrUpdateSetting(AvailableSettingGroup.TRUSTED_CLIENTS, getSetting("trustedClient2")) - return testSettingsDatabaseAccessor - } } @Autowired @@ -96,6 +87,9 @@ class ReservedVolumesRecalculatorTest { @Autowired lateinit var balanceUpdateHandlerTest: BalanceUpdateHandlerTest + @Autowired + lateinit var applicationSettingsCache: ApplicationSettingsCache + @Before fun setUp() { testOrderBookWrapper.addLimitOrder(buildLimitOrder(clientId = "trustedClient", assetId = "BTCUSD", price = 10000.0, volume = -1.0, reservedVolume = 0.5)) @@ -108,10 +102,10 @@ class ReservedVolumesRecalculatorTest { testOrderBookWrapper.addStopLimitOrder(buildLimitOrder(uid = "4", clientId = "Client2", assetId = "BTCUSD", type = LimitOrderType.STOP_LIMIT, volume = 0.1, lowerLimitPrice = 10000.0, lowerPrice = 10900.0)) testBalanceHolderWrapper.updateBalance("trustedClient", "BTC", 10.0) - testBalanceHolderWrapper.updateReservedBalance("trustedClient", "BTC", 2.0, false) + testBalanceHolderWrapper.updateReservedBalance("trustedClient", "BTC", 2.0) // negative reserved balance testBalanceHolderWrapper.updateBalance("trustedClient2", "BTC", 1.0) - testBalanceHolderWrapper.updateReservedBalance("trustedClient2", "BTC", -0.001, false) + testBalanceHolderWrapper.updateReservedBalance("trustedClient2", "BTC", -0.001) testBalanceHolderWrapper.updateBalance("Client3", "BTC", 0.0) testBalanceHolderWrapper.updateReservedBalance("Client3", "BTC", -0.001) @@ -135,6 +129,9 @@ class ReservedVolumesRecalculatorTest { testBalanceHolderWrapper.updateBalance("Client2", "USD", 990.0) testBalanceHolderWrapper.updateReservedBalance("Client2", "USD", 1.0) + + applicationSettingsCache.createOrUpdateSettingValue(AvailableSettingGroup.TRUSTED_CLIENTS, "trustedClient", "trustedClient", true) + applicationSettingsCache.createOrUpdateSettingValue(AvailableSettingGroup.TRUSTED_CLIENTS, "trustedClient2", "trustedClient2", true) } @Test