Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/dist/cfg/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,11 @@
</AsyncLogger>

# Multi Limit Order
<Logger name="MultiLimitOrderPreProcessing"
level="debug" additivity="false">
<AppenderRef ref="MultiLimitLog"/>
</Logger>

<AsyncLogger name="com.lykke.matching.engine.services.validators.impl.MultiLimitOrderValidatorImpl"
level="debug" additivity="false">
<AppenderRef ref="MultiLimitLog"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.lykke.matching.engine.incoming.preprocessor.impl.CashInOutPreprocesso
import com.lykke.matching.engine.incoming.preprocessor.impl.CashTransferPreprocessor
import com.lykke.matching.engine.incoming.preprocessor.impl.LimitOrderCancelOperationPreprocessor
import com.lykke.matching.engine.incoming.preprocessor.impl.LimitOrderMassCancelOperationPreprocessor
import com.lykke.matching.engine.incoming.preprocessor.impl.MultilimitOrderPreprocessor
import com.lykke.matching.engine.incoming.preprocessor.impl.SingleLimitOrderPreprocessor
import com.lykke.matching.engine.messages.MessageWrapper
import com.lykke.utils.logging.ThrottlingLogger
Expand Down Expand Up @@ -38,6 +39,17 @@ open class InputQueueListenerConfig {
"CashInOutInputQueueListener")
}

@Bean
open fun multilimitOrderListener(multilimitOrderInputQueue: BlockingQueue<MessageWrapper>,
multilimitOrderPreprocessor: MultilimitOrderPreprocessor,
@Qualifier("multiLimitOrderPreProcessingLogger")
logger: ThrottlingLogger): InputQueueListener {
return InputQueueListener(multilimitOrderInputQueue,
multilimitOrderPreprocessor,
logger,
"MultilimitOrderListener")
}

