diff --git a/apps/commerce-api/build.gradle.kts b/apps/commerce-api/build.gradle.kts index 93503b4ab..fd1c97d2d 100644 --- a/apps/commerce-api/build.gradle.kts +++ b/apps/commerce-api/build.gradle.kts @@ -2,6 +2,7 @@ dependencies { // add-ons implementation(project(":modules:jpa")) implementation(project(":modules:redis")) + implementation(project(":modules:kafka")) implementation(project(":supports:jackson")) implementation(project(":supports:logging")) implementation(project(":supports:monitoring")) @@ -22,6 +23,10 @@ dependencies { testImplementation("net.datafaker:datafaker:2.0.2") + // Kafka + testImplementation(testFixtures(project(":modules:kafka"))) + testImplementation("org.springframework.kafka:spring-kafka-test") + // Resilience4j implementation("io.github.resilience4j:resilience4j-spring-boot3") implementation("org.springframework.boot:spring-boot-starter-aop") diff --git a/apps/commerce-api/src/main/java/com/loopers/application/order/OrderFacade.java b/apps/commerce-api/src/main/java/com/loopers/application/order/OrderFacade.java index e66e6a4f5..a8b169f4c 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/order/OrderFacade.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/order/OrderFacade.java @@ -90,6 +90,11 @@ public OrderInfo createOrder(OrderPlaceCommand command) { public void useCoupon(Long couponId) { log.info("쿠폰 사용 처리: couponId={}", couponId); Coupon coupon = couponService.getCouponWithOptimisticLock(couponId); + if (!coupon.canUse()) { + log.info("이미 사용된 쿠폰입니다 (멱등성 처리): couponId={}", couponId); + return; + } + coupon.use(); couponService.save(coupon); } diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxEvent.java b/apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxEvent.java new file mode 100644 index 000000000..69c9fac75 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxEvent.java @@ -0,0 +1,96 @@ +package com.loopers.domain.outbox; + +import jakarta.persistence.*; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; +import java.util.UUID; + +@Entity +@Table(name = "outbox_event") +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class OutboxEvent { + + @Id + private String id; + + @Column(name = "aggregate_type", nullable = false) + private String aggregateType; + + @Column(name = "aggregate_id", nullable = false) + private String aggregateId; + + @Column(name = "event_type", nullable = false) + private String eventType; + + @Column(name = "topic", nullable = false) + private String topic; + + @Column(name = "partition_key") + private String partitionKey; + + @Column(name = "payload", columnDefinition = "TEXT", nullable = false) + private String payload; + + @Column(name = "status", nullable = false) + @Enumerated(EnumType.STRING) + private OutboxStatus status; + + @Column(name = "created_at", nullable = false) + private LocalDateTime createdAt; + + @Column(name = "processed_at") + private LocalDateTime processedAt; + + @Column(name = "retry_count") + private int retryCount; + + @Column(name = "last_error") + private String lastError; + + public static OutboxEvent create( + String aggregateType, + String aggregateId, + String eventType, + String topic, + String partitionKey, + String payload + ) { + OutboxEvent event = new OutboxEvent(); + event.id = UUID.randomUUID().toString(); + event.aggregateType = aggregateType; + event.aggregateId = aggregateId; + event.eventType = eventType; + event.topic = topic; + event.partitionKey = partitionKey; + event.payload = payload; + event.status = OutboxStatus.PENDING; + event.createdAt = LocalDateTime.now(); + event.retryCount = 0; + return event; + } + + public void markAsProcessed() { + this.status = OutboxStatus.PROCESSED; + this.processedAt = LocalDateTime.now(); + } + + public void markAsFailed(String error) { + this.status = OutboxStatus.FAILED; + this.lastError = error; + this.retryCount++; + } + + public void markForRetry() { + this.status = OutboxStatus.PENDING; + } + + public enum OutboxStatus { + PENDING, + PROCESSED, + FAILED + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxEventRepository.java b/apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxEventRepository.java new file mode 100644 index 000000000..6830c846a --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxEventRepository.java @@ -0,0 +1,9 @@ +package com.loopers.domain.outbox; + +import java.util.List; + +public interface OutboxEventRepository { + OutboxEvent save(OutboxEvent event); + List findPendingEvents(int limit); + List findFailedEventsForRetry(int maxRetryCount, int limit); +} diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxService.java b/apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxService.java new file mode 100644 index 000000000..1ff7366d9 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxService.java @@ -0,0 +1,46 @@ +package com.loopers.domain.outbox; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +@Slf4j +@Service +@RequiredArgsConstructor +public class OutboxService { + + private final OutboxEventRepository outboxEventRepository; + private final ObjectMapper objectMapper; + + @Transactional + public void saveEvent( + String aggregateType, + String aggregateId, + String eventType, + String topic, + String partitionKey, + Object payload + ) { + try { + String payloadJson = objectMapper.writeValueAsString(payload); + OutboxEvent event = OutboxEvent.create( + aggregateType, + aggregateId, + eventType, + topic, + partitionKey, + payloadJson + ); + outboxEventRepository.save(event); + log.debug("Outbox 이벤트 저장: aggregateType={}, aggregateId={}, eventType={}", + aggregateType, aggregateId, eventType); + } catch (JsonProcessingException e) { + log.error("Outbox 이벤트 직렬화 실패: aggregateType={}, aggregateId={}", + aggregateType, aggregateId, e); + throw new RuntimeException("이벤트 직렬화 실패", e); + } + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/payment/event/PaymentSucceededEvent.java b/apps/commerce-api/src/main/java/com/loopers/domain/payment/event/PaymentSucceededEvent.java index 3ed138cbd..634b39b4a 100644 --- a/apps/commerce-api/src/main/java/com/loopers/domain/payment/event/PaymentSucceededEvent.java +++ b/apps/commerce-api/src/main/java/com/loopers/domain/payment/event/PaymentSucceededEvent.java @@ -12,6 +12,7 @@ public record PaymentSucceededEvent( Long couponId, Long amount, PaymentMethod paymentMethod, + String transactionId, ZonedDateTime paidAt ) { public static PaymentSucceededEvent of(Payment payment, Long couponId, ZonedDateTime paidAt) { @@ -22,6 +23,7 @@ public static PaymentSucceededEvent of(Payment payment, Long couponId, ZonedDate couponId, payment.getAmountValue(), payment.getPaymentMethod(), + payment.getTransactionId(), paidAt != null ? paidAt : ZonedDateTime.now() ); } diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/LikeChangedDto.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/LikeChangedDto.java new file mode 100644 index 000000000..8a1c8e7ad --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/LikeChangedDto.java @@ -0,0 +1,17 @@ +package com.loopers.infrastructure.kafka.dto; + +import java.util.UUID; + +public record LikeChangedDto( + String eventId, + Long productId, + String likeType +) { + public static LikeChangedDto of(Long productId, String likeType) { + return new LikeChangedDto( + UUID.randomUUID().toString(), + productId, + likeType + ); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/OrderEventDto.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/OrderEventDto.java new file mode 100644 index 000000000..2035bac47 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/OrderEventDto.java @@ -0,0 +1,67 @@ +package com.loopers.infrastructure.kafka.dto; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.UUID; + +public record OrderEventDto( + String eventId, + Long orderId, + Long userId, + String orderStatus, + Long totalAmount, + Long discountAmount, + List items, + LocalDateTime occurredAt +) { + public static OrderEventDto created( + Long orderId, + Long userId, + Long totalAmount, + Long discountAmount, + List items + ) { + return new OrderEventDto( + UUID.randomUUID().toString(), + orderId, + userId, + "CREATED", + totalAmount, + discountAmount, + items, + LocalDateTime.now() + ); + } + + public static OrderEventDto completed(Long orderId, Long userId) { + return new OrderEventDto( + UUID.randomUUID().toString(), + orderId, + userId, + "COMPLETED", + null, + null, + null, + LocalDateTime.now() + ); + } + + public static OrderEventDto failed(Long orderId, Long userId) { + return new OrderEventDto( + UUID.randomUUID().toString(), + orderId, + userId, + "FAILED", + null, + null, + null, + LocalDateTime.now() + ); + } + + public record OrderItemDto( + Long productId, + int quantity, + Long unitPrice + ) {} +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/PaymentEventDto.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/PaymentEventDto.java new file mode 100644 index 000000000..ee02b8dea --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/PaymentEventDto.java @@ -0,0 +1,54 @@ +package com.loopers.infrastructure.kafka.dto; + +import java.time.LocalDateTime; +import java.util.UUID; + +public record PaymentEventDto( + String eventId, + Long orderId, + Long userId, + String paymentStatus, + String transactionId, + Long amount, + String failureReason, + LocalDateTime occurredAt +) { + public static PaymentEventDto success(Long orderId, Long userId, String transactionId, Long amount) { + return new PaymentEventDto( + UUID.randomUUID().toString(), + orderId, + userId, + "SUCCESS", + transactionId, + amount, + null, + LocalDateTime.now() + ); + } + + public static PaymentEventDto failed(Long orderId, Long userId, String failureReason) { + return new PaymentEventDto( + UUID.randomUUID().toString(), + orderId, + userId, + "FAILED", + null, + null, + failureReason, + LocalDateTime.now() + ); + } + + public static PaymentEventDto pending(Long orderId, Long userId, String transactionId) { + return new PaymentEventDto( + UUID.randomUUID().toString(), + orderId, + userId, + "PENDING", + transactionId, + null, + null, + LocalDateTime.now() + ); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/ProductViewedDto.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/ProductViewedDto.java new file mode 100644 index 000000000..b66bf7242 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/ProductViewedDto.java @@ -0,0 +1,15 @@ +package com.loopers.infrastructure.kafka.dto; + +import java.util.UUID; + +public record ProductViewedDto( + String eventId, + Long productId +) { + public static ProductViewedDto of(Long productId) { + return new ProductViewedDto( + UUID.randomUUID().toString(), + productId + ); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/StockChangedDto.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/StockChangedDto.java new file mode 100644 index 000000000..b9df77737 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/StockChangedDto.java @@ -0,0 +1,19 @@ +package com.loopers.infrastructure.kafka.dto; + +import java.util.UUID; + +public record StockChangedDto( + String eventId, + Long productId, + int stock, + String changedType +) { + public static StockChangedDto of(Long productId, int stock, String changedType) { + return new StockChangedDto( + UUID.randomUUID().toString(), + productId, + stock, + changedType + ); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/UserActionDto.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/UserActionDto.java new file mode 100644 index 000000000..d584e20bc --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/dto/UserActionDto.java @@ -0,0 +1,21 @@ +package com.loopers.infrastructure.kafka.dto; + +import java.util.UUID; + +public record UserActionDto( + String eventId, + Long userId, + String actionType, + String targetType, + Long targetId +) { + public static UserActionDto of(Long userId, String actionType, String targetType, Long targetId) { + return new UserActionDto( + UUID.randomUUID().toString(), + userId, + actionType, + targetType, + targetId + ); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/LikeChangedEventProducer.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/LikeChangedEventProducer.java new file mode 100644 index 000000000..c8c0d08c8 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/LikeChangedEventProducer.java @@ -0,0 +1,32 @@ +package com.loopers.infrastructure.kafka.producer; + +import com.loopers.infrastructure.kafka.dto.LikeChangedDto; +import io.github.resilience4j.retry.annotation.Retry; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class LikeChangedEventProducer { + + private final KafkaTemplate kafkaTemplate; + + @Value("${kafka.topic.product-like-name}") + private String likeChangedTopic; + + @Retry(name = "kafkaProducer", fallbackMethod = "likeChangedFallback") + public void sendLikeChangedEvent(Long productId, String likeType) { + LikeChangedDto event = LikeChangedDto.of(productId, likeType); + kafkaTemplate.send(likeChangedTopic, productId.toString(), event); + log.info("좋아요 변경 이벤트 발행: productId={}, likeType={}", productId, likeType); + } + + public void likeChangedFallback(Long productId, String likeType, Throwable ex) { + log.error("좋아요 변경 이벤트 발행 실패 (재시도 후): productId={}, likeType={}", + productId, likeType, ex); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/OrderEventProducer.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/OrderEventProducer.java new file mode 100644 index 000000000..c4e06d07c --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/OrderEventProducer.java @@ -0,0 +1,66 @@ +package com.loopers.infrastructure.kafka.producer; + +import com.loopers.infrastructure.kafka.dto.OrderEventDto; +import io.github.resilience4j.retry.annotation.Retry; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Slf4j +@Component +@RequiredArgsConstructor +public class OrderEventProducer { + + private final KafkaTemplate kafkaTemplate; + + @Value("${kafka.topic.order-events-name}") + private String orderEventsTopic; + + @Retry(name = "kafkaProducer", fallbackMethod = "orderEventFallback") + public void sendOrderCreatedEvent( + Long orderId, + Long userId, + Long totalAmount, + Long discountAmount, + List items + ) { + OrderEventDto event = OrderEventDto.created(orderId, userId, totalAmount, discountAmount, items); + kafkaTemplate.send(orderEventsTopic, orderId.toString(), event); + log.info("주문 생성 이벤트 발행: orderId={}, userId={}", orderId, userId); + } + + @Retry(name = "kafkaProducer", fallbackMethod = "orderStatusEventFallback") + public void sendOrderCompletedEvent(Long orderId, Long userId) { + OrderEventDto event = OrderEventDto.completed(orderId, userId); + kafkaTemplate.send(orderEventsTopic, orderId.toString(), event); + log.info("주문 완료 이벤트 발행: orderId={}, userId={}", orderId, userId); + } + + @Retry(name = "kafkaProducer", fallbackMethod = "orderStatusEventFallback") + public void sendOrderFailedEvent(Long orderId, Long userId) { + OrderEventDto event = OrderEventDto.failed(orderId, userId); + kafkaTemplate.send(orderEventsTopic, orderId.toString(), event); + log.info("주문 실패 이벤트 발행: orderId={}, userId={}", orderId, userId); + } + + public void orderEventFallback( + Long orderId, + Long userId, + Long totalAmount, + Long discountAmount, + List items, + Throwable ex + ) { + log.error("주문 생성 이벤트 발행 실패 (재시도 후): orderId={}, userId={}", + orderId, userId, ex); + } + + public void orderStatusEventFallback(Long orderId, Long userId, Throwable ex) { + log.error("주문 상태 이벤트 발행 실패 (재시도 후): orderId={}, userId={}", + orderId, userId, ex); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/PaymentEventProducer.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/PaymentEventProducer.java new file mode 100644 index 000000000..97fda0f85 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/PaymentEventProducer.java @@ -0,0 +1,106 @@ +package com.loopers.infrastructure.kafka.producer; + +import com.loopers.domain.outbox.OutboxService; +import com.loopers.infrastructure.kafka.dto.PaymentEventDto; +import io.github.resilience4j.retry.annotation.Retry; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +@Slf4j +@Component +@RequiredArgsConstructor +public class PaymentEventProducer { + + private final KafkaTemplate kafkaTemplate; + private final OutboxService outboxService; + + @Value("${kafka.topic.payment-events-name}") + private String paymentEventsTopic; + + @Retry(name = "kafkaProducer", fallbackMethod = "paymentSuccessFallback") + public void sendPaymentSuccessEvent(Long orderId, Long userId, String transactionId, Long amount) { + PaymentEventDto event = PaymentEventDto.success(orderId, userId, transactionId, amount); + kafkaTemplate.send(paymentEventsTopic, orderId.toString(), event); + log.info("결제 성공 이벤트 발행: orderId={}, transactionId={}", orderId, transactionId); + } + + @Retry(name = "kafkaProducer", fallbackMethod = "paymentFailedFallback") + public void sendPaymentFailedEvent(Long orderId, Long userId, String failureReason) { + PaymentEventDto event = PaymentEventDto.failed(orderId, userId, failureReason); + kafkaTemplate.send(paymentEventsTopic, orderId.toString(), event); + log.info("결제 실패 이벤트 발행: orderId={}, reason={}", orderId, failureReason); + } + + @Retry(name = "kafkaProducer", fallbackMethod = "paymentPendingFallback") + public void sendPaymentPendingEvent(Long orderId, Long userId, String transactionId) { + PaymentEventDto event = PaymentEventDto.pending(orderId, userId, transactionId); + kafkaTemplate.send(paymentEventsTopic, orderId.toString(), event); + log.info("결제 대기 이벤트 발행: orderId={}, transactionId={}", orderId, transactionId); + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void paymentSuccessFallback(Long orderId, Long userId, String transactionId, Long amount, Throwable ex) { + log.error("결제 성공 이벤트 발행 실패, Outbox에 저장: orderId={}", orderId, ex); + + try { + PaymentEventDto event = PaymentEventDto.success(orderId, userId, transactionId, amount); + + outboxService.saveEvent( + "PAYMENT", + orderId.toString(), + "PAYMENT_SUCCESS", + paymentEventsTopic, + orderId.toString(), + event + ); + } catch (Exception e) { + log.error("Fallback Outbox 저장 실패: orderId={}", orderId, e); + // 최후의 수단: 별도 실패 테이블에 저장하거나 알림 발송 + } + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void paymentFailedFallback(Long orderId, Long userId, String failureReason, Throwable ex) { + log.error("결제 실패 이벤트 발행 실패, Outbox에 저장: orderId={}", orderId, ex); + + try { + PaymentEventDto event = PaymentEventDto.failed(orderId, userId, failureReason); + + outboxService.saveEvent( + "PAYMENT", + orderId.toString(), + "PAYMENT_FAILED", + paymentEventsTopic, + orderId.toString(), + event + ); + } catch (Exception e) { + log.error("Fallback Outbox 저장 실패: orderId={}", orderId, e); + } + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void paymentPendingFallback(Long orderId, Long userId, String transactionId, Throwable ex) { + log.error("결제 대기 이벤트 발행 실패, Outbox에 저장: orderId={}", orderId, ex); + + try { + PaymentEventDto event = PaymentEventDto.pending(orderId, userId, transactionId); + + outboxService.saveEvent( + "PAYMENT", + orderId.toString(), + "PAYMENT_PENDING", + paymentEventsTopic, + orderId.toString(), + event + ); + } catch (Exception e) { + log.error("Fallback Outbox 저장 실패: orderId={}", orderId, e); + } + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/ProductViewedEventProducer.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/ProductViewedEventProducer.java new file mode 100644 index 000000000..8f34a3376 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/ProductViewedEventProducer.java @@ -0,0 +1,31 @@ +package com.loopers.infrastructure.kafka.producer; + +import com.loopers.infrastructure.kafka.dto.ProductViewedDto; +import io.github.resilience4j.retry.annotation.Retry; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ProductViewedEventProducer { + + private final KafkaTemplate kafkaTemplate; + + @Value("${kafka.topic.product-view-name}") + private String productViewTopic; + + @Retry(name = "kafkaProducer", fallbackMethod = "productViewedFallback") + public void sendProductViewedEvent(Long productId) { + ProductViewedDto event = ProductViewedDto.of(productId); + kafkaTemplate.send(productViewTopic, productId.toString(), event); + log.info("상품 조회 이벤트 발행: productId={}", productId); + } + + public void productViewedFallback(Long productId, Throwable ex) { + log.error("상품 조회 이벤트 발행 실패 (재시도 후): productId={}", productId, ex); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/StockChangedEventProducer.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/StockChangedEventProducer.java new file mode 100644 index 000000000..10aedec29 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/StockChangedEventProducer.java @@ -0,0 +1,33 @@ +package com.loopers.infrastructure.kafka.producer; + +import com.loopers.infrastructure.kafka.dto.StockChangedDto; +import io.github.resilience4j.retry.annotation.Retry; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class StockChangedEventProducer { + + private final KafkaTemplate kafkaTemplate; + + @Value("${kafka.topic.product-stock-name}") + private String stockChangedTopic; + + @Retry(name = "kafkaProducer", fallbackMethod = "stockChangedFallback") + public void sendStockChangedEvent(Long productId, int stock, String changedType) { + StockChangedDto event = StockChangedDto.of(productId, stock, changedType); + kafkaTemplate.send(stockChangedTopic, productId.toString(), event); + log.info("재고 변경 이벤트 발행: productId={}, stock={}, changedType={}", + productId, stock, changedType); + } + + public void stockChangedFallback(Long productId, int stock, String changedType, Throwable ex) { + log.error("재고 변경 이벤트 발행 실패 (재시도 후): productId={}, stock={}, changedType={}", + productId, stock, changedType, ex); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/UserActionEventProducer.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/UserActionEventProducer.java new file mode 100644 index 000000000..3b15dec34 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/producer/UserActionEventProducer.java @@ -0,0 +1,32 @@ +package com.loopers.infrastructure.kafka.producer; + +import com.loopers.infrastructure.kafka.dto.UserActionDto; +import io.github.resilience4j.retry.annotation.Retry; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class UserActionEventProducer { + + private final KafkaTemplate kafkaTemplate; + + @Value("${kafka.topic.user-action-name}") + private String userActionTopic; + + @Retry(name = "kafkaProducer", fallbackMethod = "userActionFallback") + public void sendUserActionEvent(Long userId, String actionType, String targetType, Long targetId) { + UserActionDto event = UserActionDto.of(userId, actionType, targetType, targetId); + kafkaTemplate.send(userActionTopic, userId.toString(), event); + log.info("유저 행동 이벤트 발행: userId={}, actionType={}, targetType={}, targetId={}", + userId, actionType, targetType, targetId); + } + + public void userActionFallback(Long userId, String actionType, String targetType, Long targetId, Throwable ex) { + log.error("유저 행동 이벤트 발행 실패 (재시도 후): userId={}, actionType={}", userId, actionType, ex); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventJpaRepository.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventJpaRepository.java new file mode 100644 index 000000000..b9938504f --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventJpaRepository.java @@ -0,0 +1,17 @@ +package com.loopers.infrastructure.outbox; + +import com.loopers.domain.outbox.OutboxEvent; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.util.List; + +public interface OutboxEventJpaRepository extends JpaRepository { + + @Query("SELECT o FROM OutboxEvent o WHERE o.status = 'PENDING' ORDER BY o.createdAt ASC LIMIT :limit") + List findPendingEvents(@Param("limit") int limit); + + @Query("SELECT o FROM OutboxEvent o WHERE o.status = 'FAILED' AND o.retryCount < :maxRetryCount ORDER BY o.createdAt ASC LIMIT :limit") + List findFailedEventsForRetry(@Param("maxRetryCount") int maxRetryCount, @Param("limit") int limit); +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventPublisher.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventPublisher.java new file mode 100644 index 000000000..d95b9c796 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventPublisher.java @@ -0,0 +1,67 @@ +package com.loopers.infrastructure.outbox; + +import com.loopers.domain.outbox.OutboxEvent; +import com.loopers.domain.outbox.OutboxEventRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; + +@Slf4j +@Component +@RequiredArgsConstructor +public class OutboxEventPublisher { + + private final OutboxEventRepository outboxEventRepository; + private final KafkaTemplate kafkaTemplate; + + private static final int BATCH_SIZE = 100; + private static final int MAX_RETRY_COUNT = 5; + + /** + * 1초마다 Pending 상태의 Outbox 이벤트를 Kafka로 발행 + */ + @Scheduled(fixedDelay = 1000) + @Transactional + public void publishPendingEvents() { + List pendingEvents = outboxEventRepository.findPendingEvents(BATCH_SIZE); + + for (OutboxEvent event : pendingEvents) { + try { + kafkaTemplate.send( + event.getTopic(), + event.getPartitionKey(), + event.getPayload() + ).get(); // 동기적으로 전송 확인 + + event.markAsProcessed(); + outboxEventRepository.save(event); + + log.debug("Outbox 이벤트 발행 성공: id={}, topic={}", event.getId(), event.getTopic()); + } catch (Exception e) { + event.markAsFailed(e.getMessage()); + outboxEventRepository.save(event); + log.error("Outbox 이벤트 발행 실패: id={}, topic={}", event.getId(), event.getTopic(), e); + } + } + } + + /** + * 5분마다 실패한 이벤트 재시도 + */ + @Scheduled(fixedDelay = 300000) + @Transactional + public void retryFailedEvents() { + List failedEvents = outboxEventRepository.findFailedEventsForRetry(MAX_RETRY_COUNT, BATCH_SIZE); + + for (OutboxEvent event : failedEvents) { + event.markForRetry(); + outboxEventRepository.save(event); + log.info("실패한 Outbox 이벤트 재시도 대기열로 이동: id={}", event.getId()); + } + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventRepositoryImpl.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventRepositoryImpl.java new file mode 100644 index 000000000..8099cbeae --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventRepositoryImpl.java @@ -0,0 +1,30 @@ +package com.loopers.infrastructure.outbox; + +import com.loopers.domain.outbox.OutboxEvent; +import com.loopers.domain.outbox.OutboxEventRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; + +import java.util.List; + +@Repository +@RequiredArgsConstructor +public class OutboxEventRepositoryImpl implements OutboxEventRepository { + + private final OutboxEventJpaRepository outboxEventJpaRepository; + + @Override + public OutboxEvent save(OutboxEvent event) { + return outboxEventJpaRepository.save(event); + } + + @Override + public List findPendingEvents(int limit) { + return outboxEventJpaRepository.findPendingEvents(limit); + } + + @Override + public List findFailedEventsForRetry(int maxRetryCount, int limit) { + return outboxEventJpaRepository.findFailedEventsForRetry(maxRetryCount, limit); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/OrderEventListener.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/OrderEventListener.java index 84baf5d1a..016b7d0ea 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/OrderEventListener.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/OrderEventListener.java @@ -2,11 +2,17 @@ import com.loopers.application.order.OrderFacade; import com.loopers.domain.order.OrderCreatedEvent; +import com.loopers.domain.outbox.OutboxService; import com.loopers.domain.user.UserActionEvent; +import com.loopers.infrastructure.kafka.dto.OrderEventDto; +import com.loopers.infrastructure.kafka.dto.StockChangedDto; +import com.loopers.infrastructure.kafka.producer.OrderEventProducer; +import com.loopers.infrastructure.kafka.producer.StockChangedEventProducer; import com.loopers.infrastructure.platform.DataPlatformSender; import com.loopers.infrastructure.platform.OrderResultMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @@ -23,8 +29,17 @@ public class OrderEventListener { private final OrderFacade orderFacade; + private final OutboxService outboxService; private final DataPlatformSender dataPlatformSender; private final ApplicationEventPublisher eventPublisher; + private final OrderEventProducer orderEventProducer; + private final StockChangedEventProducer stockChangedEventProducer; + + @Value("${kafka.topic.order-events-name}") + private String orderEventsTopic; + + @Value("${kafka.topic.product-stock-name}") + private String productStockTopic; /** * 주문 생성 후 쿠폰 사용 처리 @@ -78,6 +93,36 @@ public void handleDataPlatformSend(OrderCreatedEvent event) { } } + /** + * 주문 생성 후 Kafka 이벤트 발행 + */ + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void handleKafkaEventPublish(OrderCreatedEvent event) { + log.info("주문 생성 Outbox 이벤트 저장: orderId={}", event.orderId()); + + List items = event.items().stream() + .map(item -> new OrderEventDto.OrderItemDto( + item.productId(), item.quantity(), item.unitPrice())) + .toList(); + + OrderEventDto dto = OrderEventDto.created( + event.orderId(), + event.userId(), + event.totalAmount(), + event.discountAmount(), + items + ); + + outboxService.saveEvent( + "ORDER", + event.orderId().toString(), + "ORDER_CREATED", + orderEventsTopic, + event.orderId().toString(), + dto + ); + } + /** * 주문 생성 후 유저 행동 로깅 이벤트 발행 */ @@ -90,4 +135,26 @@ public void handleUserActionLogging(OrderCreatedEvent event) { UserActionEvent.orderCreate(event.userId(), event.orderId(), event.totalAmount()) ); } + + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void handleStockChangedOutboxEvent(OrderCreatedEvent event) { + log.info("재고 변경 Outbox 이벤트 저장: orderId={}", event.orderId()); + + for (OrderCreatedEvent.OrderItemInfo item : event.items()) { + StockChangedDto dto = StockChangedDto.of( + item.productId(), + item.quantity(), + "DECREASED" + ); + + outboxService.saveEvent( + "PRODUCT", + item.productId().toString(), + "STOCK_DECREASED", + productStockTopic, + item.productId().toString(), + dto + ); + } + } } diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/PaymentEventListener.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/PaymentEventListener.java index e66d6ca81..78e6976c5 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/PaymentEventListener.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/PaymentEventListener.java @@ -4,6 +4,8 @@ import com.loopers.domain.payment.event.PaymentFailedEvent; import com.loopers.domain.payment.event.PaymentSucceededEvent; import com.loopers.domain.user.UserActionEvent; +import com.loopers.infrastructure.kafka.producer.OrderEventProducer; +import com.loopers.infrastructure.kafka.producer.PaymentEventProducer; import com.loopers.infrastructure.platform.DataPlatformSender; import com.loopers.infrastructure.platform.OrderResultMessage; import com.loopers.infrastructure.platform.PaymentResultMessage; @@ -25,6 +27,8 @@ public class PaymentEventListener { private final OrderFacade orderFacade; private final DataPlatformSender dataPlatformSender; private final ApplicationEventPublisher eventPublisher; + private final PaymentEventProducer paymentEventProducer; + private final OrderEventProducer orderEventProducer; /** * 결제 성공 시 주문 완료 처리 + 데이터 플랫폼 전송 @@ -50,6 +54,30 @@ public void handleOrderCompletion(PaymentSucceededEvent event) { } } + /** + * 결제 성공 시 Kafka 이벤트 발행 + */ + @Async + @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) + public void handlePaymentSuccessKafkaEvent(PaymentSucceededEvent event) { + log.info("결제 성공 Kafka 이벤트 발행: orderId={}, paymentId={}", event.orderId(), event.paymentId()); + + try { + // Payment 이벤트 + paymentEventProducer.sendPaymentSuccessEvent( + event.orderId(), + event.userId(), + event.transactionId(), + event.amount() + ); + + // Order 완료 이벤트 + orderEventProducer.sendOrderCompletedEvent(event.orderId(), event.userId()); + } catch (Exception e) { + log.error("결제 성공 Kafka 이벤트 발행 실패: orderId={}", event.orderId(), e); + } + } + /** * 결제 성공 시 결제 데이터 플랫폼 전송 */ @@ -110,6 +138,29 @@ public void handlePaymentFailedCompensation(PaymentFailedEvent event) { } } + /** + * 결제 실패 시 Kafka 이벤트 발행 + */ + @Async + @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) + public void handlePaymentFailedKafkaEvent(PaymentFailedEvent event) { + log.info("결제 실패 Kafka 이벤트 발행: orderId={}", event.orderId()); + + try { + // Payment 실패 이벤트 + paymentEventProducer.sendPaymentFailedEvent( + event.orderId(), + event.userId(), + event.reason() + ); + + // Order 실패 이벤트 + orderEventProducer.sendOrderFailedEvent(event.orderId(), event.userId()); + } catch (Exception e) { + log.error("결제 실패 Kafka 이벤트 발행 실패: orderId={}", event.orderId(), e); + } + } + /** * 결제 실패 시 결제 데이터 플랫폼 전송 */ diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/ProductLikeEventListener.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/ProductLikeEventListener.java new file mode 100644 index 000000000..851c6fb26 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/ProductLikeEventListener.java @@ -0,0 +1,47 @@ +package com.loopers.interfaces.api.listener; + +import com.loopers.domain.like.LikeEvent; +import com.loopers.domain.outbox.OutboxService; +import com.loopers.infrastructure.kafka.dto.LikeChangedDto; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ProductLikeEventListener { + + private final OutboxService outboxService; + + @Value("${kafka.topic.product-like-name}") + private String productLikeTopic; + + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void handleLikeChangedOutboxEvent(LikeEvent event) { + log.debug("좋아요 Outbox 이벤트 저장: productId={}, action={}", + event.productId(), event.action()); + + String likeType = switch (event.action()) { + case ADDED -> "LIKED"; + case REMOVED -> "UNLIKED"; + }; + + LikeChangedDto payload = LikeChangedDto.of( + event.productId(), + likeType + ); + + outboxService.saveEvent( + "PRODUCT", + event.productId().toString(), + "LIKE_CHANGED", + productLikeTopic, + event.productId().toString(), + payload + ); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/UserActionEventListener.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/UserActionEventListener.java index 067504159..a5e495207 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/UserActionEventListener.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/listener/UserActionEventListener.java @@ -1,6 +1,8 @@ package com.loopers.interfaces.api.listener; import com.loopers.domain.user.UserActionEvent; +import com.loopers.infrastructure.kafka.producer.ProductViewedEventProducer; +import com.loopers.infrastructure.kafka.producer.UserActionEventProducer; import com.loopers.infrastructure.platform.DataPlatformSender; import com.loopers.infrastructure.platform.UserActionMessage; import lombok.RequiredArgsConstructor; @@ -15,15 +17,16 @@ public class UserActionEventListener { private final DataPlatformSender dataPlatformSender; + private final UserActionEventProducer userActionEventProducer; + private final ProductViewedEventProducer productViewedEventProducer; /** - * 유저 행동 이벤트 → 데이터 플랫폼 전송 - * - 모든 유저 행동 로깅을 여기서 통합 처리 + * 유저 행동 이벤트 → 데이터 플랫폼 전송 + Kafka 이벤트 발행 */ @Async @EventListener public void handleUserAction(UserActionEvent event) { - log.info("[UserAction] type={}, loginId={}, target={}:{}, metadata={}", + log.info("[UserAction] type={}, userId={}, target={}:{}, metadata={}", event.actionType(), event.userId(), event.targetType(), @@ -31,21 +34,45 @@ public void handleUserAction(UserActionEvent event) { event.metadata()); try { - if (isLikeAction(event)) { + if (isLikeOrViewAction(event)) { UserActionMessage message = convertToUserActionMessage(event); dataPlatformSender.sendUserAction(message); - } else { - // 주문/결제 등은 별도 메시지 타입으로 처리되므로 로깅만 - log.info("[DataPlatform] UserAction logged: type={}, loginId={}, targetId={}", - event.actionType(), event.userId(), event.targetId()); } } catch (Exception e) { - log.error("유저 행동 데이터 플랫폼 전송 실패: loginId={}, action={}, reason={}", - event.userId(), event.actionType(), e.getMessage()); + log.error("DataPlatform 전송 실패: userId={}, actionType={}", + event.userId(), event.actionType(), e); + // DataPlatform 실패는 무시하고 계속 진행 + } + + try { + publishKafkaEvent(event); + } catch (Exception e) { + log.error("Kafka 이벤트 발행 실패: userId={}, actionType={}", + event.userId(), event.actionType(), e); + } + } + + private void publishKafkaEvent(UserActionEvent event) { + try { + // 상품 조회 이벤트는 별도 Producer로 발행 + if (event.actionType() == UserActionEvent.ActionType.PRODUCT_VIEW) { + productViewedEventProducer.sendProductViewedEvent(event.targetId()); + } + + // 모든 유저 행동은 UserAction 토픽으로 발행 + userActionEventProducer.sendUserActionEvent( + event.userId(), + event.actionType().name(), + event.targetType(), + event.targetId() + ); + } catch (Exception e) { + log.error("Kafka 이벤트 발행 실패: userId={}, actionType={}", + event.userId(), event.actionType(), e); } } - private boolean isLikeAction(UserActionEvent event) { + private boolean isLikeOrViewAction(UserActionEvent event) { return event.actionType() == UserActionEvent.ActionType.PRODUCT_LIKE || event.actionType() == UserActionEvent.ActionType.PRODUCT_UNLIKE || event.actionType() == UserActionEvent.ActionType.PRODUCT_VIEW; diff --git a/apps/commerce-api/src/main/resources/application.yml b/apps/commerce-api/src/main/resources/application.yml index 65a807b99..6de93231e 100644 --- a/apps/commerce-api/src/main/resources/application.yml +++ b/apps/commerce-api/src/main/resources/application.yml @@ -22,6 +22,7 @@ spring: import: - jpa.yml - redis.yml + - kafka.yml - logging.yml - monitoring.yml @@ -34,7 +35,14 @@ cache: version: product: v1 - +kafka: + topic: + user-action-name: user-action-events + product-like-name: product-like-metrics + product-stock-name: product-stock-metrics + product-view-name: product-view-metrics + order-events-name: order-events + payment-events-name: payment-events resilience4j: circuitbreaker: @@ -63,6 +71,13 @@ resilience4j: - com.loopers.support.error.CoreException exponential-backoff-multiplier: 2 enable-exponential-backoff: true + kafkaProducer: + max-attempts: 3 + wait-duration: 1s + retry-exceptions: + - org.apache.kafka.common.errors.TimeoutException + - org.apache.kafka.common.errors.NetworkException + fail-after-max-attempts: true feign: client: diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/auditlog/AuditLogCommand.java b/apps/commerce-streamer/src/main/java/com/loopers/application/auditlog/AuditLogCommand.java new file mode 100644 index 000000000..77de087bc --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/auditlog/AuditLogCommand.java @@ -0,0 +1,25 @@ +package com.loopers.application.auditlog; + +public record AuditLogCommand( + String eventId, + Long userId, + String actionType, + String targetType, + Long targetId, + String payload +) { + public static AuditLogCommand of(String payload, String eventType) { + return new AuditLogCommand( + null, + null, + eventType, + null, + null, + payload + ); + } + + public String eventType() { + return actionType; + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/auditlog/AuditLogFacade.java b/apps/commerce-streamer/src/main/java/com/loopers/application/auditlog/AuditLogFacade.java new file mode 100644 index 000000000..1bfa65f57 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/auditlog/AuditLogFacade.java @@ -0,0 +1,36 @@ +package com.loopers.application.auditlog; + +import com.loopers.domain.auditlog.AuditLogService; +import com.loopers.domain.eventhandled.EventHandledDomainType; +import com.loopers.domain.eventhandled.EventHandledService; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +@Component +@RequiredArgsConstructor +public class AuditLogFacade { + + private final AuditLogService auditLogService; + private final EventHandledService eventHandledService; + + private static final EventHandledDomainType DOMAIN_TYPE = EventHandledDomainType.AUDIT_LOG; + + @Transactional + public void processAuditLog(AuditLogCommand command) { + if (eventHandledService.isEventHandled(command.eventId(), DOMAIN_TYPE)) { + return; + } + + auditLogService.saveAuditLog( + command.eventId(), + command.userId(), + command.actionType(), + command.targetType(), + command.targetId(), + command.payload() + ); + + eventHandledService.saveEventHandled(command.eventId(), DOMAIN_TYPE, command.eventType()); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/MetricsType.java b/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/MetricsType.java new file mode 100644 index 000000000..227c86614 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/MetricsType.java @@ -0,0 +1,7 @@ +package com.loopers.application.metrics; + +public enum MetricsType { + LIKE, + STOCK, + VIEW +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/ProductMetricsCommand.java b/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/ProductMetricsCommand.java new file mode 100644 index 000000000..b89605b92 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/ProductMetricsCommand.java @@ -0,0 +1,47 @@ +package com.loopers.application.metrics; + +import com.loopers.interfaces.dto.ProductLikePayload; +import com.loopers.interfaces.dto.ProductStockPayload; +import com.loopers.interfaces.dto.ProductViewPayload; + +public record ProductMetricsCommand( + String eventId, + Long productId, + MetricsType metricsType, + String likeType, + Integer stock, + String changedType +) { + public static ProductMetricsCommand from(ProductLikePayload payload) { + return new ProductMetricsCommand( + payload.eventId(), + payload.productId(), + MetricsType.LIKE, + payload.likeType(), + null, + null + ); + } + + public static ProductMetricsCommand from(ProductStockPayload payload) { + return new ProductMetricsCommand( + payload.eventId(), + payload.productId(), + MetricsType.STOCK, + null, + payload.stock(), + payload.changedType() + ); + } + + public static ProductMetricsCommand from(ProductViewPayload payload) { + return new ProductMetricsCommand( + payload.eventId(), + payload.productId(), + MetricsType.VIEW, + null, + null, + null + ); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/ProductMetricsFacade.java b/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/ProductMetricsFacade.java new file mode 100644 index 000000000..726bab15a --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/metrics/ProductMetricsFacade.java @@ -0,0 +1,96 @@ +package com.loopers.application.metrics; + +import com.loopers.domain.eventhandled.EventHandledDomainType; +import com.loopers.domain.eventhandled.EventHandledService; +import com.loopers.domain.metrics.ProductMetricsService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.Clock; +import java.time.LocalDate; +import java.util.Set; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ProductMetricsFacade { + + private final ProductMetricsService productMetricsService; + private final EventHandledService eventHandledService; + private final RedisTemplate redisTemplate; + private final Clock clock; + + private static final EventHandledDomainType DOMAIN_TYPE = EventHandledDomainType.METRICS; + private static final String PRODUCT_CACHE_KEY_PATTERN = "product:*:detail:%d"; + private static final String PRODUCT_LIST_CACHE_KEY_PATTERN = "product:*:list:*"; + + public LocalDate today() { + return LocalDate.now(clock); + } + + @Transactional + public void processLikeMetrics(ProductMetricsCommand command) { + if (eventHandledService.isEventHandled(command.eventId(), DOMAIN_TYPE)) { + return; + } + + LocalDate date = today(); + productMetricsService.processLikeMetrics(command.productId(), command.likeType(), date); + eventHandledService.saveEventHandled(command.eventId(), DOMAIN_TYPE, command.metricsType().toString()); + } + + @Transactional + public void processStockMetrics(ProductMetricsCommand command) { + if (eventHandledService.isEventHandled(command.eventId())) { + return; + } + + LocalDate date = today(); + productMetricsService.processStockMetrics(command.productId(), command.stock(), command.changedType(), date); + eventHandledService.saveEventHandled(command.eventId(), DOMAIN_TYPE, command.metricsType().toString()); + + // 재고 변경 시 캐시 무효화 + invalidateProductCache(command.productId()); + } + + @Transactional + public void processViewMetrics(ProductMetricsCommand command) { + if (eventHandledService.isEventHandled(command.eventId())) { + return; + } + + LocalDate date = today(); + productMetricsService.processViewMetrics(command.productId(), date); + eventHandledService.saveEventHandled(command.eventId(), DOMAIN_TYPE, command.metricsType().toString()); + } + + /** + * 상품 캐시 무효화 + * - 상품 상세 캐시 삭제 + * - 상품 목록 캐시 삭제 (해당 상품이 포함된 목록 캐시) + */ + private void invalidateProductCache(Long productId) { + try { + // 상품 상세 캐시 삭제 (버전별로 삭제) + String detailKeyPattern = String.format("product:*:detail:%d", productId); + Set detailKeys = redisTemplate.keys(detailKeyPattern); + if (detailKeys != null && !detailKeys.isEmpty()) { + redisTemplate.delete(detailKeys); + log.info("상품 상세 캐시 무효화 완료: productId={}, keys={}", productId, detailKeys.size()); + } + + // 상품 목록 캐시 삭제 + Set listKeys = redisTemplate.keys("product:*:list:*"); + if (listKeys != null && !listKeys.isEmpty()) { + redisTemplate.delete(listKeys); + log.info("상품 목록 캐시 무효화 완료: productId={}, keys={}", productId, listKeys.size()); + } + } catch (Exception e) { + log.error("캐시 무효화 실패: productId={}", productId, e); + // 캐시 무효화 실패는 비즈니스 로직에 영향을 주지 않도록 예외 전파하지 않음 + } + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/auditlog/AuditLog.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/auditlog/AuditLog.java new file mode 100644 index 000000000..db519d8dc --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/auditlog/AuditLog.java @@ -0,0 +1,59 @@ +package com.loopers.domain.auditlog; + +import jakarta.persistence.*; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Entity +@Getter +@Table(name = "audit_log") +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class AuditLog { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(name = "event_id", unique = true) + private String eventId; + + @Column(name = "user_id") + private Long userId; + + @Column(name = "action_type") + private String actionType; + + @Column(name = "target_type") + private String targetType; + + @Column(name = "target_id") + private Long targetId; + + @Column(name = "payload", columnDefinition = "TEXT") + private String payload; + + @Column(name = "created_at") + private LocalDateTime createdAt; + + public static AuditLog create( + String eventId, + Long userId, + String actionType, + String targetType, + Long targetId, + String payload + ) { + AuditLog auditLog = new AuditLog(); + auditLog.eventId = eventId; + auditLog.userId = userId; + auditLog.actionType = actionType; + auditLog.targetType = targetType; + auditLog.targetId = targetId; + auditLog.payload = payload; + auditLog.createdAt = LocalDateTime.now(); + return auditLog; + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/auditlog/AuditLogRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/auditlog/AuditLogRepository.java new file mode 100644 index 000000000..9d91bcd88 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/auditlog/AuditLogRepository.java @@ -0,0 +1,6 @@ +package com.loopers.domain.auditlog; + +public interface AuditLogRepository { + + AuditLog save(AuditLog auditLog); +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/auditlog/AuditLogService.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/auditlog/AuditLogService.java new file mode 100644 index 000000000..310f55c69 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/auditlog/AuditLogService.java @@ -0,0 +1,25 @@ +package com.loopers.domain.auditlog; + +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +@Service +@RequiredArgsConstructor +public class AuditLogService { + + private final AuditLogRepository auditLogRepository; + + @Transactional + public void saveAuditLog( + String eventId, + Long userId, + String actionType, + String targetType, + Long targetId, + String payload + ) { + AuditLog auditLog = AuditLog.create(eventId, userId, actionType, targetType, targetId, payload); + auditLogRepository.save(auditLog); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessage.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessage.java new file mode 100644 index 000000000..41328b17b --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessage.java @@ -0,0 +1,96 @@ +package com.loopers.domain.dlq; + +import jakarta.persistence.*; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; +import java.util.UUID; + +@Entity +@Getter +@Table(name = "dlq_message") +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class DlqMessage { + + @Id + private String id; + + @Column(name = "original_topic", nullable = false) + private String originalTopic; + + @Column(name = "partition_num") + private Integer partitionNum; + + @Column(name = "offset_num") + private Long offsetNum; + + @Column(name = "message_key") + private String messageKey; + + @Column(name = "payload", columnDefinition = "TEXT") + private String payload; + + @Column(name = "error_message", columnDefinition = "TEXT") + private String errorMessage; + + @Column(name = "created_at", nullable = false) + private LocalDateTime createdAt; + + @Column(name = "processed_at") + private LocalDateTime processedAt; + + @Column(name = "retry_count") + private int retryCount; + + @Enumerated(EnumType.STRING) + @Column(name = "status", nullable = false) + private DlqStatus status; + + public static DlqMessage create( + String originalTopic, + Integer partitionNum, + Long offsetNum, + String messageKey, + String payload, + String errorMessage + ) { + DlqMessage dlqMessage = new DlqMessage(); + dlqMessage.id = UUID.randomUUID().toString(); + dlqMessage.originalTopic = originalTopic; + dlqMessage.partitionNum = partitionNum; + dlqMessage.offsetNum = offsetNum; + dlqMessage.messageKey = messageKey; + dlqMessage.payload = payload; + dlqMessage.errorMessage = errorMessage; + dlqMessage.createdAt = LocalDateTime.now(); + dlqMessage.retryCount = 0; + dlqMessage.status = DlqStatus.PENDING; + return dlqMessage; + } + + public void incrementRetryCount() { + this.retryCount++; + } + + public void markAsResolved() { + this.status = DlqStatus.RESOLVED; + this.processedAt = LocalDateTime.now(); + } + + public void markAsAbandoned() { + this.status = DlqStatus.ABANDONED; + this.processedAt = LocalDateTime.now(); + } + + public boolean canRetry(int maxRetryCount) { + return this.retryCount < maxRetryCount && this.status == DlqStatus.PENDING; + } + + public enum DlqStatus { + PENDING, + RESOLVED, + ABANDONED + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessageRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessageRepository.java new file mode 100644 index 000000000..ecd500f14 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessageRepository.java @@ -0,0 +1,19 @@ +package com.loopers.domain.dlq; + +import java.util.List; +import java.util.Optional; + +public interface DlqMessageRepository { + + DlqMessage save(DlqMessage dlqMessage); + + Optional findById(String id); + + List findByStatus(DlqMessage.DlqStatus status); + + List findPendingMessagesForRetry(int maxRetryCount, int limit); + + List findAll(); + + long countByStatus(DlqMessage.DlqStatus status); +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessageService.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessageService.java new file mode 100644 index 000000000..edf27aeaa --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessageService.java @@ -0,0 +1,81 @@ +package com.loopers.domain.dlq; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; + +@Slf4j +@Service +@RequiredArgsConstructor +public class DlqMessageService { + + private final DlqMessageRepository dlqMessageRepository; + + private static final int MAX_RETRY_COUNT = 5; + + @Transactional + public DlqMessage saveDlqMessage( + String originalTopic, + Integer partitionNum, + Long offsetNum, + String messageKey, + String payload, + String errorMessage + ) { + DlqMessage dlqMessage = DlqMessage.create( + originalTopic, + partitionNum, + offsetNum, + messageKey, + payload, + errorMessage + ); + DlqMessage saved = dlqMessageRepository.save(dlqMessage); + log.info("DLQ 메시지 저장: id={}, originalTopic={}", saved.getId(), originalTopic); + return saved; + } + + @Transactional(readOnly = true) + public List getPendingMessages() { + return dlqMessageRepository.findByStatus(DlqMessage.DlqStatus.PENDING); + } + + @Transactional(readOnly = true) + public List getMessagesForRetry(int limit) { + return dlqMessageRepository.findPendingMessagesForRetry(MAX_RETRY_COUNT, limit); + } + + @Transactional + public void markAsResolved(String id) { + dlqMessageRepository.findById(id).ifPresent(message -> { + message.markAsResolved(); + dlqMessageRepository.save(message); + log.info("DLQ 메시지 해결 완료: id={}", id); + }); + } + + @Transactional + public void markAsAbandoned(String id) { + dlqMessageRepository.findById(id).ifPresent(message -> { + message.markAsAbandoned(); + dlqMessageRepository.save(message); + log.warn("DLQ 메시지 포기 처리: id={}", id); + }); + } + + @Transactional + public void incrementRetryCount(String id) { + dlqMessageRepository.findById(id).ifPresent(message -> { + message.incrementRetryCount(); + dlqMessageRepository.save(message); + }); + } + + @Transactional(readOnly = true) + public long countPendingMessages() { + return dlqMessageRepository.countByStatus(DlqMessage.DlqStatus.PENDING); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandled.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandled.java new file mode 100644 index 000000000..b668c59c7 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandled.java @@ -0,0 +1,42 @@ +package com.loopers.domain.eventhandled; + +import jakarta.persistence.*; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Entity +@Getter +@Table( + name = "event_handled", + uniqueConstraints = { + @UniqueConstraint(columnNames = {"event_id", "domain_type"}) + } +) +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class EventHandled { + + @Id + private String eventId; + + @Column(name = "domain_type") + @Enumerated(EnumType.STRING) + private EventHandledDomainType domainType; + + @Column(name = "event_type") + private String eventType; + + @Column(name = "processed_at") + private LocalDateTime processedAt; + + public static EventHandled create(String eventId, EventHandledDomainType domainType, String eventType) { + EventHandled eventHandled = new EventHandled(); + eventHandled.eventId = eventId; + eventHandled.domainType = domainType; + eventHandled.eventType = eventType; + eventHandled.processedAt = LocalDateTime.now(); + return eventHandled; + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandledDomainType.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandledDomainType.java new file mode 100644 index 000000000..57ead51fd --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandledDomainType.java @@ -0,0 +1,6 @@ +package com.loopers.domain.eventhandled; + +public enum EventHandledDomainType { + METRICS, + AUDIT_LOG +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandledRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandledRepository.java new file mode 100644 index 000000000..572b617e8 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandledRepository.java @@ -0,0 +1,10 @@ +package com.loopers.domain.eventhandled; + +public interface EventHandledRepository { + + boolean existsByEventId(String eventId); + + boolean existsByEventIdAndDomainType(String eventId, EventHandledDomainType domainType); + + EventHandled save(EventHandled eventHandled); +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandledService.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandledService.java new file mode 100644 index 000000000..02fed7dff --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandledService.java @@ -0,0 +1,28 @@ +package com.loopers.domain.eventhandled; + +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +@Service +@RequiredArgsConstructor +public class EventHandledService { + + private final EventHandledRepository eventHandledRepository; + + @Transactional(readOnly = true) + public boolean isEventHandled(String eventId) { + return eventHandledRepository.existsByEventId(eventId); + } + + @Transactional(readOnly = true) + public boolean isEventHandled(String eventId, EventHandledDomainType domainType) { + return eventHandledRepository.existsByEventIdAndDomainType(eventId, domainType); + } + + @Transactional + public void saveEventHandled(String eventId, EventHandledDomainType domainType, String eventType) { + EventHandled eventHandled = EventHandled.create(eventId, domainType, eventType); + eventHandledRepository.save(eventHandled); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetrics.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetrics.java new file mode 100644 index 000000000..6e24c27ce --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetrics.java @@ -0,0 +1,56 @@ +package com.loopers.domain.metrics; + +import jakarta.persistence.*; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.time.LocalDate; + +@Entity +@Getter +@Table(name = "product_metrics") +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class ProductMetrics { + + @EmbeddedId + private ProductMetricsId id; + + @Column(name = "likes_delta") + private int likesDelta; + + @Column(name = "sales_delta") + private int salesDelta; + + @Column(name = "views_delta") + private int viewsDelta; + + public static ProductMetrics create(Long productId, LocalDate date) { + ProductMetrics productMetrics = new ProductMetrics(); + productMetrics.id = ProductMetricsId.create(productId, date); + productMetrics.likesDelta = 0; + productMetrics.salesDelta = 0; + productMetrics.viewsDelta = 0; + return productMetrics; + } + + public void incrementLikes() { + this.likesDelta++; + } + + public void decrementLikes() { + this.likesDelta--; + } + + public void incrementSales(int quantity) { + this.salesDelta += quantity; + } + + public void decrementSales(int quantity) { + this.salesDelta -= quantity; + } + + public void incrementViews() { + this.viewsDelta++; + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetricsId.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetricsId.java new file mode 100644 index 000000000..de268d0a9 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetricsId.java @@ -0,0 +1,33 @@ +package com.loopers.domain.metrics; + +import jakarta.persistence.Column; +import jakarta.persistence.Embeddable; +import lombok.AccessLevel; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.time.LocalDate; + +@Embeddable +@Getter +@EqualsAndHashCode +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class ProductMetricsId implements Serializable { + + @Column(name = "product_id") + private Long productId; + + @Column(name = "metrics_date") + private LocalDate metricsDate; + + private ProductMetricsId(Long productId, LocalDate metricsDate) { + this.productId = productId; + this.metricsDate = metricsDate; + } + + public static ProductMetricsId create(Long productId, LocalDate date) { + return new ProductMetricsId(productId, date); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetricsRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetricsRepository.java new file mode 100644 index 000000000..631eaeaad --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetricsRepository.java @@ -0,0 +1,11 @@ +package com.loopers.domain.metrics; + +import java.time.LocalDate; +import java.util.Optional; + +public interface ProductMetricsRepository { + + Optional findByProductIdAndDate(Long productId, LocalDate date); + + ProductMetrics save(ProductMetrics productMetrics); +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetricsService.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetricsService.java new file mode 100644 index 000000000..d568cccef --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetricsService.java @@ -0,0 +1,52 @@ +package com.loopers.domain.metrics; + +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDate; + +@Service +@RequiredArgsConstructor +public class ProductMetricsService { + + private final ProductMetricsRepository productMetricsRepository; + + @Transactional + public void processLikeMetrics(Long productId, String likeType, LocalDate date) { + ProductMetrics metrics = getOrCreateMetrics(productId, date); + + if ("LIKED".equals(likeType)) { + metrics.incrementLikes(); + } else if ("UNLIKED".equals(likeType)) { + metrics.decrementLikes(); + } + + productMetricsRepository.save(metrics); + } + + @Transactional + public void processStockMetrics(Long productId, int stock, String changedType, LocalDate date) { + ProductMetrics metrics = getOrCreateMetrics(productId, date); + + if ("DECREASED".equals(changedType)) { + metrics.incrementSales(stock); + } else if ("RESTORED".equals(changedType)) { + metrics.decrementSales(stock); + } + + productMetricsRepository.save(metrics); + } + + @Transactional + public void processViewMetrics(Long productId, LocalDate date) { + ProductMetrics metrics = getOrCreateMetrics(productId, date); + metrics.incrementViews(); + productMetricsRepository.save(metrics); + } + + private ProductMetrics getOrCreateMetrics(Long productId, LocalDate date) { + return productMetricsRepository.findByProductIdAndDate(productId, date) + .orElseGet(() -> ProductMetrics.create(productId, date)); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/auditlog/AuditLogJpaRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/auditlog/AuditLogJpaRepository.java new file mode 100644 index 000000000..2f736717d --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/auditlog/AuditLogJpaRepository.java @@ -0,0 +1,7 @@ +package com.loopers.infrastructure.auditlog; + +import com.loopers.domain.auditlog.AuditLog; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface AuditLogJpaRepository extends JpaRepository { +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/auditlog/AuditLogRepositoryImpl.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/auditlog/AuditLogRepositoryImpl.java new file mode 100644 index 000000000..d9d6e0c9c --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/auditlog/AuditLogRepositoryImpl.java @@ -0,0 +1,18 @@ +package com.loopers.infrastructure.auditlog; + +import com.loopers.domain.auditlog.AuditLog; +import com.loopers.domain.auditlog.AuditLogRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; + +@Repository +@RequiredArgsConstructor +public class AuditLogRepositoryImpl implements AuditLogRepository { + + private final AuditLogJpaRepository auditLogJpaRepository; + + @Override + public AuditLog save(AuditLog auditLog) { + return auditLogJpaRepository.save(auditLog); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/dlq/DlqMessageJpaRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/dlq/DlqMessageJpaRepository.java new file mode 100644 index 000000000..3ca3e57ac --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/dlq/DlqMessageJpaRepository.java @@ -0,0 +1,21 @@ +package com.loopers.infrastructure.dlq; + +import com.loopers.domain.dlq.DlqMessage; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.util.List; + +public interface DlqMessageJpaRepository extends JpaRepository { + + List findByStatus(DlqMessage.DlqStatus status); + + @Query("SELECT d FROM DlqMessage d WHERE d.status = :status AND d.retryCount < :maxRetryCount ORDER BY d.createdAt ASC") + List findPendingMessagesForRetry(@Param("status") DlqMessage.DlqStatus status, + @Param("maxRetryCount") int maxRetryCount, + Pageable pageable); + + long countByStatus(DlqMessage.DlqStatus status); +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/dlq/DlqMessageRepositoryImpl.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/dlq/DlqMessageRepositoryImpl.java new file mode 100644 index 000000000..d94838b88 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/dlq/DlqMessageRepositoryImpl.java @@ -0,0 +1,47 @@ +package com.loopers.infrastructure.dlq; + +import com.loopers.domain.dlq.DlqMessage; +import com.loopers.domain.dlq.DlqMessageRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.data.domain.PageRequest; +import org.springframework.stereotype.Repository; + +import java.util.List; +import java.util.Optional; + +@Repository +@RequiredArgsConstructor +public class DlqMessageRepositoryImpl implements DlqMessageRepository { + + private final DlqMessageJpaRepository dlqMessageJpaRepository; + + @Override + public DlqMessage save(DlqMessage dlqMessage) { + return dlqMessageJpaRepository.save(dlqMessage); + } + + @Override + public Optional findById(String id) { + return dlqMessageJpaRepository.findById(id); + } + + @Override + public List findByStatus(DlqMessage.DlqStatus status) { + return dlqMessageJpaRepository.findByStatus(status); + } + + @Override + public List findPendingMessagesForRetry(int maxRetryCount, int limit) { + return dlqMessageJpaRepository.findPendingMessagesForRetry(DlqMessage.DlqStatus.PENDING, maxRetryCount, PageRequest.of(0, 100)); + } + + @Override + public List findAll() { + return dlqMessageJpaRepository.findAll(); + } + + @Override + public long countByStatus(DlqMessage.DlqStatus status) { + return dlqMessageJpaRepository.countByStatus(status); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/eventhandled/EventHandledJpaRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/eventhandled/EventHandledJpaRepository.java new file mode 100644 index 000000000..7a93d420c --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/eventhandled/EventHandledJpaRepository.java @@ -0,0 +1,12 @@ +package com.loopers.infrastructure.eventhandled; + +import com.loopers.domain.eventhandled.EventHandled; +import com.loopers.domain.eventhandled.EventHandledDomainType; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface EventHandledJpaRepository extends JpaRepository { + + boolean existsByEventId(String eventId); + + boolean existsByEventIdAndDomainType(String eventId, EventHandledDomainType domainType); +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/eventhandled/EventHandledRepositoryImpl.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/eventhandled/EventHandledRepositoryImpl.java new file mode 100644 index 000000000..4500c36c3 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/eventhandled/EventHandledRepositoryImpl.java @@ -0,0 +1,29 @@ +package com.loopers.infrastructure.eventhandled; + +import com.loopers.domain.eventhandled.EventHandled; +import com.loopers.domain.eventhandled.EventHandledDomainType; +import com.loopers.domain.eventhandled.EventHandledRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; + +@Repository +@RequiredArgsConstructor +public class EventHandledRepositoryImpl implements EventHandledRepository { + + private final EventHandledJpaRepository eventHandledJpaRepository; + + @Override + public boolean existsByEventId(String eventId) { + return eventHandledJpaRepository.existsByEventId(eventId); + } + + @Override + public boolean existsByEventIdAndDomainType(String eventId, EventHandledDomainType domainType) { + return eventHandledJpaRepository.existsByEventIdAndDomainType(eventId, domainType); + } + + @Override + public EventHandled save(EventHandled eventHandled) { + return eventHandledJpaRepository.save(eventHandled); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsJpaRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsJpaRepository.java new file mode 100644 index 000000000..3d4c385ae --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsJpaRepository.java @@ -0,0 +1,16 @@ +package com.loopers.infrastructure.metrics; + +import com.loopers.domain.metrics.ProductMetrics; +import com.loopers.domain.metrics.ProductMetricsId; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.time.LocalDate; +import java.util.Optional; + +public interface ProductMetricsJpaRepository extends JpaRepository { + + @Query("SELECT pm FROM ProductMetrics pm WHERE pm.id.productId = :productId AND pm.id.metricsDate = :date") + Optional findByProductIdAndDate(@Param("productId") Long productId, @Param("date") LocalDate date); +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsRepositoryImpl.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsRepositoryImpl.java new file mode 100644 index 000000000..b4ec46a99 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsRepositoryImpl.java @@ -0,0 +1,26 @@ +package com.loopers.infrastructure.metrics; + +import com.loopers.domain.metrics.ProductMetrics; +import com.loopers.domain.metrics.ProductMetricsRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; + +import java.time.LocalDate; +import java.util.Optional; + +@Repository +@RequiredArgsConstructor +public class ProductMetricsRepositoryImpl implements ProductMetricsRepository { + + private final ProductMetricsJpaRepository productMetricsJpaRepository; + + @Override + public Optional findByProductIdAndDate(Long productId, LocalDate date) { + return productMetricsJpaRepository.findByProductIdAndDate(productId, date); + } + + @Override + public ProductMetrics save(ProductMetrics productMetrics) { + return productMetricsJpaRepository.save(productMetrics); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/AuditLogConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/AuditLogConsumer.java new file mode 100644 index 000000000..f3323c537 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/AuditLogConsumer.java @@ -0,0 +1,78 @@ +package com.loopers.interfaces.consumer; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.loopers.application.auditlog.AuditLogCommand; +import com.loopers.application.auditlog.AuditLogFacade; +import com.loopers.confg.kafka.KafkaConfig; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +@Slf4j +@Component +@RequiredArgsConstructor +public class AuditLogConsumer { + + private final AuditLogFacade auditLogFacade; + private final ObjectMapper objectMapper; + + @KafkaListener( + topics = "${kafka.topic.user-action-name}", + groupId = "${kafka.consumer.audit-log-group}", + containerFactory = KafkaConfig.BATCH_LISTENER + ) + public void listen( + List> records, + Acknowledgment acknowledgment + ) { + try { + for (ConsumerRecord record : records) { + String payload = record.value(); + String userId = record.key(); + + String eventType = ""; + if (record.headers().lastHeader("eventType") != null) { + eventType = new String(record.headers().lastHeader("eventType").value(), StandardCharsets.UTF_8); + } + + if (userId == null || userId.isBlank()) { + log.warn("userId is blank, payload = {}", payload); + continue; + } + + try { + AuditLogCommand command = parsePayload(payload, eventType); + auditLogFacade.processAuditLog(command); + } catch (Exception e) { + log.error("AuditLog 처리 실패: payload={}", payload, e); + } + } + } finally { + acknowledgment.acknowledge(); + } + } + + private AuditLogCommand parsePayload(String payload, String eventType) { + try { + JsonNode node = objectMapper.readTree(payload); + return new AuditLogCommand( + node.has("eventId") ? node.get("eventId").asText() : null, + node.has("userId") ? node.get("userId").asLong() : null, + node.has("actionType") ? node.get("actionType").asText() : eventType, + node.has("targetType") ? node.get("targetType").asText() : null, + node.has("targetId") ? node.get("targetId").asLong() : null, + payload + ); + } catch (Exception e) { + log.error("Payload 파싱 실패: {}", payload, e); + return new AuditLogCommand(null, null, eventType, null, null, payload); + } + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/DemoKafkaConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/DemoKafkaConsumer.java deleted file mode 100644 index ba862cec6..000000000 --- a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/DemoKafkaConsumer.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.loopers.interfaces.consumer; - -import com.loopers.confg.kafka.KafkaConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.Acknowledgment; -import org.springframework.stereotype.Component; - -import java.util.List; - -@Component -public class DemoKafkaConsumer { - @KafkaListener( - topics = {"${demo-kafka.test.topic-name}"}, - containerFactory = KafkaConfig.BATCH_LISTENER - ) - public void demoListener( - List> messages, - Acknowledgment acknowledgment - ){ - System.out.println(messages); - acknowledgment.acknowledge(); - } -} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/DlqConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/DlqConsumer.java new file mode 100644 index 000000000..7c23feed5 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/DlqConsumer.java @@ -0,0 +1,67 @@ +package com.loopers.interfaces.consumer; + +import com.loopers.confg.kafka.KafkaConfig; +import com.loopers.domain.dlq.DlqMessageService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Slf4j +@Component +@RequiredArgsConstructor +public class DlqConsumer { + + private final DlqMessageService dlqMessageService; + + @KafkaListener( + topicPattern = ".*\\.DLT", + groupId = "${kafka.consumer.dlq-group:dlq-consumer-group}", + containerFactory = KafkaConfig.BATCH_LISTENER + ) + public void consume( + List> records, + Acknowledgment acknowledgment + ) { + for (ConsumerRecord record : records) { + log.error("DLQ 메시지 수신 - topic: {}, partition: {}, offset: {}, key: {}", + record.topic(), + record.partition(), + record.offset(), + record.key() + ); + + processDlqRecord(record); + } + + acknowledgment.acknowledge(); + } + + private void processDlqRecord(ConsumerRecord record) { + try { + String originalTopic = extractOriginalTopic(record.topic()); + + dlqMessageService.saveDlqMessage( + originalTopic, + record.partition(), + record.offset(), + record.key(), + record.value(), + "Message failed after max retries" + ); + } catch (Exception e) { + log.error("DLQ 메시지 저장 실패: topic={}, key={}", record.topic(), record.key(), e); + } + } + + private String extractOriginalTopic(String dlqTopic) { + if (dlqTopic.endsWith(".DLT")) { + return dlqTopic.substring(0, dlqTopic.length() - 4); + } + return dlqTopic; + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductMetricsConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductMetricsConsumer.java new file mode 100644 index 000000000..979f47553 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductMetricsConsumer.java @@ -0,0 +1,95 @@ +package com.loopers.interfaces.consumer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.loopers.application.metrics.ProductMetricsCommand; +import com.loopers.application.metrics.ProductMetricsFacade; +import com.loopers.confg.kafka.KafkaConfig; +import com.loopers.interfaces.dto.ProductLikePayload; +import com.loopers.interfaces.dto.ProductStockPayload; +import com.loopers.interfaces.dto.ProductViewPayload; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Set; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ProductMetricsConsumer { + + private final ProductMetricsFacade productMetricsFacade; + private final ObjectMapper objectMapper; + + @Value("${kafka.topic.product-like-name}") + private String productLikeTopic; + + @Value("${kafka.topic.product-stock-name}") + private String productStockTopic; + + @Value("${kafka.topic.product-view-name}") + private String productViewTopic; + + @KafkaListener( + topics = {"${kafka.topic.product-like-name}", "${kafka.topic.product-stock-name}", "${kafka.topic.product-view-name}"}, + groupId = "${kafka.consumer.product-metrics-group}", + containerFactory = KafkaConfig.BATCH_LISTENER + ) + public void listen( + List> records, + Acknowledgment acknowledgment + ) { + try { + for (ConsumerRecord record : records) { + String payload = record.value(); + String topic = record.topic(); + + if (topic == null) { + log.warn("Received null topic for payload: {}", payload); + continue; + } + + try { + processPayload(topic, payload); + } catch (Exception e) { + log.error("메트릭 처리 실패: topic={}, payload={}", topic, payload, e); + // 개별 메시지 실패는 로깅 후 계속 진행 + } + } + } finally { + // 모든 메시지 처리 후 manual ack + acknowledgment.acknowledge(); + } + } + + private void processPayload(String topic, String payload) throws JsonProcessingException { + Set allowedTopics = Set.of(productLikeTopic, productStockTopic, productViewTopic); + + if (!allowedTopics.contains(topic)) { + log.warn("허용되지 않은 토픽: {}", topic); + return; + } + + if (topic.contains("product-like")) { + ProductLikePayload likePayload = objectMapper.readValue(payload, ProductLikePayload.class); + ProductMetricsCommand likeCommand = ProductMetricsCommand.from(likePayload); + productMetricsFacade.processLikeMetrics(likeCommand); + + } else if (topic.contains("product-stock")) { + ProductStockPayload stockPayload = objectMapper.readValue(payload, ProductStockPayload.class); + ProductMetricsCommand stockCommand = ProductMetricsCommand.from(stockPayload); + productMetricsFacade.processStockMetrics(stockCommand); + + } else if (topic.contains("product-view")) { + ProductViewPayload viewPayload = objectMapper.readValue(payload, ProductViewPayload.class); + ProductMetricsCommand viewCommand = ProductMetricsCommand.from(viewPayload); + productMetricsFacade.processViewMetrics(viewCommand); + } + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/dto/ProductLikePayload.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/dto/ProductLikePayload.java new file mode 100644 index 000000000..e2811ecd3 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/dto/ProductLikePayload.java @@ -0,0 +1,8 @@ +package com.loopers.interfaces.dto; + +public record ProductLikePayload( + String eventId, + Long productId, + String likeType +) { +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/dto/ProductStockPayload.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/dto/ProductStockPayload.java new file mode 100644 index 000000000..49f1e4767 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/dto/ProductStockPayload.java @@ -0,0 +1,9 @@ +package com.loopers.interfaces.dto; + +public record ProductStockPayload( + String eventId, + Long productId, + int stock, + String changedType +) { +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/dto/ProductViewPayload.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/dto/ProductViewPayload.java new file mode 100644 index 000000000..88d337d04 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/dto/ProductViewPayload.java @@ -0,0 +1,7 @@ +package com.loopers.interfaces.dto; + +public record ProductViewPayload( + String eventId, + Long productId +) { +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/support/config/TimeConfig.java b/apps/commerce-streamer/src/main/java/com/loopers/support/config/TimeConfig.java new file mode 100644 index 000000000..fc9eb1717 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/support/config/TimeConfig.java @@ -0,0 +1,15 @@ +package com.loopers.support.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.time.Clock; + +@Configuration +public class TimeConfig { + + @Bean + public Clock clock() { + return Clock.systemDefaultZone(); + } +} diff --git a/apps/commerce-streamer/src/main/resources/application.yml b/apps/commerce-streamer/src/main/resources/application.yml index 0651bc2bd..56dd4ddd1 100644 --- a/apps/commerce-streamer/src/main/resources/application.yml +++ b/apps/commerce-streamer/src/main/resources/application.yml @@ -24,6 +24,16 @@ spring: - kafka.yml - logging.yml - monitoring.yml + - +kafka: + topic: + user-action-name: user-action-events + product-like-name: product-like-metrics + product-stock-name: product-stock-metrics + product-view-name: product-view-metrics + consumer: + audit-log-group: audit-log-group + product-metrics-group: product-metrics-group demo-kafka: test: diff --git a/apps/commerce-streamer/src/test/java/com/loopers/CommerceStreamerContextTest.java b/apps/commerce-streamer/src/test/java/com/loopers/CommerceStreamerContextTest.java new file mode 100644 index 000000000..d3dafe19e --- /dev/null +++ b/apps/commerce-streamer/src/test/java/com/loopers/CommerceStreamerContextTest.java @@ -0,0 +1,13 @@ +package com.loopers; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class CommerceStreamerContextTest { + + @Test + void contextLoads() { + // 컨텍스트 로드 확인 + } +} diff --git a/apps/commerce-streamer/src/test/java/com/loopers/interfaces/AuditLogConsumerTest.java b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/AuditLogConsumerTest.java new file mode 100644 index 000000000..aadc4738d --- /dev/null +++ b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/AuditLogConsumerTest.java @@ -0,0 +1,85 @@ +package com.loopers.interfaces; + +import com.loopers.application.auditlog.AuditLogCommand; +import com.loopers.application.auditlog.AuditLogFacade; +import com.loopers.interfaces.consumer.AuditLogConsumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.*; + +class AuditLogConsumerTest { + + private final AuditLogFacade facade = mock(AuditLogFacade.class); + private final AuditLogConsumer consumer = new AuditLogConsumer(facade, new com.fasterxml.jackson.databind.ObjectMapper()); + + private ConsumerRecord makeRecord( + String topic, String key, String value, String eventTypeHeader + ) { + ConsumerRecord rec = new ConsumerRecord<>(topic, 0, 0L, key, value); + if (eventTypeHeader != null) { + rec.headers().add("eventType", eventTypeHeader.getBytes(StandardCharsets.UTF_8)); + } + return rec; + } + + @Test + @DisplayName("userAction 토픽 → JSON 파싱 → Facade.processAuditLog 호출") + void listen_withMapPayload() { + // given + String topic = "user-action-events"; + String userId = "oyy"; + + String jsonValue = """ + { + "eventId": "evt-audit-123", + "traceId": "sdwers3rdgsdf", + "userId": 100, + "actionType": "PAYMENT_PROCESS", + "targetType": "ORDER", + "targetId": 4, + "payload": { + "orderId": 4, + "paymentId": 2, + "totalPrice": 40000, + "paymentMethod": "POINT" + }, + "occurredAt": "2025-09-04T12:34:56" + } + """; + + ConsumerRecord record = makeRecord(topic, userId, jsonValue, "PAYMENT_PROCESS"); + + // when + consumer.listen(List.of(record), mock(org.springframework.kafka.support.Acknowledgment.class)); + + // then + ArgumentCaptor captor = ArgumentCaptor.forClass(AuditLogCommand.class); + verify(facade, times(1)).processAuditLog(captor.capture()); + + AuditLogCommand captured = captor.getValue(); + assertThat(captured.eventId()).isEqualTo("evt-audit-123"); + assertThat(captured.userId()).isEqualTo(100L); + assertThat(captured.actionType()).isEqualTo("PAYMENT_PROCESS"); + } + + @Test + @DisplayName("userId가 blank면 처리하지 않음") + void listen_blankUserId_skip() { + // given + String jsonValue = "{\"eventId\": \"evt-123\"}"; + ConsumerRecord record = makeRecord("user-action-events", "", jsonValue, null); + + // when + consumer.listen(List.of(record), mock(org.springframework.kafka.support.Acknowledgment.class)); + + // then + verify(facade, never()).processAuditLog(any()); + } +} diff --git a/apps/commerce-streamer/src/test/java/com/loopers/interfaces/DlqConsumerTest.java b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/DlqConsumerTest.java new file mode 100644 index 000000000..1175d1b46 --- /dev/null +++ b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/DlqConsumerTest.java @@ -0,0 +1,227 @@ +package com.loopers.interfaces; + +import com.loopers.domain.dlq.DlqMessageService; +import com.loopers.interfaces.consumer.DlqConsumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.kafka.support.Acknowledgment; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.*; + +class DlqConsumerTest { + + private final DlqMessageService dlqMessageService = mock(DlqMessageService.class); + private final DlqConsumer consumer = new DlqConsumer(dlqMessageService); + + private ConsumerRecord makeRecord( + String topic, int partition, long offset, String key, String value + ) { + return new ConsumerRecord<>(topic, partition, offset, key, value); + } + + @Nested + @DisplayName("DLQ 메시지 소비") + class ConsumeDlqMessage { + + @Test + @DisplayName("DLQ 메시지 → DlqMessageService.saveDlqMessage 호출") + void consume_callsSaveDlqMessage() { + // given + String dlqTopic = "product-like-metrics.DLT"; + String key = "123"; + String value = "{\"eventId\":\"evt-001\",\"productId\":123}"; + int partition = 0; + long offset = 100L; + + ConsumerRecord record = makeRecord(dlqTopic, partition, offset, key, value); + Acknowledgment ack = mock(Acknowledgment.class); + + // when + consumer.consume(List.of(record), ack); + + // then + verify(dlqMessageService, times(1)).saveDlqMessage( + eq("product-like-metrics"), // originalTopic (.DLT 제거) + eq(partition), + eq(offset), + eq(key), + eq(value), + anyString() + ); + verify(ack, times(1)).acknowledge(); + } + + @Test + @DisplayName(".DLT 접미사가 제거된 원본 토픽명이 저장된다") + void consume_extractsOriginalTopic() { + // given + String dlqTopic = "order-events.DLT"; + ConsumerRecord record = makeRecord(dlqTopic, 0, 0L, "order-456", "{}"); + Acknowledgment ack = mock(Acknowledgment.class); + + // when + consumer.consume(List.of(record), ack); + + // then + ArgumentCaptor topicCaptor = ArgumentCaptor.forClass(String.class); + verify(dlqMessageService).saveDlqMessage( + topicCaptor.capture(), + anyInt(), + anyLong(), + anyString(), + anyString(), + anyString() + ); + + assertThat(topicCaptor.getValue()).isEqualTo("order-events"); + } + + @Test + @DisplayName("여러 DLQ 메시지 배치 처리") + void consume_batchMessages() { + // given + List> records = List.of( + makeRecord("product-like-metrics.DLT", 0, 1L, "key-1", "{\"id\":1}"), + makeRecord("product-stock-metrics.DLT", 0, 2L, "key-2", "{\"id\":2}"), + makeRecord("order-events.DLT", 0, 3L, "key-3", "{\"id\":3}") + ); + Acknowledgment ack = mock(Acknowledgment.class); + + // when + consumer.consume(records, ack); + + // then + verify(dlqMessageService, times(3)).saveDlqMessage( + anyString(), anyInt(), anyLong(), anyString(), anyString(), anyString() + ); + verify(ack, times(1)).acknowledge(); + } + + @Test + @DisplayName("partition, offset, key, value가 정확히 전달된다") + void consume_passesCorrectParameters() { + // given + String dlqTopic = "user-action-events.DLT"; + int partition = 2; + long offset = 999L; + String key = "user-123"; + String value = "{\"eventId\":\"evt-fail\",\"userId\":123}"; + + ConsumerRecord record = makeRecord(dlqTopic, partition, offset, key, value); + Acknowledgment ack = mock(Acknowledgment.class); + + // when + consumer.consume(List.of(record), ack); + + // then + ArgumentCaptor partitionCaptor = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor offsetCaptor = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor valueCaptor = ArgumentCaptor.forClass(String.class); + + verify(dlqMessageService).saveDlqMessage( + eq("user-action-events"), + partitionCaptor.capture(), + offsetCaptor.capture(), + keyCaptor.capture(), + valueCaptor.capture(), + anyString() + ); + + assertThat(partitionCaptor.getValue()).isEqualTo(partition); + assertThat(offsetCaptor.getValue()).isEqualTo(offset); + assertThat(keyCaptor.getValue()).isEqualTo(key); + assertThat(valueCaptor.getValue()).isEqualTo(value); + } + } + + @Nested + @DisplayName("예외 처리") + class ExceptionHandling { + + @Test + @DisplayName("저장 실패해도 다음 메시지 처리 및 acknowledge 호출") + void consume_continuesOnException() { + // given + List> records = List.of( + makeRecord("topic-1.DLT", 0, 1L, "key-1", "{}"), + makeRecord("topic-2.DLT", 0, 2L, "key-2", "{}"), + makeRecord("topic-3.DLT", 0, 3L, "key-3", "{}") + ); + Acknowledgment ack = mock(Acknowledgment.class); + + // 두 번째 호출에서 예외 발생 + doNothing() + .doThrow(new RuntimeException("DB 저장 실패")) + .doNothing() + .when(dlqMessageService).saveDlqMessage( + anyString(), anyInt(), anyLong(), anyString(), anyString(), anyString() + ); + + // when + consumer.consume(records, ack); + + // then - 3번 모두 호출 시도, acknowledge도 호출 + verify(dlqMessageService, times(3)).saveDlqMessage( + anyString(), anyInt(), anyLong(), anyString(), anyString(), anyString() + ); + verify(ack, times(1)).acknowledge(); + } + + @Test + @DisplayName("빈 레코드 리스트도 정상 처리") + void consume_emptyRecords() { + // given + Acknowledgment ack = mock(Acknowledgment.class); + + // when + consumer.consume(List.of(), ack); + + // then + verify(dlqMessageService, never()).saveDlqMessage( + anyString(), anyInt(), anyLong(), anyString(), anyString(), anyString() + ); + verify(ack, times(1)).acknowledge(); + } + } + + @Nested + @DisplayName("토픽명 추출") + class TopicExtraction { + + @Test + @DisplayName("다양한 DLT 토픽 패턴에서 원본 토픽 추출") + void consume_variousTopicPatterns() { + // given + List> records = List.of( + makeRecord("simple.DLT", 0, 1L, "k1", "{}"), + makeRecord("multi-word-topic.DLT", 0, 2L, "k2", "{}"), + makeRecord("namespace.topic.name.DLT", 0, 3L, "k3", "{}") + ); + Acknowledgment ack = mock(Acknowledgment.class); + + // when + consumer.consume(records, ack); + + // then + ArgumentCaptor topicCaptor = ArgumentCaptor.forClass(String.class); + verify(dlqMessageService, times(3)).saveDlqMessage( + topicCaptor.capture(), + anyInt(), anyLong(), anyString(), anyString(), anyString() + ); + + List capturedTopics = topicCaptor.getAllValues(); + assertThat(capturedTopics).containsExactly( + "simple", + "multi-word-topic", + "namespace.topic.name" + ); + } + } +} diff --git a/apps/commerce-streamer/src/test/java/com/loopers/interfaces/ProductMetricsConsumerTest.java b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/ProductMetricsConsumerTest.java new file mode 100644 index 000000000..21d384f24 --- /dev/null +++ b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/ProductMetricsConsumerTest.java @@ -0,0 +1,141 @@ +package com.loopers.interfaces; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.loopers.application.metrics.ProductMetricsCommand; +import com.loopers.application.metrics.ProductMetricsFacade; +import com.loopers.interfaces.consumer.ProductMetricsConsumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.kafka.support.Acknowledgment; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.*; + +class ProductMetricsConsumerTest { + + private final ProductMetricsFacade facade = mock(ProductMetricsFacade.class); + private final ObjectMapper objectMapper = new ObjectMapper(); + private final ProductMetricsConsumer consumer = new ProductMetricsConsumer(facade, objectMapper); + + private ConsumerRecord makeRecord(String topic, String key, String value) { + return new ConsumerRecord<>(topic, 0, 0L, key, value); + } + + @Test + @DisplayName("product-like 토픽 → processLikeMetrics 호출") + void listen_likeEvent() { + // given + String topic = "product-like-metrics"; + String key = "123"; + String value = """ + { + "eventId": "evt-like-001", + "productId": 123, + "likeType": "LIKED" + } + """; + + ConsumerRecord record = makeRecord(topic, key, value); + Acknowledgment ack = mock(Acknowledgment.class); + + // when + consumer.listen(List.of(record), ack); + + // then + ArgumentCaptor captor = ArgumentCaptor.forClass(ProductMetricsCommand.class); + verify(facade, times(1)).processLikeMetrics(captor.capture()); + verify(ack, times(1)).acknowledge(); + + ProductMetricsCommand captured = captor.getValue(); + assertThat(captured.eventId()).isEqualTo("evt-like-001"); + assertThat(captured.productId()).isEqualTo(123L); + assertThat(captured.likeType()).isEqualTo("LIKED"); + } + + @Test + @DisplayName("product-stock 토픽 → processStockMetrics 호출") + void listen_stockEvent() { + // given + String topic = "product-stock-metrics"; + String key = "456"; + String value = """ + { + "eventId": "evt-stock-001", + "productId": 456, + "stock": 5, + "changedType": "DECREASED" + } + """; + + ConsumerRecord record = makeRecord(topic, key, value); + Acknowledgment ack = mock(Acknowledgment.class); + + // when + consumer.listen(List.of(record), ack); + + // then + ArgumentCaptor captor = ArgumentCaptor.forClass(ProductMetricsCommand.class); + verify(facade, times(1)).processStockMetrics(captor.capture()); + + ProductMetricsCommand captured = captor.getValue(); + assertThat(captured.eventId()).isEqualTo("evt-stock-001"); + assertThat(captured.productId()).isEqualTo(456L); + assertThat(captured.stock()).isEqualTo(5); + assertThat(captured.changedType()).isEqualTo("DECREASED"); + } + + @Test + @DisplayName("product-view 토픽 → processViewMetrics 호출") + void listen_viewEvent() { + // given + String topic = "product-view-metrics"; + String key = "789"; + String value = """ + { + "eventId": "evt-view-001", + "productId": 789 + } + """; + + ConsumerRecord record = makeRecord(topic, key, value); + Acknowledgment ack = mock(Acknowledgment.class); + + // when + consumer.listen(List.of(record), ack); + + // then + verify(facade, times(1)).processViewMetrics(any()); + verify(ack, times(1)).acknowledge(); + } + + @Test + @DisplayName("여러 메시지 배치 처리 후 acknowledge 1회 호출") + void listen_batchMessages() { + // given + List> records = List.of( + makeRecord("product-like-metrics", "1", """ + {"eventId": "evt-1", "productId": 1, "likeType": "LIKED"} + """), + makeRecord("product-like-metrics", "2", """ + {"eventId": "evt-2", "productId": 2, "likeType": "UNLIKED"} + """), + makeRecord("product-view-metrics", "3", """ + {"eventId": "evt-3", "productId": 3} + """) + ); + + Acknowledgment ack = mock(Acknowledgment.class); + + // when + consumer.listen(records, ack); + + // then + verify(facade, times(2)).processLikeMetrics(any()); + verify(facade, times(1)).processViewMetrics(any()); + verify(ack, times(1)).acknowledge(); + } +} diff --git a/modules/kafka/src/main/java/com/loopers/confg/kafka/KafkaConfig.java b/modules/kafka/src/main/java/com/loopers/confg/kafka/KafkaConfig.java index a73842775..57f9e0479 100644 --- a/modules/kafka/src/main/java/com/loopers/confg/kafka/KafkaConfig.java +++ b/modules/kafka/src/main/java/com/loopers/confg/kafka/KafkaConfig.java @@ -1,6 +1,7 @@ package com.loopers.confg.kafka; import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -9,13 +10,18 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter; +import org.springframework.util.backoff.FixedBackOff; import java.util.HashMap; import java.util.Map; +@Slf4j @EnableKafka @Configuration @EnableConfigurationProperties(KafkaProperties.class) @@ -29,6 +35,10 @@ public class KafkaConfig { public static final int HEARTBEAT_INTERVAL_MS = 20 * 1000; // heartbeat interval = 20s ( 1/3 of session_timeout ) public static final int MAX_POLL_INTERVAL_MS = 2 * 60 * 1000; // max poll interval = 2m + // Dead Letter Queue 설정 값 + private static final long DLQ_RETRY_INTERVAL = 1000L; + private static final long DLQ_MAX_ATTEMPTS = 3L; + @Bean public ProducerFactory producerFactory(KafkaProperties kafkaProperties) { Map props = new HashMap<>(kafkaProperties.buildProducerProperties()); @@ -51,10 +61,34 @@ public ByteArrayJsonMessageConverter jsonMessageConverter(ObjectMapper objectMap return new ByteArrayJsonMessageConverter(objectMapper); } + @Bean + public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate kafkaTemplate) { + return new DeadLetterPublishingRecoverer(kafkaTemplate, + (record, exception) -> { + log.error("메시지 처리 실패, DLQ로 전송: topic={}, partition={}, offset={}, key={}", + record.topic(), record.partition(), record.offset(), record.key(), exception); + return new org.apache.kafka.common.TopicPartition( + record.topic() + ".DLT", + -1 + ); + }); + } + + @Bean + public CommonErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) { + DefaultErrorHandler errorHandler = new DefaultErrorHandler( + recoverer, + new FixedBackOff(DLQ_RETRY_INTERVAL, DLQ_MAX_ATTEMPTS) + ); + errorHandler.addNotRetryableExceptions(IllegalArgumentException.class); + return errorHandler; + } + @Bean(name = BATCH_LISTENER) public ConcurrentKafkaListenerContainerFactory defaultBatchListenerContainerFactory( KafkaProperties kafkaProperties, - ByteArrayJsonMessageConverter converter + ByteArrayJsonMessageConverter converter, + CommonErrorHandler errorHandler ) { Map consumerConfig = new HashMap<>(kafkaProperties.buildConsumerProperties()); consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLLING_SIZE); @@ -70,6 +104,7 @@ public ConcurrentKafkaListenerContainerFactory defaultBatchListe factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter)); factory.setConcurrency(3); factory.setBatchListener(true); + factory.setCommonErrorHandler(errorHandler); return factory; } } diff --git a/modules/kafka/src/main/resources/kafka.yml b/modules/kafka/src/main/resources/kafka.yml index 9609dbf85..3a0dfe0bb 100644 --- a/modules/kafka/src/main/resources/kafka.yml +++ b/modules/kafka/src/main/resources/kafka.yml @@ -15,12 +15,16 @@ spring: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer retries: 3 + acks: all + properties: + enable.idempotence: true + max.in.flight.requests.per.connection: 5 consumer: group-id: loopers-default-consumer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: - enable-auto-commit: false + enable.auto.commit: false listener: ack-mode: manual