Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/commerce-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
// add-ons
implementation(project(":modules:jpa"))
implementation(project(":modules:redis"))
implementation(project(":modules:kafka"))
implementation(project(":supports:jackson"))
implementation(project(":supports:logging"))
implementation(project(":supports:monitoring"))
Expand Down Expand Up @@ -34,4 +35,5 @@ dependencies {
// test-fixtures
testImplementation(testFixtures(project(":modules:jpa")))
testImplementation(testFixtures(project(":modules:redis")))
testImplementation(testFixtures(project(":modules:kafka")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
import com.loopers.domain.brand.Brand;
import com.loopers.domain.product.Product;
import com.loopers.domain.product.ProductDetail;
import com.loopers.domain.product.ProductEvent;
import com.loopers.domain.product.ProductEventPublisher;
import com.loopers.support.error.CoreException;
import com.loopers.support.error.ErrorType;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.Map;
Expand All @@ -30,6 +33,7 @@ public class CatalogFacade {
private final BrandService brandService;
private final ProductService productService;
private final ProductCacheService productCacheService;
private final ProductEventPublisher productEventPublisher;

/**
* ์ƒํ’ˆ ๋ชฉ๋ก์„ ์กฐํšŒํ•ฉ๋‹ˆ๋‹ค.
Expand Down Expand Up @@ -103,16 +107,20 @@ public ProductInfoList getProducts(Long brandId, String sort, int page, int size
* ์ƒํ’ˆ ์ •๋ณด๋ฅผ ์กฐํšŒํ•ฉ๋‹ˆ๋‹ค.
* <p>
* Redis ์บ์‹œ๋ฅผ ๋จผ์ € ํ™•์ธํ•˜๊ณ , ์บ์‹œ์— ์—†์œผ๋ฉด DB์—์„œ ์กฐํšŒํ•œ ํ›„ ์บ์‹œ์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
* ์ƒํ’ˆ ์กฐํšŒ ์‹œ ProductViewed ์ด๋ฒคํŠธ๋ฅผ ๋ฐœํ–‰ํ•˜์—ฌ ๋ฉ”ํŠธ๋ฆญ ์ง‘๊ณ„์— ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.
* </p>
*
* @param productId ์ƒํ’ˆ ID
* @return ์ƒํ’ˆ ์ •๋ณด์™€ ์ข‹์•„์š” ์ˆ˜
* @throws CoreException ์ƒํ’ˆ์„ ์ฐพ์„ ์ˆ˜ ์—†๋Š” ๊ฒฝ์šฐ
*/
@Transactional(readOnly = true)
public ProductInfo getProduct(Long productId) {
// ์บ์‹œ์—์„œ ์กฐํšŒ ์‹œ๋„
ProductInfo cachedResult = productCacheService.getCachedProduct(productId);
if (cachedResult != null) {
// ์บ์‹œ ํžˆํŠธ ์‹œ์—๋„ ์กฐํšŒ ์ˆ˜ ์ง‘๊ณ„๋ฅผ ์œ„ํ•ด ์ด๋ฒคํŠธ ๋ฐœํ–‰
productEventPublisher.publish(ProductEvent.ProductViewed.from(productId));
return cachedResult;
}

Expand All @@ -133,6 +141,9 @@ public ProductInfo getProduct(Long productId) {
// ์บ์‹œ์— ์ €์žฅ
productCacheService.cacheProduct(productId, result);

// โœ… ์ƒํ’ˆ ์กฐํšŒ ์ด๋ฒคํŠธ ๋ฐœํ–‰ (๋ฉ”ํŠธ๋ฆญ ์ง‘๊ณ„์šฉ)
productEventPublisher.publish(ProductEvent.ProductViewed.from(productId));

// ๋กœ์ปฌ ์บ์‹œ์˜ ์ข‹์•„์š” ์ˆ˜ ๋ธํƒ€ ์ ์šฉ (DB ์กฐํšŒ ๊ฒฐ๊ณผ์—๋„ ๋ธํƒ€ ๋ฐ˜์˜)
return productCacheService.applyLikeCountDelta(result);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package com.loopers.application.outbox;

import com.loopers.domain.like.LikeEvent;
import com.loopers.domain.order.OrderEvent;
import com.loopers.domain.product.ProductEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

/**
* Outbox Bridge Event Listener.
* <p>
* ApplicationEvent๋ฅผ ๊ตฌ๋…ํ•˜์—ฌ ์™ธ๋ถ€ ์‹œ์Šคํ…œ(Kafka)์œผ๋กœ ์ „์†กํ•ด์•ผ ํ•˜๋Š” ์ด๋ฒคํŠธ๋ฅผ
* Transactional Outbox Pattern์„ ํ†ตํ•ด Outbox์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
* </p>
* <p>
* <b>ํ‘œ์ค€ ํŒจํ„ด:</b>
* <ul>
* <li>EventPublisher๋Š” ApplicationEvent๋งŒ ๋ฐœํ–‰ (๋‹จ์ผ ์ฑ…์ž„)</li>
* <li>์ด ์ปดํฌ๋„ŒํŠธ๊ฐ€ ApplicationEvent๋ฅผ ๊ตฌ๋…ํ•˜์—ฌ Outbox์— ์ €์žฅ (๊ด€์‹ฌ์‚ฌ ๋ถ„๋ฆฌ)</li>
* <li>ํŠธ๋žœ์žญ์…˜ ์ปค๋ฐ‹ ํ›„(AFTER_COMMIT) ์ฒ˜๋ฆฌํ•˜์—ฌ ์—๋Ÿฌ ๊ฒฉ๋ฆฌ</li>
* </ul>
* </p>
* <p>
* <b>์ฒ˜๋ฆฌ ์ด๋ฒคํŠธ:</b>
* <ul>
* <li><b>LikeEvent:</b> LikeAdded, LikeRemoved โ†’ like-events</li>
* <li><b>OrderEvent:</b> OrderCreated โ†’ order-events</li>
* <li><b>ProductEvent:</b> ProductViewed โ†’ product-events</li>
* </ul>
* </p>
*
* @author Loopers
* @version 1.0
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxBridgeEventListener {

private final OutboxEventService outboxEventService;

/**
* LikeAdded ์ด๋ฒคํŠธ๋ฅผ Outbox์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
*
* @param event LikeAdded ์ด๋ฒคํŠธ
*/
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleLikeAdded(LikeEvent.LikeAdded event) {
try {
outboxEventService.saveEvent(
"LikeAdded",
event.productId().toString(),
"Product",
event,
"like-events",
event.productId().toString()
);
log.debug("LikeAdded ์ด๋ฒคํŠธ๋ฅผ Outbox์— ์ €์žฅ: productId={}", event.productId());
} catch (Exception e) {
log.error("LikeAdded ์ด๋ฒคํŠธ Outbox ์ €์žฅ ์‹คํŒจ: productId={}", event.productId(), e);
// ์™ธ๋ถ€ ์‹œ์Šคํ…œ ์ „์†ก ์‹คํŒจ๋Š” ๋‚ด๋ถ€ ์ฒ˜๋ฆฌ์— ์˜ํ–ฅ ์—†์Œ
}
}

/**
* LikeRemoved ์ด๋ฒคํŠธ๋ฅผ Outbox์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
*
* @param event LikeRemoved ์ด๋ฒคํŠธ
*/
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleLikeRemoved(LikeEvent.LikeRemoved event) {
try {
outboxEventService.saveEvent(
"LikeRemoved",
event.productId().toString(),
"Product",
event,
"like-events",
event.productId().toString()
);
log.debug("LikeRemoved ์ด๋ฒคํŠธ๋ฅผ Outbox์— ์ €์žฅ: productId={}", event.productId());
} catch (Exception e) {
log.error("LikeRemoved ์ด๋ฒคํŠธ Outbox ์ €์žฅ ์‹คํŒจ: productId={}", event.productId(), e);
// ์™ธ๋ถ€ ์‹œ์Šคํ…œ ์ „์†ก ์‹คํŒจ๋Š” ๋‚ด๋ถ€ ์ฒ˜๋ฆฌ์— ์˜ํ–ฅ ์—†์Œ
}
}

/**
* OrderCreated ์ด๋ฒคํŠธ๋ฅผ Outbox์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
*
* @param event OrderCreated ์ด๋ฒคํŠธ
*/
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCreated(OrderEvent.OrderCreated event) {
try {
outboxEventService.saveEvent(
"OrderCreated",
event.orderId().toString(),
"Order",
event,
"order-events",
event.orderId().toString()
);
log.debug("OrderCreated ์ด๋ฒคํŠธ๋ฅผ Outbox์— ์ €์žฅ: orderId={}", event.orderId());
} catch (Exception e) {
log.error("OrderCreated ์ด๋ฒคํŠธ Outbox ์ €์žฅ ์‹คํŒจ: orderId={}", event.orderId(), e);
// ์™ธ๋ถ€ ์‹œ์Šคํ…œ ์ „์†ก ์‹คํŒจ๋Š” ๋‚ด๋ถ€ ์ฒ˜๋ฆฌ์— ์˜ํ–ฅ ์—†์Œ
}
}

/**
* ProductViewed ์ด๋ฒคํŠธ๋ฅผ Outbox์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
*
* @param event ProductViewed ์ด๋ฒคํŠธ
*/
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleProductViewed(ProductEvent.ProductViewed event) {
try {
outboxEventService.saveEvent(
"ProductViewed",
event.productId().toString(),
"Product",
event,
"product-events",
event.productId().toString()
);
log.debug("ProductViewed ์ด๋ฒคํŠธ๋ฅผ Outbox์— ์ €์žฅ: productId={}", event.productId());
} catch (Exception e) {
log.error("ProductViewed ์ด๋ฒคํŠธ Outbox ์ €์žฅ ์‹คํŒจ: productId={}", event.productId(), e);
// ์™ธ๋ถ€ ์‹œ์Šคํ…œ ์ „์†ก ์‹คํŒจ๋Š” ๋‚ด๋ถ€ ์ฒ˜๋ฆฌ์— ์˜ํ–ฅ ์—†์Œ
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.loopers.application.outbox;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.loopers.domain.outbox.OutboxEvent;
import com.loopers.domain.outbox.OutboxEventRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;

/**
* Outbox ์ด๋ฒคํŠธ ์ €์žฅ ์„œ๋น„์Šค.
* <p>
* ๋„๋ฉ”์ธ ํŠธ๋žœ์žญ์…˜๊ณผ ๊ฐ™์€ ํŠธ๋žœ์žญ์…˜์—์„œ Outbox์— ์ด๋ฒคํŠธ๋ฅผ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
* Application ๋ ˆ์ด์–ด์— ์œ„์น˜ํ•˜์—ฌ ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง(์ด๋ฒคํŠธ ์ €์žฅ ๊ฒฐ์ •)์„ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
* </p>
*
* @author Loopers
* @version 1.0
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class OutboxEventService {

private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;

/**
* Kafka๋กœ ์ „์†กํ•  ์ด๋ฒคํŠธ๋ฅผ Outbox์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
* <p>
* ๋„๋ฉ”์ธ ํŠธ๋žœ์žญ์…˜๊ณผ ๊ฐ™์€ ํŠธ๋žœ์žญ์…˜์—์„œ ์‹คํ–‰๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
* ์ง‘๊ณ„ ID๋ณ„๋กœ ์ˆœ์ฐจ์ ์ธ ๋ฒ„์ „์„ ์ž๋™์œผ๋กœ ๋ถ€์—ฌํ•ฉ๋‹ˆ๋‹ค.
* </p>
* <p>
* ๋ฒ„์ „ ์ถฉ๋Œ ์‹œ ์ตœ๋Œ€ 3ํšŒ๊นŒ์ง€ ์žฌ์‹œ๋„ํ•ฉ๋‹ˆ๋‹ค.
* ์œ ๋‹ˆํฌ ์ œ์•ฝ ์กฐ๊ฑด์„ ํ†ตํ•ด ๊ฒฝ์Ÿ ์กฐ๊ฑด์„ ๊ฐ์ง€ํ•˜๊ณ  ์žฌ์‹œ๋„ํ•ฉ๋‹ˆ๋‹ค.
* </p>
*
* @param eventType ์ด๋ฒคํŠธ ํƒ€์ž… (์˜ˆ: "OrderCreated", "LikeAdded")
* @param aggregateId ์ง‘๊ณ„ ID (์˜ˆ: orderId, productId)
* @param aggregateType ์ง‘๊ณ„ ํƒ€์ž… (์˜ˆ: "Order", "Product")
* @param event ์ด๋ฒคํŠธ ๊ฐ์ฒด
* @param topic Kafka ํ† ํ”ฝ ์ด๋ฆ„
* @param partitionKey ํŒŒํ‹ฐ์…˜ ํ‚ค
*/
@Transactional
public void saveEvent(
String eventType,
String aggregateId,
String aggregateType,
Object event,
String topic,
String partitionKey
) {
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
try {
String eventId = UUID.randomUUID().toString();
String payload = objectMapper.writeValueAsString(event);

// ์ง‘๊ณ„ ID๋ณ„ ์ตœ์‹  ๋ฒ„์ „ ์กฐํšŒ ํ›„ +1
Long latestVersion = outboxEventRepository.findLatestVersionByAggregateId(aggregateId, aggregateType);
Long nextVersion = latestVersion + 1L;

OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(eventId)
.eventType(eventType)
.aggregateId(aggregateId)
.aggregateType(aggregateType)
.payload(payload)
.topic(topic)
.partitionKey(partitionKey)
.version(nextVersion)
.build();

outboxEventRepository.save(outboxEvent);
log.debug("Outbox ์ด๋ฒคํŠธ ์ €์žฅ: eventType={}, aggregateId={}, topic={}, version={}",
eventType, aggregateId, topic, nextVersion);
return; // ์„ฑ๊ณต
} catch (DataIntegrityViolationException e) {
// ์œ ๋‹ˆํฌ ์ œ์•ฝ ์กฐ๊ฑด ์œ„๋ฐ˜ (๋ฒ„์ „ ์ถฉ๋Œ)
if (i == maxRetries - 1) {
log.error("Outbox ์ด๋ฒคํŠธ ์ €์žฅ ์‹คํŒจ (์ตœ๋Œ€ ์žฌ์‹œ๋„ ํšŸ์ˆ˜ ์ดˆ๊ณผ): eventType={}, aggregateId={}, retryCount={}",
eventType, aggregateId, i + 1, e);
throw new RuntimeException("Outbox ์ด๋ฒคํŠธ ์ €์žฅ ์‹คํŒจ: ๋ฒ„์ „ ์ถฉ๋Œ", e);
}
log.warn("Outbox ์ด๋ฒคํŠธ ์ €์žฅ ์žฌ์‹œ๋„: eventType={}, aggregateId={}, retryCount={}",
eventType, aggregateId, i + 1);
} catch (Exception e) {
log.error("Outbox ์ด๋ฒคํŠธ ์ €์žฅ ์‹คํŒจ: eventType={}, aggregateId={}",
eventType, aggregateId, e);
throw new RuntimeException("Outbox ์ด๋ฒคํŠธ ์ €์žฅ ์‹คํŒจ", e);
}
}
}
}
Loading