diff --git a/apps/commerce-api/build.gradle.kts b/apps/commerce-api/build.gradle.kts index 21aa025af..9f57f5ffa 100644 --- a/apps/commerce-api/build.gradle.kts +++ b/apps/commerce-api/build.gradle.kts @@ -2,6 +2,7 @@ dependencies { // add-ons implementation(project(":modules:jpa")) implementation(project(":modules:redis")) + implementation(project(":modules:kafka")) implementation(project(":supports:jackson")) implementation(project(":supports:logging")) implementation(project(":supports:monitoring")) diff --git a/apps/commerce-api/src/main/java/com/loopers/application/event/listener/OrderStatusEventListener.java b/apps/commerce-api/src/main/java/com/loopers/application/event/listener/OrderStatusEventListener.java index 8a5a3ef6e..fe5ae1b1d 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/event/listener/OrderStatusEventListener.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/event/listener/OrderStatusEventListener.java @@ -50,6 +50,13 @@ public void handlePaymentCompleted(PaymentCompletedEvent event) { order.getOrderNo(), order.getMemberId(), order.getTotalPrice(), + order.getItems().stream() + .map(item -> new OrderCompletedEvent.OrderItemInfo( + item.getProductId(), + item.getQuantity(), + item.getUnitPrice() + )) + .toList(), java.time.LocalDateTime.now() )); diff --git a/apps/commerce-api/src/main/java/com/loopers/application/event/order/OrderCompletedEvent.java b/apps/commerce-api/src/main/java/com/loopers/application/event/order/OrderCompletedEvent.java index c74b543a0..98f72b87b 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/event/order/OrderCompletedEvent.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/event/order/OrderCompletedEvent.java @@ -2,11 +2,18 @@ import com.loopers.domain.common.vo.Money; import java.time.LocalDateTime; +import java.util.List; public record OrderCompletedEvent( String orderNo, Long memberId, Money totalPrice, + List items, LocalDateTime completedAt ) { + public record OrderItemInfo( + Long productId, + int quantity, + Money price + ) {} } diff --git a/apps/commerce-api/src/main/java/com/loopers/application/event/product/ProductViewedEvent.java b/apps/commerce-api/src/main/java/com/loopers/application/event/product/ProductViewedEvent.java new file mode 100644 index 000000000..e67d3abdc --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/event/product/ProductViewedEvent.java @@ -0,0 +1,11 @@ +package com.loopers.application.event.product; + +import java.time.LocalDateTime; + +public record ProductViewedEvent( + Long memberId, // nullable (비로그인 사용자도 추적) + Long productId, + Long brandId, + LocalDateTime viewedAt +) { +} \ No newline at end of file diff --git a/apps/commerce-api/src/main/java/com/loopers/application/like/LikeFacade.java b/apps/commerce-api/src/main/java/com/loopers/application/like/LikeFacade.java index 54ae37aa5..e43e4c4c0 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/like/LikeFacade.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/like/LikeFacade.java @@ -3,9 +3,9 @@ import com.loopers.application.event.like.ProductLikedEvent; import com.loopers.application.event.like.ProductUnlikedEvent; import com.loopers.application.event.tracking.UserActionEvent; +import com.loopers.domain.like.repository.LikeRepository; import com.loopers.domain.like.service.LikeService; import com.loopers.domain.product.Product; -import com.loopers.domain.product.repository.ProductRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationEventPublisher; @@ -22,11 +22,17 @@ public class LikeFacade { private final LikeService likeService; - private final ProductRepository productRepository; + private final LikeRepository likeRepository; private final ApplicationEventPublisher eventPublisher; public void likeProduct(Long memberId, Long productId) { - // 1. DB에 좋아요 저장 (Redis 로직은 LikeService에서 제거됨) + // 멱등성: 이미 좋아요한 경우 early return (좋아요가 존재하면 상품도 존재함) + if (likeRepository.existsByMemberIdAndProductId(memberId, productId)) { + log.debug("[LikeFacade] 이미 좋아요한 상품 - memberId: {}, productId: {}", memberId, productId); + return; + } + + // 1. DB에 좋아요 저장 Product product = likeService.like(memberId, productId); // 2. 이벤트 발행 (Redis 업데이트는 비동기 리스너에서 처리) @@ -49,7 +55,13 @@ public void likeProduct(Long memberId, Long productId) { } public void unlikeProduct(Long memberId, Long productId) { - // 1. DB에서 좋아요 삭제 (Redis 로직은 LikeService에서 제거됨) + // 멱등성: 좋아요하지 않은 경우 early return + if (!likeRepository.existsByMemberIdAndProductId(memberId, productId)) { + log.debug("[LikeFacade] 좋아요하지 않은 상품 - memberId: {}, productId: {}", memberId, productId); + return; + } + + // 1. DB에서 좋아요 삭제 Product product = likeService.unlike(memberId, productId); // 2. 이벤트 발행 (Redis 업데이트는 비동기 리스너에서 처리) diff --git a/apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.java b/apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.java index fc093b960..b18658643 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.java @@ -1,18 +1,23 @@ package com.loopers.application.product; +import com.loopers.application.event.product.ProductViewedEvent; import com.loopers.domain.like.service.LikeReadService; +import com.loopers.domain.product.Product; +import com.loopers.domain.product.repository.ProductRepository; import com.loopers.domain.product.service.ProductReadService; import com.loopers.domain.product.command.ProductSearchFilter; import com.loopers.domain.product.enums.ProductSortCondition; import com.loopers.infrastructure.cache.ProductDetailCache; import com.loopers.infrastructure.cache.ProductListCache; import lombok.RequiredArgsConstructor; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import java.time.LocalDateTime; import java.util.List; @RequiredArgsConstructor @@ -24,6 +29,8 @@ public class ProductFacade { private final LikeReadService likeReadService; private final ProductDetailCache productDetailCache; private final ProductListCache productListCache; + private final ProductRepository productRepository; + private final ApplicationEventPublisher eventPublisher; @Transactional(readOnly = true) public Page getProducts(ProductSearchCommand command) { @@ -93,16 +100,17 @@ public ProductDetailInfo getProductDetail(Long productId, Long memberIdOrNull) { return result; }); - // 2. 로그인하지 않은 경우 바로 반환 - if (memberIdOrNull == null) { - return cachedInfo; // isLikedByMember=false 그대로 - } + // 2. Product 엔티티 조회 (brandId 획득용) + Product product = productRepository.findById(productId) + .orElseThrow(() -> new com.loopers.support.error.CoreException( + com.loopers.support.error.ErrorType.NOT_FOUND, + "상품을 찾을 수 없습니다.")); - // 3. isLikedByMember만 동적 계산 - boolean isLiked = likeReadService.isLikedBy(memberIdOrNull, productId); + // 3. isLikedByMember 동적 계산 + boolean isLiked = memberIdOrNull != null && likeReadService.isLikedBy(memberIdOrNull, productId); // 4. isLikedByMember 필드만 교체해서 반환 - return ProductDetailInfo.builder() + ProductDetailInfo result = ProductDetailInfo.builder() .id(cachedInfo.getId()) .name(cachedInfo.getName()) .description(cachedInfo.getDescription()) @@ -113,6 +121,16 @@ public ProductDetailInfo getProductDetail(Long productId, Long memberIdOrNull) { .likeCount(cachedInfo.getLikeCount()) .isLikedByMember(isLiked) // ⭐ 동적 계산 .build(); + + // 5. ProductViewedEvent 발행 (조회수 집계) + eventPublisher.publishEvent(new ProductViewedEvent( + memberIdOrNull, // 비로그인 사용자는 null + productId, + product.getBrandId(), + LocalDateTime.now() + )); + + return result; } @Transactional(readOnly = true) diff --git a/apps/commerce-api/src/main/java/com/loopers/config/AsyncConfig.java b/apps/commerce-api/src/main/java/com/loopers/config/AsyncConfig.java index 4c4f260d1..4bb4eac7c 100644 --- a/apps/commerce-api/src/main/java/com/loopers/config/AsyncConfig.java +++ b/apps/commerce-api/src/main/java/com/loopers/config/AsyncConfig.java @@ -10,7 +10,6 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** @@ -35,7 +34,7 @@ public class AsyncConfig implements AsyncConfigurer { */ @Bean(name = "taskExecutor") @Override - public Executor getAsyncExecutor() { + public ThreadPoolTaskExecutor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); // 5 → 10 executor.setMaxPoolSize(20); // 10 → 20 diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/like/service/LikeService.java b/apps/commerce-api/src/main/java/com/loopers/domain/like/service/LikeService.java index fe7e396c6..e06b42b3e 100644 --- a/apps/commerce-api/src/main/java/com/loopers/domain/like/service/LikeService.java +++ b/apps/commerce-api/src/main/java/com/loopers/domain/like/service/LikeService.java @@ -28,22 +28,16 @@ public class LikeService { * - DB에는 Like 레코드만 저장 * - 좋아요 카운트는 이벤트 리스너에서 Redis에 업데이트 * - 스케줄러가 주기적으로 Redis → DB 동기화 + * - 멱등성은 Facade에서 보장 (중복 체크는 Facade 책임) * * @return 좋아요한 상품 (이벤트 발행용 brandId 포함) */ public Product like(Long memberId, Long productId) { - // 중복 좋아요 방지 (멱등성) - if (likeRepository.existsByMemberIdAndProductId(memberId, productId)) { - log.debug("[LikeService] 이미 좋아요한 상품 - memberId: {}, productId: {}", memberId, productId); - return productRepository.findById(productId) - .orElseThrow(() -> new CoreException(ErrorType.NOT_FOUND, "상품을 찾을 수 없습니다.")); - } - - // 1. 상품 존재 확인 (비관적 락 제거) + // 1. 상품 존재 확인 Product product = productRepository.findById(productId) .orElseThrow(() -> new CoreException(ErrorType.NOT_FOUND, "상품을 찾을 수 없습니다.")); - // 2. DB에 Like 레코드만 저장 (카운트는 Redis에서 관리) + // 2. DB에 Like 레코드 저장 (카운트는 Redis에서 관리) likeRepository.save(new Like(memberId, productId)); log.info("[LikeService] 좋아요 저장 완료 - memberId: {}, productId: {}", memberId, productId); @@ -56,22 +50,16 @@ public Product like(Long memberId, Long productId) { * - DB에서 Like 레코드만 삭제 * - 좋아요 카운트는 이벤트 리스너에서 Redis에 업데이트 * - 스케줄러가 주기적으로 Redis → DB 동기화 + * - 멱등성은 Facade에서 보장 (존재 여부 체크는 Facade 책임) * * @return 좋아요 취소한 상품 (이벤트 발행용 brandId 포함) */ public Product unlike(Long memberId, Long productId) { - // 좋아요 없으면 스킵 (멱등성) - if (!likeRepository.existsByMemberIdAndProductId(memberId, productId)) { - log.debug("[LikeService] 좋아요하지 않은 상품 - memberId: {}, productId: {}", memberId, productId); - return productRepository.findById(productId) - .orElseThrow(() -> new CoreException(ErrorType.NOT_FOUND, "상품을 찾을 수 없습니다.")); - } - - // 1. 상품 존재 확인 (비관적 락 제거) + // 1. 상품 존재 확인 Product product = productRepository.findById(productId) .orElseThrow(() -> new CoreException(ErrorType.NOT_FOUND, "상품을 찾을 수 없습니다.")); - // 2. DB에서 Like 레코드만 삭제 (카운트는 Redis에서 관리) + // 2. DB에서 Like 레코드 삭제 (카운트는 Redis에서 관리) likeRepository.deleteByMemberIdAndProductId(memberId, productId); log.info("[LikeService] 좋아요 취소 완료 - memberId: {}, productId: {}", memberId, productId); diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/order/service/OrderPlacementService.java b/apps/commerce-api/src/main/java/com/loopers/domain/order/service/OrderPlacementService.java index c1bc0f44e..23be65b5b 100644 --- a/apps/commerce-api/src/main/java/com/loopers/domain/order/service/OrderPlacementService.java +++ b/apps/commerce-api/src/main/java/com/loopers/domain/order/service/OrderPlacementService.java @@ -11,14 +11,17 @@ import com.loopers.domain.order.repository.OrderRepository; import com.loopers.domain.product.Product; import com.loopers.domain.product.repository.ProductRepository; +import com.loopers.infrastructure.cache.CacheInvalidationService; import com.loopers.support.error.CoreException; import com.loopers.support.error.ErrorType; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; +@Slf4j @RequiredArgsConstructor @Component public class OrderPlacementService { @@ -27,6 +30,7 @@ public class OrderPlacementService { private final ProductRepository productRepository; private final MemberRepository memberRepository; private final MemberCouponRepository memberCouponRepository; + private final CacheInvalidationService cacheInvalidationService; public Order placeOrder(OrderPlacementCommand command) { validateMemberExists(command.getMemberId()); @@ -83,6 +87,13 @@ private List processOrderLines(List orderLines) { throw new CoreException(ErrorType.BAD_REQUEST, "재고가 부족합니다."); } + // 재고 소진 시 캐시 무효화 + int remainingStock = productRepository.getStockQuantity(product.getId()); + if (remainingStock == 0) { + log.info("[Order] Stock depleted for productId={}, invalidating cache", product.getId()); + cacheInvalidationService.invalidateOnStockDepletion(product.getId()); + } + items.add(new OrderItem(product.getId(), line.getQuantity(), product.getPrice())); } diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/product/repository/ProductRepository.java b/apps/commerce-api/src/main/java/com/loopers/domain/product/repository/ProductRepository.java index 0a586b3c4..95c18a2f3 100644 --- a/apps/commerce-api/src/main/java/com/loopers/domain/product/repository/ProductRepository.java +++ b/apps/commerce-api/src/main/java/com/loopers/domain/product/repository/ProductRepository.java @@ -26,6 +26,8 @@ public interface ProductRepository { int increaseStock(Long productId, int quantity); + int getStockQuantity(Long productId); + int incrementLikeCount(Long productId); int decrementLikeCount(Long productId); diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/CacheInvalidationService.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/CacheInvalidationService.java index df294921b..8eb094bb2 100644 --- a/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/CacheInvalidationService.java +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/CacheInvalidationService.java @@ -35,4 +35,13 @@ public void invalidateOnProductUpdate(Long productId) { log.info("[CacheInvalidation] Invalidating cache for product update, productId={}", productId); productDetailCache.delete(productId); } + + /** + * Invalidate cache when product stock is depleted + */ + public void invalidateOnStockDepletion(Long productId) { + log.info("[CacheInvalidation] Invalidating cache for stock depletion, productId={}", productId); + productDetailCache.delete(productId); + // Note: Product list cache는 TTL(60초)에 의존하여 자동 무효화 + } } diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/MemberLikesCache.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/MemberLikesCache.java index 8e532330b..02a68970d 100644 --- a/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/MemberLikesCache.java +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/MemberLikesCache.java @@ -79,9 +79,10 @@ public Set findLikedProductIds(Long memberId, List productIds) { return Set.of(); } - // productIds 중 좋아요한 것만 필터링 + // productIds 중 좋아요한 것만 필터링 (변환 실패한 값은 제외) Set likedSet = allLiked.stream() - .map(obj -> (Long) obj) + .map(this::toLongOrNull) + .filter(v -> v != null) .collect(Collectors.toSet()); return productIds.stream() @@ -107,7 +108,8 @@ public Set findAll(Long memberId) { } return members.stream() - .map(obj -> (Long) obj) + .map(this::toLongOrNull) + .filter(v -> v != null) .collect(Collectors.toSet()); } catch (Exception e) { @@ -165,4 +167,29 @@ public void delete(Long memberId) { log.warn("[MemberLikesCache] delete failed, error={}", e.getMessage()); } } + + /** + * Redis 값을 Long으로 변환 (변환 실패 시 null 반환) + */ + private Long toLongOrNull(Object value) { + if (value == null) { + return null; + } + if (value instanceof Long) { + return (Long) value; + } + if (value instanceof Integer) { + return ((Integer) value).longValue(); + } + if (value instanceof String) { + try { + return Long.parseLong((String) value); + } catch (NumberFormatException e) { + log.warn("[MemberLikesCache] Failed to parse String to Long: {}", value); + return null; + } + } + log.warn("[MemberLikesCache] Unsupported value type: {}", value.getClass().getSimpleName()); + return null; + } } diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/ProductLikeCountCache.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/ProductLikeCountCache.java index 861aab618..edcbb16ed 100644 --- a/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/ProductLikeCountCache.java +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/cache/ProductLikeCountCache.java @@ -60,16 +60,7 @@ public Long get(Long productId) { try { String key = CacheKeyGenerator.productLikeCountKey(productId); Object value = cacheRedisTemplate.opsForValue().get(key); - - if (value == null) { - return null; - } - - if (value instanceof Integer) { - return ((Integer) value).longValue(); - } - - return (Long) value; + return toLongOrNull(value); } catch (Exception e) { log.warn("[ProductLikeCountCache] get failed, error={}", e.getMessage()); return null; @@ -109,11 +100,9 @@ public Map getAllCounts() { // key에서 productId 추출 Long productId = extractProductIdFromKey(key); Object value = cacheRedisTemplate.opsForValue().get(key); + Long count = toLongOrNull(value); - if (value != null && productId != null) { - Long count = value instanceof Integer - ? ((Integer) value).longValue() - : (Long) value; + if (count != null && productId != null) { result.put(productId, count); } } catch (Exception e) { @@ -155,4 +144,29 @@ private Long extractProductIdFromKey(String key) { return null; } } + + /** + * Redis 값을 Long으로 변환 (변환 실패 시 null 반환) + */ + private Long toLongOrNull(Object value) { + if (value == null) { + return null; + } + if (value instanceof Long) { + return (Long) value; + } + if (value instanceof Integer) { + return ((Integer) value).longValue(); + } + if (value instanceof String) { + try { + return Long.parseLong((String) value); + } catch (NumberFormatException e) { + log.warn("[ProductLikeCountCache] Failed to parse String to Long: {}", value); + return null; + } + } + log.warn("[ProductLikeCountCache] Unsupported value type: {}", value.getClass().getSimpleName()); + return null; + } } diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/KafkaTopicRouter.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/KafkaTopicRouter.java new file mode 100644 index 000000000..506201f41 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/KafkaTopicRouter.java @@ -0,0 +1,40 @@ +package com.loopers.infrastructure.kafka; + +import org.springframework.stereotype.Component; + +/** + * Kafka Topic Router + * - 이벤트 타입에 따라 적절한 Kafka 토픽을 반환 + * - 토픽 네이밍 규칙: loopers.commerce.{event-name}-v1 + */ +@Component +public class KafkaTopicRouter { + + private static final String TOPIC_PREFIX = "loopers.commerce."; + private static final String TOPIC_VERSION = "-v1"; + + /** + * 이벤트 타입에 맞는 토픽 이름 반환 + * + * @param eventType 이벤트 타입 (ORDER_PLACED, PRODUCT_LIKED 등) + * @return Kafka 토픽 이름 + */ + public String getTopicName(String eventType) { + return switch (eventType) { + // Order Events + case "ORDER_PLACED" -> TOPIC_PREFIX + "order-placed" + TOPIC_VERSION; + case "ORDER_COMPLETED" -> TOPIC_PREFIX + "order-completed" + TOPIC_VERSION; + + // Payment Events + case "PAYMENT_COMPLETED" -> TOPIC_PREFIX + "payment-completed" + TOPIC_VERSION; + + // Product Events + case "PRODUCT_LIKED" -> TOPIC_PREFIX + "product-liked" + TOPIC_VERSION; + case "PRODUCT_UNLIKED" -> TOPIC_PREFIX + "product-unliked" + TOPIC_VERSION; + case "PRODUCT_VIEWED" -> TOPIC_PREFIX + "product-viewed" + TOPIC_VERSION; + + default -> throw new IllegalArgumentException("Unknown event type: " + eventType); + }; + } + +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/event/KafkaEventEnvelope.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/event/KafkaEventEnvelope.java new file mode 100644 index 000000000..4ab98e62b --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/event/KafkaEventEnvelope.java @@ -0,0 +1,43 @@ +package com.loopers.infrastructure.kafka.event; + +import java.time.ZonedDateTime; + +/** + * Kafka 이벤트 Envelope + * - Application Event를 Kafka로 전송할 때 메타데이터를 포함하기 위한 래퍼 + * - 멱등성 체크 및 순서 보장을 위한 정보 포함 + * + * @param Application Event 타입 (OrderPlacedEvent, ProductLikedEvent 등) + */ +public record KafkaEventEnvelope( + /** + * 이벤트 ID (Outbox Event의 ID) + * - Consumer에서 멱등성 체크에 사용 + */ + String eventId, + + /** + * 이벤트 타입 + * - 예: ORDER_PLACED, PRODUCT_LIKED, PAYMENT_COMPLETED + */ + String eventType, + + /** + * Partition Key + * - Kafka 파티션 분배 기준 + * - 같은 값은 항상 같은 파티션으로 전송되어 순서 보장 + */ + String partitionKey, + + /** + * 실제 이벤트 Payload + * - Application Event 객체 (OrderPlacedEvent, ProductLikedEvent 등) + */ + T payload, + + /** + * 이벤트 발생 시각 + */ + ZonedDateTime occurredAt +) { +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/KafkaOutboxEventListener.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/KafkaOutboxEventListener.java new file mode 100644 index 000000000..011635370 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/KafkaOutboxEventListener.java @@ -0,0 +1,113 @@ +package com.loopers.infrastructure.outbox; + +import com.loopers.application.event.like.ProductLikedEvent; +import com.loopers.application.event.like.ProductUnlikedEvent; +import com.loopers.application.event.order.OrderCompletedEvent; +import com.loopers.application.event.order.OrderPlacedEvent; +import com.loopers.application.event.payment.PaymentCompletedEvent; +import com.loopers.application.event.product.ProductViewedEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + +/** + * Kafka Outbox Event Listener + * - Application Event를 받아서 Outbox 테이블에 저장 + * - BEFORE_COMMIT: 같은 트랜잭션 내에서 처리되어 원자성 보장 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class KafkaOutboxEventListener { + + private final OutboxEventWriter outboxWriter; + + /** + * 주문 생성 이벤트 → Outbox 저장 + */ + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void handleOrderPlaced(OrderPlacedEvent event) { + log.debug("[Outbox] OrderPlacedEvent 수신 - orderNo: {}", event.orderNo()); + + outboxWriter.write( + event.orderNo(), // partition key + "ORDER_PLACED", // event type + event // payload + ); + } + + /** + * 주문 완료 이벤트 → Outbox 저장 + */ + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void handleOrderCompleted(OrderCompletedEvent event) { + log.debug("[Outbox] OrderCompletedEvent 수신 - orderNo: {}", event.orderNo()); + + outboxWriter.write( + event.orderNo(), + "ORDER_COMPLETED", + event + ); + } + + /** + * 결제 완료 이벤트 → Outbox 저장 + */ + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void handlePaymentCompleted(PaymentCompletedEvent event) { + log.debug("[Outbox] PaymentCompletedEvent 수신 - orderNo: {}", event.orderNo()); + + outboxWriter.write( + event.orderNo(), + "PAYMENT_COMPLETED", + event + ); + } + + /** + * 상품 좋아요 이벤트 → Outbox 저장 + */ + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void handleProductLiked(ProductLikedEvent event) { + log.debug("[Outbox] ProductLikedEvent 수신 - productId: {}", event.productId()); + + outboxWriter.write( + String.valueOf(event.productId()), // partition key + "PRODUCT_LIKED", + event + ); + } + + /** + * 상품 좋아요 취소 이벤트 → Outbox 저장 + */ + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void handleProductUnliked(ProductUnlikedEvent event) { + log.debug("[Outbox] ProductUnlikedEvent 수신 - productId: {}", event.productId()); + + outboxWriter.write( + String.valueOf(event.productId()), + "PRODUCT_UNLIKED", + event + ); + } + + /** + * 상품 조회 이벤트 → Outbox 저장 + */ + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void handleProductViewed(ProductViewedEvent event) { + log.debug("[Outbox] ProductViewedEvent 수신 - productId: {}", event.productId()); + + outboxWriter.write( + String.valueOf(event.productId()), // partition key + "PRODUCT_VIEWED", + event + ); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEvent.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEvent.java new file mode 100644 index 000000000..5c7e56b8f --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEvent.java @@ -0,0 +1,123 @@ +package com.loopers.infrastructure.outbox; + +import com.loopers.domain.BaseEntity; +import jakarta.persistence.*; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.time.ZonedDateTime; + +/** + * Transactional Outbox Pattern + * - 이벤트를 DB에 먼저 저장하여 트랜잭션 원자성 보장 + * - 별도 Poller가 PENDING 이벤트를 Kafka로 발행 + */ +@Entity +@Table(name = "outbox_event", indexes = { + @Index(name = "idx_status_created", columnList = "status, created_at"), + @Index(name = "idx_event_type_key", columnList = "event_type, partition_key") +}) +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class OutboxEvent extends BaseEntity { + + /** + * Kafka Partition Key + * - 같은 엔티티의 이벤트는 같은 파티션으로 전송되어 순서 보장 + * - 예: productId "1", orderNo "ORD-20250101-001" + */ + @Column(name = "partition_key", nullable = false, length = 100) + private String partitionKey; + + /** + * 이벤트 타입 + * - 예: PRODUCT_LIKED, ORDER_PLACED, PAYMENT_COMPLETED + */ + @Column(name = "event_type", nullable = false, length = 100) + private String eventType; + + /** + * 이벤트 Payload (JSON) + * - Application Event 객체를 JSON으로 직렬화한 값 + * - 예: OrderPlacedEvent, ProductLikedEvent 등 + */ + @Column(name = "payload", columnDefinition = "JSON", nullable = false) + private String payload; + + /** + * 발행 상태 + */ + @Enumerated(EnumType.STRING) + @Column(name = "status", nullable = false, length = 20) + private OutboxStatus status = OutboxStatus.PENDING; + + /** + * Kafka 발행 완료 시각 + */ + @Column(name = "published_at") + private ZonedDateTime publishedAt; + + /** + * 재시도 횟수 + */ + @Column(name = "retry_count", nullable = false) + private int retryCount = 0; + + /** + * 실패 원인 + */ + @Column(name = "error_message", columnDefinition = "TEXT") + private String errorMessage; + + /** + * Outbox 이벤트 생성 + */ + public static OutboxEvent create( + String partitionKey, + String eventType, + String payload + ) { + OutboxEvent event = new OutboxEvent(); + event.partitionKey = partitionKey; + event.eventType = eventType; + event.payload = payload; + event.status = OutboxStatus.PENDING; + event.retryCount = 0; + return event; + } + + /** + * Kafka 발행 성공 처리 + */ + public void markAsPublished() { + this.status = OutboxStatus.PUBLISHED; + this.publishedAt = ZonedDateTime.now(); + } + + /** + * Kafka 발행 실패 처리 + */ + public void markAsFailed(String errorMessage) { + this.status = OutboxStatus.FAILED; + this.retryCount++; + this.errorMessage = errorMessage; + } + + /** + * 재시도 가능 여부 확인 + * @param maxRetryCount 최대 재시도 횟수 + */ + public boolean canRetry(int maxRetryCount) { + return this.status == OutboxStatus.FAILED && this.retryCount < maxRetryCount; + } + + /** + * 발행 상태 + */ + public enum OutboxStatus { + PENDING, // 발행 대기 중 + PUBLISHED, // 발행 완료 + FAILED // 발행 실패 + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventCleaner.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventCleaner.java new file mode 100644 index 000000000..3e0f75928 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventCleaner.java @@ -0,0 +1,105 @@ +package com.loopers.infrastructure.outbox; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.ZonedDateTime; + +/** + * Outbox Event Cleaner + * - 오래된 PUBLISHED 이벤트 정리 + * - 디스크 공간 절약 및 쿼리 성능 유지 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class OutboxEventCleaner { + + private final OutboxEventRepository outboxRepository; + + /** + * 7일 이상 지난 PUBLISHED 이벤트 삭제 + * - 매일 새벽 3시에 실행 + * - PUBLISHED 상태만 삭제 (PENDING, FAILED는 유지) + */ + @Scheduled(cron = "0 0 3 * * *") // 매일 새벽 3시 + @Transactional + public void cleanOldPublishedEvents() { + ZonedDateTime cutoffDate = ZonedDateTime.now().minusDays(7); + + log.info("[OutboxCleaner] Starting cleanup of published events older than {}", cutoffDate); + + try { + int deleted = outboxRepository.deletePublishedEventsOlderThan( + OutboxEvent.OutboxStatus.PUBLISHED, + cutoffDate + ); + + log.info("[OutboxCleaner] Deleted {} old published events", deleted); + + } catch (Exception e) { + log.error("[OutboxCleaner] Failed to clean old published events", e); + } + } + + /** + * 30일 이상 지난 FAILED 이벤트 삭제 + * - 매주 일요일 새벽 4시에 실행 + * - 재시도 횟수를 초과한 FAILED 이벤트는 30일 후 삭제 + */ + @Scheduled(cron = "0 0 4 * * SUN") // 매주 일요일 새벽 4시 + @Transactional + public void cleanOldFailedEvents() { + ZonedDateTime cutoffDate = ZonedDateTime.now().minusDays(30); + + log.info("[OutboxCleaner] Starting cleanup of failed events older than {}", cutoffDate); + + try { + int deleted = outboxRepository.deleteFailedEventsOlderThan( + OutboxEvent.OutboxStatus.FAILED, + cutoffDate, + 3 // 최대 재시도 횟수 초과한 이벤트만 삭제 + ); + + log.info("[OutboxCleaner] Deleted {} old failed events", deleted); + + } catch (Exception e) { + log.error("[OutboxCleaner] Failed to clean old failed events", e); + } + } + + /** + * Outbox 테이블 통계 로깅 + * - 매시간 정각에 실행 + * - PENDING, PUBLISHED, FAILED 상태별 이벤트 개수 확인 + */ + @Scheduled(cron = "0 0 * * * *") // 매시간 정각 + @Transactional(readOnly = true) + public void logOutboxStatistics() { + try { + long pendingCount = outboxRepository.countByStatus(OutboxEvent.OutboxStatus.PENDING); + long publishedCount = outboxRepository.countByStatus(OutboxEvent.OutboxStatus.PUBLISHED); + long failedCount = outboxRepository.countByStatus(OutboxEvent.OutboxStatus.FAILED); + long totalCount = outboxRepository.count(); + + log.info("[OutboxCleaner] Outbox statistics - Total: {}, PENDING: {}, PUBLISHED: {}, FAILED: {}", + totalCount, pendingCount, publishedCount, failedCount); + + // PENDING 이벤트가 1000개 이상이면 경고 + if (pendingCount > 1000) { + log.warn("[OutboxCleaner] WARNING: Too many PENDING events ({}). Check OutboxEventPoller!", pendingCount); + } + + // FAILED 이벤트가 100개 이상이면 경고 + if (failedCount > 100) { + log.warn("[OutboxCleaner] WARNING: Too many FAILED events ({}). Check Kafka connection!", failedCount); + } + + } catch (Exception e) { + log.error("[OutboxCleaner] Failed to log outbox statistics", e); + } + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventPoller.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventPoller.java new file mode 100644 index 000000000..74db28bf2 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventPoller.java @@ -0,0 +1,144 @@ +package com.loopers.infrastructure.outbox; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.loopers.infrastructure.kafka.KafkaTopicRouter; +import com.loopers.infrastructure.kafka.event.KafkaEventEnvelope; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; +import java.util.concurrent.ExecutionException; + +/** + * Outbox Event Poller + * - 주기적으로 PENDING 상태의 이벤트를 조회하여 Kafka로 발행 + * - 5초마다 실행 (fixedDelay) + * - At Least Once 보장: acks=all, idempotence=true + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class OutboxEventPoller { + + private static final int BATCH_SIZE = 100; + private static final int MAX_RETRY_COUNT = 3; + + private final OutboxEventRepository outboxRepository; + private final KafkaTemplate kafkaTemplate; + private final KafkaTopicRouter topicRouter; + private final ObjectMapper objectMapper; + + /** + * PENDING 이벤트 폴링 및 Kafka 발행 + * - 5초마다 실행 + */ + @Scheduled(fixedDelay = 5000, initialDelay = 5000) + @Transactional + public void pollAndPublish() { + List pendingEvents = outboxRepository.findByStatusOrderByCreatedAtAsc( + OutboxEvent.OutboxStatus.PENDING, + BATCH_SIZE + ); + + if (pendingEvents.isEmpty()) { + return; + } + + log.info("[OutboxPoller] Processing {} pending events", pendingEvents.size()); + + int successCount = 0; + int failureCount = 0; + + for (OutboxEvent event : pendingEvents) { + try { + publishToKafka(event); + event.markAsPublished(); + outboxRepository.save(event); + successCount++; + + } catch (Exception e) { + log.error("[OutboxPoller] Failed to publish event: id={}, type={}", + event.getId(), event.getEventType(), e); + event.markAsFailed(e.getMessage()); + outboxRepository.save(event); + failureCount++; + } + } + + log.info("[OutboxPoller] Completed - success: {}, failed: {}", successCount, failureCount); + } + + /** + * FAILED 이벤트 재시도 + * - 30초마다 실행 + */ + @Scheduled(fixedDelay = 30000, initialDelay = 10000) + @Transactional + public void retryFailedEvents() { + List retryableEvents = outboxRepository.findRetryableEvents( + MAX_RETRY_COUNT, + BATCH_SIZE + ); + + if (retryableEvents.isEmpty()) { + return; + } + + log.info("[OutboxPoller] Retrying {} failed events", retryableEvents.size()); + + int successCount = 0; + int failureCount = 0; + + for (OutboxEvent event : retryableEvents) { + try { + publishToKafka(event); + event.markAsPublished(); + outboxRepository.save(event); + successCount++; + log.info("[OutboxPoller] Retry success - id: {}, retryCount: {}", + event.getId(), event.getRetryCount()); + + } catch (Exception e) { + event.markAsFailed(e.getMessage()); + outboxRepository.save(event); + failureCount++; + log.warn("[OutboxPoller] Retry failed - id: {}, retryCount: {}/{}", + event.getId(), event.getRetryCount(), MAX_RETRY_COUNT); + } + } + + log.info("[OutboxPoller] Retry completed - success: {}, failed: {}", successCount, failureCount); + } + + /** + * Outbox 이벤트를 Kafka로 발행 + */ + private void publishToKafka(OutboxEvent outboxEvent) throws Exception { + String topic = topicRouter.getTopicName(outboxEvent.getEventType()); + String key = outboxEvent.getPartitionKey(); + + // Envelope로 감싸서 전송 + KafkaEventEnvelope envelope = new KafkaEventEnvelope<>( + String.valueOf(outboxEvent.getId()), + outboxEvent.getEventType(), + outboxEvent.getPartitionKey(), + objectMapper.readValue(outboxEvent.getPayload(), Object.class), + outboxEvent.getCreatedAt() + ); + + // 동기 전송 (실패 시 예외 발생) + try { + kafkaTemplate.send(topic, key, envelope).get(); + + log.debug("[OutboxPoller] Published to Kafka - topic: {}, key: {}, eventId: {}", + topic, key, outboxEvent.getId()); + + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Kafka send failed", e); + } + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventRepository.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventRepository.java new file mode 100644 index 000000000..379c632f3 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventRepository.java @@ -0,0 +1,78 @@ +package com.loopers.infrastructure.outbox; + +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.time.ZonedDateTime; +import java.util.List; + +public interface OutboxEventRepository extends JpaRepository { + + /** + * PENDING 상태의 이벤트를 생성 시각 순으로 조회 + * @param limit 조회 개수 + */ + @Query("SELECT o FROM OutboxEvent o WHERE o.status = :status ORDER BY o.createdAt ASC LIMIT :limit") + List findByStatusOrderByCreatedAtAsc( + @Param("status") OutboxEvent.OutboxStatus status, + @Param("limit") int limit + ); + + /** + * 재시도 가능한 FAILED 이벤트 조회 + * @param maxRetryCount 최대 재시도 횟수 + * @param limit 조회 개수 + */ + @Query("SELECT o FROM OutboxEvent o WHERE o.status = 'FAILED' AND o.retryCount < :maxRetryCount ORDER BY o.createdAt ASC LIMIT :limit") + List findRetryableEvents( + @Param("maxRetryCount") int maxRetryCount, + @Param("limit") int limit + ); + + /** + * 특정 파티션 키의 이벤트 조회 (디버깅용) + */ + List findByPartitionKeyOrderByCreatedAtDesc(String partitionKey); + + /** + * 특정 이벤트 타입 조회 (디버깅용) + */ + List findByEventTypeOrderByCreatedAtDesc(String eventType); + + /** + * 오래된 PUBLISHED 이벤트 삭제 + * @param status 이벤트 상태 (PUBLISHED) + * @param publishedBefore 발행 완료 시각 기준 + * @return 삭제된 이벤트 개수 + */ + @Modifying + @Query("DELETE FROM OutboxEvent o WHERE o.status = :status AND o.publishedAt < :publishedBefore") + int deletePublishedEventsOlderThan( + @Param("status") OutboxEvent.OutboxStatus status, + @Param("publishedBefore") ZonedDateTime publishedBefore + ); + + /** + * 오래된 FAILED 이벤트 삭제 (재시도 횟수 초과) + * @param status 이벤트 상태 (FAILED) + * @param createdBefore 생성 시각 기준 + * @param maxRetryCount 최대 재시도 횟수 + * @return 삭제된 이벤트 개수 + */ + @Modifying + @Query("DELETE FROM OutboxEvent o WHERE o.status = :status AND o.createdAt < :createdBefore AND o.retryCount >= :maxRetryCount") + int deleteFailedEventsOlderThan( + @Param("status") OutboxEvent.OutboxStatus status, + @Param("createdBefore") ZonedDateTime createdBefore, + @Param("maxRetryCount") int maxRetryCount + ); + + /** + * 상태별 이벤트 개수 조회 + * @param status 이벤트 상태 + * @return 이벤트 개수 + */ + long countByStatus(OutboxEvent.OutboxStatus status); +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventWriter.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventWriter.java new file mode 100644 index 000000000..3d8975b4a --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventWriter.java @@ -0,0 +1,50 @@ +package com.loopers.infrastructure.outbox; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * Outbox 이벤트 저장 서비스 + * - Application Event를 Outbox 테이블에 저장 + * - JSON 직렬화 처리 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class OutboxEventWriter { + + private final OutboxEventRepository outboxRepository; + private final ObjectMapper objectMapper; + + /** + * Application Event를 Outbox 테이블에 저장 + * + * @param partitionKey Kafka partition key (productId, orderNo 등) + * @param eventType 이벤트 타입 (PRODUCT_LIKED, ORDER_PLACED 등) + * @param eventPayload Application Event 객체 + */ + public void write(String partitionKey, String eventType, Object eventPayload) { + try { + String payload = objectMapper.writeValueAsString(eventPayload); + + OutboxEvent outboxEvent = OutboxEvent.create( + partitionKey, + eventType, + payload + ); + + outboxRepository.save(outboxEvent); + + log.debug("[Outbox] 이벤트 저장 완료 - type: {}, key: {}, id: {}", + eventType, partitionKey, outboxEvent.getId()); + + } catch (JsonProcessingException e) { + log.error("[Outbox] 이벤트 직렬화 실패 - type: {}, key: {}", + eventType, partitionKey, e); + throw new IllegalStateException("Failed to serialize event", e); + } + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductRepositoryImpl.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductRepositoryImpl.java index 91e443985..0ecdb8a07 100644 --- a/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductRepositoryImpl.java +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductRepositoryImpl.java @@ -138,6 +138,16 @@ public int increaseStock(Long productId, int quantity) { .execute()); } + @Override + public int getStockQuantity(Long productId) { + Integer stockQuantity = queryFactory + .select(product.stock.quantity) + .from(product) + .where(product.id.eq(productId)) + .fetchOne(); + return stockQuantity != null ? stockQuantity : 0; + } + @Override public int incrementLikeCount(Long productId) { return Math.toIntExact(queryFactory diff --git a/apps/commerce-api/src/main/resources/application.yml b/apps/commerce-api/src/main/resources/application.yml index 30fab38db..2fd21d1c8 100644 --- a/apps/commerce-api/src/main/resources/application.yml +++ b/apps/commerce-api/src/main/resources/application.yml @@ -21,6 +21,7 @@ spring: import: - jpa.yml - redis.yml + - kafka.yml - logging.yml - monitoring.yml - resilience4j.yml diff --git a/apps/commerce-api/src/test/java/com/loopers/application/event/integration/LikeEventIntegrationTest.java b/apps/commerce-api/src/test/java/com/loopers/application/event/integration/LikeEventIntegrationTest.java index 3f123bc9b..02204584d 100644 --- a/apps/commerce-api/src/test/java/com/loopers/application/event/integration/LikeEventIntegrationTest.java +++ b/apps/commerce-api/src/test/java/com/loopers/application/event/integration/LikeEventIntegrationTest.java @@ -5,7 +5,7 @@ import com.loopers.domain.common.vo.Money; import com.loopers.domain.like.Like; import com.loopers.domain.like.repository.LikeRepository; -import com.loopers.domain.like.service.LikeService; +import com.loopers.application.like.LikeFacade; import com.loopers.domain.members.Member; import com.loopers.domain.members.enums.Gender; import com.loopers.domain.members.repository.MemberRepository; @@ -14,6 +14,7 @@ import com.loopers.domain.product.vo.Stock; import com.loopers.infrastructure.cache.MemberLikesCache; import com.loopers.infrastructure.cache.ProductLikeCountCache; +import com.loopers.testcontainers.RedisTestContainersConfig; import com.loopers.utils.DatabaseCleanUp; import jakarta.persistence.EntityManager; import org.junit.jupiter.api.AfterEach; @@ -25,6 +26,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.test.context.event.ApplicationEvents; import org.springframework.test.context.event.RecordApplicationEvents; +import org.springframework.context.annotation.Import; import java.util.concurrent.TimeUnit; @@ -41,12 +43,13 @@ * 4. 여러 좋아요/취소 작업 후 데이터 일치 확인 */ @SpringBootTest +@Import(RedisTestContainersConfig.class) @RecordApplicationEvents @DisplayName("좋아요 이벤트 통합 테스트") class LikeEventIntegrationTest { @Autowired - private LikeService likeService; + private LikeFacade likeFacade; @Autowired private LikeRepository likeRepository; @@ -84,14 +87,13 @@ void setUp() { redisTemplate.getConnectionFactory().getConnection().flushDb(); // 테스트 회원 생성 - testMember = new Member("like-user", "like@example.com", "password123", "1995-05-05", Gender.FEMALE); + testMember = new Member("likeuser", "like@example.com", "password123", "1995-05-05", Gender.FEMALE); testMember = memberRepository.save(testMember); // 테스트 상품 생성 testProduct = new Product(10L, "인기 상품", "좋아요 테스트용", Money.of(50000), Stock.of(50)); testProduct = productRepository.save(testProduct); - entityManager.flush(); entityManager.clear(); } @@ -109,7 +111,7 @@ void tearDown() { Long productId = testProduct.getId(); // when - likeService.like(memberId, productId); + likeFacade.likeProduct(memberId, productId); // then - DB에 Like 레코드 저장 확인 entityManager.clear(); @@ -142,7 +144,7 @@ void tearDown() { // given - 먼저 좋아요 생성 Long memberId = testMember.getId(); Long productId = testProduct.getId(); - likeService.like(memberId, productId); + likeFacade.likeProduct(memberId, productId); // 비동기 처리 대기 await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { @@ -152,7 +154,7 @@ void tearDown() { entityManager.clear(); // when - 좋아요 취소 - likeService.unlike(memberId, productId); + likeFacade.unlikeProduct(memberId, productId); // then - DB에서 Like 레코드 삭제 확인 entityManager.clear(); @@ -190,9 +192,9 @@ void tearDown() { Long productId = testProduct.getId(); // when - 3명이 좋아요 - likeService.like(testMember.getId(), productId); - likeService.like(member2.getId(), productId); - likeService.like(member3.getId(), productId); + likeFacade.likeProduct(testMember.getId(), productId); + likeFacade.likeProduct(member2.getId(), productId); + likeFacade.likeProduct(member3.getId(), productId); // then - DB에 3개 레코드 저장 확인 entityManager.clear(); @@ -217,12 +219,12 @@ void tearDown() { Long productId = testProduct.getId(); // when - 복잡한 좋아요/취소 시나리오 - likeService.like(testMember.getId(), productId); // +1 = 1 - likeService.like(member2.getId(), productId); // +1 = 2 - likeService.like(member3.getId(), productId); // +1 = 3 - likeService.unlike(member2.getId(), productId); // -1 = 2 - likeService.like(member4.getId(), productId); // +1 = 3 - likeService.unlike(testMember.getId(), productId); // -1 = 2 + likeFacade.likeProduct(testMember.getId(), productId); // +1 = 1 + likeFacade.likeProduct(member2.getId(), productId); // +1 = 2 + likeFacade.likeProduct(member3.getId(), productId); // +1 = 3 + likeFacade.unlikeProduct(member2.getId(), productId); // -1 = 2 + likeFacade.likeProduct(member4.getId(), productId); // +1 = 3 + likeFacade.unlikeProduct(testMember.getId(), productId); // -1 = 2 // 비동기 처리 완료 대기 await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { @@ -252,9 +254,9 @@ void tearDown() { Long productId = testProduct.getId(); // when - 동일 회원이 같은 상품에 3번 좋아요 - likeService.like(memberId, productId); - likeService.like(memberId, productId); - likeService.like(memberId, productId); + likeFacade.likeProduct(memberId, productId); + likeFacade.likeProduct(memberId, productId); + likeFacade.likeProduct(memberId, productId); // then - DB에는 1개만 저장 entityManager.clear(); diff --git a/apps/commerce-api/src/test/java/com/loopers/application/event/integration/OrderPaymentEventIntegrationTest.java b/apps/commerce-api/src/test/java/com/loopers/application/event/integration/OrderPaymentEventIntegrationTest.java index 93b70db46..22b449cb7 100644 --- a/apps/commerce-api/src/test/java/com/loopers/application/event/integration/OrderPaymentEventIntegrationTest.java +++ b/apps/commerce-api/src/test/java/com/loopers/application/event/integration/OrderPaymentEventIntegrationTest.java @@ -25,7 +25,7 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.test.context.event.ApplicationEvents; import org.springframework.test.context.event.RecordApplicationEvents; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; import java.time.LocalDateTime; import java.util.List; @@ -69,6 +69,9 @@ class OrderPaymentEventIntegrationTest { @Autowired private DatabaseCleanUp databaseCleanUp; + @Autowired + private TransactionTemplate transactionTemplate; + private Member testMember; private Product testProduct; private Order testOrder; @@ -76,7 +79,7 @@ class OrderPaymentEventIntegrationTest { @BeforeEach void setUp() { // 테스트 회원 생성 - testMember = new Member("test-user", "test@example.com", "password123", "1990-01-01", Gender.MALE); + testMember = new Member("testuser", "test@example.com", "password123", "1990-01-01", Gender.MALE); testMember = memberRepository.save(testMember); // 테스트 상품 생성 (재고 100개) @@ -92,7 +95,6 @@ void setUp() { testProduct.decreaseStock(5); productRepository.save(testProduct); - entityManager.flush(); entityManager.clear(); } @@ -103,7 +105,6 @@ void tearDown() { @Test @DisplayName("결제 성공 시 주문 상태가 PAID로 변경되고 OrderCompletedEvent가 발행된다") - @Transactional void 결제_성공_시_주문_완료() { // given String orderNo = testOrder.getOrderNo(); @@ -118,7 +119,7 @@ void tearDown() { ); // when - eventPublisher.publishEvent(paymentEvent); + transactionTemplate.executeWithoutResult(status -> eventPublisher.publishEvent(paymentEvent)); // 비동기 처리 대기 (최대 3초) await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { @@ -139,7 +140,6 @@ void tearDown() { @Test @DisplayName("결제 실패 시 주문이 취소되고 재고가 복구된다") - @Transactional void 결제_실패_시_주문_취소_및_재고_복구() { // given String orderNo = testOrder.getOrderNo(); @@ -160,7 +160,7 @@ void tearDown() { ); // when - eventPublisher.publishEvent(paymentEvent); + transactionTemplate.executeWithoutResult(status -> eventPublisher.publishEvent(paymentEvent)); // 비동기 처리 대기 await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { @@ -179,13 +179,11 @@ void tearDown() { @Test @DisplayName("이미 결제 완료된 주문에 중복 이벤트 발행 시 멱등성 보장") - @Transactional void 중복_이벤트_멱등성_보장() { // given - 주문을 먼저 PAID 상태로 변경 String orderNo = testOrder.getOrderNo(); testOrder.markAsPaid(); orderRepository.save(testOrder); - entityManager.flush(); entityManager.clear(); PaymentCompletedEvent paymentEvent = new PaymentCompletedEvent( @@ -197,7 +195,7 @@ void tearDown() { ); // when - 중복 이벤트 발행 - eventPublisher.publishEvent(paymentEvent); + transactionTemplate.executeWithoutResult(status -> eventPublisher.publishEvent(paymentEvent)); // 비동기 처리 대기 await().atMost(3, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted(() -> { @@ -228,7 +226,7 @@ void tearDown() { ); // when - eventPublisher.publishEvent(paymentEvent); + transactionTemplate.executeWithoutResult(status -> eventPublisher.publishEvent(paymentEvent)); // 비동기 처리 및 트랜잭션 커밋 대기 await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { diff --git a/apps/commerce-api/src/test/java/com/loopers/domain/like/LikeConcurrencyTest.java b/apps/commerce-api/src/test/java/com/loopers/domain/like/LikeConcurrencyTest.java index 87ad27a16..653f9a699 100644 --- a/apps/commerce-api/src/test/java/com/loopers/domain/like/LikeConcurrencyTest.java +++ b/apps/commerce-api/src/test/java/com/loopers/domain/like/LikeConcurrencyTest.java @@ -6,6 +6,7 @@ import com.loopers.domain.brand.Brand; import com.loopers.domain.brand.repository.BrandRepository; import com.loopers.domain.common.vo.Money; +import com.loopers.domain.like.repository.LikeRepository; import com.loopers.domain.members.enums.Gender; import com.loopers.domain.product.Product; import com.loopers.domain.product.repository.ProductRepository; @@ -39,6 +40,9 @@ class LikeConcurrencyTest { @Autowired private ProductRepository productRepository; + @Autowired + private LikeRepository likeRepository; + @Autowired private BrandRepository brandRepository; @@ -93,8 +97,8 @@ void shouldHandleConcurrentLikes_whenMultipleUsersLikeSameProduct() throws Inter } // then - Product result = productRepository.findById(productId).orElseThrow(); - assertThat(result.getLikeCount()).isEqualTo(threadCount); + long likeCount = likeRepository.countByProductId(productId); + assertThat(likeCount).isEqualTo(threadCount); } @Test @@ -124,8 +128,8 @@ void shouldHandleConcurrentUnlikes_whenMultipleUsersUnlikeSameProduct() throws I } // 좋아요 개수 확인 - Product beforeUnlike = productRepository.findById(productId).orElseThrow(); - assertThat(beforeUnlike.getLikeCount()).isEqualTo(threadCount); + long beforeUnlikeCount = likeRepository.countByProductId(productId); + assertThat(beforeUnlikeCount).isEqualTo(threadCount); CountDownLatch latch = new CountDownLatch(threadCount); @@ -146,8 +150,8 @@ void shouldHandleConcurrentUnlikes_whenMultipleUsersUnlikeSameProduct() throws I } // then - Product result = productRepository.findById(productId).orElseThrow(); - assertThat(result.getLikeCount()).isEqualTo(0); + long likeCount = likeRepository.countByProductId(productId); + assertThat(likeCount).isEqualTo(0); } @Test @@ -212,7 +216,7 @@ void shouldHandleConcurrentMixedLikes_whenMultipleUsersLikeAndUnlikeSameProduct( } // then: likeCount명만 좋아요 상태여야 함 - Product result = productRepository.findById(productId).orElseThrow(); - assertThat(result.getLikeCount()).isEqualTo(likeCount); + long finalLikeCount = likeRepository.countByProductId(productId); + assertThat(finalLikeCount).isEqualTo(likeCount); } } diff --git a/apps/commerce-api/src/test/java/com/loopers/domain/like/LikeServiceIntegrationTest.java b/apps/commerce-api/src/test/java/com/loopers/domain/like/LikeServiceIntegrationTest.java index 69c265a5e..c72057eb2 100644 --- a/apps/commerce-api/src/test/java/com/loopers/domain/like/LikeServiceIntegrationTest.java +++ b/apps/commerce-api/src/test/java/com/loopers/domain/like/LikeServiceIntegrationTest.java @@ -11,7 +11,6 @@ import com.loopers.domain.product.vo.Stock; import com.loopers.utils.DatabaseCleanUp; import org.junit.jupiter.api.*; -import jakarta.persistence.EntityManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.transaction.annotation.Transactional; @@ -36,9 +35,6 @@ class LikeServiceIntegrationTest { @Autowired private DatabaseCleanUp cleanUp; - @Autowired - private EntityManager entityManager; - @AfterEach void tearDown() { cleanUp.truncateAllTables(); @@ -70,11 +66,6 @@ void likeSuccess() { // then Like saved = likeRepository.findByMemberIdAndProductId(member.getId(), product.getId()).orElse(null); assertThat(saved).isNotNull(); - - entityManager.flush(); - entityManager.clear(); // 1차 캐시 클리어 - Product updated = productRepository.findById(product.getId()).get(); - assertThat(updated.getLikeCount()).isEqualTo(1); } @Test @@ -93,11 +84,6 @@ void duplicateLike() { // then long likeCount = likeRepository.countByProductId(product.getId()); assertThat(likeCount).isEqualTo(1L); - - entityManager.flush(); - entityManager.clear(); // 1차 캐시 클리어 - Product updated = productRepository.findById(product.getId()).get(); - assertThat(updated.getLikeCount()).isEqualTo(1); // 증가 X } @Test @@ -116,9 +102,6 @@ void unlikeSuccess() { // then Like like = likeRepository.findByMemberIdAndProductId(member.getId(), product.getId()).orElse(null); assertThat(like).isNull(); - - Product updated = productRepository.findById(product.getId()).get(); - assertThat(updated.getLikeCount()).isEqualTo(0); } @Test diff --git a/apps/commerce-api/src/test/java/com/loopers/domain/order/OrderPlacementServiceTest.java b/apps/commerce-api/src/test/java/com/loopers/domain/order/OrderPlacementServiceTest.java index 6b449049b..82b54d72c 100644 --- a/apps/commerce-api/src/test/java/com/loopers/domain/order/OrderPlacementServiceTest.java +++ b/apps/commerce-api/src/test/java/com/loopers/domain/order/OrderPlacementServiceTest.java @@ -15,6 +15,7 @@ import com.loopers.domain.product.InMemoryProductRepository; import com.loopers.domain.product.Product; import com.loopers.domain.product.vo.Stock; +import com.loopers.infrastructure.cache.CacheInvalidationService; import com.loopers.support.TestEntityUtils; import com.loopers.support.error.CoreException; import org.junit.jupiter.api.BeforeEach; @@ -26,6 +27,7 @@ import java.util.List; import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.mock; class OrderPlacementServiceTest { @@ -34,6 +36,7 @@ class OrderPlacementServiceTest { private InMemoryMemberRepository memberRepository; private InMemoryPointRepository pointRepository; private InMemoryMemberCouponRepository memberCouponRepository; + private CacheInvalidationService cacheInvalidationService; private OrderPlacementService orderPlacementService; private long memberSequence; @@ -44,12 +47,14 @@ void setUp() { memberRepository = new InMemoryMemberRepository(); pointRepository = new InMemoryPointRepository(); memberCouponRepository = new InMemoryMemberCouponRepository(); + cacheInvalidationService = mock(CacheInvalidationService.class); memberSequence = 0L; orderPlacementService = new OrderPlacementService( orderRepository, productRepository, memberRepository, - memberCouponRepository + memberCouponRepository, + cacheInvalidationService ); } diff --git a/apps/commerce-api/src/test/java/com/loopers/domain/product/InMemoryProductRepository.java b/apps/commerce-api/src/test/java/com/loopers/domain/product/InMemoryProductRepository.java index df9cfa159..78c9e5f2e 100644 --- a/apps/commerce-api/src/test/java/com/loopers/domain/product/InMemoryProductRepository.java +++ b/apps/commerce-api/src/test/java/com/loopers/domain/product/InMemoryProductRepository.java @@ -95,6 +95,15 @@ public int increaseStock(Long productId, int quantity) { return 1; } + @Override + public int getStockQuantity(Long productId) { + Product product = store.get(productId); + if (product == null) { + return 0; + } + return product.getStock().getQuantity(); + } + @Override public int incrementLikeCount(Long productId) { Product product = store.get(productId); diff --git a/apps/commerce-api/src/test/java/com/loopers/infrastructure/kafka/KafkaConnectionTest.java b/apps/commerce-api/src/test/java/com/loopers/infrastructure/kafka/KafkaConnectionTest.java new file mode 100644 index 000000000..8a700ed21 --- /dev/null +++ b/apps/commerce-api/src/test/java/com/loopers/infrastructure/kafka/KafkaConnectionTest.java @@ -0,0 +1,75 @@ +package com.loopers.infrastructure.kafka; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.ImportAutoConfiguration; +import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.test.context.ActiveProfiles; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +@SpringBootTest( + classes = {com.loopers.confg.kafka.KafkaConfig.class}, + properties = { + "spring.kafka.bootstrap-servers=localhost:19092", + "spring.kafka.producer.acks=all", + "spring.kafka.producer.properties.enable.idempotence=true", + "spring.kafka.producer.retries=3", + "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer" + } +) +@ImportAutoConfiguration(JacksonAutoConfiguration.class) +@ActiveProfiles("test") +class KafkaConnectionTest { + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Test + @DisplayName("Kafka Producer 연결 테스트") + void kafkaProducerConnectionTest() throws Exception { + // given + String topic = "demo.internal.topic-v1"; + String key = "test-key"; + String message = "Hello Kafka!"; + + // when + CompletableFuture> future = + kafkaTemplate.send(topic, key, message); + + // then + SendResult result = future.get(10, TimeUnit.SECONDS); + assertThat(result).isNotNull(); + assertThat(result.getRecordMetadata().topic()).isEqualTo(topic); + + System.out.println("✅ Kafka Producer 연결 성공!"); + System.out.println("Topic: " + result.getRecordMetadata().topic()); + System.out.println("Partition: " + result.getRecordMetadata().partition()); + System.out.println("Offset: " + result.getRecordMetadata().offset()); + } + + @Test + @DisplayName("Kafka Producer acks=all 설정 확인") + void kafkaProducerAcksConfigTest() { + // given + var producerFactory = kafkaTemplate.getProducerFactory(); + var configs = producerFactory.getConfigurationProperties(); + + // then + assertThat(configs.get("acks")).isEqualTo("all"); + assertThat(Boolean.parseBoolean(String.valueOf(configs.get("enable.idempotence")))).isTrue(); + + System.out.println("✅ Producer 설정 확인:"); + System.out.println("acks: " + configs.get("acks")); + System.out.println("enable.idempotence: " + configs.get("enable.idempotence")); + System.out.println("retries: " + configs.get("retries")); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/event/like/ProductLikedEvent.java b/apps/commerce-streamer/src/main/java/com/loopers/application/event/like/ProductLikedEvent.java new file mode 100644 index 000000000..90e451090 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/event/like/ProductLikedEvent.java @@ -0,0 +1,11 @@ +package com.loopers.application.event.like; + +import java.time.LocalDateTime; + +public record ProductLikedEvent( + Long memberId, + Long productId, + Long brandId, + LocalDateTime likedAt +) { +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/event/like/ProductUnlikedEvent.java b/apps/commerce-streamer/src/main/java/com/loopers/application/event/like/ProductUnlikedEvent.java new file mode 100644 index 000000000..ce60c397a --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/event/like/ProductUnlikedEvent.java @@ -0,0 +1,11 @@ +package com.loopers.application.event.like; + +import java.time.LocalDateTime; + +public record ProductUnlikedEvent( + Long memberId, + Long productId, + Long brandId, + LocalDateTime unlikedAt +) { +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/event/order/OrderCompletedEvent.java b/apps/commerce-streamer/src/main/java/com/loopers/application/event/order/OrderCompletedEvent.java new file mode 100644 index 000000000..3e37c0072 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/event/order/OrderCompletedEvent.java @@ -0,0 +1,19 @@ +package com.loopers.application.event.order; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.List; + +public record OrderCompletedEvent( + String orderNo, + Long memberId, + BigDecimal totalPrice, + List items, + LocalDateTime completedAt +) { + public record OrderItemInfo( + Long productId, + int quantity, + BigDecimal price + ) {} +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/event/product/ProductViewedEvent.java b/apps/commerce-streamer/src/main/java/com/loopers/application/event/product/ProductViewedEvent.java new file mode 100644 index 000000000..9d59de626 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/event/product/ProductViewedEvent.java @@ -0,0 +1,11 @@ +package com.loopers.application.event.product; + +import java.time.LocalDateTime; + +public record ProductViewedEvent( + Long memberId, // nullable (비로그인 사용자도 추적) + Long productId, + Long brandId, + LocalDateTime viewedAt +) { +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/MetricsAggregationService.java b/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/MetricsAggregationService.java new file mode 100644 index 000000000..cea286f32 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/MetricsAggregationService.java @@ -0,0 +1,140 @@ +package com.loopers.application.metrics; + +import com.loopers.application.event.like.ProductLikedEvent; +import com.loopers.application.event.like.ProductUnlikedEvent; +import com.loopers.application.event.order.OrderCompletedEvent; +import com.loopers.application.event.product.ProductViewedEvent; +import com.loopers.domain.event.EventHandled; +import com.loopers.domain.metrics.ProductMetrics; +import com.loopers.infrastructure.persistence.EventHandledRepository; +import com.loopers.infrastructure.persistence.ProductMetricsRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +/** + * 메트릭 집계 서비스 + * - Kafka 이벤트를 받아서 ProductMetrics 테이블 업데이트 + * - 멱등성 보장: event_handled 테이블로 중복 처리 방지 + */ +@Slf4j +@Service +@RequiredArgsConstructor +@Transactional +public class MetricsAggregationService { + + private final ProductMetricsRepository metricsRepository; + private final EventHandledRepository eventHandledRepository; + + /** + * 상품 좋아요 이벤트 처리 + */ + public void handleProductLiked(String eventId, ProductLikedEvent payload) { + // 멱등성 체크 + if (eventHandledRepository.existsById(eventId)) { + log.warn("[Metrics] Duplicate event ignored - eventId: {}, type: PRODUCT_LIKED", eventId); + return; + } + + // ProductMetrics 업데이트 + ProductMetrics metrics = metricsRepository.findById(payload.productId()) + .orElse(ProductMetrics.create(payload.productId())); + + metrics.incrementLikeCount(); + metricsRepository.save(metrics); + + // 처리 완료 기록 + eventHandledRepository.save( + EventHandled.create(eventId, "PRODUCT_LIKED", String.valueOf(payload.productId())) + ); + + log.info("[Metrics] Like count incremented - productId: {}, count: {}", + payload.productId(), metrics.getLikeCount()); + } + + /** + * 상품 좋아요 취소 이벤트 처리 + */ + public void handleProductUnliked(String eventId, ProductUnlikedEvent payload) { + // 멱등성 체크 + if (eventHandledRepository.existsById(eventId)) { + log.warn("[Metrics] Duplicate event ignored - eventId: {}, type: PRODUCT_UNLIKED", eventId); + return; + } + + // ProductMetrics 업데이트 + ProductMetrics metrics = metricsRepository.findById(payload.productId()) + .orElse(ProductMetrics.create(payload.productId())); + + metrics.decrementLikeCount(); + metricsRepository.save(metrics); + + // 처리 완료 기록 + eventHandledRepository.save( + EventHandled.create(eventId, "PRODUCT_UNLIKED", String.valueOf(payload.productId())) + ); + + log.info("[Metrics] Like count decremented - productId: {}, count: {}", + payload.productId(), metrics.getLikeCount()); + } + + /** + * 주문 완료 이벤트 처리 (판매량 집계) + */ + public void handleOrderCompleted(String eventId, OrderCompletedEvent payload) { + // 멱등성 체크 + if (eventHandledRepository.existsById(eventId)) { + log.warn("[Metrics] Duplicate event ignored - eventId: {}, type: ORDER_COMPLETED", eventId); + return; + } + + // 각 주문 아이템별로 ProductMetrics 업데이트 + for (var item : payload.items()) { + ProductMetrics metrics = metricsRepository.findById(item.productId()) + .orElse(ProductMetrics.create(item.productId())); + + // 판매 금액 = 수량 * 단가 + long totalAmount = item.quantity() * item.price().longValue(); + metrics.addSales(item.quantity(), totalAmount); + metricsRepository.save(metrics); + + log.debug("[Metrics] Sales updated - productId: {}, quantity: {}, amount: {}", + item.productId(), item.quantity(), totalAmount); + } + + // 처리 완료 기록 + eventHandledRepository.save( + EventHandled.create(eventId, "ORDER_COMPLETED", payload.orderNo()) + ); + + log.info("[Metrics] Sales aggregated for order: {} ({} items)", + payload.orderNo(), payload.items().size()); + } + + /** + * 상품 조회 이벤트 처리 (조회수 집계) + */ + public void handleProductViewed(String eventId, ProductViewedEvent payload) { + // 멱등성 체크 + if (eventHandledRepository.existsById(eventId)) { + log.warn("[Metrics] Duplicate event ignored - eventId: {}, type: PRODUCT_VIEWED", eventId); + return; + } + + // ProductMetrics 업데이트 + ProductMetrics metrics = metricsRepository.findById(payload.productId()) + .orElse(ProductMetrics.create(payload.productId())); + + metrics.incrementViewCount(); + metricsRepository.save(metrics); + + // 처리 완료 기록 + eventHandledRepository.save( + EventHandled.create(eventId, "PRODUCT_VIEWED", String.valueOf(payload.productId())) + ); + + log.info("[Metrics] View count incremented - productId: {}, count: {}", + payload.productId(), metrics.getViewCount()); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/confg/kafka/KafkaConfig.java b/apps/commerce-streamer/src/main/java/com/loopers/confg/kafka/KafkaConfig.java new file mode 100644 index 000000000..484616963 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/confg/kafka/KafkaConfig.java @@ -0,0 +1,35 @@ +package com.loopers.confg.kafka; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; + +/** + * Kafka Consumer Configuration + */ +@Configuration +public class KafkaConfig { + + public static final String BATCH_LISTENER = "batchListenerContainerFactory"; + + /** + * 배치 리스너 컨테이너 팩토리 + * - 한 번에 여러 메시지를 처리 + * - Manual Ack 모드 + */ + @Bean(name = BATCH_LISTENER) + public ConcurrentKafkaListenerContainerFactory batchListenerContainerFactory( + ConsumerFactory consumerFactory + ) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + factory.setConsumerFactory(consumerFactory); + factory.setBatchListener(true); // 배치 리스너 활성화 + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // Manual Ack + + return factory; + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessage.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessage.java new file mode 100644 index 000000000..1badf9261 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessage.java @@ -0,0 +1,154 @@ +package com.loopers.domain.dlq; + +import jakarta.persistence.*; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.time.ZonedDateTime; + +/** + * Dead Letter Queue Message + * - Consumer에서 처리 실패한 메시지를 저장 + * - 디버깅 및 재처리를 위한 이력 보관 + */ +@Entity +@Table(name = "dlq_message", indexes = { + @Index(name = "idx_original_topic", columnList = "original_topic"), + @Index(name = "idx_failed_at", columnList = "failed_at"), + @Index(name = "idx_error_type", columnList = "error_type") +}) +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class DlqMessage { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + /** + * 원본 토픽 이름 + */ + @Column(name = "original_topic", nullable = false, length = 200) + private String originalTopic; + + /** + * 원본 파티션 + */ + @Column(name = "original_partition") + private Integer originalPartition; + + /** + * 원본 오프셋 + */ + @Column(name = "original_offset") + private Long originalOffset; + + /** + * 메시지 키 + */ + @Column(name = "message_key", length = 200) + private String messageKey; + + /** + * 메시지 값 (원본 JSON) + */ + @Column(name = "message_value", columnDefinition = "TEXT", nullable = false) + private String messageValue; + + /** + * 에러 타입 + */ + @Column(name = "error_type", nullable = false, length = 100) + private String errorType; + + /** + * 에러 메시지 + */ + @Column(name = "error_message", columnDefinition = "TEXT") + private String errorMessage; + + /** + * 스택 트레이스 + */ + @Column(name = "stack_trace", columnDefinition = "TEXT") + private String stackTrace; + + /** + * 실패 시각 + */ + @Column(name = "failed_at", nullable = false) + private ZonedDateTime failedAt; + + /** + * 재처리 완료 여부 + */ + @Column(name = "resolved", nullable = false) + private boolean resolved = false; + + /** + * DLQ 메시지 생성 + */ + public static DlqMessage create( + String originalTopic, + Integer originalPartition, + Long originalOffset, + String messageKey, + String messageValue, + Exception exception + ) { + DlqMessage dlqMessage = new DlqMessage(); + dlqMessage.originalTopic = originalTopic; + dlqMessage.originalPartition = originalPartition; + dlqMessage.originalOffset = originalOffset; + dlqMessage.messageKey = messageKey; + dlqMessage.messageValue = messageValue; + dlqMessage.errorType = resolveErrorType(exception); + dlqMessage.errorMessage = exception.getMessage(); + dlqMessage.stackTrace = getStackTraceAsString(exception); + dlqMessage.failedAt = ZonedDateTime.now(); + dlqMessage.resolved = false; + return dlqMessage; + } + + /** + * 재처리 완료 처리 + */ + public void markAsResolved() { + this.resolved = true; + } + + /** + * 스택 트레이스를 문자열로 변환 + */ + private static String getStackTraceAsString(Exception exception) { + StringBuilder sb = new StringBuilder(); + sb.append(exception.toString()).append("\n"); + + StackTraceElement[] stackTrace = exception.getStackTrace(); + int maxLines = Math.min(stackTrace.length, 10); // 최대 10줄만 저장 + + for (int i = 0; i < maxLines; i++) { + sb.append("\tat ").append(stackTrace[i]).append("\n"); + } + + if (stackTrace.length > maxLines) { + sb.append("\t... ").append(stackTrace.length - maxLines).append(" more"); + } + + return sb.toString(); + } + + private static String resolveErrorType(Exception exception) { + String simpleName = exception.getClass().getSimpleName(); + if (!simpleName.isEmpty()) { + return simpleName; + } + // 익명 클래스의 경우 simpleName이 빈 문자열이므로 부모 클래스 이름 사용 + Class superClass = exception.getClass().getSuperclass(); + if (superClass != null) { + return superClass.getSimpleName(); + } + return exception.getClass().getName(); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/event/EventHandled.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/event/EventHandled.java new file mode 100644 index 000000000..7a4efdb80 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/event/EventHandled.java @@ -0,0 +1,61 @@ +package com.loopers.domain.event; + +import jakarta.persistence.*; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.time.ZonedDateTime; + +/** + * 이벤트 처리 이력 (멱등성 보장) + * - 중복 이벤트 처리 방지를 위한 테이블 + * - event_id를 PK로 사용하여 같은 이벤트는 한 번만 처리 + */ +@Entity +@Table(name = "event_handled", indexes = { + @Index(name = "idx_event_type_key", columnList = "event_type, partition_key"), + @Index(name = "idx_handled_at", columnList = "handled_at") +}) +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class EventHandled { + + /** + * 이벤트 ID (Outbox Event의 ID) + * - Kafka 메시지의 eventId와 동일 + */ + @Id + @Column(name = "event_id", length = 100) + private String eventId; + + /** + * 이벤트 타입 + */ + @Column(name = "event_type", nullable = false, length = 100) + private String eventType; + + /** + * Partition Key (디버깅용) + */ + @Column(name = "partition_key", nullable = false, length = 100) + private String partitionKey; + + /** + * 처리 완료 시각 + */ + @Column(name = "handled_at", nullable = false) + private ZonedDateTime handledAt; + + /** + * 이벤트 처리 완료 기록 + */ + public static EventHandled create(String eventId, String eventType, String partitionKey) { + EventHandled handled = new EventHandled(); + handled.eventId = eventId; + handled.eventType = eventType; + handled.partitionKey = partitionKey; + handled.handledAt = ZonedDateTime.now(); + return handled; + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetrics.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetrics.java new file mode 100644 index 000000000..459f3c699 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetrics.java @@ -0,0 +1,122 @@ +package com.loopers.domain.metrics; + +import jakarta.persistence.*; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.time.ZonedDateTime; + +/** + * 상품별 집계 메트릭 + * - 좋아요 수, 조회 수, 판매량 등을 실시간 집계 + * - Kafka Consumer가 이벤트를 받아서 업데이트 + * - event_handled 테이블로 멱등성 보장 (중복 처리 방지) + */ +@Entity +@Table(name = "product_metrics", indexes = { + @Index(name = "idx_last_updated", columnList = "last_updated"), + @Index(name = "idx_like_count", columnList = "like_count"), + @Index(name = "idx_sales_count", columnList = "sales_count") +}) +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class ProductMetrics { + + /** + * 상품 ID (PK) + */ + @Id + @Column(name = "product_id") + private Long productId; + + /** + * 좋아요 수 + */ + @Column(name = "like_count", nullable = false) + private long likeCount = 0; + + /** + * 조회 수 (상세 페이지) + */ + @Column(name = "view_count", nullable = false) + private long viewCount = 0; + + /** + * 판매 수량 + */ + @Column(name = "sales_count", nullable = false) + private long salesCount = 0; + + /** + * 판매 금액 + */ + @Column(name = "sales_amount", nullable = false) + private long salesAmount = 0; + + /** + * 마지막 업데이트 시각 + */ + @Column(name = "last_updated", nullable = false) + private ZonedDateTime lastUpdated; + + /** + * 상품 메트릭 초기 생성 + */ + public static ProductMetrics create(Long productId) { + ProductMetrics metrics = new ProductMetrics(); + metrics.productId = productId; + metrics.likeCount = 0; + metrics.viewCount = 0; + metrics.salesCount = 0; + metrics.salesAmount = 0; + metrics.lastUpdated = ZonedDateTime.now(); + return metrics; + } + + /** + * 좋아요 수 증가 + */ + public void incrementLikeCount() { + this.likeCount++; + this.lastUpdated = ZonedDateTime.now(); + } + + /** + * 좋아요 수 감소 + */ + public void decrementLikeCount() { + this.likeCount = Math.max(0, this.likeCount - 1); + this.lastUpdated = ZonedDateTime.now(); + } + + /** + * 조회 수 증가 + */ + public void incrementViewCount() { + this.viewCount++; + this.lastUpdated = ZonedDateTime.now(); + } + + /** + * 판매 데이터 추가 + * @param quantity 판매 수량 + * @param amount 판매 금액 + */ + public void addSales(int quantity, long amount) { + this.salesCount += quantity; + this.salesAmount += amount; + this.lastUpdated = ZonedDateTime.now(); + } + + /** + * 모든 메트릭 초기화 (테스트용) + */ + public void reset() { + this.likeCount = 0; + this.viewCount = 0; + this.salesCount = 0; + this.salesAmount = 0; + this.lastUpdated = ZonedDateTime.now(); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/kafka/DlqPublisher.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/kafka/DlqPublisher.java new file mode 100644 index 000000000..0f0c2bc31 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/kafka/DlqPublisher.java @@ -0,0 +1,86 @@ +package com.loopers.infrastructure.kafka; + +import com.loopers.domain.dlq.DlqMessage; +import com.loopers.infrastructure.persistence.DlqMessageRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +/** + * DLQ Publisher + * - Consumer에서 처리 실패한 메시지를 DLQ 테이블에 저장 + * - 역직렬화 실패, 비즈니스 로직 에러 등 복구 불가능한 에러 처리 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DlqPublisher { + + private final DlqMessageRepository dlqMessageRepository; + + /** + * Consumer Record를 DLQ에 저장 + * + * @param record 실패한 Kafka 메시지 + * @param exception 발생한 예외 + */ + @Transactional + public void publishToDlq(ConsumerRecord record, Exception exception) { + try { + DlqMessage dlqMessage = DlqMessage.create( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value(), + exception + ); + + dlqMessageRepository.save(dlqMessage); + + log.error("[DLQ] Message saved to DLQ - topic: {}, partition: {}, offset: {}, key: {}, error: {}", + record.topic(), + record.partition(), + record.offset(), + record.key(), + exception.getClass().getSimpleName()); + + } catch (Exception e) { + // DLQ 저장마저 실패한 경우 (심각한 상황) + log.error("[DLQ] CRITICAL: Failed to save message to DLQ - topic: {}, partition: {}, offset: {}", + record.topic(), + record.partition(), + record.offset(), + e); + // 여기서는 예외를 던지지 않음 (무한 루프 방지) + } + } + + /** + * 에러 타입에 따라 DLQ 전송 여부 결정 + * + * @param exception 발생한 예외 + * @return true = DLQ로 전송, false = 재시도 필요 + */ + public boolean shouldSendToDlq(Exception exception) { + // 역직렬화 에러 → DLQ (재시도 불가능) + if (exception instanceof com.fasterxml.jackson.core.JsonProcessingException) { + return true; + } + + // IllegalArgumentException → DLQ (데이터 오류) + if (exception instanceof IllegalArgumentException) { + return true; + } + + // NullPointerException → DLQ (데이터 오류) + if (exception instanceof NullPointerException) { + return true; + } + + // 그 외의 에러는 일시적 장애일 수 있으므로 재시도 + return false; + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/kafka/event/KafkaEventEnvelope.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/kafka/event/KafkaEventEnvelope.java new file mode 100644 index 000000000..7e8d3b90d --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/kafka/event/KafkaEventEnvelope.java @@ -0,0 +1,42 @@ +package com.loopers.infrastructure.kafka.event; + +import java.time.ZonedDateTime; + +/** + * Kafka 이벤트 Envelope + * - Kafka로부터 수신한 이벤트의 메타데이터 포함 + * - 멱등성 체크 및 순서 보장을 위한 정보 포함 + * + * @param Application Event 타입 (OrderPlacedEvent, ProductLikedEvent 등) + */ +public record KafkaEventEnvelope( + /** + * 이벤트 ID (Outbox Event의 ID) + * - Consumer에서 멱등성 체크에 사용 + */ + String eventId, + + /** + * 이벤트 타입 + * - 예: ORDER_PLACED, PRODUCT_LIKED, PAYMENT_COMPLETED + */ + String eventType, + + /** + * Partition Key + * - 디버깅 및 로깅용 + */ + String partitionKey, + + /** + * 실제 이벤트 Payload + * - Application Event 객체 (OrderPlacedEvent, ProductLikedEvent 등) + */ + T payload, + + /** + * 이벤트 발생 시각 + */ + ZonedDateTime occurredAt +) { +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/persistence/DlqMessageRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/persistence/DlqMessageRepository.java new file mode 100644 index 000000000..2527e98bb --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/persistence/DlqMessageRepository.java @@ -0,0 +1,30 @@ +package com.loopers.infrastructure.persistence; + +import com.loopers.domain.dlq.DlqMessage; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; + +import java.time.ZonedDateTime; +import java.util.List; + +/** + * DLQ Message Repository + */ +public interface DlqMessageRepository extends JpaRepository { + + /** + * 미해결 DLQ 메시지 조회 + */ + List findByResolvedFalseOrderByFailedAtDesc(); + + /** + * 특정 토픽의 DLQ 메시지 조회 + */ + List findByOriginalTopicOrderByFailedAtDesc(String originalTopic); + + /** + * 특정 기간 이전의 해결된 DLQ 메시지 삭제 (정리용) + */ + @Query("DELETE FROM DlqMessage d WHERE d.resolved = true AND d.failedAt < :cutoffDate") + int deleteResolvedMessagesOlderThan(ZonedDateTime cutoffDate); +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/persistence/EventHandledRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/persistence/EventHandledRepository.java new file mode 100644 index 000000000..bc9094264 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/persistence/EventHandledRepository.java @@ -0,0 +1,30 @@ +package com.loopers.infrastructure.persistence; + +import com.loopers.domain.event.EventHandled; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.time.ZonedDateTime; +import java.util.List; + +public interface EventHandledRepository extends JpaRepository { + + /** + * 이벤트 처리 여부 확인 (멱등성 체크) + */ + boolean existsById(String eventId); + + /** + * 특정 기간 이전의 처리 완료 이벤트 삭제 (정리용) + * @param before 기준 시각 + */ + @Query("DELETE FROM EventHandled e WHERE e.handledAt < :before") + void deleteOldEvents(@Param("before") ZonedDateTime before); + + /** + * 특정 파티션 키의 처리 이력 조회 (디버깅용) + */ + @Query("SELECT e FROM EventHandled e WHERE e.partitionKey = :partitionKey ORDER BY e.handledAt DESC") + List findByPartitionKey(@Param("partitionKey") String partitionKey); +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/persistence/ProductMetricsRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/persistence/ProductMetricsRepository.java new file mode 100644 index 000000000..8284b3383 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/persistence/ProductMetricsRepository.java @@ -0,0 +1,40 @@ +package com.loopers.infrastructure.persistence; + +import com.loopers.domain.metrics.ProductMetrics; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import jakarta.persistence.LockModeType; +import java.util.List; +import java.util.Optional; + +public interface ProductMetricsRepository extends JpaRepository { + + /** + * 비관적 락을 사용한 조회 (동시성 제어) + * - 동시에 여러 이벤트가 들어올 때 데이터 정합성 보장 + */ + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query("SELECT pm FROM ProductMetrics pm WHERE pm.productId = :productId") + Optional findByIdWithLock(@Param("productId") Long productId); + + /** + * 좋아요 수 상위 N개 상품 조회 + */ + @Query("SELECT pm FROM ProductMetrics pm ORDER BY pm.likeCount DESC LIMIT :limit") + List findTopByLikeCount(@Param("limit") int limit); + + /** + * 판매량 상위 N개 상품 조회 + */ + @Query("SELECT pm FROM ProductMetrics pm ORDER BY pm.salesCount DESC LIMIT :limit") + List findTopBySalesCount(@Param("limit") int limit); + + /** + * 조회수 상위 N개 상품 조회 + */ + @Query("SELECT pm FROM ProductMetrics pm ORDER BY pm.viewCount DESC LIMIT :limit") + List findTopByViewCount(@Param("limit") int limit); +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductMetricsConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductMetricsConsumer.java new file mode 100644 index 000000000..27752823b --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductMetricsConsumer.java @@ -0,0 +1,246 @@ +package com.loopers.interfaces.consumer; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.loopers.application.event.like.ProductLikedEvent; +import com.loopers.application.event.like.ProductUnlikedEvent; +import com.loopers.application.event.order.OrderCompletedEvent; +import com.loopers.application.event.product.ProductViewedEvent; +import com.loopers.application.metrics.MetricsAggregationService; +import com.loopers.confg.kafka.KafkaConfig; +import com.loopers.infrastructure.kafka.DlqPublisher; +import com.loopers.infrastructure.kafka.event.KafkaEventEnvelope; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +/** + * Product Metrics Kafka Consumer + * - Product 관련 이벤트를 수신하여 메트릭 집계 + * - Manual Ack: 처리 성공 후에만 offset commit + * - Batch Listener: 한 번에 여러 메시지 처리 + * - 에러 처리: 복구 불가능한 에러는 DLQ로 전송, 일시적 에러는 Kafka 재시도 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class ProductMetricsConsumer { + + private final MetricsAggregationService aggregationService; + private final ObjectMapper objectMapper; + private final DlqPublisher dlqPublisher; + + /** + * 상품 좋아요 이벤트 Consumer + */ + @KafkaListener( + topics = {"loopers.commerce.product-liked-v1"}, + groupId = "metrics-aggregator", + containerFactory = KafkaConfig.BATCH_LISTENER + ) + public void consumeProductLiked( + List> records, + Acknowledgment ack + ) { + log.info("[Consumer] Received {} product-liked events", records.size()); + + List> failedRecords = new ArrayList<>(); + + for (ConsumerRecord record : records) { + try { + KafkaEventEnvelope envelope = + objectMapper.readValue( + record.value(), + new TypeReference>() {} + ); + + aggregationService.handleProductLiked( + envelope.eventId(), + envelope.payload() + ); + + } catch (Exception e) { + log.error("[Consumer] Failed to process product-liked event - offset: {}, key: {}", + record.offset(), record.key(), e); + handleFailedRecord(record, e, failedRecords); + } + } + + // 실패한 레코드가 있으면 예외를 던져서 재처리 + if (!failedRecords.isEmpty()) { + log.warn("[Consumer] {} records failed, will retry", failedRecords.size()); + throw new RuntimeException( + String.format("Failed to process %d records", failedRecords.size()) + ); + } + + ack.acknowledge(); // 모두 성공 시에만 커밋 + log.debug("[Consumer] Acknowledged {} product-liked events", records.size()); + } + + /** + * 상품 좋아요 취소 이벤트 Consumer + */ + @KafkaListener( + topics = {"loopers.commerce.product-unliked-v1"}, + groupId = "metrics-aggregator", + containerFactory = KafkaConfig.BATCH_LISTENER + ) + public void consumeProductUnliked( + List> records, + Acknowledgment ack + ) { + log.info("[Consumer] Received {} product-unliked events", records.size()); + + List> failedRecords = new ArrayList<>(); + + for (ConsumerRecord record : records) { + try { + KafkaEventEnvelope envelope = + objectMapper.readValue( + record.value(), + new TypeReference>() {} + ); + + aggregationService.handleProductUnliked( + envelope.eventId(), + envelope.payload() + ); + + } catch (Exception e) { + log.error("[Consumer] Failed to process product-unliked event - offset: {}, key: {}", + record.offset(), record.key(), e); + handleFailedRecord(record, e, failedRecords); + } + } + + if (!failedRecords.isEmpty()) { + log.warn("[Consumer] {} records failed, will retry", failedRecords.size()); + throw new RuntimeException( + String.format("Failed to process %d records", failedRecords.size()) + ); + } + + ack.acknowledge(); + log.debug("[Consumer] Acknowledged {} product-unliked events", records.size()); + } + + /** + * 주문 완료 이벤트 Consumer (판매량 집계용) + */ + @KafkaListener( + topics = {"loopers.commerce.order-completed-v1"}, + groupId = "metrics-aggregator", + containerFactory = KafkaConfig.BATCH_LISTENER + ) + public void consumeOrderCompleted( + List> records, + Acknowledgment ack + ) { + log.info("[Consumer] Received {} order-completed events", records.size()); + + List> failedRecords = new ArrayList<>(); + + for (ConsumerRecord record : records) { + try { + KafkaEventEnvelope envelope = + objectMapper.readValue( + record.value(), + new TypeReference>() {} + ); + + aggregationService.handleOrderCompleted( + envelope.eventId(), + envelope.payload() + ); + + } catch (Exception e) { + log.error("[Consumer] Failed to process order-completed event - offset: {}, key: {}", + record.offset(), record.key(), e); + handleFailedRecord(record, e, failedRecords); + } + } + + if (!failedRecords.isEmpty()) { + log.warn("[Consumer] {} records failed, will retry", failedRecords.size()); + throw new RuntimeException( + String.format("Failed to process %d records", failedRecords.size()) + ); + } + + ack.acknowledge(); + log.debug("[Consumer] Acknowledged {} order-completed events", records.size()); + } + + /** + * 상품 조회 이벤트 Consumer (조회수 집계용) + */ + @KafkaListener( + topics = {"loopers.commerce.product-viewed-v1"}, + groupId = "metrics-aggregator", + containerFactory = KafkaConfig.BATCH_LISTENER + ) + public void consumeProductViewed( + List> records, + Acknowledgment ack + ) { + log.info("[Consumer] Received {} product-viewed events", records.size()); + + List> failedRecords = new ArrayList<>(); + + for (ConsumerRecord record : records) { + try { + KafkaEventEnvelope envelope = + objectMapper.readValue( + record.value(), + new TypeReference>() {} + ); + + aggregationService.handleProductViewed( + envelope.eventId(), + envelope.payload() + ); + + } catch (Exception e) { + log.error("[Consumer] Failed to process product-viewed event - offset: {}, key: {}", + record.offset(), record.key(), e); + handleFailedRecord(record, e, failedRecords); + } + } + + if (!failedRecords.isEmpty()) { + log.warn("[Consumer] {} records failed, will retry", failedRecords.size()); + throw new RuntimeException( + String.format("Failed to process %d records", failedRecords.size()) + ); + } + + ack.acknowledge(); + log.debug("[Consumer] Acknowledged {} product-viewed events", records.size()); + } + + /** + * 실패한 레코드 처리 공통 메서드 + * - 복구 불가능한 에러: DLQ로 전송 + * - 일시적 에러: 배치 재처리 (Kafka가 자동으로 재시도) + */ + private void handleFailedRecord( + ConsumerRecord record, + Exception exception, + List> failedRecords + ) { + if (dlqPublisher.shouldSendToDlq(exception)) { + // 복구 불가능한 에러 → 즉시 DLQ로 전송 + dlqPublisher.publishToDlq(record, exception); + } else { + // 일시적 에러 → 재시도 대상에 추가 + failedRecords.add(record); + } + } +} diff --git a/apps/commerce-streamer/src/test/java/com/loopers/application/metrics/MetricsAggregationServiceIdempotencyTest.java b/apps/commerce-streamer/src/test/java/com/loopers/application/metrics/MetricsAggregationServiceIdempotencyTest.java new file mode 100644 index 000000000..a9f922461 --- /dev/null +++ b/apps/commerce-streamer/src/test/java/com/loopers/application/metrics/MetricsAggregationServiceIdempotencyTest.java @@ -0,0 +1,251 @@ +package com.loopers.application.metrics; + +import com.loopers.application.event.like.ProductLikedEvent; +import com.loopers.application.event.like.ProductUnlikedEvent; +import com.loopers.application.event.order.OrderCompletedEvent; +import com.loopers.application.event.product.ProductViewedEvent; +import com.loopers.config.TestConfig; +import com.loopers.domain.metrics.ProductMetrics; +import com.loopers.infrastructure.persistence.EventHandledRepository; +import com.loopers.infrastructure.persistence.ProductMetricsRepository; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.ActiveProfiles; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * 메트릭 집계 서비스 멱등성 테스트 + * - 중복 이벤트 처리 방지 검증 + * - event_handled 테이블 기반 멱등성 보장 + */ +@DataJpaTest +@Import({MetricsAggregationService.class}) +@ActiveProfiles("test") +class MetricsAggregationServiceIdempotencyTest { + + @Autowired + private MetricsAggregationService aggregationService; + + @Autowired + private ProductMetricsRepository metricsRepository; + + @Autowired + private EventHandledRepository eventHandledRepository; + + private static final Long PRODUCT_ID = 100L; + private static final Long MEMBER_ID = 1L; + private static final Long BRAND_ID = 10L; + + @BeforeEach + void setUp() { + // 테스트용 ProductMetrics 초기화 + ProductMetrics metrics = ProductMetrics.create(PRODUCT_ID); + metricsRepository.save(metrics); + } + + @Test + @DisplayName("좋아요 이벤트 중복 처리 방지 - 동일한 eventId로 두 번 호출 시 한 번만 처리됨") + void shouldIgnoreDuplicateProductLikedEvent() { + // given + String eventId = "event-liked-001"; + ProductLikedEvent event = new ProductLikedEvent( + MEMBER_ID, + PRODUCT_ID, + BRAND_ID, + LocalDateTime.now() + ); + + // when - 첫 번째 이벤트 처리 + aggregationService.handleProductLiked(eventId, event); + + // then - 좋아요 수가 1로 증가 + ProductMetrics metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + assertThat(metrics.getLikeCount()).isEqualTo(1); + assertThat(eventHandledRepository.existsById(eventId)).isTrue(); + + // when - 동일한 eventId로 두 번째 이벤트 처리 (중복) + aggregationService.handleProductLiked(eventId, event); + + // then - 좋아요 수가 여전히 1 (증가하지 않음) + metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + assertThat(metrics.getLikeCount()).isEqualTo(1); + } + + @Test + @DisplayName("좋아요 취소 이벤트 중복 처리 방지") + void shouldIgnoreDuplicateProductUnlikedEvent() { + // given - 좋아요 수를 먼저 1로 설정 + ProductMetrics metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + metrics.incrementLikeCount(); + metricsRepository.save(metrics); + + String eventId = "event-unliked-001"; + ProductUnlikedEvent event = new ProductUnlikedEvent( + MEMBER_ID, + PRODUCT_ID, + BRAND_ID, + LocalDateTime.now() + ); + + // when - 첫 번째 이벤트 처리 + aggregationService.handleProductUnliked(eventId, event); + + // then - 좋아요 수가 0으로 감소 + metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + assertThat(metrics.getLikeCount()).isEqualTo(0); + assertThat(eventHandledRepository.existsById(eventId)).isTrue(); + + // when - 동일한 eventId로 두 번째 이벤트 처리 (중복) + aggregationService.handleProductUnliked(eventId, event); + + // then - 좋아요 수가 여전히 0 (감소하지 않음) + metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + assertThat(metrics.getLikeCount()).isEqualTo(0); + } + + @Test + @DisplayName("주문 완료 이벤트 중복 처리 방지") + void shouldIgnoreDuplicateOrderCompletedEvent() { + // given + String eventId = "event-order-001"; + OrderCompletedEvent event = new OrderCompletedEvent( + "ORDER-001", + MEMBER_ID, + BigDecimal.valueOf(50000), + List.of( + new OrderCompletedEvent.OrderItemInfo( + PRODUCT_ID, + 2, + BigDecimal.valueOf(25000) + ) + ), + LocalDateTime.now() + ); + + // when - 첫 번째 이벤트 처리 + aggregationService.handleOrderCompleted(eventId, event); + + // then - 판매량 증가 + ProductMetrics metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + assertThat(metrics.getSalesCount()).isEqualTo(2); + assertThat(metrics.getSalesAmount()).isEqualTo(50000); + assertThat(eventHandledRepository.existsById(eventId)).isTrue(); + + // when - 동일한 eventId로 두 번째 이벤트 처리 (중복) + aggregationService.handleOrderCompleted(eventId, event); + + // then - 판매량이 여전히 2 (증가하지 않음) + metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + assertThat(metrics.getSalesCount()).isEqualTo(2); + assertThat(metrics.getSalesAmount()).isEqualTo(50000); + } + + @Test + @DisplayName("서로 다른 eventId는 각각 처리됨") + void shouldProcessDifferentEventIds() { + // given + String eventId1 = "event-liked-001"; + String eventId2 = "event-liked-002"; + ProductLikedEvent event1 = new ProductLikedEvent( + MEMBER_ID, + PRODUCT_ID, + BRAND_ID, + LocalDateTime.now() + ); + ProductLikedEvent event2 = new ProductLikedEvent( + MEMBER_ID + 1, + PRODUCT_ID, + BRAND_ID, + LocalDateTime.now() + ); + + // when - 서로 다른 eventId로 두 번 호출 + aggregationService.handleProductLiked(eventId1, event1); + aggregationService.handleProductLiked(eventId2, event2); + + // then - 좋아요 수가 2로 증가 + ProductMetrics metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + assertThat(metrics.getLikeCount()).isEqualTo(2); + assertThat(eventHandledRepository.existsById(eventId1)).isTrue(); + assertThat(eventHandledRepository.existsById(eventId2)).isTrue(); + } + + @Test + @DisplayName("상품 조회 이벤트 중복 처리 방지 - 동일한 eventId로 두 번 호출 시 한 번만 처리됨") + void shouldIgnoreDuplicateProductViewedEvent() { + // given + String eventId = "event-viewed-001"; + ProductViewedEvent event = new ProductViewedEvent( + MEMBER_ID, + PRODUCT_ID, + BRAND_ID, + LocalDateTime.now() + ); + + // when - 첫 번째 이벤트 처리 + aggregationService.handleProductViewed(eventId, event); + + // then - 조회 수가 1로 증가 + ProductMetrics metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + assertThat(metrics.getViewCount()).isEqualTo(1); + assertThat(eventHandledRepository.existsById(eventId)).isTrue(); + + // when - 동일한 eventId로 두 번째 이벤트 처리 (중복) + aggregationService.handleProductViewed(eventId, event); + + // then - 조회 수가 여전히 1 (증가하지 않음) + metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + assertThat(metrics.getViewCount()).isEqualTo(1); + } + + @Test + @DisplayName("비로그인 사용자 조회도 집계됨") + void shouldAggregateViewFromAnonymousUser() { + // given + String eventId = "event-viewed-002"; + ProductViewedEvent event = new ProductViewedEvent( + null, // 비로그인 사용자 + PRODUCT_ID, + BRAND_ID, + LocalDateTime.now() + ); + + // when + aggregationService.handleProductViewed(eventId, event); + + // then + ProductMetrics metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + assertThat(metrics.getViewCount()).isEqualTo(1); + } + + @Test + @DisplayName("여러 조회 이벤트가 누적됨") + void shouldAccumulateMultipleViewEvents() { + // given + String eventId1 = "event-viewed-003"; + String eventId2 = "event-viewed-004"; + String eventId3 = "event-viewed-005"; + + ProductViewedEvent event1 = new ProductViewedEvent(MEMBER_ID, PRODUCT_ID, BRAND_ID, LocalDateTime.now()); + ProductViewedEvent event2 = new ProductViewedEvent(MEMBER_ID + 1, PRODUCT_ID, BRAND_ID, LocalDateTime.now()); + ProductViewedEvent event3 = new ProductViewedEvent(null, PRODUCT_ID, BRAND_ID, LocalDateTime.now()); + + // when + aggregationService.handleProductViewed(eventId1, event1); + aggregationService.handleProductViewed(eventId2, event2); + aggregationService.handleProductViewed(eventId3, event3); + + // then + ProductMetrics metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow(); + assertThat(metrics.getViewCount()).isEqualTo(3); + } +} diff --git a/apps/commerce-streamer/src/test/java/com/loopers/config/TestConfig.java b/apps/commerce-streamer/src/test/java/com/loopers/config/TestConfig.java new file mode 100644 index 000000000..c7a92084e --- /dev/null +++ b/apps/commerce-streamer/src/test/java/com/loopers/config/TestConfig.java @@ -0,0 +1,17 @@ +package com.loopers.config; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.ComponentScan; + +/** + * 테스트 환경 설정 + * - Kafka 자동 설정 비활성화 + * - JPA, Redis만 활성화 + */ +@TestConfiguration +@EnableAutoConfiguration(exclude = {KafkaAutoConfiguration.class}) +@ComponentScan(basePackages = "com.loopers") +public class TestConfig { +} diff --git a/apps/commerce-streamer/src/test/java/com/loopers/infrastructure/kafka/DlqPublisherTest.java b/apps/commerce-streamer/src/test/java/com/loopers/infrastructure/kafka/DlqPublisherTest.java new file mode 100644 index 000000000..3988a519d --- /dev/null +++ b/apps/commerce-streamer/src/test/java/com/loopers/infrastructure/kafka/DlqPublisherTest.java @@ -0,0 +1,118 @@ +package com.loopers.infrastructure.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.loopers.domain.dlq.DlqMessage; +import com.loopers.infrastructure.persistence.DlqMessageRepository; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.transaction.annotation.Transactional; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * DLQ Publisher 테스트 + */ +@SpringBootTest +@ActiveProfiles("test") +@Transactional +class DlqPublisherTest { + + @Autowired + private DlqPublisher dlqPublisher; + + @Autowired + private DlqMessageRepository dlqMessageRepository; + + @Test + @DisplayName("역직렬화 에러는 DLQ로 전송해야 함") + void shouldSendJsonProcessingExceptionToDlq() { + // given + JsonProcessingException exception = new JsonProcessingException("Invalid JSON") {}; + + // when + boolean shouldSendToDlq = dlqPublisher.shouldSendToDlq(exception); + + // then + assertThat(shouldSendToDlq).isTrue(); + } + + @Test + @DisplayName("IllegalArgumentException은 DLQ로 전송해야 함") + void shouldSendIllegalArgumentExceptionToDlq() { + // given + IllegalArgumentException exception = new IllegalArgumentException("Invalid argument"); + + // when + boolean shouldSendToDlq = dlqPublisher.shouldSendToDlq(exception); + + // then + assertThat(shouldSendToDlq).isTrue(); + } + + @Test + @DisplayName("RuntimeException은 재시도해야 함 (DLQ 전송 안 함)") + void shouldRetryRuntimeException() { + // given + RuntimeException exception = new RuntimeException("Temporary error"); + + // when + boolean shouldSendToDlq = dlqPublisher.shouldSendToDlq(exception); + + // then + assertThat(shouldSendToDlq).isFalse(); + } + + @Test + @DisplayName("실패한 메시지를 DLQ 테이블에 저장") + void shouldSaveFailedMessageToDlq() { + // given + ConsumerRecord record = new ConsumerRecord<>( + "loopers.commerce.product-liked-v1", + 0, + 100L, + "product-123", + "{\"invalid\":\"json}" + ); + JsonProcessingException exception = new JsonProcessingException("Parse error") {}; + + // when + dlqPublisher.publishToDlq(record, exception); + + // then + DlqMessage savedMessage = dlqMessageRepository.findAll().get(0); + assertThat(savedMessage.getOriginalTopic()).isEqualTo("loopers.commerce.product-liked-v1"); + assertThat(savedMessage.getOriginalPartition()).isEqualTo(0); + assertThat(savedMessage.getOriginalOffset()).isEqualTo(100L); + assertThat(savedMessage.getMessageKey()).isEqualTo("product-123"); + assertThat(savedMessage.getMessageValue()).isEqualTo("{\"invalid\":\"json}"); + assertThat(savedMessage.getErrorType()).isEqualTo("JsonProcessingException"); + assertThat(savedMessage.isResolved()).isFalse(); + } + + @Test + @DisplayName("DLQ 메시지는 스택 트레이스를 포함해야 함") + void shouldIncludeStackTrace() { + // given + ConsumerRecord record = new ConsumerRecord<>( + "test-topic", + 0, + 1L, + "key", + "value" + ); + Exception exception = new RuntimeException("Test error"); + + // when + dlqPublisher.publishToDlq(record, exception); + + // then + DlqMessage savedMessage = dlqMessageRepository.findAll().get(0); + assertThat(savedMessage.getStackTrace()).isNotNull(); + assertThat(savedMessage.getStackTrace()).contains("RuntimeException"); + assertThat(savedMessage.getStackTrace()).contains("Test error"); + } +} diff --git a/apps/commerce-streamer/src/test/resources/application-test.yml b/apps/commerce-streamer/src/test/resources/application-test.yml new file mode 100644 index 000000000..fd7e1568c --- /dev/null +++ b/apps/commerce-streamer/src/test/resources/application-test.yml @@ -0,0 +1,18 @@ +spring: + datasource: + driver-class-name: org.testcontainers.jdbc.ContainerDatabaseDriver + url: jdbc:tc:mysql:8.0:///test_db?TC_DAEMON=true + jpa: + hibernate: + ddl-auto: create-drop + show-sql: true + properties: + hibernate: + format_sql: true + kafka: + enabled: false # 테스트에서는 Kafka 비활성화 + +logging: + level: + com.loopers: DEBUG + org.springframework.kafka: DEBUG diff --git a/modules/kafka/src/main/java/com/loopers/confg/kafka/KafkaConfig.java b/modules/kafka/src/main/java/com/loopers/confg/kafka/KafkaConfig.java index a73842775..8cd13702b 100644 --- a/modules/kafka/src/main/java/com/loopers/confg/kafka/KafkaConfig.java +++ b/modules/kafka/src/main/java/com/loopers/confg/kafka/KafkaConfig.java @@ -30,19 +30,19 @@ public class KafkaConfig { public static final int MAX_POLL_INTERVAL_MS = 2 * 60 * 1000; // max poll interval = 2m @Bean - public ProducerFactory producerFactory(KafkaProperties kafkaProperties) { + public ProducerFactory producerFactory(KafkaProperties kafkaProperties) { Map props = new HashMap<>(kafkaProperties.buildProducerProperties()); return new DefaultKafkaProducerFactory<>(props); } @Bean - public ConsumerFactory consumerFactory(KafkaProperties kafkaProperties) { + public ConsumerFactory consumerFactory(KafkaProperties kafkaProperties) { Map props = new HashMap<>(kafkaProperties.buildConsumerProperties()); return new DefaultKafkaConsumerFactory<>(props); } @Bean - public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { return new KafkaTemplate<>(producerFactory); } @@ -52,7 +52,7 @@ public ByteArrayJsonMessageConverter jsonMessageConverter(ObjectMapper objectMap } @Bean(name = BATCH_LISTENER) - public ConcurrentKafkaListenerContainerFactory defaultBatchListenerContainerFactory( + public ConcurrentKafkaListenerContainerFactory defaultBatchListenerContainerFactory( KafkaProperties kafkaProperties, ByteArrayJsonMessageConverter converter ) { @@ -64,7 +64,7 @@ public ConcurrentKafkaListenerContainerFactory defaultBatchListe consumerConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, HEARTBEAT_INTERVAL_MS); consumerConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, MAX_POLL_INTERVAL_MS); - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig)); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // 수동 커밋 factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter)); diff --git a/modules/kafka/src/main/resources/kafka.yml b/modules/kafka/src/main/resources/kafka.yml index 9609dbf85..40d5c8b3e 100644 --- a/modules/kafka/src/main/resources/kafka.yml +++ b/modules/kafka/src/main/resources/kafka.yml @@ -15,6 +15,10 @@ spring: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer retries: 3 + acks: all # At Least Once 보장: 모든 in-sync replica가 확인할 때까지 대기 + properties: + enable.idempotence: true # 멱등성 프로듀서: 중복 발행 방지 + max.in.flight.requests.per.connection: 5 # 순서 보장 consumer: group-id: loopers-default-consumer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer diff --git a/modules/redis/src/testFixtures/java/com/loopers/testcontainers/RedisTestContainersConfig.java b/modules/redis/src/testFixtures/java/com/loopers/testcontainers/RedisTestContainersConfig.java index 35bf94f06..09496e0e8 100644 --- a/modules/redis/src/testFixtures/java/com/loopers/testcontainers/RedisTestContainersConfig.java +++ b/modules/redis/src/testFixtures/java/com/loopers/testcontainers/RedisTestContainersConfig.java @@ -10,13 +10,13 @@ public class RedisTestContainersConfig { static { redisContainer.start(); - } - - public RedisTestContainersConfig() { System.setProperty("datasource.redis.database", "0"); System.setProperty("datasource.redis.master.host", redisContainer.getHost()); System.setProperty("datasource.redis.master.port", String.valueOf(redisContainer.getFirstMappedPort())); System.setProperty("datasource.redis.replicas[0].host", redisContainer.getHost()); System.setProperty("datasource.redis.replicas[0].port", String.valueOf(redisContainer.getFirstMappedPort())); } + + public RedisTestContainersConfig() { + } }