Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9b8e511
LWDEV-10200 remove obsolete balances functionality in balances holder
olpapchenko Dec 17, 2018
c45efd9
LWDEV-9478 do now use persistence manager in wallets processor
olpapchenko Dec 18, 2018
fee59c8
LWDEV-9478 create wallet operations
olpapchenko Dec 18, 2018
998c1c7
LWDEV-9478 add console log, add balances service
olpapchenko Dec 19, 2018
ca9b6bd
LWDEV-9478 move send notifications to balances service
olpapchenko Dec 20, 2018
db51a55
LWDEV-9478 fix tests
olpapchenko Dec 20, 2018
0da37d4
Merge branch 'master' into LWDEV-9478
olpapchenko Dec 21, 2018
0ca9b06
LWDEV-9478 add balance update service test
olpapchenko Dec 21, 2018
ea27377
LWDEV-9478 fix tests
olpapchenko Dec 21, 2018
4bb6de5
Merge branch 'master' into LWDEV-9478
olpapchenko Jan 14, 2019
4930d30
LWDEV-9478 merge fixes
olpapchenko Jan 14, 2019
1095a04
Merge branch 'master' into LWDEV-9478
olpapchenko Apr 8, 2019
97bf4f9
LWDEV-9478 merge fixes
olpapchenko Apr 8, 2019
ed59aa2
LWDEV-9478 merge fixes
olpapchenko Apr 8, 2019
20f6d7c
Merge branch 'master' into LWDEV-9478
olpapchenko Apr 10, 2019
ef3a087
LWDEV-9478 fix logs
olpapchenko Apr 11, 2019
fd928b3
LWDEV-9478 fixes
olpapchenko Apr 16, 2019
344aeeb
LWDEV-9478 fixes
olpapchenko Apr 16, 2019
715c789
LWDEV-9478 remove send notifications from wallets processor
olpapchenko Apr 17, 2019
3a1fdee
Merge branch 'master' into LWDEV-9478
olpapchenko Apr 24, 2019
8f6f188
Merge branch 'master' into LWDEV-9478
olpapchenko Apr 26, 2019
e61bfbe
LWDEV-9478 merge fixes
olpapchenko Apr 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/dist/cfg/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,24 @@
<AppenderRef ref="ConsoleLog"/>
</Logger>

<Logger name="com.lykke.matching.engine.database.reconciliation.AccountsMigrationService"
level="info" additivity="false">
<AppenderRef ref="ServerLog"/>
<AppenderRef ref="ConsoleLog"/>
</Logger>

<Logger name="com.lykke.matching.engine.database.reconciliation.OrdersMigrationService"
level="info" additivity="false">
<AppenderRef ref="ServerLog"/>
<AppenderRef ref="ConsoleLog"/>
</Logger>

<Logger name="com.lykke.matching.engine.utils.balance.ReservedVolumesRecalculator"
level="info" additivity="false">
<AppenderRef ref="ServerLog"/>
<AppenderRef ref="ConsoleLog"/>
</Logger>

<AsyncLogger name="com.lykke.matching.engine.services.events.listeners.MessageProcessingSwitchListener"
level="info" additivity="false">
<AppenderRef ref="ServerLog"/>
Expand Down Expand Up @@ -541,6 +559,16 @@
<AppenderRef ref="BalanceUpdatesLog"/>
</AsyncLogger>

<AsyncLogger name="com.lykke.matching.engine.services.BalancesServiceImpl"
level="info" additivity="false">
<AppenderRef ref="BalanceUpdatesLog"/>
</AsyncLogger>

<AsyncLogger name="com.lykke.matching.engine.outgoing.senders.impl.specialized.CashInOutOldEventSender"
level="info" additivity="false">
<AppenderRef ref="BalanceUpdatesLog"/>
</AsyncLogger>

# Errors logger
<AsyncLogger name="com.lykke.utils.logging.MetricsLogger"
level="debug" additivity="false">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,9 @@ package com.lykke.matching.engine.balance
import com.lykke.matching.engine.daos.WalletOperation
import com.lykke.matching.engine.daos.wallet.AssetBalance
import com.lykke.matching.engine.daos.wallet.Wallet
import com.lykke.matching.engine.database.PersistenceManager
import com.lykke.matching.engine.database.common.entity.BalancesData
import com.lykke.matching.engine.database.common.entity.OrderBooksPersistenceData
import com.lykke.matching.engine.database.common.entity.PersistenceData
import com.lykke.matching.engine.deduplication.ProcessedMessage
import com.lykke.matching.engine.holders.ApplicationSettingsHolder
import com.lykke.matching.engine.holders.AssetsHolder
import com.lykke.matching.engine.holders.BalancesHolder
import com.lykke.matching.engine.outgoing.messages.BalanceUpdate
import com.lykke.matching.engine.outgoing.messages.ClientBalanceUpdate
import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolder
import com.lykke.matching.engine.order.transaction.WalletAssetBalance
Expand All @@ -20,21 +14,18 @@ import com.lykke.utils.logging.MetricsLogger
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.math.BigDecimal
import java.util.Date

