From 946e6fc2798b415d4880549d473fc6943910b486 Mon Sep 17 00:00:00 2001 From: caniro Date: Sun, 8 Jun 2025 03:10:50 +0900 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20=EC=B2=B4=EA=B2=B0=EC=9D=B4=20?= =?UTF-8?q?=EC=97=B0=EC=86=8D=EC=A0=81=EC=9C=BC=EB=A1=9C=20=EC=B2=98?= =?UTF-8?q?=EB=A6=AC=EB=90=98=EB=8F=84=EB=A1=9D=20=EB=A1=9C=EC=A7=81=20?= =?UTF-8?q?=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../coin/trade/application/TradeExecutor.java | 7 ++++--- .../trade/application/TradeFlowService.java | 20 ++++++++++++++----- .../coin/trade/application/TradeMatcher.java | 11 ---------- .../trade/application/TradeQueueManager.java | 10 +++------- .../application/TradeFlowServiceTest.java | 16 +++------------ 5 files changed, 25 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java index 1e96fbfa..a7256f70 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java @@ -12,11 +12,12 @@ import com.cleanengine.coin.user.domain.Wallet; import com.cleanengine.coin.user.info.application.AccountService; import com.cleanengine.coin.user.info.application.WalletService; -import jakarta.transaction.Transactional; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; @@ -33,7 +34,7 @@ public class TradeExecutor { private final TradeExecutedEventPublisher tradeExecutedEventPublisher; private final TradeService tradeService; - @Transactional + @Transactional(propagation = Propagation.REQUIRES_NEW) public void executeTrade(WaitingOrders waitingOrders, TradePair tradePair, String ticker) { BuyOrder buyOrder = tradePair.getBuyOrder(); SellOrder sellOrder = tradePair.getSellOrder(); @@ -48,7 +49,7 @@ public void executeTrade(WaitingOrders waitingOrders, TradePair tr tradedPrice = tradeUnitPriceAndSize.tradedPrice(); if (approxEquals(tradedSize, 0.0)) { log.debug("체결 중단! 체결 시도 수량 : {}, 매수단가 : {}, 매도단가 : {}", tradedSize, buyOrder.getPrice(), sellOrder.getPrice()); - return; + return ; } this.writeTradingLog(buyOrder, sellOrder); diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java index 5a7cab22..df367924 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java @@ -2,11 +2,10 @@ import com.cleanengine.coin.order.domain.Order; import com.cleanengine.coin.order.domain.spi.WaitingOrders; +import com.cleanengine.coin.order.domain.spi.WaitingOrdersManager; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; import java.util.Optional; @@ -17,14 +16,25 @@ public class TradeFlowService { private final TradeMatcher tradeMatcher; private final TradeExecutor tradeExecutor; + private final WaitingOrdersManager waitingOrdersManager; - @Transactional(propagation = Propagation.REQUIRES_NEW) public void execMatchAndTrade(String ticker) { - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); + WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); // TODO : peek() 해온 Order 객체들을 lock -> 체결 도중 취소 방지 Optional> tradePair = tradeMatcher.matchOrders(waitingOrders); + boolean continueProcessing = tradePair.isPresent(); - tradePair.ifPresent(orderOrderTradePair -> tradeExecutor.executeTrade(waitingOrders, orderOrderTradePair, ticker)); + while (continueProcessing) { + try { + tradeExecutor.executeTrade(waitingOrders, tradePair.get(), ticker); + tradePair = tradeMatcher.matchOrders(waitingOrders); + continueProcessing = tradePair.isPresent(); + } catch (Exception e) { + // TODO : 회복 필요 + log.error("Error processing trades for {}: {}", ticker, e.getMessage()); + continueProcessing = false; + } + } } } diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeMatcher.java b/src/main/java/com/cleanengine/coin/trade/application/TradeMatcher.java index 11bfa6ba..0fe3e3e4 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeMatcher.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeMatcher.java @@ -5,7 +5,6 @@ import com.cleanengine.coin.order.domain.OrderType; import com.cleanengine.coin.order.domain.SellOrder; import com.cleanengine.coin.order.domain.spi.WaitingOrders; -import com.cleanengine.coin.order.domain.spi.WaitingOrdersManager; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -15,20 +14,10 @@ @Component public class TradeMatcher { - private final WaitingOrdersManager waitingOrdersManager; - // 1초마다 로깅 private long lastLogTime = 0; private static final long LOG_INTERVAL = 1000; - public TradeMatcher(WaitingOrdersManager waitingOrdersManager) { - this.waitingOrdersManager = waitingOrdersManager; - } - - public WaitingOrders getWaitingOrders(String ticker) { - return waitingOrdersManager.getWaitingOrders(ticker); - } - public Optional> matchOrders(WaitingOrders waitingOrders) { // 반환값 : 체결여부 this.writeQueueLog(waitingOrders); diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeQueueManager.java b/src/main/java/com/cleanengine/coin/trade/application/TradeQueueManager.java index 46d84d26..0b198e55 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeQueueManager.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeQueueManager.java @@ -1,6 +1,5 @@ package com.cleanengine.coin.trade.application; -import com.cleanengine.coin.order.application.event.OrderCreated; import com.cleanengine.coin.order.application.event.OrderInsertedToQueue; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; @@ -19,12 +18,9 @@ public TradeQueueManager(TradeFlowService tradeFlowService) { } @EventListener - public void handleOrderInserted(OrderInsertedToQueue event) { - try { - tradeFlowService.execMatchAndTrade(event.order().getTicker()); - } catch (Exception e) { - log.error("Error processing trades for {}: {}", event.order().getTicker(), e.getMessage()); - } + public void handleOrderInserted(OrderInsertedToQueue orderInsertedToQueue) { + String ticker = orderInsertedToQueue.order().getTicker(); + tradeFlowService.execMatchAndTrade(ticker); } } \ No newline at end of file diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceTest.java index f3ad8b87..fd75eb3a 100644 --- a/src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceTest.java +++ b/src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceTest.java @@ -23,9 +23,9 @@ @ActiveProfiles({"dev", "it", "h2-mem"}) @SpringBootTest -@DisplayName("체결 처리 통합테스트") +@DisplayName("체결처리 h2 통합테스트") @Disabled -class TradeFlowServiceTest { +class TradeFlowServiceIntegrationTest { private static TradeBatchProcessor staticTradeBatchProcessor; @@ -41,10 +41,9 @@ class TradeFlowServiceTest { TradeBatchProcessor tradeBatchProcessor; @Autowired private WaitingOrdersManager waitingOrdersManager; - @Autowired - TradeMatcher tradeMatcher; private final String ticker = "BTC"; + private final WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); @BeforeEach void setUp() { @@ -76,7 +75,6 @@ void testLimitToLimitCompleteTrade() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -122,7 +120,6 @@ void testLimitToLimitPartialTrade1() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -161,7 +158,6 @@ void testLimitToLimitPartialTrade2() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -200,7 +196,6 @@ void testMarketToLimitCompleteTrade1() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -239,7 +234,6 @@ void testMarketToLimitCompleteTrade2() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -278,7 +272,6 @@ void testMarketToLimitPartialTrade1() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -317,7 +310,6 @@ void testMarketToLimitPartialTrade2() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -361,7 +353,6 @@ void testMarketToLimitPartialTrade3() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -400,7 +391,6 @@ void testMarketToLimitPartialTrade4() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); From 96a23f205bca0209906c34c9517c8e8e1ab922b2 Mon Sep 17 00:00:00 2001 From: caniro Date: Sun, 8 Jun 2025 03:11:34 +0900 Subject: [PATCH 2/3] =?UTF-8?q?chore:=20=EC=B2=B4=EA=B2=B0=EC=99=84?= =?UTF-8?q?=EB=A3=8C=20Event=20=EC=88=98=EC=8B=A0=20=EC=8B=9C=20Transactio?= =?UTF-8?q?nalEventListener=EB=A1=9C=20=EC=88=98=EC=8B=A0=ED=95=98?= =?UTF-8?q?=EB=8F=84=EB=A1=9D=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/cleanengine/coin/chart/handler/TradeEventHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/cleanengine/coin/chart/handler/TradeEventHandler.java b/src/main/java/com/cleanengine/coin/chart/handler/TradeEventHandler.java index 7b5d9270..db294c80 100644 --- a/src/main/java/com/cleanengine/coin/chart/handler/TradeEventHandler.java +++ b/src/main/java/com/cleanengine/coin/chart/handler/TradeEventHandler.java @@ -12,8 +12,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; -import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionalEventListener; @Component @RequiredArgsConstructor @@ -25,7 +25,7 @@ public class TradeEventHandler { private final RealTimeDataPrevRateService realTimeDataPrevRateService; //event로 이벤틀 처리해야한다. //eventListener는 void로 처리를 해야한다 - @EventListener + @TransactionalEventListener public void handleTradeEvent(TradeExecutedEvent event) { Trade trade = event.getTrade(); if (trade == null) { From 091bbe0803cd3ba3fb3dc229be1445f1ab6e4d33 Mon Sep 17 00:00:00 2001 From: caniro Date: Sun, 8 Jun 2025 03:20:14 +0900 Subject: [PATCH 3/3] =?UTF-8?q?chore:=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20?= =?UTF-8?q?=ED=8C=8C=EC=9D=BC=EB=AA=85=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...eFlowServiceTest.java => TradeFlowServiceIntegrationTest.java} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/test/java/com/cleanengine/coin/trade/application/{TradeFlowServiceTest.java => TradeFlowServiceIntegrationTest.java} (100%) diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceIntegrationTest.java similarity index 100% rename from src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceTest.java rename to src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceIntegrationTest.java