diff --git a/src/dist/cfg/log4j2.xml b/src/dist/cfg/log4j2.xml
index 226fec133..ee832ae31 100644
--- a/src/dist/cfg/log4j2.xml
+++ b/src/dist/cfg/log4j2.xml
@@ -344,6 +344,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -541,6 +559,16 @@
+
+
+
+
+
+
+
+
# Errors logger
diff --git a/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessor.kt b/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessor.kt
index 5b0c3b436..6fb6d3cfc 100644
--- a/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessor.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessor.kt
@@ -3,15 +3,9 @@ package com.lykke.matching.engine.balance
import com.lykke.matching.engine.daos.WalletOperation
import com.lykke.matching.engine.daos.wallet.AssetBalance
import com.lykke.matching.engine.daos.wallet.Wallet
-import com.lykke.matching.engine.database.PersistenceManager
import com.lykke.matching.engine.database.common.entity.BalancesData
-import com.lykke.matching.engine.database.common.entity.OrderBooksPersistenceData
-import com.lykke.matching.engine.database.common.entity.PersistenceData
-import com.lykke.matching.engine.deduplication.ProcessedMessage
import com.lykke.matching.engine.holders.ApplicationSettingsHolder
import com.lykke.matching.engine.holders.AssetsHolder
-import com.lykke.matching.engine.holders.BalancesHolder
-import com.lykke.matching.engine.outgoing.messages.BalanceUpdate
import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate
import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolder
import com.lykke.matching.engine.order.transaction.WalletAssetBalance
@@ -20,21 +14,18 @@ import com.lykke.utils.logging.MetricsLogger
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.math.BigDecimal
-import java.util.Date
-class WalletOperationsProcessor(private val balancesHolder: BalancesHolder,
- private val currentTransactionBalancesHolder: CurrentTransactionBalancesHolder,
+class WalletOperationsProcessor(private val currentTransactionBalancesHolder: CurrentTransactionBalancesHolder,
private val applicationSettingsHolder: ApplicationSettingsHolder,
- private val persistenceManager: PersistenceManager,
private val assetsHolder: AssetsHolder,
- private val logger: Logger?): BalancesGetter {
+ private val logger: Logger?) : BalancesGetter {
companion object {
private val LOGGER = LoggerFactory.getLogger(WalletOperationsProcessor::class.java.name)
private val METRICS_LOGGER = MetricsLogger.getLogger()
}
- private val clientBalanceUpdatesByClientIdAndAssetId = HashMap()
+ private val clientBalanceUpdatesByClientIdAndAssetId = HashMap()
fun preProcess(operations: Collection,
allowInvalidBalance: Boolean = false,
@@ -60,6 +51,14 @@ class WalletOperationsProcessor(private val balancesHolder: BalancesHolder,
changedAssetBalance.reserved
}
+ validateChangedAssetBalances(changedAssetBalances, allowInvalidBalance)
+
+
+ changedAssetBalances.forEach { processChangedAssetBalance(it.value) }
+ return this
+ }
+
+ private fun validateChangedAssetBalances(changedAssetBalances: HashMap, allowInvalidBalance: Boolean) {
try {
changedAssetBalances.values.forEach { validateBalanceChange(it) }
} catch (e: BalanceException) {
@@ -70,9 +69,6 @@ class WalletOperationsProcessor(private val balancesHolder: BalancesHolder,
(logger ?: LOGGER).error(message)
METRICS_LOGGER.logError(message, e)
}
-
- changedAssetBalances.forEach { processChangedAssetBalance(it.value) }
- return this
}
private fun processChangedAssetBalance(changedAssetBalance: ChangedAssetBalance) {
@@ -114,23 +110,6 @@ class WalletOperationsProcessor(private val balancesHolder: BalancesHolder,
return currentTransactionBalancesHolder.persistenceData()
}
- fun persistBalances(processedMessage: ProcessedMessage?,
- orderBooksData: OrderBooksPersistenceData?,
- stopOrderBooksData: OrderBooksPersistenceData?,
- messageSequenceNumber: Long?): Boolean {
- return persistenceManager.persist(PersistenceData(persistenceData(),
- processedMessage,
- orderBooksData,
- stopOrderBooksData,
- messageSequenceNumber))
- }
-
- fun sendNotification(id: String, type: String, messageId: String) {
- if (clientBalanceUpdatesByClientIdAndAssetId.isNotEmpty()) {
- balancesHolder.sendBalanceUpdate(BalanceUpdate(id, type, Date(), clientBalanceUpdatesByClientIdAndAssetId.values.toList(), messageId))
- }
- }
-
fun getClientBalanceUpdates(): List {
return clientBalanceUpdatesByClientIdAndAssetId.values.toList()
}
diff --git a/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorFactory.kt b/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorFactory.kt
new file mode 100644
index 000000000..a1f6802d4
--- /dev/null
+++ b/src/main/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorFactory.kt
@@ -0,0 +1,21 @@
+package com.lykke.matching.engine.balance
+
+import com.lykke.matching.engine.holders.ApplicationSettingsHolder
+import com.lykke.matching.engine.holders.AssetsHolder
+import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolderFactory
+import org.slf4j.Logger
+import org.springframework.stereotype.Component
+
+@Component
+class WalletOperationsProcessorFactory(private val currentTransactionBalancesHolderFactory: CurrentTransactionBalancesHolderFactory,
+ private val applicationSettingsHolder: ApplicationSettingsHolder,
+ private val assetsHolder: AssetsHolder) {
+ fun create(logger: Logger?): WalletOperationsProcessor {
+
+ return WalletOperationsProcessor(
+ currentTransactionBalancesHolderFactory.create(),
+ applicationSettingsHolder,
+ assetsHolder,
+ logger)
+ }
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/AccountsMigrationService.kt b/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/AccountsMigrationService.kt
index 8b3e1afb2..b91f3cc24 100644
--- a/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/AccountsMigrationService.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/AccountsMigrationService.kt
@@ -5,7 +5,7 @@ import com.lykke.matching.engine.database.Storage
import com.lykke.matching.engine.database.azure.AzureWalletDatabaseAccessor
import com.lykke.matching.engine.database.redis.accessor.impl.RedisWalletDatabaseAccessor
import com.lykke.matching.engine.exception.MatchingEngineException
-import com.lykke.matching.engine.holders.BalancesHolder
+import com.lykke.matching.engine.services.BalancesService
import com.lykke.matching.engine.utils.config.Config
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
@@ -17,9 +17,9 @@ import java.util.*
@Component
@Order(1)
-class AccountsMigrationService @Autowired constructor (private val balancesHolder: BalancesHolder,
- private val config: Config,
- private val redisWalletDatabaseAccessor: Optional): ApplicationRunner {
+class AccountsMigrationService @Autowired constructor (private val config: Config,
+ private val redisWalletDatabaseAccessor: Optional,
+ private val balancesService: BalancesService): ApplicationRunner {
override fun run(args: ApplicationArguments?) {
if (config.me.walletsMigration) {
migrateAccounts()
@@ -55,38 +55,41 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde
}
val startTime = Date().time
- teeLog("Starting wallets migration from azure to redis; azure table: $azureAccountsTableName, redis: ${config.me.redis.host}.${config.me.redis.port}")
+ LOGGER.info("Starting wallets migration from azure to redis; azure table: $azureAccountsTableName, redis: ${config.me.redis.host}.${config.me.redis.port}")
val wallets = azureDatabaseAccessor.loadWallets()
val loadTime = Date().time
- teeLog("Loaded ${wallets.size} wallets from azure (ms: ${loadTime - startTime})")
- balancesHolder.insertOrUpdateWallets(wallets.values.toList(), null)
+ LOGGER.info("Loaded ${wallets.size} wallets from azure (ms: ${loadTime - startTime})")
+ val balancesSaved = balancesService.insertOrUpdateWallets(wallets.values.toList(), null)
+ if (!balancesSaved) {
+ LOGGER.error("Can not save balances data during migration from azure db to redis")
+ return
+ }
val saveTime = Date().time
- teeLog("Saved ${wallets.size} wallets to redis (ms: ${saveTime - loadTime})")
+ LOGGER.info("Saved ${wallets.size} wallets to redis (ms: ${saveTime - loadTime})")
compare()
}
fun fromRedisToDb() {
val startTime = Date().time
- teeLog("Starting wallets migration from redis to azure; redis: ${config.me.redis.host}.${config.me.redis.port}, azure table: $azureAccountsTableName")
+ LOGGER.info("Starting wallets migration from redis to azure; redis: ${config.me.redis.host}.${config.me.redis.port}, azure table: $azureAccountsTableName")
val loadTime = Date().time
val wallets = redisWalletDatabaseAccessor.get().loadWallets()
if (wallets.isEmpty()) {
throw AccountsMigrationException("There are no wallets in redis ${config.me.redis.host}.${config.me.redis.port}")
}
- teeLog("Loaded ${wallets.size} wallets from redis (ms: ${loadTime - startTime})")
- balancesHolder.insertOrUpdateWallets(wallets.values.toList(), null)
+ LOGGER.info("Loaded ${wallets.size} wallets from redis (ms: ${loadTime - startTime})")
+ val balancesSaved = balancesService.insertOrUpdateWallets(wallets.values.toList(), null)
+ if (!balancesSaved) {
+ LOGGER.error("Can not save balances data during migration from redis to azure db")
+ return
+ }
val saveTime = Date().time
- teeLog("Saved ${wallets.size} wallets to azure (ms: ${saveTime - loadTime})")
+ LOGGER.info("Saved ${wallets.size} wallets to azure (ms: ${saveTime - loadTime})")
compare()
}
- private fun teeLog(message: String) {
- println(message)
- LOGGER.info(message)
- }
-
/** Compares balances stored in redis & azure; logs comparison result */
private fun compare() {
val azureWallets = azureDatabaseAccessor.loadWallets().filter { it.value.balances.isNotEmpty() }
@@ -98,8 +101,8 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde
val differentWallets = LinkedList()
- teeLog("Comparison result. Differences: ")
- teeLog("---------------------------------------------------------------------------------------------")
+ LOGGER.info("Comparison result. Differences: ")
+ LOGGER.info("---------------------------------------------------------------------------------------------")
commonClients.forEach {
val azureWallet = azureWallets[it]
val redisWallet = redisWallets[it]
@@ -107,19 +110,19 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde
differentWallets.add(it)
}
}
- teeLog("---------------------------------------------------------------------------------------------")
-
- teeLog("Total: ")
- teeLog("azure clients count: ${azureWallets.size}")
- teeLog("redis clients count: ${redisWallets.size}")
- teeLog("only azure clients (count: ${onlyAzureClients.size}): $onlyAzureClients")
- teeLog("only redis clients (count: ${onlyRedisClients.size}): $onlyRedisClients")
- teeLog("clients with different wallets (count: ${differentWallets.size}): $differentWallets")
+ LOGGER.info("---------------------------------------------------------------------------------------------")
+
+ LOGGER.info("Total: ")
+ LOGGER.info("azure clients count: ${azureWallets.size}")
+ LOGGER.info("redis clients count: ${redisWallets.size}")
+ LOGGER.info("only azure clients (count: ${onlyAzureClients.size}): $onlyAzureClients")
+ LOGGER.info("only redis clients (count: ${onlyRedisClients.size}): $onlyRedisClients")
+ LOGGER.info("clients with different wallets (count: ${differentWallets.size}): $differentWallets")
}
private fun compareBalances(azureWallet: Wallet, redisWallet: Wallet): Boolean {
if (azureWallet.clientId != redisWallet.clientId) {
- teeLog("different clients: ${azureWallet.clientId} & ${redisWallet.clientId}")
+ LOGGER.info("different clients: ${azureWallet.clientId} & ${redisWallet.clientId}")
return false
}
val clientId = azureWallet.clientId
@@ -130,7 +133,7 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde
val onlyRedisAssets = redisBalances.keys.filterNot { azureBalances.keys.contains(it) }
if (onlyAzureAssets.isNotEmpty() || onlyRedisAssets.isNotEmpty()) {
- teeLog("different asset sets: $onlyAzureAssets & $onlyRedisAssets, client: $clientId")
+ LOGGER.info("different asset sets: $onlyAzureAssets & $onlyRedisAssets, client: $clientId")
return false
}
@@ -139,11 +142,11 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde
val azureBalance = azureBalances[it]
val redisBalance = redisBalances[it]
if (azureBalance!!.balance != redisBalance!!.balance) {
- teeLog("different balances: ${azureBalance.balance} & ${redisBalance.balance}, client: $clientId")
+ LOGGER.info("different balances: ${azureBalance.balance} & ${redisBalance.balance}, client: $clientId")
return false
}
if (azureBalance.reserved != redisBalance.reserved) {
- teeLog("different reserved balances: ${azureBalance.reserved} & ${redisBalance.reserved}, client: $clientId")
+ LOGGER.info("different reserved balances: ${azureBalance.reserved} & ${redisBalance.reserved}, client: $clientId")
return false
}
}
diff --git a/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/OrdersMigrationService.kt b/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/OrdersMigrationService.kt
index 3dba034ea..9f14af98f 100644
--- a/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/OrdersMigrationService.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/database/reconciliation/OrdersMigrationService.kt
@@ -45,7 +45,7 @@ class OrdersMigrationService(private val config: Config,
return
}
if (config.me.storage != Storage.Redis) {
- teeLog("Do not perform migration to files")
+ LOGGER.info("Do not perform migration to files")
return
}
fromFilesToRedis()
@@ -59,12 +59,12 @@ class OrdersMigrationService(private val config: Config,
throw Exception("Stop orders already exist in redis ${config.me.redis.host}.${config.me.redis.port}/${config.me.redis.ordersDatabase}")
}
val startTime = Date().time
- teeLog("Starting orders migration from files to redis; files dirs: ${config.me.orderBookPath}, ${config.me.stopOrderBookPath}" +
+ LOGGER.info("Starting orders migration from files to redis; files dirs: ${config.me.orderBookPath}, ${config.me.stopOrderBookPath}" +
", redis: ${config.me.redis.host}.${config.me.redis.port}/${config.me.redis.ordersDatabase}")
val orders = fileOrderBookDatabaseAccessor.loadLimitOrders()
val stopOrders = fileStopOrderBookDatabaseAccessor.loadStopLimitOrders()
val loadTime = Date().time
- teeLog("Loaded ${orders.size} orders from files (ms: ${loadTime - startTime})")
+ LOGGER.info("Loaded ${orders.size} orders from files (ms: ${loadTime - startTime})")
persistenceManager.persist(PersistenceData(null,
null,
OrderBooksPersistenceData(mapOrdersToOrderBookPersistenceDataList(orders, LOGGER),
@@ -77,11 +77,6 @@ class OrdersMigrationService(private val config: Config,
genericLimitOrderService.update()
genericStopLimitOrderService.update()
val saveTime = Date().time
- teeLog("Saved ${orders.size} orders and ${stopOrders.size} stop orders to redis (ms: ${saveTime - loadTime})")
- }
-
- private fun teeLog(message: String) {
- println(message)
- LOGGER.info(message)
+ LOGGER.info("Saved ${orders.size} orders and ${stopOrders.size} stop orders to redis (ms: ${saveTime - loadTime})")
}
}
\ No newline at end of file
diff --git a/src/main/kotlin/com/lykke/matching/engine/holders/BalancesHolder.kt b/src/main/kotlin/com/lykke/matching/engine/holders/BalancesHolder.kt
index 21659099d..8725482e4 100644
--- a/src/main/kotlin/com/lykke/matching/engine/holders/BalancesHolder.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/holders/BalancesHolder.kt
@@ -1,31 +1,14 @@
package com.lykke.matching.engine.holders
import com.lykke.matching.engine.balance.BalancesGetter
-import com.lykke.matching.engine.balance.WalletOperationsProcessor
import com.lykke.matching.engine.daos.wallet.AssetBalance
import com.lykke.matching.engine.daos.wallet.Wallet
-import com.lykke.matching.engine.database.PersistenceManager
-import com.lykke.matching.engine.database.common.entity.BalancesData
-import com.lykke.matching.engine.database.common.entity.PersistenceData
-import com.lykke.matching.engine.deduplication.ProcessedMessage
-import com.lykke.matching.engine.outgoing.messages.BalanceUpdate
-import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolder
-import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.math.BigDecimal
-import java.util.concurrent.BlockingQueue
@Component
-class BalancesHolder(private val balancesDbAccessorsHolder: BalancesDatabaseAccessorsHolder,
- private val persistenceManager: PersistenceManager,
- private val assetsHolder: AssetsHolder,
- private val balanceUpdateQueue: BlockingQueue,
- private val applicationSettingsHolder: ApplicationSettingsHolder): BalancesGetter {
-
- companion object {
- private val LOGGER = LoggerFactory.getLogger(BalancesHolder::class.java.name)
- }
+class BalancesHolder(private val balancesDbAccessorsHolder: BalancesDatabaseAccessorsHolder): BalancesGetter {
lateinit var wallets: MutableMap
var initialClientsCount = 0
@@ -85,65 +68,6 @@ class BalancesHolder(private val balancesDbAccessorsHolder: BalancesDatabaseAcce
return BigDecimal.ZERO
}
- fun updateBalance(processedMessage: ProcessedMessage?,
- messageSequenceNumber: Long?,
- clientId: String,
- assetId: String,
- balance: BigDecimal): Boolean {
- val currentTransactionBalancesHolder = createCurrentTransactionBalancesHolder()
- currentTransactionBalancesHolder.updateBalance(clientId, assetId, balance)
- val balancesData = currentTransactionBalancesHolder.persistenceData()
- val persisted = persistenceManager.persist(PersistenceData(balancesData, processedMessage, null, null, messageSequenceNumber))
- if (!persisted) {
- return false
- }
- currentTransactionBalancesHolder.apply()
- return true
- }
-
- fun updateReservedBalance(processedMessage: ProcessedMessage?,
- messageSequenceNumber: Long?,
- clientId: String,
- assetId: String,
- balance: BigDecimal,
- skipForTrustedClient: Boolean = true): Boolean {
- val currentTransactionBalancesHolder = createCurrentTransactionBalancesHolder()
- currentTransactionBalancesHolder.updateReservedBalance(clientId, assetId, balance)
- val balancesData = currentTransactionBalancesHolder.persistenceData()
- val persisted = persistenceManager.persist(PersistenceData(balancesData, processedMessage, null, null, messageSequenceNumber))
- if (!persisted) {
- return false
- }
- currentTransactionBalancesHolder.apply()
- return true
- }
-
- fun insertOrUpdateWallets(wallets: Collection, messageSequenceNumber: Long?) {
- persistenceManager.persist(PersistenceData(BalancesData(wallets, wallets.flatMap { it.balances.values }), null, null, null,
- messageSequenceNumber = messageSequenceNumber))
- update()
- }
-
- fun sendBalanceUpdate(balanceUpdate: BalanceUpdate) {
- if (balanceUpdate.balances.isNotEmpty()) {
- LOGGER.info(balanceUpdate.toString())
- balanceUpdateQueue.put(balanceUpdate)
- }
- }
-
- fun isTrustedClient(clientId: String) = applicationSettingsHolder.isTrustedClient(clientId)
-
- fun createWalletProcessor(logger: Logger?): WalletOperationsProcessor {
- return WalletOperationsProcessor(this,
- createCurrentTransactionBalancesHolder(),
- applicationSettingsHolder,
- persistenceManager,
- assetsHolder,
- logger)
- }
-
- private fun createCurrentTransactionBalancesHolder() = CurrentTransactionBalancesHolder(this)
-
fun setWallets(wallets: Collection) {
wallets.forEach { wallet ->
this.wallets[wallet.clientId] = wallet
diff --git a/src/main/kotlin/com/lykke/matching/engine/order/transaction/CurrentTransactionBalancesHolderFactory.kt b/src/main/kotlin/com/lykke/matching/engine/order/transaction/CurrentTransactionBalancesHolderFactory.kt
new file mode 100644
index 000000000..861538756
--- /dev/null
+++ b/src/main/kotlin/com/lykke/matching/engine/order/transaction/CurrentTransactionBalancesHolderFactory.kt
@@ -0,0 +1,11 @@
+package com.lykke.matching.engine.order.transaction
+
+import com.lykke.matching.engine.holders.BalancesHolder
+import org.springframework.stereotype.Component
+
+@Component
+class CurrentTransactionBalancesHolderFactory(val balancesHolder: BalancesHolder) {
+ fun create(): CurrentTransactionBalancesHolder {
+ return CurrentTransactionBalancesHolder(balancesHolder)
+ }
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/lykke/matching/engine/order/transaction/ExecutionContextFactory.kt b/src/main/kotlin/com/lykke/matching/engine/order/transaction/ExecutionContextFactory.kt
index 818ff5284..7085be056 100644
--- a/src/main/kotlin/com/lykke/matching/engine/order/transaction/ExecutionContextFactory.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/order/transaction/ExecutionContextFactory.kt
@@ -1,10 +1,11 @@
package com.lykke.matching.engine.order.transaction
+import com.lykke.matching.engine.balance.WalletOperationsProcessor
+import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory
import com.lykke.matching.engine.daos.Asset
import com.lykke.matching.engine.daos.AssetPair
import com.lykke.matching.engine.deduplication.ProcessedMessage
import com.lykke.matching.engine.holders.AssetsHolder
-import com.lykke.matching.engine.holders.BalancesHolder
import com.lykke.matching.engine.messages.MessageType
import com.lykke.matching.engine.services.GenericLimitOrderService
import com.lykke.matching.engine.services.GenericStopLimitOrderService
@@ -14,7 +15,7 @@ import org.springframework.stereotype.Component
import java.util.Date
@Component
-class ExecutionContextFactory(private val balancesHolder: BalancesHolder,
+class ExecutionContextFactory(private val walletOperationsProcessorFactory: WalletOperationsProcessorFactory,
private val genericLimitOrderService: GenericLimitOrderService,
private val genericStopLimitOrderService: GenericStopLimitOrderService,
private val assetsHolder: AssetsHolder) {
@@ -35,7 +36,7 @@ class ExecutionContextFactory(private val balancesHolder: BalancesHolder,
assetPairsById,
assetsById,
preProcessorValidationResultsByOrderId,
- balancesHolder.createWalletProcessor(logger),
+ walletOperationsProcessorFactory.create(logger),
genericLimitOrderService.createCurrentTransactionOrderBooksHolder(),
genericStopLimitOrderService.createCurrentTransactionOrderBooksHolder(),
date,
diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashInOutEventData.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashInOutEventData.kt
index eb35bfac1..b2a5e40d9 100644
--- a/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashInOutEventData.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashInOutEventData.kt
@@ -1,6 +1,5 @@
package com.lykke.matching.engine.outgoing.messages
-import com.lykke.matching.engine.balance.WalletOperationsProcessor
import com.lykke.matching.engine.daos.Asset
import com.lykke.matching.engine.daos.OutgoingEventData
import com.lykke.matching.engine.daos.WalletOperation
@@ -12,7 +11,7 @@ class CashInOutEventData(val messageId: String,
val sequenceNumber: Long,
val now: Date,
val timestamp: Date,
- val walletProcessor: WalletOperationsProcessor,
+ val clientBalanceUpdates: List,
val walletOperation: WalletOperation,
val asset: Asset,
val internalFees: List
diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashTransferEventData.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashTransferEventData.kt
index cf87b28b5..0877fc289 100644
--- a/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashTransferEventData.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/messages/CashTransferEventData.kt
@@ -1,13 +1,12 @@
package com.lykke.matching.engine.outgoing.messages
-import com.lykke.matching.engine.balance.WalletOperationsProcessor
import com.lykke.matching.engine.daos.OutgoingEventData
import com.lykke.matching.engine.daos.TransferOperation
import com.lykke.matching.engine.daos.fee.v2.Fee
import java.util.*
class CashTransferEventData(val messageId: String,
- val walletProcessor: WalletOperationsProcessor,
+ val clientBalanceUpdates: List,
val fees: List,
val transferOperation: TransferOperation,
val sequenceNumber: Long,
diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/OldFormatBalancesSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/OldFormatBalancesSender.kt
new file mode 100644
index 000000000..885f89352
--- /dev/null
+++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/OldFormatBalancesSender.kt
@@ -0,0 +1,12 @@
+package com.lykke.matching.engine.outgoing.senders.impl
+
+import com.lykke.matching.engine.messages.MessageType
+import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate
+
+@Deprecated("Old outgoing messages format is deprecated")
+interface OldFormatBalancesSender {
+ fun sendBalanceUpdate(id: String,
+ type: MessageType,
+ messageId: String,
+ clientBalanceUpdates: List)
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutEventSender.kt
index 4c8be500d..822f361f0 100644
--- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutEventSender.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutEventSender.kt
@@ -20,7 +20,7 @@ class CashInOutEventSender(private val messageSender: MessageSender) : Specializ
requestId = event.externalId,
date = event.now,
messageType = MessageType.CASH_IN_OUT_OPERATION,
- clientBalanceUpdates = event.walletProcessor.getClientBalanceUpdates(),
+ clientBalanceUpdates = event.clientBalanceUpdates,
cashInOperation = event.walletOperation,
internalFees = event.internalFees)
diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutOldEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutOldEventSender.kt
index 0a67ac3fc..d17bbba27 100644
--- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutOldEventSender.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashInOutOldEventSender.kt
@@ -1,26 +1,37 @@
package com.lykke.matching.engine.outgoing.senders.impl.specialized
import com.lykke.matching.engine.messages.MessageType
-import com.lykke.matching.engine.outgoing.messages.CashInOutEventData
+import com.lykke.matching.engine.outgoing.messages.BalanceUpdate
import com.lykke.matching.engine.outgoing.messages.CashOperation
+import com.lykke.matching.engine.outgoing.messages.CashInOutEventData
+import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate
import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender
+import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender
import com.lykke.matching.engine.utils.NumberUtils
+import org.apache.juli.logging.LogFactory
import org.springframework.stereotype.Component
+import java.util.*
import java.util.concurrent.BlockingQueue
@Deprecated("Old format of outgoing message is deprecated")
@Component
-class CashInOutOldEventSender(private val rabbitCashInOutQueue: BlockingQueue) : SpecializedEventSender {
+class CashInOutOldEventSender(private val rabbitCashInOutQueue: BlockingQueue,
+ private val balanceUpdateQueue: BlockingQueue) : SpecializedEventSender,
+ OldFormatBalancesSender {
+
+ private companion object {
+ val LOGGER = LogFactory.getLog(CashInOutOldEventSender::class.java)
+ }
+
override fun getEventClass(): Class {
return CashInOutEventData::class.java
}
override fun sendEvent(event: CashInOutEventData) {
- event
- .walletProcessor
- .sendNotification(id = event.externalId,
- type = MessageType.CASH_IN_OUT_OPERATION.name,
- messageId = event.messageId)
+ sendBalanceUpdate(event.externalId,
+ MessageType.CASH_IN_OUT_OPERATION,
+ event.messageId,
+ event.clientBalanceUpdates)
rabbitCashInOutQueue.put(CashOperation(
id = event.externalId,
@@ -32,4 +43,15 @@ class CashInOutOldEventSender(private val rabbitCashInOutQueue: BlockingQueue) {
+ if (clientBalanceUpdates.isNotEmpty()) {
+ val balanceUpdate = BalanceUpdate(id, type.name, Date(), clientBalanceUpdates, messageId)
+ LOGGER.info(balanceUpdate.toString())
+ balanceUpdateQueue.put(balanceUpdate)
+ }
+ }
}
\ No newline at end of file
diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferEventSender.kt
index 89b5eefdc..b46424596 100644
--- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferEventSender.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferEventSender.kt
@@ -19,7 +19,7 @@ class CashTransferEventSender(val messageSender: MessageSender) : SpecializedEve
event.transferOperation.externalId,
event.now,
MessageType.CASH_TRANSFER_OPERATION,
- event.walletProcessor.getClientBalanceUpdates(),
+ event.clientBalanceUpdates,
event.transferOperation,
event.fees)
diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferOldEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferOldEventSender.kt
index 0c02610a8..a3bc28ce9 100644
--- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferOldEventSender.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/CashTransferOldEventSender.kt
@@ -5,7 +5,9 @@ import com.lykke.matching.engine.fee.singleFeeTransfer
import com.lykke.matching.engine.messages.MessageType
import com.lykke.matching.engine.outgoing.messages.CashTransferEventData
import com.lykke.matching.engine.outgoing.messages.CashTransferOperation
+import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate
import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender
+import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender
import com.lykke.matching.engine.utils.NumberUtils
import org.springframework.stereotype.Component
import org.springframework.util.CollectionUtils
@@ -13,7 +15,8 @@ import java.util.concurrent.BlockingQueue
@Component
@Deprecated("Old format of outgoing message is deprecated")
-class CashTransferOldEventSender(private val notificationQueue: BlockingQueue) : SpecializedEventSender {
+class CashTransferOldEventSender(private val notificationQueue: BlockingQueue,
+ private val oldFormatBalancesSender: OldFormatBalancesSender) : SpecializedEventSender {
override fun getEventClass(): Class {
return CashTransferEventData::class.java
}
@@ -21,7 +24,9 @@ class CashTransferOldEventSender(private val notificationQueue: BlockingQueue) {
+ oldFormatBalancesSender.sendBalanceUpdate(externalId,
+ MessageType.CASH_TRANSFER_OPERATION,
+ messageId,
+ clientBalanceUpdates)
}
}
\ No newline at end of file
diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/OldFormatExecutionEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/OldFormatExecutionEventSender.kt
index 19f6f02f6..f2f24d859 100644
--- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/OldFormatExecutionEventSender.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/OldFormatExecutionEventSender.kt
@@ -5,6 +5,7 @@ import com.lykke.matching.engine.order.transaction.ExecutionContext
import com.lykke.matching.engine.outgoing.messages.LimitOrdersReport
import com.lykke.matching.engine.outgoing.messages.MarketOrderWithTrades
import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender
+import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender
import com.lykke.matching.engine.utils.event.isThereClientEvent
import com.lykke.matching.engine.utils.event.isThereTrustedClientEvent
import org.springframework.stereotype.Component
@@ -14,7 +15,8 @@ import java.util.concurrent.BlockingQueue
@Component
class OldFormatExecutionEventSender(private val clientLimitOrdersQueue: BlockingQueue,
private val trustedClientsLimitOrdersQueue: BlockingQueue,
- private val rabbitSwapQueue: BlockingQueue) : SpecializedEventSender {
+ private val rabbitSwapQueue: BlockingQueue,
+ private val oldFormatBalancesSender: OldFormatBalancesSender) : SpecializedEventSender {
override fun getEventClass(): Class {
return ExecutionData::class.java
@@ -28,11 +30,10 @@ class OldFormatExecutionEventSender(private val clientLimitOrdersQueue: Blocking
}
private fun sendBalanceUpdateEvent(executionContext: ExecutionContext) {
- executionContext
- .walletOperationsProcessor
- .sendNotification(id = executionContext.requestId,
- type = executionContext.messageType.name,
- messageId = executionContext.messageId)
+ oldFormatBalancesSender.sendBalanceUpdate(executionContext.requestId,
+ executionContext.messageType,
+ executionContext.messageId,
+ executionContext.walletOperationsProcessor.getClientBalanceUpdates())
}
private fun sendTrustedClientsExecutionEventIfNeeded(executionContext: ExecutionContext) {
diff --git a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/ReservedCashInOutOldEventSender.kt b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/ReservedCashInOutOldEventSender.kt
index 353243cf9..35ed0eb33 100644
--- a/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/ReservedCashInOutOldEventSender.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/outgoing/senders/impl/specialized/ReservedCashInOutOldEventSender.kt
@@ -4,20 +4,25 @@ import com.lykke.matching.engine.messages.MessageType
import com.lykke.matching.engine.outgoing.messages.ReservedCashInOutEventData
import com.lykke.matching.engine.outgoing.messages.ReservedCashOperation
import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender
+import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender
import com.lykke.matching.engine.utils.NumberUtils
import org.springframework.stereotype.Component
import java.util.concurrent.BlockingQueue
@Deprecated("Old format of outgoing message is deprecated")
@Component
-class ReservedCashInOutOldEventSender(private val reservedCashOperationQueue: BlockingQueue) : SpecializedEventSender {
+class ReservedCashInOutOldEventSender(private val reservedCashOperationQueue: BlockingQueue,
+ private val oldFormatBalancesSender: OldFormatBalancesSender) : SpecializedEventSender {
override fun getEventClass(): Class {
return ReservedCashInOutEventData::class.java
}
override fun sendEvent(event: ReservedCashInOutEventData) {
- event.walletOperationsProcessor.sendNotification(event.requestId, MessageType.RESERVED_CASH_IN_OUT_OPERATION.name, event.messageId)
+ oldFormatBalancesSender.sendBalanceUpdate(id = event.requestId,
+ messageId = event.messageId,
+ clientBalanceUpdates = event.walletOperationsProcessor.getClientBalanceUpdates(),
+ type = MessageType.RESERVED_CASH_IN_OUT_OPERATION)
reservedCashOperationQueue.put(ReservedCashOperation(event.requestId,
event.walletOperation.clientId,
diff --git a/src/main/kotlin/com/lykke/matching/engine/services/BalancesService.kt b/src/main/kotlin/com/lykke/matching/engine/services/BalancesService.kt
new file mode 100644
index 000000000..4c4fb740c
--- /dev/null
+++ b/src/main/kotlin/com/lykke/matching/engine/services/BalancesService.kt
@@ -0,0 +1,8 @@
+package com.lykke.matching.engine.services
+
+import com.lykke.matching.engine.daos.wallet.Wallet
+import com.lykke.matching.engine.outgoing.messages.BalanceUpdate
+
+interface BalancesService {
+ fun insertOrUpdateWallets(wallets: Collection, messageSequenceNumber: Long?): Boolean
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/lykke/matching/engine/services/BalancesServiceImpl.kt b/src/main/kotlin/com/lykke/matching/engine/services/BalancesServiceImpl.kt
new file mode 100644
index 000000000..d9f9c44a7
--- /dev/null
+++ b/src/main/kotlin/com/lykke/matching/engine/services/BalancesServiceImpl.kt
@@ -0,0 +1,41 @@
+package com.lykke.matching.engine.services
+
+import com.lykke.matching.engine.daos.wallet.Wallet
+import com.lykke.matching.engine.database.PersistenceManager
+import com.lykke.matching.engine.database.common.entity.BalancesData
+import com.lykke.matching.engine.database.common.entity.PersistenceData
+import com.lykke.matching.engine.holders.BalancesHolder
+import com.lykke.utils.logging.MetricsLogger
+import org.slf4j.LoggerFactory
+import org.springframework.stereotype.Service
+
+@Service
+class BalancesServiceImpl(private val balancesHolder: BalancesHolder,
+ private val persistenceManager: PersistenceManager): BalancesService {
+ private companion object {
+ val LOGGER = LoggerFactory.getLogger(BalancesServiceImpl::class.java.name)
+ val METRICS_LOGGER = MetricsLogger.getLogger()
+ }
+
+ /**
+ * Persists wallets to the db and updates values in cache,
+ * Note: this method will not send balance updates notifications
+ */
+ override fun insertOrUpdateWallets(wallets: Collection, messageSequenceNumber: Long?): Boolean {
+ val updated = persistenceManager.persist(PersistenceData(BalancesData(wallets,
+ wallets.flatMap { it.balances.values }),
+ null,
+ null,
+ null,
+ messageSequenceNumber = messageSequenceNumber))
+ if (!updated) {
+ val message = "Can not persist balances data, wallets: ${wallets.size}"
+ LOGGER.error(message)
+ METRICS_LOGGER.logError(message)
+ return false
+ }
+
+ balancesHolder.setWallets(wallets)
+ return true
+ }
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/lykke/matching/engine/services/CashInOutOperationService.kt b/src/main/kotlin/com/lykke/matching/engine/services/CashInOutOperationService.kt
index 20dc5f7e8..bace34983 100644
--- a/src/main/kotlin/com/lykke/matching/engine/services/CashInOutOperationService.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/services/CashInOutOperationService.kt
@@ -1,8 +1,12 @@
package com.lykke.matching.engine.services
import com.lykke.matching.engine.balance.BalanceException
+import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory
import com.lykke.matching.engine.daos.context.CashInOutContext
import com.lykke.matching.engine.daos.converters.CashInOutOperationConverter
+import com.lykke.matching.engine.daos.fee.v2.Fee
+import com.lykke.matching.engine.database.PersistenceManager
+import com.lykke.matching.engine.database.common.entity.PersistenceData
import com.lykke.matching.engine.fee.FeeException
import com.lykke.matching.engine.fee.FeeProcessor
import com.lykke.matching.engine.holders.BalancesHolder
@@ -27,9 +31,11 @@ import java.util.*
@Service
class CashInOutOperationService(private val balancesHolder: BalancesHolder,
private val feeProcessor: FeeProcessor,
+ private val walletOperationsProcessorFactory: WalletOperationsProcessorFactory,
private val cashInOutOperationBusinessValidator: CashInOutOperationBusinessValidator,
private val messageSequenceNumberHolder: MessageSequenceNumberHolder,
- private val outgoingEventProcessor: OutgoingEventProcessor) : AbstractService {
+ private val outgoingEventProcessor: OutgoingEventProcessor,
+ private val persistenceManager: PersistenceManager) : AbstractService {
override fun parseMessage(messageWrapper: MessageWrapper) {
//do nothing
}
@@ -67,7 +73,7 @@ class CashInOutOperationService(private val balancesHolder: BalancesHolder,
return
}
- val walletProcessor = balancesHolder.createWalletProcessor(LOGGER)
+ val walletProcessor = walletOperationsProcessorFactory.create(LOGGER)
try {
walletProcessor.preProcess(operations)
} catch (e: BalanceException) {
@@ -76,7 +82,11 @@ class CashInOutOperationService(private val balancesHolder: BalancesHolder,
}
val sequenceNumber = messageSequenceNumberHolder.getNewValue()
- val updated = walletProcessor.persistBalances(cashInOutContext.processedMessage, null, null, sequenceNumber)
+ val updated = persistenceManager.persist(PersistenceData(walletProcessor.persistenceData(),
+ cashInOutContext.processedMessage,
+ null,
+ null,
+ sequenceNumber))
messageWrapper.triedToPersist = true
messageWrapper.persisted = updated
if (!updated) {
@@ -91,7 +101,7 @@ class CashInOutOperationService(private val balancesHolder: BalancesHolder,
sequenceNumber,
now,
cashInOutOperation.dateTime,
- walletProcessor,
+ walletProcessor.getClientBalanceUpdates(),
walletOperation,
asset,
fees))
diff --git a/src/main/kotlin/com/lykke/matching/engine/services/CashTransferOperationService.kt b/src/main/kotlin/com/lykke/matching/engine/services/CashTransferOperationService.kt
index c21414145..05a5afeea 100644
--- a/src/main/kotlin/com/lykke/matching/engine/services/CashTransferOperationService.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/services/CashTransferOperationService.kt
@@ -1,9 +1,12 @@
package com.lykke.matching.engine.services
import com.lykke.matching.engine.balance.BalanceException
+import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory
import com.lykke.matching.engine.daos.TransferOperation
import com.lykke.matching.engine.daos.WalletOperation
import com.lykke.matching.engine.daos.context.CashTransferContext
+import com.lykke.matching.engine.database.PersistenceManager
+import com.lykke.matching.engine.database.common.entity.PersistenceData
import com.lykke.matching.engine.exception.PersistenceException
import com.lykke.matching.engine.fee.FeeException
import com.lykke.matching.engine.fee.FeeProcessor
@@ -31,11 +34,13 @@ import java.util.concurrent.BlockingQueue
@Service
class CashTransferOperationService(private val balancesHolder: BalancesHolder,
+ private val walletOperationsProcessorFactory: WalletOperationsProcessorFactory,
private val dbTransferOperationQueue: BlockingQueue,
private val feeProcessor: FeeProcessor,
private val cashTransferOperationBusinessValidator: CashTransferOperationBusinessValidator,
private val messageSequenceNumberHolder: MessageSequenceNumberHolder,
- private val outgoingEventProcessor: OutgoingEventProcessor) : AbstractService {
+ private val outgoingEventProcessor: OutgoingEventProcessor,
+ private val persistenceManager: PersistenceManager) : AbstractService {
override fun parseMessage(messageWrapper: MessageWrapper) {
//do nothing
}
@@ -95,11 +100,15 @@ class CashTransferOperationService(private val balancesHolder: BalancesHolder,
val fees = feeProcessor.processFee(operation.fees, receiptOperation, operations, balancesGetter = balancesHolder)
- val walletProcessor = balancesHolder.createWalletProcessor(LOGGER)
- .preProcess(operations, true)
+ val walletProcessor = walletOperationsProcessorFactory.create(LOGGER)
+ walletProcessor.preProcess(operations, true)
val sequenceNumber = messageSequenceNumberHolder.getNewValue()
- val updated = walletProcessor.persistBalances(cashTransferContext.processedMessage, null, null, sequenceNumber)
+ val updated = persistenceManager.persist(PersistenceData(walletProcessor.persistenceData(),
+ cashTransferContext.processedMessage,
+ null,
+ null,
+ sequenceNumber))
messageWrapper.triedToPersist = true
messageWrapper.persisted = updated
if (!updated) {
@@ -107,7 +116,7 @@ class CashTransferOperationService(private val balancesHolder: BalancesHolder,
}
walletProcessor.apply()
outgoingEventProcessor.submitCashTransferEvent(CashTransferEventData(cashTransferContext.messageId,
- walletProcessor,
+ walletProcessor.getClientBalanceUpdates(),
fees,
operation,
sequenceNumber,
diff --git a/src/main/kotlin/com/lykke/matching/engine/services/ReservedCashInOutOperationService.kt b/src/main/kotlin/com/lykke/matching/engine/services/ReservedCashInOutOperationService.kt
index 264f7c859..6accb57b2 100644
--- a/src/main/kotlin/com/lykke/matching/engine/services/ReservedCashInOutOperationService.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/services/ReservedCashInOutOperationService.kt
@@ -2,8 +2,10 @@ package com.lykke.matching.engine.services
import com.lykke.matching.engine.balance.BalanceException
import com.lykke.matching.engine.daos.WalletOperation
+import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory
+import com.lykke.matching.engine.database.PersistenceManager
+import com.lykke.matching.engine.database.common.entity.PersistenceData
import com.lykke.matching.engine.holders.AssetsHolder
-import com.lykke.matching.engine.holders.BalancesHolder
import com.lykke.matching.engine.holders.MessageProcessingStatusHolder
import com.lykke.matching.engine.holders.MessageSequenceNumberHolder
import com.lykke.matching.engine.holders.UUIDHolder
@@ -24,13 +26,14 @@ import java.math.BigDecimal
import java.util.Date
@Service
-class ReservedCashInOutOperationService @Autowired constructor(private val assetsHolder: AssetsHolder,
- private val balancesHolder: BalancesHolder,
- private val reservedCashInOutOperationValidator: ReservedCashInOutOperationValidator,
- private val messageProcessingStatusHolder: MessageProcessingStatusHolder,
- private val uuidHolder: UUIDHolder,
- private val messageSequenceNumberHolder: MessageSequenceNumberHolder,
- private val outgoingEventProcessor: OutgoingEventProcessor) : AbstractService {
+class ReservedCashInOutOperationService @Autowired constructor (private val assetsHolder: AssetsHolder,
+ private val walletOperationsProcessorFactory: WalletOperationsProcessorFactory,
+ private val reservedCashInOutOperationValidator: ReservedCashInOutOperationValidator,
+ private val messageProcessingStatusHolder: MessageProcessingStatusHolder,
+ private val persistenceManager: PersistenceManager,
+ private val messageSequenceNumberHolder: MessageSequenceNumberHolder,
+ private val uuidHolder: UUIDHolder,
+ private val outgoingEventProcessor: OutgoingEventProcessor) : AbstractService {
companion object {
private val LOGGER = LoggerFactory.getLogger(ReservedCashInOutOperationService::class.java.name)
@@ -63,7 +66,9 @@ class ReservedCashInOutOperationService @Autowired constructor(private val asset
return
}
- val walletProcessor = balancesHolder.createWalletProcessor(LOGGER)
+ val accuracy = asset.accuracy
+
+ val walletProcessor = walletOperationsProcessorFactory.create(LOGGER)
try {
walletProcessor.preProcess(listOf(operation), allowTrustedClientReservedBalanceOperation = true)
} catch (e: BalanceException) {
@@ -73,7 +78,11 @@ class ReservedCashInOutOperationService @Autowired constructor(private val asset
}
val sequenceNumber = messageSequenceNumberHolder.getNewValue()
- val updated = walletProcessor.persistBalances(messageWrapper.processedMessage, null, null, sequenceNumber)
+ val updated = persistenceManager.persist(PersistenceData(walletProcessor.persistenceData(),
+ messageWrapper.processedMessage,
+ null,
+ null,
+ sequenceNumber))
messageWrapper.triedToPersist = true
messageWrapper.persisted = updated
if (!updated) {
diff --git a/src/main/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculator.kt b/src/main/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculator.kt
index fe773ab90..6ed351d50 100644
--- a/src/main/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculator.kt
+++ b/src/main/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculator.kt
@@ -14,9 +14,10 @@ import com.lykke.matching.engine.holders.MessageSequenceNumberHolder
import com.lykke.matching.engine.holders.OrdersDatabaseAccessorsHolder
import com.lykke.matching.engine.holders.StopOrdersDatabaseAccessorsHolder
import com.lykke.matching.engine.messages.MessageType
-import com.lykke.matching.engine.outgoing.messages.BalanceUpdate
import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate
import com.lykke.matching.engine.outgoing.messages.v2.builders.EventFactory
+import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender
+import com.lykke.matching.engine.services.BalancesService
import com.lykke.matching.engine.outgoing.messages.v2.events.ReservedBalanceUpdateEvent
import com.lykke.matching.engine.services.MessageSender
import com.lykke.matching.engine.utils.NumberUtils
@@ -44,18 +45,15 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa
private val applicationSettingsHolder: ApplicationSettingsHolder,
@Value("#{Config.me.correctReservedVolumes}") private val correctReservedVolumes: Boolean,
private val messageSequenceNumberHolder: MessageSequenceNumberHolder,
- private val messageSender: MessageSender) : ApplicationRunner {
+ private val messageSender: MessageSender,
+ private val balancesService: BalancesService,
+ private val oldFormatBalancesSender: OldFormatBalancesSender) : ApplicationRunner {
override fun run(args: ApplicationArguments?) {
correctReservedVolumesIfNeed()
}
companion object {
private val LOGGER = LoggerFactory.getLogger(ReservedVolumesRecalculator::class.java.name)
-
- fun teeLog(message: String) {
- println(message)
- LOGGER.info(message)
- }
}
fun correctReservedVolumesIfNeed() {
@@ -63,7 +61,7 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa
return
}
- teeLog("Starting order books analyze")
+ LOGGER.info("Starting order books analyze")
recalculate()
}
@@ -104,7 +102,7 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa
balance.orderIds.add(order.externalId)
} catch (e: Exception) {
val errorMessage = "Unable to handle order (id: ${order.externalId}): ${e.message}"
- teeLog(errorMessage)
+ LOGGER.info(errorMessage)
}
}
}
@@ -133,7 +131,7 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa
if (!NumberUtils.equalsIgnoreScale(oldBalance, newBalance.volume)) {
val correction = ReservedVolumeCorrection(id, assetBalance.asset, newBalance.orderIds.joinToString(","), oldBalance, newBalance.volume)
corrections.add(correction)
- teeLog("1 $id, ${assetBalance.asset} : Old $oldBalance New $newBalance")
+ LOGGER.info("1 $id, ${assetBalance.asset} : Old $oldBalance New $newBalance")
wallet.setReservedBalance(assetBalance.asset, newBalance.volume)
updatedWallets.add(wallet)
val balanceUpdate = ClientBalanceUpdate(id,
@@ -148,7 +146,7 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa
val orderIds = newBalance?.orderIds?.joinToString(",")
val correction = ReservedVolumeCorrection(id, assetBalance.asset, orderIds, oldBalance, newBalance?.volume ?: BigDecimal.ZERO)
corrections.add(correction)
- teeLog("2 $id, ${assetBalance.asset} : Old $oldBalance New ${newBalance ?: 0.0}")
+ LOGGER.info("2 $id, ${assetBalance.asset} : Old $oldBalance New ${newBalance ?: 0.0}")
wallet.setReservedBalance(assetBalance.asset, BigDecimal.ZERO)
updatedWallets.add(wallet)
val balanceUpdate = ClientBalanceUpdate(id,
@@ -183,14 +181,21 @@ class ReservedVolumesRecalculator @Autowired constructor(private val orderBookDa
listOf(clientBalanceUpdate),
walletOperation))
}
+ val balancesPersisted = balancesService.insertOrUpdateWallets(updatedWallets, sequenceNumber)
+
+ if (!balancesPersisted) {
+ LOGGER.error("Can not persist balances during reserved balance recalculation, updated wallets size: ${updatedWallets.size}")
+ return
+ }
- balancesHolder.insertOrUpdateWallets(updatedWallets, sequenceNumber)
reservedVolumesDatabaseAccessor.addCorrectionsInfo(corrections)
- balancesHolder.sendBalanceUpdate(BalanceUpdate(operationId, MessageType.LIMIT_ORDER.name, now, balanceUpdates, operationId))
+ oldFormatBalancesSender.sendBalanceUpdate(id = operationId,
+ messageId = operationId,
+ clientBalanceUpdates = balanceUpdates,
+ type = MessageType.LIMIT_ORDER)
reservedBalanceUpdateEvents.forEach { messageSender.sendMessage(it) }
-
}
- teeLog("Reserved volume recalculation finished")
+ LOGGER.info("Reserved volume recalculation finished")
}
}
\ No newline at end of file
diff --git a/src/test/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorTest.kt b/src/test/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorTest.kt
index e64bb2a30..27f304d49 100644
--- a/src/test/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorTest.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/balance/WalletOperationsProcessorTest.kt
@@ -7,8 +7,10 @@ import com.lykke.matching.engine.daos.WalletOperation
import com.lykke.matching.engine.daos.setting.AvailableSettingGroup
import com.lykke.matching.engine.database.BackOfficeDatabaseAccessor
import com.lykke.matching.engine.database.TestBackOfficeDatabaseAccessor
-import com.lykke.matching.engine.database.TestSettingsDatabaseAccessor
+import com.lykke.matching.engine.database.common.entity.PersistenceData
+import com.lykke.matching.engine.messages.MessageType
import com.lykke.matching.engine.outgoing.messages.BalanceUpdate
+import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender
import org.junit.Test
import org.junit.runner.RunWith
import org.springframework.boot.test.context.SpringBootTest
@@ -45,7 +47,10 @@ class WalletOperationsProcessorTest : AbstractTest() {
}
@Autowired
- private lateinit var settingsDatabaseAccessor: TestSettingsDatabaseAccessor
+ private lateinit var walletOperationsProcessorFactory: WalletOperationsProcessorFactory
+
+ @Autowired
+ private lateinit var oldFormatBalancesSender: OldFormatBalancesSender
@Test
fun testPreProcessWalletOperations() {
@@ -53,7 +58,7 @@ class WalletOperationsProcessorTest : AbstractTest() {
testBalanceHolderWrapper.updateReservedBalance("Client1", "BTC", 0.1)
initServices()
- val walletOperationsProcessor = balancesHolder.createWalletProcessor(null)
+ val walletOperationsProcessor = walletOperationsProcessorFactory.create(null)
walletOperationsProcessor.preProcess(
listOf(
@@ -75,8 +80,11 @@ class WalletOperationsProcessorTest : AbstractTest() {
)
)
}
- assertTrue(walletOperationsProcessor.persistBalances(null, null, null, null))
- walletOperationsProcessor.apply().sendNotification("id", "type", "test")
+ assertTrue(persistenceManager.persist(PersistenceData( walletOperationsProcessor.persistenceData(), null, null, null, null)))
+ walletOperationsProcessor.apply()
+
+ oldFormatBalancesSender.sendBalanceUpdate("id", MessageType.CASH_IN_OUT_OPERATION, "test",
+ walletOperationsProcessor.getClientBalanceUpdates())
assertBalance("Client1", "BTC", 0.5, 0.0)
assertBalance("Client2", "ETH", 3.0, 0.3)
@@ -86,7 +94,7 @@ class WalletOperationsProcessorTest : AbstractTest() {
val balanceUpdate = balanceUpdateHandlerTest.balanceUpdateQueue.poll() as BalanceUpdate
assertEquals(2, balanceUpdate.balances.size)
assertEquals("id", balanceUpdate.id)
- assertEquals("type", balanceUpdate.type)
+ assertEquals(MessageType.CASH_IN_OUT_OPERATION.name, balanceUpdate.type)
val clientBalanceUpdate1 = balanceUpdate.balances.first { it.id == "Client1" }
assertNotNull(clientBalanceUpdate1)
@@ -109,15 +117,15 @@ class WalletOperationsProcessorTest : AbstractTest() {
fun testForceProcessInvalidWalletOperations() {
initServices()
- val walletOperationsProcessor = balancesHolder.createWalletProcessor(null)
+ val walletOperationsProcessor = walletOperationsProcessorFactory.create(null)
walletOperationsProcessor.preProcess(
listOf(
WalletOperation("Client1", "BTC", BigDecimal.ZERO, BigDecimal.valueOf(-0.1))
), true)
- assertTrue(walletOperationsProcessor.persistBalances(null, null, null, null))
- walletOperationsProcessor.apply().sendNotification("id", "type", "test")
+ assertTrue(persistenceManager.persist(PersistenceData(walletOperationsProcessor.persistenceData(), null, null, null, null)))
+ walletOperationsProcessor.apply()
assertBalance("Client1", "BTC", 0.0, -0.1)
}
@@ -151,7 +159,7 @@ class WalletOperationsProcessorTest : AbstractTest() {
testBalanceHolderWrapper.updateBalance("TrustedClient1", "BTC", 1.0)
testBalanceHolderWrapper.updateBalance("TrustedClient2", "EUR", 1.0)
- val walletOperationsProcessor = balancesHolder.createWalletProcessor(null)
+ val walletOperationsProcessor = walletOperationsProcessorFactory.create(null)
walletOperationsProcessor.preProcess(listOf(
WalletOperation("TrustedClient1", "BTC", BigDecimal.ZERO, BigDecimal.valueOf(0.1)),
@@ -175,7 +183,7 @@ class WalletOperationsProcessorTest : AbstractTest() {
@Test
fun testNotChangedBalance() {
- val walletOperationsProcessor = balancesHolder.createWalletProcessor(null)
+ val walletOperationsProcessor = walletOperationsProcessorFactory.create(null)
walletOperationsProcessor.preProcess(listOf(
WalletOperation("Client1", "BTC", BigDecimal.valueOf(0.1), BigDecimal.valueOf(0.1)),
diff --git a/src/test/kotlin/com/lykke/matching/engine/balance/util/TestBalanceHolderWrapper.kt b/src/test/kotlin/com/lykke/matching/engine/balance/util/TestBalanceHolderWrapper.kt
index 5891e8b93..fd28cfef9 100644
--- a/src/test/kotlin/com/lykke/matching/engine/balance/util/TestBalanceHolderWrapper.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/balance/util/TestBalanceHolderWrapper.kt
@@ -1,20 +1,26 @@
package com.lykke.matching.engine.balance.util
+import com.lykke.matching.engine.daos.wallet.Wallet
import com.lykke.matching.engine.holders.BalancesHolder
-import com.lykke.matching.engine.notification.BalanceUpdateHandlerTest
+import com.lykke.matching.engine.services.BalancesService
import org.springframework.beans.factory.annotation.Autowired
import java.math.BigDecimal
-class TestBalanceHolderWrapper @Autowired constructor (private val balanceUpdateHandlerTest: BalanceUpdateHandlerTest,
- private val balancesHolder: BalancesHolder) {
+class TestBalanceHolderWrapper @Autowired constructor(private val balancesService: BalancesService,
+ private val balancesHolder: BalancesHolder) {
fun updateBalance(clientId: String, assetId: String, balance: Double) {
- balancesHolder.updateBalance(null, null, clientId, assetId, BigDecimal.valueOf(balance))
- balanceUpdateHandlerTest.clear()
+ val wallet = balancesHolder.wallets[clientId] ?: Wallet(clientId)
+ wallet.setBalance(assetId, BigDecimal.valueOf(balance) )
+
+ balancesService.insertOrUpdateWallets(listOf(wallet), null)
}
- fun updateReservedBalance(clientId: String, assetId: String, reservedBalance: Double, skip: Boolean = false) {
- balancesHolder.updateReservedBalance(null, null, clientId, assetId, BigDecimal.valueOf(reservedBalance), skip)
- balanceUpdateHandlerTest.clear()
+ fun updateReservedBalance(clientId: String, assetId: String, reservedBalance: Double) {
+ val wallet = balancesHolder.wallets[clientId] ?: Wallet(clientId)
+ wallet.setReservedBalance(assetId, BigDecimal.valueOf(reservedBalance) )
+
+ balancesService.insertOrUpdateWallets(listOf(wallet), null)
+
}
}
\ No newline at end of file
diff --git a/src/test/kotlin/com/lykke/matching/engine/config/TestApplicationContext.kt b/src/test/kotlin/com/lykke/matching/engine/config/TestApplicationContext.kt
index 340d14ad0..7ddaf5f9b 100644
--- a/src/test/kotlin/com/lykke/matching/engine/config/TestApplicationContext.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/config/TestApplicationContext.kt
@@ -1,5 +1,6 @@
package com.lykke.matching.engine.config
+import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory
import com.lykke.matching.engine.balance.util.TestBalanceHolderWrapper
import com.lykke.matching.engine.config.spring.JsonConfig
import com.lykke.matching.engine.config.spring.QueueConfig
@@ -32,6 +33,7 @@ import com.lykke.matching.engine.order.process.PreviousLimitOrdersProcessor
import com.lykke.matching.engine.order.process.StopOrderBookProcessor
import com.lykke.matching.engine.order.process.common.LimitOrdersCancelExecutor
import com.lykke.matching.engine.order.process.common.MatchingResultHandlingHelper
+import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolderFactory
import com.lykke.matching.engine.order.transaction.ExecutionContextFactory
import com.lykke.matching.engine.order.utils.TestOrderBookWrapper
import com.lykke.matching.engine.outgoing.messages.*
@@ -40,6 +42,7 @@ import com.lykke.matching.engine.outgoing.messages.v2.events.ExecutionEvent
import com.lykke.matching.engine.outgoing.senders.SpecializedEventSendersHolder
import com.lykke.matching.engine.outgoing.senders.OutgoingEventProcessor
import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender
+import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender
import com.lykke.matching.engine.outgoing.senders.impl.SpecializedEventSendersHolderImpl
import com.lykke.matching.engine.services.CashInOutOperationService
import com.lykke.matching.engine.services.CashTransferOperationService
@@ -115,12 +118,9 @@ open class TestApplicationContext {
@Bean
open fun balanceHolder(balancesDatabaseAccessorsHolder: BalancesDatabaseAccessorsHolder,
- persistenceManager: PersistenceManager,
balanceUpdateQueue: BlockingQueue,
- applicationSettingsHolder: ApplicationSettingsHolder,
- backOfficeDatabaseAccessor: BackOfficeDatabaseAccessor): BalancesHolder {
- return BalancesHolder(balancesDatabaseAccessorsHolder, persistenceManager, assetHolder(backOfficeDatabaseAccessor),
- balanceUpdateQueue, applicationSettingsHolder)
+ applicationSettingsHolder: ApplicationSettingsHolder): BalancesHolder {
+ return BalancesHolder(balancesDatabaseAccessorsHolder)
}
@Bean
@@ -149,14 +149,15 @@ open class TestApplicationContext {
stopOrdersDatabaseAccessorsHolder: StopOrdersDatabaseAccessorsHolder,
testReservedVolumesDatabaseAccessor: TestReservedVolumesDatabaseAccessor,
assetHolder: AssetsHolder, assetsPairsHolder: AssetsPairsHolder,
- balancesHolder: BalancesHolder, applicationSettingsHolder: ApplicationSettingsHolder,
+ balancesHolder: BalancesHolder,
+ balancesService: BalancesService, applicationSettingsHolder: ApplicationSettingsHolder,
messageSequenceNumberHolder: MessageSequenceNumberHolder,
- messageSender: MessageSender): ReservedVolumesRecalculator {
-
+ messageSender: MessageSender,
+ oldFormatBalancesSender: OldFormatBalancesSender): ReservedVolumesRecalculator {
return ReservedVolumesRecalculator(testOrderDatabaseAccessorHolder, stopOrdersDatabaseAccessorsHolder,
testReservedVolumesDatabaseAccessor, assetHolder,
assetsPairsHolder, balancesHolder, applicationSettingsHolder,
- false, messageSequenceNumberHolder, messageSender)
+ false, messageSequenceNumberHolder, messageSender, balancesService, oldFormatBalancesSender)
}
@Bean
@@ -206,9 +207,9 @@ open class TestApplicationContext {
}
@Bean
- open fun testBalanceHolderWrapper(balanceUpdateHandlerTest: BalanceUpdateHandlerTest,
- balancesHolder: BalancesHolder): TestBalanceHolderWrapper {
- return TestBalanceHolderWrapper(balanceUpdateHandlerTest, balancesHolder)
+ open fun testBalanceHolderWrapper(balancesHolder: BalancesHolder,
+ balancesService: BalancesService): TestBalanceHolderWrapper {
+ return TestBalanceHolderWrapper(balancesService, balancesHolder)
}
@Bean
@@ -268,15 +269,19 @@ open class TestApplicationContext {
@Bean
open fun cashInOutOperationService(balancesHolder: BalancesHolder,
+ walletOperationsProcessorFactory: WalletOperationsProcessorFactory,
feeProcessor: FeeProcessor,
cashInOutOperationBusinessValidator: CashInOutOperationBusinessValidator,
messageSequenceNumberHolder: MessageSequenceNumberHolder,
- outgoingEventProcessor: OutgoingEventProcessor): CashInOutOperationService {
+ outgoingEventProcessor: OutgoingEventProcessor,
+ persistenceManager: PersistenceManager): CashInOutOperationService {
return CashInOutOperationService(balancesHolder,
feeProcessor,
+ walletOperationsProcessorFactory,
cashInOutOperationBusinessValidator,
messageSequenceNumberHolder,
- outgoingEventProcessor)
+ outgoingEventProcessor,
+ persistenceManager)
}
@Bean
@@ -304,20 +309,18 @@ open class TestApplicationContext {
}
@Bean
- open fun reservedCashInOutOperation(balancesHolder: BalancesHolder,
+ open fun reservedCashInOutOperation(walletOperationsProcessorFactory: WalletOperationsProcessorFactory,
assetsHolder: AssetsHolder,
reservedCashInOutOperationValidator: ReservedCashInOutOperationValidator,
messageProcessingStatusHolder: MessageProcessingStatusHolder,
+ persistenceManager: PersistenceManager,
uuidHolder: UUIDHolder,
+ oldFormatBalancesSender: OldFormatBalancesSender,
messageSequenceNumberHolder: MessageSequenceNumberHolder,
outgoingEventProcessor: OutgoingEventProcessor): ReservedCashInOutOperationService {
- return ReservedCashInOutOperationService(assetsHolder,
- balancesHolder,
- reservedCashInOutOperationValidator,
- messageProcessingStatusHolder,
- uuidHolder,
- messageSequenceNumberHolder,
- outgoingEventProcessor)
+ return ReservedCashInOutOperationService(assetsHolder, walletOperationsProcessorFactory,
+ reservedCashInOutOperationValidator, messageProcessingStatusHolder, persistenceManager, messageSequenceNumberHolder,
+ uuidHolder, outgoingEventProcessor)
}
@Bean
@@ -578,12 +581,14 @@ open class TestApplicationContext {
@Bean
open fun cashTransferOperationService(balancesHolder: BalancesHolder,
+ walletOperationsProcessorFactory: WalletOperationsProcessorFactory,
dbTransferOperationQueue: BlockingQueue, feeProcessor: FeeProcessor,
cashTransferOperationBusinessValidator: CashTransferOperationBusinessValidator,
messageSequenceNumberHolder: MessageSequenceNumberHolder,
- outgoingEventProcessor: OutgoingEventProcessor): CashTransferOperationService {
- return CashTransferOperationService(balancesHolder, dbTransferOperationQueue, feeProcessor,
- cashTransferOperationBusinessValidator, messageSequenceNumberHolder, outgoingEventProcessor)
+ outgoingEventProcessor: OutgoingEventProcessor,
+ persistenceManager: PersistenceManager): CashTransferOperationService {
+ return CashTransferOperationService(balancesHolder, walletOperationsProcessorFactory, dbTransferOperationQueue, feeProcessor,
+ cashTransferOperationBusinessValidator, messageSequenceNumberHolder, outgoingEventProcessor, persistenceManager)
}
@Bean
@@ -715,4 +720,22 @@ open class TestApplicationContext {
open fun specializedEventSendersHolder(specializedEventSenders: List>): SpecializedEventSendersHolder {
return SpecializedEventSendersHolderImpl(specializedEventSenders)
}
+
+ @Bean
+ open fun currentTransactionBalancesHolderFactory(balancesHolder: BalancesHolder): CurrentTransactionBalancesHolderFactory {
+ return CurrentTransactionBalancesHolderFactory(balancesHolder)
+ }
+
+ @Bean
+ open fun walletOperationsProcessorFactory(currentTransactionBalancesHolderFactory: CurrentTransactionBalancesHolderFactory,
+ applicationSettingsHolder: ApplicationSettingsHolder,
+ assetsHolder: AssetsHolder): WalletOperationsProcessorFactory {
+ return WalletOperationsProcessorFactory(currentTransactionBalancesHolderFactory, applicationSettingsHolder, assetsHolder)
+ }
+
+ @Bean
+ open fun balancesService(balancesHolder: BalancesHolder,
+ persistenceManager: PersistenceManager): BalancesService {
+ return BalancesServiceImpl(balancesHolder, persistenceManager)
+ }
}
\ No newline at end of file
diff --git a/src/test/kotlin/com/lykke/matching/engine/config/TestExecutionContext.kt b/src/test/kotlin/com/lykke/matching/engine/config/TestExecutionContext.kt
index eb536db0f..6ef5528dd 100644
--- a/src/test/kotlin/com/lykke/matching/engine/config/TestExecutionContext.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/config/TestExecutionContext.kt
@@ -1,5 +1,6 @@
package com.lykke.matching.engine.config
+import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory
import com.lykke.matching.engine.daos.ExecutionData
import com.lykke.matching.engine.daos.LkkTrade
import com.lykke.matching.engine.daos.OutgoingEventData
@@ -8,7 +9,6 @@ import com.lykke.matching.engine.fee.FeeProcessor
import com.lykke.matching.engine.holders.ApplicationSettingsHolder
import com.lykke.matching.engine.holders.AssetsHolder
import com.lykke.matching.engine.holders.AssetsPairsHolder
-import com.lykke.matching.engine.holders.BalancesHolder
import com.lykke.matching.engine.holders.MessageSequenceNumberHolder
import com.lykke.matching.engine.holders.UUIDHolder
import com.lykke.matching.engine.matching.MatchingEngine
@@ -26,6 +26,7 @@ import com.lykke.matching.engine.order.process.common.LimitOrdersCancellerImpl
import com.lykke.matching.engine.order.process.common.MatchingResultHandlingHelper
import com.lykke.matching.engine.order.transaction.ExecutionContextFactory
import com.lykke.matching.engine.order.transaction.ExecutionEventsSequenceNumbersGenerator
+import com.lykke.matching.engine.outgoing.messages.BalanceUpdate
import com.lykke.matching.engine.outgoing.messages.CashInOutEventData
import com.lykke.matching.engine.outgoing.messages.CashOperation
import com.lykke.matching.engine.outgoing.messages.CashTransferEventData
@@ -38,6 +39,7 @@ import com.lykke.matching.engine.outgoing.messages.ReservedCashOperation
import com.lykke.matching.engine.outgoing.senders.SpecializedEventSendersHolder
import com.lykke.matching.engine.outgoing.senders.OutgoingEventProcessor
import com.lykke.matching.engine.outgoing.senders.SpecializedEventSender
+import com.lykke.matching.engine.outgoing.senders.impl.OldFormatBalancesSender
import com.lykke.matching.engine.outgoing.senders.impl.specialized.CashInOutEventSender
import com.lykke.matching.engine.outgoing.senders.impl.specialized.CashInOutOldEventSender
import com.lykke.matching.engine.outgoing.senders.impl.OutgoingEventProcessorImpl
@@ -77,11 +79,11 @@ open class TestExecutionContext {
}
@Bean
- open fun executionContextFactory(balancesHolder: BalancesHolder,
+ open fun executionContextFactory(walletOperationsProcessorFactory: WalletOperationsProcessorFactory,
genericLimitOrderService: GenericLimitOrderService,
genericStopLimitOrderService: GenericStopLimitOrderService,
assetsHolder: AssetsHolder): ExecutionContextFactory {
- return ExecutionContextFactory(balancesHolder,
+ return ExecutionContextFactory(walletOperationsProcessorFactory,
genericLimitOrderService,
genericStopLimitOrderService,
assetsHolder)
@@ -106,8 +108,9 @@ open class TestExecutionContext {
}
@Bean
- open fun cashTransferOldSender(notificationQueue: BlockingQueue): SpecializedEventSender {
- return CashTransferOldEventSender(notificationQueue)
+ open fun cashTransferOldSender(notificationQueue: BlockingQueue,
+ oldFormatBalancesSender: OldFormatBalancesSender): SpecializedEventSender {
+ return CashTransferOldEventSender(notificationQueue, oldFormatBalancesSender)
}
@Bean
@@ -127,11 +130,13 @@ open class TestExecutionContext {
@Bean
open fun specializedOldExecutionEventSender(clientLimitOrdersQueue: BlockingQueue,
trustedClientsLimitOrdersQueue: BlockingQueue,
- rabbitSwapQueue: BlockingQueue): SpecializedEventSender {
+ rabbitSwapQueue: BlockingQueue,
+ oldFormatBalancesSender: OldFormatBalancesSender): SpecializedEventSender {
return OldFormatExecutionEventSender(
clientLimitOrdersQueue,
trustedClientsLimitOrdersQueue,
- rabbitSwapQueue)
+ rabbitSwapQueue,
+ oldFormatBalancesSender)
}
@Bean
@@ -145,14 +150,16 @@ open class TestExecutionContext {
}
@Bean
- open fun specializedReservedCashInOutOldEventSender(reservedCashOperationQueue: BlockingQueue)
+ open fun specializedReservedCashInOutOldEventSender(reservedCashOperationQueue: BlockingQueue,
+ oldFormatBalancesSender: OldFormatBalancesSender)
: SpecializedEventSender {
- return ReservedCashInOutOldEventSender(reservedCashOperationQueue)
+ return ReservedCashInOutOldEventSender(reservedCashOperationQueue, oldFormatBalancesSender)
}
@Bean
- open fun specializedCashInOutOldEventSender(rabbitCashInOutQueue: BlockingQueue): SpecializedEventSender {
- return CashInOutOldEventSender(rabbitCashInOutQueue)
+ open fun specializedCashInOutOldEventSender(rabbitCashInOutQueue: BlockingQueue,
+ balanceUpdateQueue: BlockingQueue): CashInOutOldEventSender {
+ return CashInOutOldEventSender(rabbitCashInOutQueue, balanceUpdateQueue)
}
@Bean
diff --git a/src/test/kotlin/com/lykke/matching/engine/fee/FeeProcessorTest.kt b/src/test/kotlin/com/lykke/matching/engine/fee/FeeProcessorTest.kt
index add382108..269b55c4e 100644
--- a/src/test/kotlin/com/lykke/matching/engine/fee/FeeProcessorTest.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/fee/FeeProcessorTest.kt
@@ -1,6 +1,7 @@
package com.lykke.matching.engine.fee
import com.lykke.matching.engine.balance.BalancesGetter
+import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory
import com.lykke.matching.engine.balance.util.TestBalanceHolderWrapper
import com.lykke.matching.engine.config.TestApplicationContext
import com.lykke.matching.engine.daos.Asset
@@ -44,10 +45,10 @@ class FeeProcessorTest {
private lateinit var feeProcessor: FeeProcessor
@Autowired
- lateinit var balancesHolder: BalancesHolder
+ lateinit var walletOperationsProcessorFactory: WalletOperationsProcessorFactory
@Autowired
- lateinit var testBalanceHolderWrapper: TestBalanceHolderWrapper
+ lateinit var testBalanceHolderWrapper: TestBalanceHolderWrapper
@Autowired
lateinit var testBackOfficeDatabaseAccessor: TestBackOfficeDatabaseAccessor
@@ -59,9 +60,19 @@ class FeeProcessorTest {
open fun testBackOfficeDatabaseAccessor(): TestBackOfficeDatabaseAccessor {
val testBackOfficeDatabaseAccessor = TestBackOfficeDatabaseAccessor()
testBackOfficeDatabaseAccessor.addAsset(Asset("USD", 2))
+ testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 2))
+ testBackOfficeDatabaseAccessor.addAsset(Asset("CHF", 4))
return testBackOfficeDatabaseAccessor
}
+
+ @Bean
+ @Primary
+ fun testDictionaryDatabaseAccessor(): TestDictionariesDatabaseAccessor {
+ val testDictionariesDatabaseAccessor = TestDictionariesDatabaseAccessor()
+ testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5))
+ return testDictionariesDatabaseAccessor
+ }
}
@Before
@@ -74,9 +85,6 @@ class FeeProcessorTest {
fun testNoPercentageFee() {
testBalanceHolderWrapper.updateBalance("Client2", "EUR", 10.0)
- testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 2))
- testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5))
-
val operations = LinkedList()
operations.add(WalletOperation("Client1", "USD", BigDecimal.valueOf(-10.0)))
operations.add(WalletOperation("Client2", "USD", BigDecimal.valueOf(10.0)))
@@ -145,9 +153,6 @@ class FeeProcessorTest {
fun testNoAbsoluteFee() {
testBalanceHolderWrapper.updateBalance("Client2", "EUR", 0.09)
- testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 2))
- testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5))
-
val operations = LinkedList()
operations.add(WalletOperation("Client1", "USD", BigDecimal.valueOf(-0.5)))
operations.add(WalletOperation("Client2", "USD", BigDecimal.valueOf(0.5)))
@@ -171,9 +176,6 @@ class FeeProcessorTest {
@Test
fun testAbsoluteFeeCashout() {
- testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 2))
- testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5))
-
val operations = LinkedList()
operations.add(WalletOperation("Client1", "USD", BigDecimal.valueOf(-0.5)))
val receiptOperation = operations[0]
@@ -190,9 +192,6 @@ class FeeProcessorTest {
@Test
fun testPercentFeeCashout() {
- testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 2))
- testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5))
-
val operations = LinkedList()
operations.add(WalletOperation("Client1", "USD", BigDecimal.valueOf(-0.5)))
val receiptOperation = operations[0]
@@ -209,19 +208,18 @@ class FeeProcessorTest {
@Test
fun testAnotherAssetFee() {
- testBalanceHolderWrapper.updateBalance("Client2", "EUR", 0.6543)
- testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 4))
+ testBalanceHolderWrapper.updateBalance("Client2", "CHF", 0.6543)
val operations = LinkedList()
operations.add(WalletOperation("Client1", "USD", BigDecimal.valueOf(-0.5)))
operations.add(WalletOperation("Client2", "USD", BigDecimal.valueOf(0.5)))
val receiptOperation = operations[1]
- val feeInstructions = buildFeeInstructions(type = FeeType.CLIENT_FEE, size = 0.6543, sizeType = FeeSizeType.ABSOLUTE, targetClientId = "Client3", assetIds = listOf("EUR"))
+ val feeInstructions = buildFeeInstructions(type = FeeType.CLIENT_FEE, size = 0.6543, sizeType = FeeSizeType.ABSOLUTE, targetClientId = "Client3", assetIds = listOf("CHF"))
val fees = feeProcessor.processFee(feeInstructions, receiptOperation, operations, balancesGetter = createBalancesGetter())
assertEquals(1, fees.size)
assertEquals(BigDecimal.valueOf(0.6543), fees.first().transfer!!.volume)
- assertEquals("EUR", fees.first().transfer!!.asset)
+ assertEquals("CHF", fees.first().transfer!!.asset)
assertEquals(4, operations.size)
}
@@ -603,7 +601,7 @@ class FeeProcessorTest {
}
private fun createBalancesGetter(): BalancesGetter {
- return balancesHolder.createWalletProcessor(null)
+ return walletOperationsProcessorFactory.create(null)
}
}
diff --git a/src/test/kotlin/com/lykke/matching/engine/performance/AbstractPerformanceTest.kt b/src/test/kotlin/com/lykke/matching/engine/performance/AbstractPerformanceTest.kt
index 708920c92..7dacd9882 100644
--- a/src/test/kotlin/com/lykke/matching/engine/performance/AbstractPerformanceTest.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/performance/AbstractPerformanceTest.kt
@@ -1,5 +1,6 @@
package com.lykke.matching.engine.performance
+import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory
import com.lykke.matching.engine.balance.util.TestBalanceHolderWrapper
import com.lykke.matching.engine.daos.LkkTrade
import com.lykke.matching.engine.daos.OutgoingEventData
@@ -25,7 +26,6 @@ import com.lykke.matching.engine.holders.TestUUIDHolder
import com.lykke.matching.engine.incoming.parsers.impl.LimitOrderCancelOperationContextParser
import com.lykke.matching.engine.incoming.parsers.impl.LimitOrderMassCancelOperationContextParser
import com.lykke.matching.engine.matching.MatchingEngine
-import com.lykke.matching.engine.notification.BalanceUpdateHandlerTest
import com.lykke.matching.engine.order.ExecutionDataApplyService
import com.lykke.matching.engine.order.ExecutionPersistenceService
import com.lykke.matching.engine.order.ExpiryOrdersQueue
@@ -36,6 +36,7 @@ import com.lykke.matching.engine.order.process.StopLimitOrderProcessor
import com.lykke.matching.engine.order.process.StopOrderBookProcessor
import com.lykke.matching.engine.order.process.common.LimitOrdersCancellerImpl
import com.lykke.matching.engine.order.process.common.MatchingResultHandlingHelper
+import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolderFactory
import com.lykke.matching.engine.order.transaction.ExecutionContextFactory
import com.lykke.matching.engine.order.transaction.ExecutionEventsSequenceNumbersGenerator
import com.lykke.matching.engine.outgoing.messages.BalanceUpdate
@@ -80,6 +81,7 @@ abstract class AbstractPerformanceTest {
protected lateinit var assetsHolder: AssetsHolder
protected lateinit var balancesHolder: BalancesHolder
+ protected lateinit var balancesService: BalancesService
protected lateinit var assetsPairsHolder: AssetsPairsHolder
protected lateinit var assetCache: AssetsCache
protected lateinit var balancesDatabaseAccessorsHolder: BalancesDatabaseAccessorsHolder
@@ -129,6 +131,11 @@ abstract class AbstractPerformanceTest {
val tradeInfoQueue = LinkedBlockingQueue()
+ val currentTransactionBalancesHolderFactory = CurrentTransactionBalancesHolderFactory(balancesHolder)
+
+ val walletOperationsProcessorFactory = WalletOperationsProcessorFactory(currentTransactionBalancesHolderFactory,
+ applicationSettingsHolder, assetsHolder)
+
val outgoingEventData = LinkedBlockingQueue()
val messageSender = MessageSender(rabbitEventsQueue, rabbitTrustedClientsEventsQueue)
@@ -160,13 +167,10 @@ abstract class AbstractPerformanceTest {
persistenceManager = TestPersistenceManager(balancesDatabaseAccessorsHolder.primaryAccessor,
ordersDatabaseAccessorsHolder,
stopOrdersDatabaseAccessorsHolder)
- balancesHolder = BalancesHolder(balancesDatabaseAccessorsHolder,
- persistenceManager,
- assetsHolder,
- balanceUpdateQueue,
- applicationSettingsHolder)
+ balancesHolder = BalancesHolder(balancesDatabaseAccessorsHolder)
+ balancesService = BalancesServiceImpl(balancesHolder, persistenceManager)
- testBalanceHolderWrapper = TestBalanceHolderWrapper(BalanceUpdateHandlerTest(balanceUpdateQueue), balancesHolder)
+ testBalanceHolderWrapper = TestBalanceHolderWrapper(balancesService, balancesHolder)
assetPairsCache = AssetPairsCache(testDictionariesDatabaseAccessor, ApplicationEventPublisher {})
assetsPairsHolder = AssetsPairsHolder(assetPairsCache)
@@ -207,7 +211,7 @@ abstract class AbstractPerformanceTest {
executionPersistenceService,
outgoingEventProcessor)
- val executionContextFactory = ExecutionContextFactory(balancesHolder,
+ val executionContextFactory = ExecutionContextFactory(walletOperationsProcessorFactory,
genericLimitOrderService,
genericStopLimitOrderService,
assetsHolder)
diff --git a/src/test/kotlin/com/lykke/matching/engine/services/BalancesServiceTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/BalancesServiceTest.kt
new file mode 100644
index 000000000..5100db635
--- /dev/null
+++ b/src/test/kotlin/com/lykke/matching/engine/services/BalancesServiceTest.kt
@@ -0,0 +1,44 @@
+package com.lykke.matching.engine.services
+
+import com.lykke.matching.engine.config.TestApplicationContext
+import com.lykke.matching.engine.daos.wallet.AssetBalance
+import com.lykke.matching.engine.daos.wallet.Wallet
+import com.lykke.matching.engine.holders.BalancesDatabaseAccessorsHolder
+import com.lykke.matching.engine.holders.BalancesHolder
+import com.lykke.matching.engine.notification.BalanceUpdateHandlerTest
+import com.lykke.matching.engine.outgoing.messages.BalanceUpdate
+import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate
+import com.lykke.matching.engine.utils.assertEquals
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.junit4.SpringRunner
+import java.math.BigDecimal
+import java.util.*
+import kotlin.test.assertEquals
+
+@RunWith(SpringRunner::class)
+@SpringBootTest(classes = [(TestApplicationContext::class)])
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+class BalancesServiceTest {
+
+ @Autowired
+ private lateinit var balancesService: BalancesService
+
+ @Autowired
+ private lateinit var balancesHolder: BalancesHolder
+
+ @Autowired
+ private lateinit var testBalancesDatabaseAccessorsHolder: BalancesDatabaseAccessorsHolder
+
+ @Test
+ fun testInsertOrUpdateWallets() {
+ balancesService.insertOrUpdateWallets(listOf(Wallet("test", listOf(AssetBalance("test", "BTC", BigDecimal.valueOf(10))))), null)
+
+
+ assertEquals(BigDecimal.valueOf(10), balancesHolder.getBalance("test", "BTC"))
+ assertEquals(BigDecimal.valueOf(10), testBalancesDatabaseAccessorsHolder.primaryAccessor.loadWallets()["test"]?.balances!!["BTC"]?.balance)
+ }
+}
\ No newline at end of file
diff --git a/src/test/kotlin/com/lykke/matching/engine/services/CashInOutOperationServiceTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/CashInOutOperationServiceTest.kt
index 1fc81a797..865272829 100644
--- a/src/test/kotlin/com/lykke/matching/engine/services/CashInOutOperationServiceTest.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/services/CashInOutOperationServiceTest.kt
@@ -54,6 +54,9 @@ class CashInOutOperationServiceTest : AbstractTest() {
@Autowired
private lateinit var reservedCashInOutOperationService: ReservedCashInOutOperationService
+ @Autowired
+ private lateinit var balancesService: BalancesService
+
@Autowired
private lateinit var testSettingDatabaseAccessor: TestSettingsDatabaseAccessor
@@ -295,7 +298,7 @@ class CashInOutOperationServiceTest : AbstractTest() {
@Test
fun testRounding() {
- balancesHolder.insertOrUpdateWallets(listOf(Wallet("Client1", listOf(AssetBalance("Client1","Asset1", BigDecimal.valueOf(29.99))))), null)
+ balancesService.insertOrUpdateWallets(listOf(Wallet("Client1", listOf(AssetBalance("Client1","Asset1", BigDecimal.valueOf(29.99))))), null)
cashInOutOperationService.processMessage(messageBuilder.buildCashInOutWrapper("Client1", "Asset1", -0.01))
val balance = testWalletDatabaseAccessor.getBalance("Client1", "Asset1")
diff --git a/src/test/kotlin/com/lykke/matching/engine/services/InvalidBalanceTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/InvalidBalanceTest.kt
index e127745f5..bcb2acf04 100644
--- a/src/test/kotlin/com/lykke/matching/engine/services/InvalidBalanceTest.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/services/InvalidBalanceTest.kt
@@ -224,10 +224,11 @@ class InvalidBalanceTest : AbstractTest() {
IncomingLimitOrder(-0.05, 1010.0, "2"),
IncomingLimitOrder(-0.1, 1100.0, "3")
)))
- testBalanceHolderWrapper.updateReservedBalance("Client1", "ETH", reservedBalance = 0.09)
+
testSettingDatabaseAccessor.clear()
applicationSettingsCache.update()
applicationSettingsHolder.update()
+ testBalanceHolderWrapper.updateReservedBalance("Client1", "ETH", 0.09)
clearMessageQueues()
singleLimitOrderService.processMessage(messageBuilder.buildLimitOrderWrapper(buildLimitOrder(uid = "4", clientId = "Client2", assetId = "ETHUSD", volume = 0.25, price = 1100.0)))
diff --git a/src/test/kotlin/com/lykke/matching/engine/services/MarketOrderServiceTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/MarketOrderServiceTest.kt
index cf29e019e..91b98294d 100644
--- a/src/test/kotlin/com/lykke/matching/engine/services/MarketOrderServiceTest.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/services/MarketOrderServiceTest.kt
@@ -360,8 +360,7 @@ class MarketOrderServiceTest : AbstractTest() {
assertEquals(OrderRejectReason.NOT_ENOUGH_FUNDS, eventMarketOrder.rejectReason)
assertEquals(0, eventMarketOrder.trades?.size)
- balancesHolder.updateBalance(
- ProcessedMessage(MessageType.CASH_IN_OUT_OPERATION.type, System.currentTimeMillis(), "test"), 0, "Client4", "EUR", BigDecimal.valueOf(1000.0))
+ testBalanceHolderWrapper.updateBalance("Client4", "EUR", 1000.0)
marketOrderService.processMessage(buildMarketOrderWrapper(buildMarketOrder(clientId = "Client4", assetId = "EURUSD", volume = -1000.0)))
assertEquals(1, rabbitSwapListener.getCount())
diff --git a/src/test/kotlin/com/lykke/matching/engine/services/MultiLimitOrderServiceTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/MultiLimitOrderServiceTest.kt
index 6782f05b7..ad57d783a 100644
--- a/src/test/kotlin/com/lykke/matching/engine/services/MultiLimitOrderServiceTest.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/services/MultiLimitOrderServiceTest.kt
@@ -28,7 +28,6 @@ import org.junit.runner.RunWith
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.context.TestConfiguration
-import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Primary
import org.springframework.test.annotation.DirtiesContext
diff --git a/src/test/kotlin/com/lykke/matching/engine/services/RoundingTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/RoundingTest.kt
index a6c41197b..2f9718f9c 100644
--- a/src/test/kotlin/com/lykke/matching/engine/services/RoundingTest.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/services/RoundingTest.kt
@@ -5,6 +5,7 @@ import com.lykke.matching.engine.config.TestApplicationContext
import com.lykke.matching.engine.daos.Asset
import com.lykke.matching.engine.daos.AssetPair
import com.lykke.matching.engine.database.TestBackOfficeDatabaseAccessor
+import com.lykke.matching.engine.database.cache.AssetsCache
import com.lykke.matching.engine.order.OrderStatus
import com.lykke.matching.engine.outgoing.messages.MarketOrderWithTrades
import com.lykke.matching.engine.utils.MessageBuilder.Companion.buildLimitOrder
@@ -22,14 +23,13 @@ import org.springframework.test.context.junit4.SpringRunner
import java.math.BigDecimal
import kotlin.test.assertNotNull
import com.lykke.matching.engine.utils.assertEquals
+import org.springframework.beans.factory.annotation.Autowired
import kotlin.test.assertEquals
-
-
@RunWith(SpringRunner::class)
@SpringBootTest(classes = [(TestApplicationContext::class), (RoundingTest.Config::class)])
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
-class RoundingTest: AbstractTest() {
+class RoundingTest : AbstractTest() {
@TestConfiguration
open class Config {
@@ -49,6 +49,9 @@ class RoundingTest: AbstractTest() {
}
}
+ @Autowired
+ private lateinit var assetsCache: AssetsCache
+
@Before
fun setUp() {
testDictionariesDatabaseAccessor.addAssetPair(AssetPair("EURUSD", "EUR", "USD", 5))
@@ -110,7 +113,7 @@ class RoundingTest: AbstractTest() {
assertEquals("USD", marketOrderReport.trades.first().limitAsset)
assertEquals("Client3", marketOrderReport.trades.first().limitClientId)
- assertEquals(BigDecimal.valueOf( 1.0), testWalletDatabaseAccessor.getBalance("Client3", "EUR"))
+ assertEquals(BigDecimal.valueOf(1.0), testWalletDatabaseAccessor.getBalance("Client3", "EUR"))
assertEquals(BigDecimal.valueOf(998.89), testWalletDatabaseAccessor.getBalance("Client3", "USD"))
assertEquals(BigDecimal.valueOf(1499.0), testWalletDatabaseAccessor.getBalance("Client4", "EUR"))
assertEquals(BigDecimal.valueOf(1.11), testWalletDatabaseAccessor.getBalance("Client4", "USD"))
@@ -179,7 +182,7 @@ class RoundingTest: AbstractTest() {
testBalanceHolderWrapper.updateBalance("Client4", "CHF", 1.0)
initServices()
- marketOrderService.processMessage(buildMarketOrderWrapper(buildMarketOrder(clientId = "Client4", assetId = "BTCCHF", volume = -0.38, straight = false)))
+ marketOrderService.processMessage(buildMarketOrderWrapper(buildMarketOrder(clientId = "Client4", assetId = "BTCCHF", volume = -0.38, straight = false)))
assertEquals(1, rabbitSwapListener.getCount())
val marketOrderReport = rabbitSwapListener.getQueue().poll() as MarketOrderWithTrades
diff --git a/src/test/kotlin/com/lykke/matching/engine/services/StopLimitOrderTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/StopLimitOrderTest.kt
index ebaecfc79..f7cb5cafb 100644
--- a/src/test/kotlin/com/lykke/matching/engine/services/StopLimitOrderTest.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/services/StopLimitOrderTest.kt
@@ -59,6 +59,7 @@ class StopLimitOrderTest : AbstractTest() {
testBackOfficeDatabaseAccessor.addAsset(Asset("USD", 2))
testBackOfficeDatabaseAccessor.addAsset(Asset("BTC", 8))
+ testBackOfficeDatabaseAccessor.addAsset(Asset("EUR", 8))
return testBackOfficeDatabaseAccessor
}
diff --git a/src/test/kotlin/com/lykke/matching/engine/services/validator/business/CashTransferOperationBusinessValidatorTest.kt b/src/test/kotlin/com/lykke/matching/engine/services/validator/business/CashTransferOperationBusinessValidatorTest.kt
index 70397210e..c6533415f 100644
--- a/src/test/kotlin/com/lykke/matching/engine/services/validator/business/CashTransferOperationBusinessValidatorTest.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/services/validator/business/CashTransferOperationBusinessValidatorTest.kt
@@ -1,5 +1,6 @@
package com.lykke.matching.engine.services.validator.business
+import com.lykke.matching.engine.balance.WalletOperationsProcessorFactory
import com.lykke.matching.engine.balance.util.TestBalanceHolderWrapper
import com.lykke.matching.engine.config.TestApplicationContext
import com.lykke.matching.engine.daos.Asset
@@ -11,7 +12,7 @@ import com.lykke.matching.engine.incoming.parsers.impl.CashTransferContextParser
import com.lykke.matching.engine.messages.MessageType
import com.lykke.matching.engine.messages.MessageWrapper
import com.lykke.matching.engine.messages.ProtocolMessages
-import com.lykke.matching.engine.notification.BalanceUpdateHandlerTest
+import com.lykke.matching.engine.services.BalancesService
import com.lykke.matching.engine.services.validators.business.CashTransferOperationBusinessValidator
import com.lykke.matching.engine.services.validators.impl.ValidationException
import org.junit.Test
@@ -54,8 +55,9 @@ class CashTransferOperationBusinessValidatorTest {
@Bean
@Primary
- open fun testBalanceHolderWrapper(balanceUpdateHandlerTest: BalanceUpdateHandlerTest, balancesHolder: BalancesHolder): TestBalanceHolderWrapper {
- val testBalanceHolderWrapper = TestBalanceHolderWrapper(balanceUpdateHandlerTest, balancesHolder)
+ open fun testBalanceHolderWrapper(balancesService: BalancesService,
+ balancesHolder: BalancesHolder): TestBalanceHolderWrapper {
+ val testBalanceHolderWrapper = TestBalanceHolderWrapper(balancesService, balancesHolder)
testBalanceHolderWrapper.updateBalance(CLIENT_NAME1, ASSET_ID, 100.0)
testBalanceHolderWrapper.updateReservedBalance(CLIENT_NAME1, ASSET_ID, 50.0)
return testBalanceHolderWrapper
diff --git a/src/test/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculatorTest.kt b/src/test/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculatorTest.kt
index 97bdd4517..d4ddf0abc 100644
--- a/src/test/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculatorTest.kt
+++ b/src/test/kotlin/com/lykke/matching/engine/utils/balance/ReservedVolumesRecalculatorTest.kt
@@ -7,6 +7,7 @@ import com.lykke.matching.engine.daos.AssetPair
import com.lykke.matching.engine.daos.order.LimitOrderType
import com.lykke.matching.engine.daos.setting.AvailableSettingGroup
import com.lykke.matching.engine.database.*
+import com.lykke.matching.engine.database.cache.ApplicationSettingsCache
import com.lykke.matching.engine.holders.BalancesDatabaseAccessorsHolder
import com.lykke.matching.engine.notification.BalanceUpdateHandlerTest
import com.lykke.matching.engine.order.utils.TestOrderBookWrapper
@@ -29,7 +30,6 @@ import org.springframework.test.context.junit4.SpringRunner
import java.math.BigDecimal
import kotlin.test.assertEquals
import com.lykke.matching.engine.utils.assertEquals
-import com.lykke.matching.engine.utils.getSetting
import java.util.concurrent.BlockingQueue
@RunWith(SpringRunner::class)
@@ -60,15 +60,6 @@ class ReservedVolumesRecalculatorTest {
return testDictionariesDatabaseAccessor
}
-
- @Bean
- @Primary
- open fun testConfig(): TestSettingsDatabaseAccessor {
- val testSettingsDatabaseAccessor = TestSettingsDatabaseAccessor()
- testSettingsDatabaseAccessor.createOrUpdateSetting(AvailableSettingGroup.TRUSTED_CLIENTS, getSetting("trustedClient"))
- testSettingsDatabaseAccessor.createOrUpdateSetting(AvailableSettingGroup.TRUSTED_CLIENTS, getSetting("trustedClient2"))
- return testSettingsDatabaseAccessor
- }
}
@Autowired
@@ -96,6 +87,9 @@ class ReservedVolumesRecalculatorTest {
@Autowired
lateinit var balanceUpdateHandlerTest: BalanceUpdateHandlerTest
+ @Autowired
+ lateinit var applicationSettingsCache: ApplicationSettingsCache
+
@Before
fun setUp() {
testOrderBookWrapper.addLimitOrder(buildLimitOrder(clientId = "trustedClient", assetId = "BTCUSD", price = 10000.0, volume = -1.0, reservedVolume = 0.5))
@@ -108,10 +102,10 @@ class ReservedVolumesRecalculatorTest {
testOrderBookWrapper.addStopLimitOrder(buildLimitOrder(uid = "4", clientId = "Client2", assetId = "BTCUSD", type = LimitOrderType.STOP_LIMIT, volume = 0.1, lowerLimitPrice = 10000.0, lowerPrice = 10900.0))
testBalanceHolderWrapper.updateBalance("trustedClient", "BTC", 10.0)
- testBalanceHolderWrapper.updateReservedBalance("trustedClient", "BTC", 2.0, false)
+ testBalanceHolderWrapper.updateReservedBalance("trustedClient", "BTC", 2.0)
// negative reserved balance
testBalanceHolderWrapper.updateBalance("trustedClient2", "BTC", 1.0)
- testBalanceHolderWrapper.updateReservedBalance("trustedClient2", "BTC", -0.001, false)
+ testBalanceHolderWrapper.updateReservedBalance("trustedClient2", "BTC", -0.001)
testBalanceHolderWrapper.updateBalance("Client3", "BTC", 0.0)
testBalanceHolderWrapper.updateReservedBalance("Client3", "BTC", -0.001)
@@ -135,6 +129,9 @@ class ReservedVolumesRecalculatorTest {
testBalanceHolderWrapper.updateBalance("Client2", "USD", 990.0)
testBalanceHolderWrapper.updateReservedBalance("Client2", "USD", 1.0)
+
+ applicationSettingsCache.createOrUpdateSettingValue(AvailableSettingGroup.TRUSTED_CLIENTS, "trustedClient", "trustedClient", true)
+ applicationSettingsCache.createOrUpdateSettingValue(AvailableSettingGroup.TRUSTED_CLIENTS, "trustedClient2", "trustedClient2", true)
}
@Test