class WalletOperationsProcessor(private val balancesHolder: BalancesHolder,
private val currentTransactionBalancesHolder: CurrentTransactionBalancesHolder,
class WalletOperationsProcessor(private val currentTransactionBalancesHolder: CurrentTransactionBalancesHolder,
private val applicationSettingsHolder: ApplicationSettingsHolder,
private val persistenceManager: PersistenceManager,
private val assetsHolder: AssetsHolder,
private val logger: Logger?): BalancesGetter {
private val logger: Logger?) : BalancesGetter {

companion object {
private val LOGGER = LoggerFactory.getLogger(WalletOperationsProcessor::class.java.name)
private val METRICS_LOGGER = MetricsLogger.getLogger()
}

private val clientBalanceUpdatesByClientIdAndAssetId = HashMap<String, ClientBalanceUpdate>()
private val clientBalanceUpdatesByClientIdAndAssetId = HashMap<String, ClientBalanceUpdate>()

fun preProcess(operations: Collection<WalletOperation>,
allowInvalidBalance: Boolean = false,
Expand All @@ -60,6 +51,14 @@ class WalletOperationsProcessor(private val balancesHolder: BalancesHolder,
changedAssetBalance.reserved
}

validateChangedAssetBalances(changedAssetBalances, allowInvalidBalance)


changedAssetBalances.forEach { processChangedAssetBalance(it.value) }
return this
}

private fun validateChangedAssetBalances(changedAssetBalances: HashMap<String, ChangedAssetBalance>, allowInvalidBalance: Boolean) {
try {
changedAssetBalances.values.forEach { validateBalanceChange(it) }
} catch (e: BalanceException) {
Expand All @@ -70,9 +69,6 @@ class WalletOperationsProcessor(private val balancesHolder: BalancesHolder,
(logger ?: LOGGER).error(message)
METRICS_LOGGER.logError(message, e)
}

changedAssetBalances.forEach { processChangedAssetBalance(it.value) }
return this
}

private fun processChangedAssetBalance(changedAssetBalance: ChangedAssetBalance) {
Expand Down Expand Up @@ -114,23 +110,6 @@ class WalletOperationsProcessor(private val balancesHolder: BalancesHolder,
return currentTransactionBalancesHolder.persistenceData()
}

fun persistBalances(processedMessage: ProcessedMessage?,
orderBooksData: OrderBooksPersistenceData?,
stopOrderBooksData: OrderBooksPersistenceData?,
messageSequenceNumber: Long?): Boolean {
return persistenceManager.persist(PersistenceData(persistenceData(),
processedMessage,
orderBooksData,
stopOrderBooksData,
messageSequenceNumber))
}

fun sendNotification(id: String, type: String, messageId: String) {
if (clientBalanceUpdatesByClientIdAndAssetId.isNotEmpty()) {
balancesHolder.sendBalanceUpdate(BalanceUpdate(id, type, Date(), clientBalanceUpdatesByClientIdAndAssetId.values.toList(), messageId))
}
}

fun getClientBalanceUpdates(): List<ClientBalanceUpdate> {
return clientBalanceUpdatesByClientIdAndAssetId.values.toList()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.lykke.matching.engine.balance

import com.lykke.matching.engine.holders.ApplicationSettingsHolder
import com.lykke.matching.engine.holders.AssetsHolder
import com.lykke.matching.engine.order.transaction.CurrentTransactionBalancesHolderFactory
import org.slf4j.Logger
import org.springframework.stereotype.Component

@Component
class WalletOperationsProcessorFactory(private val currentTransactionBalancesHolderFactory: CurrentTransactionBalancesHolderFactory,
private val applicationSettingsHolder: ApplicationSettingsHolder,
private val assetsHolder: AssetsHolder) {
fun create(logger: Logger?): WalletOperationsProcessor {

return WalletOperationsProcessor(
currentTransactionBalancesHolderFactory.create(),
applicationSettingsHolder,
assetsHolder,
logger)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.lykke.matching.engine.database.Storage
import com.lykke.matching.engine.database.azure.AzureWalletDatabaseAccessor
import com.lykke.matching.engine.database.redis.accessor.impl.RedisWalletDatabaseAccessor
import com.lykke.matching.engine.exception.MatchingEngineException
import com.lykke.matching.engine.holders.BalancesHolder
import com.lykke.matching.engine.services.BalancesService
import com.lykke.matching.engine.utils.config.Config
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
Expand All @@ -17,9 +17,9 @@ import java.util.*

@Component
@Order(1)
class AccountsMigrationService @Autowired constructor (private val balancesHolder: BalancesHolder,
private val config: Config,
private val redisWalletDatabaseAccessor: Optional<RedisWalletDatabaseAccessor>): ApplicationRunner {
class AccountsMigrationService @Autowired constructor (private val config: Config,
private val redisWalletDatabaseAccessor: Optional<RedisWalletDatabaseAccessor>,
private val balancesService: BalancesService): ApplicationRunner {
override fun run(args: ApplicationArguments?) {
if (config.me.walletsMigration) {
migrateAccounts()
Expand Down Expand Up @@ -55,38 +55,41 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde
}

val startTime = Date().time
teeLog("Starting wallets migration from azure to redis; azure table: $azureAccountsTableName, redis: ${config.me.redis.host}.${config.me.redis.port}")
LOGGER.info("Starting wallets migration from azure to redis; azure table: $azureAccountsTableName, redis: ${config.me.redis.host}.${config.me.redis.port}")
val wallets = azureDatabaseAccessor.loadWallets()
val loadTime = Date().time
teeLog("Loaded ${wallets.size} wallets from azure (ms: ${loadTime - startTime})")
balancesHolder.insertOrUpdateWallets(wallets.values.toList(), null)
LOGGER.info("Loaded ${wallets.size} wallets from azure (ms: ${loadTime - startTime})")
val balancesSaved = balancesService.insertOrUpdateWallets(wallets.values.toList(), null)
if (!balancesSaved) {
LOGGER.error("Can not save balances data during migration from azure db to redis")
return
}
val saveTime = Date().time
teeLog("Saved ${wallets.size} wallets to redis (ms: ${saveTime - loadTime})")
LOGGER.info("Saved ${wallets.size} wallets to redis (ms: ${saveTime - loadTime})")

compare()
}

fun fromRedisToDb() {
val startTime = Date().time
teeLog("Starting wallets migration from redis to azure; redis: ${config.me.redis.host}.${config.me.redis.port}, azure table: $azureAccountsTableName")
LOGGER.info("Starting wallets migration from redis to azure; redis: ${config.me.redis.host}.${config.me.redis.port}, azure table: $azureAccountsTableName")
val loadTime = Date().time
val wallets = redisWalletDatabaseAccessor.get().loadWallets()
if (wallets.isEmpty()) {
throw AccountsMigrationException("There are no wallets in redis ${config.me.redis.host}.${config.me.redis.port}")
}
teeLog("Loaded ${wallets.size} wallets from redis (ms: ${loadTime - startTime})")
balancesHolder.insertOrUpdateWallets(wallets.values.toList(), null)
LOGGER.info("Loaded ${wallets.size} wallets from redis (ms: ${loadTime - startTime})")
val balancesSaved = balancesService.insertOrUpdateWallets(wallets.values.toList(), null)
if (!balancesSaved) {
LOGGER.error("Can not save balances data during migration from redis to azure db")
return
}
val saveTime = Date().time
teeLog("Saved ${wallets.size} wallets to azure (ms: ${saveTime - loadTime})")
LOGGER.info("Saved ${wallets.size} wallets to azure (ms: ${saveTime - loadTime})")

compare()
}

private fun teeLog(message: String) {
println(message)
LOGGER.info(message)
}

/** Compares balances stored in redis & azure; logs comparison result */
private fun compare() {
val azureWallets = azureDatabaseAccessor.loadWallets().filter { it.value.balances.isNotEmpty() }
Expand All @@ -98,28 +101,28 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde

val differentWallets = LinkedList<String>()

teeLog("Comparison result. Differences: ")
teeLog("---------------------------------------------------------------------------------------------")
LOGGER.info("Comparison result. Differences: ")
LOGGER.info("---------------------------------------------------------------------------------------------")
commonClients.forEach {
val azureWallet = azureWallets[it]
val redisWallet = redisWallets[it]
if (!compareBalances(azureWallet!!, redisWallet!!)) {
differentWallets.add(it)
}
}
teeLog("---------------------------------------------------------------------------------------------")

teeLog("Total: ")
teeLog("azure clients count: ${azureWallets.size}")
teeLog("redis clients count: ${redisWallets.size}")
teeLog("only azure clients (count: ${onlyAzureClients.size}): $onlyAzureClients")
teeLog("only redis clients (count: ${onlyRedisClients.size}): $onlyRedisClients")
teeLog("clients with different wallets (count: ${differentWallets.size}): $differentWallets")
LOGGER.info("---------------------------------------------------------------------------------------------")

LOGGER.info("Total: ")
LOGGER.info("azure clients count: ${azureWallets.size}")
LOGGER.info("redis clients count: ${redisWallets.size}")
LOGGER.info("only azure clients (count: ${onlyAzureClients.size}): $onlyAzureClients")
LOGGER.info("only redis clients (count: ${onlyRedisClients.size}): $onlyRedisClients")
LOGGER.info("clients with different wallets (count: ${differentWallets.size}): $differentWallets")
}

private fun compareBalances(azureWallet: Wallet, redisWallet: Wallet): Boolean {
if (azureWallet.clientId != redisWallet.clientId) {
teeLog("different clients: ${azureWallet.clientId} & ${redisWallet.clientId}")
LOGGER.info("different clients: ${azureWallet.clientId} & ${redisWallet.clientId}")
return false
}
val clientId = azureWallet.clientId
Expand All @@ -130,7 +133,7 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde
val onlyRedisAssets = redisBalances.keys.filterNot { azureBalances.keys.contains(it) }

if (onlyAzureAssets.isNotEmpty() || onlyRedisAssets.isNotEmpty()) {
teeLog("different asset sets: $onlyAzureAssets & $onlyRedisAssets, client: $clientId")
LOGGER.info("different asset sets: $onlyAzureAssets & $onlyRedisAssets, client: $clientId")
return false
}

Expand All @@ -139,11 +142,11 @@ class AccountsMigrationService @Autowired constructor (private val balancesHolde
val azureBalance = azureBalances[it]
val redisBalance = redisBalances[it]
if (azureBalance!!.balance != redisBalance!!.balance) {
teeLog("different balances: ${azureBalance.balance} & ${redisBalance.balance}, client: $clientId")
LOGGER.info("different balances: ${azureBalance.balance} & ${redisBalance.balance}, client: $clientId")
return false
}
if (azureBalance.reserved != redisBalance.reserved) {
teeLog("different reserved balances: ${azureBalance.reserved} & ${redisBalance.reserved}, client: $clientId")
LOGGER.info("different reserved balances: ${azureBalance.reserved} & ${redisBalance.reserved}, client: $clientId")
return false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class OrdersMigrationService(private val config: Config,
return
}
if (config.me.storage != Storage.Redis) {
teeLog("Do not perform migration to files")
LOGGER.info("Do not perform migration to files")
return
}
fromFilesToRedis()
Expand All @@ -59,12 +59,12 @@ class OrdersMigrationService(private val config: Config,
throw Exception("Stop orders already exist in redis ${config.me.redis.host}.${config.me.redis.port}/${config.me.redis.ordersDatabase}")
}
val startTime = Date().time
teeLog("Starting orders migration from files to redis; files dirs: ${config.me.orderBookPath}, ${config.me.stopOrderBookPath}" +
LOGGER.info("Starting orders migration from files to redis; files dirs: ${config.me.orderBookPath}, ${config.me.stopOrderBookPath}" +
", redis: ${config.me.redis.host}.${config.me.redis.port}/${config.me.redis.ordersDatabase}")
val orders = fileOrderBookDatabaseAccessor.loadLimitOrders()
val stopOrders = fileStopOrderBookDatabaseAccessor.loadStopLimitOrders()
val loadTime = Date().time
teeLog("Loaded ${orders.size} orders from files (ms: ${loadTime - startTime})")
LOGGER.info("Loaded ${orders.size} orders from files (ms: ${loadTime - startTime})")
persistenceManager.persist(PersistenceData(null,
null,
OrderBooksPersistenceData(mapOrdersToOrderBookPersistenceDataList(orders, LOGGER),
Expand All @@ -77,11 +77,6 @@ class OrdersMigrationService(private val config: Config,
genericLimitOrderService.update()
genericStopLimitOrderService.update()
val saveTime = Date().time
teeLog("Saved ${orders.size} orders and ${stopOrders.size} stop orders to redis (ms: ${saveTime - loadTime})")
}

private fun teeLog(message: String) {
println(message)
LOGGER.info(message)
LOGGER.info("Saved ${orders.size} orders and ${stopOrders.size} stop orders to redis (ms: ${saveTime - loadTime})")
}
}
Loading