@Bean
open fun limitOrderCancelInputQueueListener(limitOrderCancelInputQueue: BlockingQueue<MessageWrapper>,
limitOrderCancelOperationPreprocessor: LimitOrderCancelOperationPreprocessor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ open class LoggerConfig {
return LoggerFactory.getLogger("AppStarter")
}

@Bean
open fun multiLimitOrderPreProcessingLogger(): ThrottlingLogger {
return ThrottlingLogger.getLogger("MultiLimitOrderPreProcessing")
}

@Bean
open fun singleLimitOrderPreProcessingLogger(): ThrottlingLogger {
return ThrottlingLogger.getLogger("SingleLimitOrderPreProcessing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ open class QueueConfig {
return LinkedBlockingQueue<MessageWrapper>()
}

@Bean
@InputQueue
open fun multilimitOrderInputQueue(): BlockingQueue<MessageWrapper> {
return LinkedBlockingQueue<MessageWrapper>()
}

@Bean
@InputQueue
open fun cashInOutInputQueue(): BlockingQueue<MessageWrapper> {
Expand Down
32 changes: 32 additions & 0 deletions src/main/kotlin/com/lykke/matching/engine/daos/LimitOrder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.lykke.matching.engine.daos.fee.v2.NewLimitOrderFeeInstruction
import com.lykke.matching.engine.daos.order.OrderTimeInForce
import com.lykke.matching.engine.daos.order.LimitOrderType
import com.lykke.matching.engine.daos.v2.LimitOrderFeeInstruction
import com.lykke.matching.engine.utils.NumberUtils
import org.nustaq.serialization.annotations.Version
import java.io.Serializable
import java.math.BigDecimal
Expand Down Expand Up @@ -104,4 +105,35 @@ class LimitOrder(id: String,
fun isExpired(date: Date): Boolean {
return hasExpiryTime() && !expiryTime!!.after(date)
}

override fun toString(): String {
return "id: $externalId" +
if(previousExternalId != null) ", previousExternalId: $previousExternalId" else "" +
if(parentOrderExternalId != null) ", parentOrderExternalId: $previousExternalId" else "" +
if(childOrderExternalId != null) ", childOrderExternalId: $childOrderExternalId" else "" +

", type: $type" +
", client: $clientId" +
", assetPair: $assetPairId" +
", status: $status" +

", volume: ${NumberUtils.roundForPrint(volume)}" +
(if (reservedLimitVolume != null) ", reservedLimitVolume: $reservedLimitVolume" else "") +
", remainingVolume: $remainingVolume" +
", price: ${NumberUtils.roundForPrint(price)}" +
(if (lowerLimitPrice != null) ", lowerLimitPrice: ${NumberUtils.roundForPrint(lowerLimitPrice)}" else "") +
(if (lowerPrice != null) ", lowerPrice: ${NumberUtils.roundForPrint(lowerPrice)}" else "") +
(if (upperLimitPrice != null) ", upperLimitPrice: ${NumberUtils.roundForPrint(upperLimitPrice)}" else "") +
(if (upperPrice != null) ", upperPrice: ${NumberUtils.roundForPrint(upperPrice)}" else "") +

", createdAt: $createdAt" +
(if (statusDate != null) ", statusDate: $statusDate" else "") +
(if (registered != null) ", registered: $registered" else "") +
(if (lastMatchTime != null) ", lastMatchTime: $lastMatchTime" else "") +

", fee: $fee" +
", fees: $fees" +
(if (timeInForce != null) ", timeInForce: $timeInForce" else "") +
(if (expiryTime != null) ", expiryTime: $expiryTime" else "")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.lykke.matching.engine.daos.context
import com.lykke.matching.engine.deduplication.ProcessedMessage
import com.lykke.matching.engine.messages.MessageType

class LimitOrderMassCancelOperationContext(val uid: String,
data class LimitOrderMassCancelOperationContext(val uid: String,
val messageId: String,
val clientId: String?,
val processedMessage: ProcessedMessage,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.lykke.matching.engine.daos.context

import com.lykke.matching.engine.daos.Asset
import com.lykke.matching.engine.daos.AssetPair
import com.lykke.matching.engine.daos.MultiLimitOrder
import com.lykke.matching.engine.services.validators.MultilimitOrderValidationResult

data class MultilimitOrderContext(val assetPair: AssetPair?,
val baseAsset: Asset?,
val quotingAsset: Asset?,
val isTrustedClient: Boolean,
val multiLimitOrder: MultiLimitOrder,
var multilimitOrderValidationResult: MultilimitOrderValidationResult? = null)
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import com.lykke.matching.engine.daos.AssetPair
import com.lykke.matching.engine.daos.LimitOrder
import com.lykke.matching.engine.deduplication.ProcessedMessage
import com.lykke.matching.engine.services.validators.impl.OrderValidationResult
import com.lykke.matching.engine.utils.NumberUtils

class SingleLimitOrderContext(val messageId: String,
data class SingleLimitOrderContext(val messageId: String,
val limitOrder: LimitOrder,
val isCancelOrders: Boolean,
val assetPair: AssetPair?,
Expand All @@ -30,25 +29,10 @@ class SingleLimitOrderContext(val messageId: String,
builder.processedMessage)

override fun toString(): String {
val order = this.limitOrder

return "id: ${limitOrder.externalId}" +
", messageId: $messageId" +
", type: ${order.type}" +
", client: ${order.clientId}" +
return ", messageId: $messageId" +
", isTrustedClient: $isTrustedClient" +
", assetPair: ${order.assetPairId}" +
", volume: ${NumberUtils.roundForPrint(order.volume)}" +
", price: ${NumberUtils.roundForPrint(order.price)}" +
(if (order.lowerLimitPrice != null) ", lowerLimitPrice: ${NumberUtils.roundForPrint(order.lowerLimitPrice)}" else "") +
(if (order.lowerPrice != null) ", lowerPrice: ${NumberUtils.roundForPrint(order.lowerPrice)}" else "") +
(if (order.upperLimitPrice != null) ", upperLimitPrice: ${NumberUtils.roundForPrint(order.upperLimitPrice)}" else "") +
(if (order.upperPrice != null) ", upperPrice: ${NumberUtils.roundForPrint(order.upperPrice)}" else "") +
", cancel: $isCancelOrders" +
", fee: ${order.fee}" +
", fees: ${order.fees}" +
(if (order.timeInForce != null) ", timeInForce=${order.timeInForce}" else "") +
(if (order.expiryTime != null) ", expiryTime=${order.expiryTime}" else "")
", limitOrder: $limitOrder"
}

class Builder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class MessageRouter(
private val cashTransferInputQueue: BlockingQueue<MessageWrapper>,
private val limitOrderCancelInputQueue: BlockingQueue<MessageWrapper>,
private val limitOrderMassCancelInputQueue: BlockingQueue<MessageWrapper>,
private val multilimitOrderInputQueue: BlockingQueue<MessageWrapper>,
val preProcessedMessageQueue: BlockingQueue<MessageWrapper>
) {
fun process(wrapper: MessageWrapper) {
Expand All @@ -21,6 +22,7 @@ class MessageRouter(
MessageType.LIMIT_ORDER.type -> limitOrderInputQueue.put(wrapper)
MessageType.LIMIT_ORDER_CANCEL.type -> limitOrderCancelInputQueue.put(wrapper)
MessageType.LIMIT_ORDER_MASS_CANCEL.type -> limitOrderMassCancelInputQueue.put(wrapper)
MessageType.MULTI_LIMIT_ORDER.type -> multilimitOrderInputQueue.put(wrapper)

else -> preProcessedMessageQueue.put(wrapper)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.lykke.utils.logging.MetricsLogger
import com.lykke.utils.logging.ThrottlingLogger
import java.util.concurrent.BlockingQueue
import javax.annotation.PostConstruct
import javax.annotation.PreDestroy

class InputQueueListener(private val inputQueue: BlockingQueue<MessageWrapper>,
private val preProcessor: MessagePreprocessor,
Expand All @@ -20,14 +21,34 @@ class InputQueueListener(private val inputQueue: BlockingQueue<MessageWrapper>,
@PostConstruct
fun init() = start()

@PreDestroy
fun shutdown() {
this.interrupt()
}

override fun run() {
while (true) {
try {
preProcessor.preProcess(inputQueue.take())
if(!process()) {
return
}
} catch (e: Exception) {
logger.error(ERROR_MESSAGE, e)
METRICS_LOGGER.logError(ERROR_MESSAGE, e)
}
}
}


private fun process(): Boolean {
try {
val messageWrapper = inputQueue.take()
preProcessor.preProcess(messageWrapper)
} catch(e: InterruptedException) {
this.interrupt()
return false
}

return true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.lykke.matching.engine.incoming.parsers.data

import com.lykke.matching.engine.messages.MessageWrapper

class MultilimitOrderParsedData(messageWrapper: MessageWrapper, val inputAssetPairId: String) : ParsedData(messageWrapper)
Loading