From bcf08685adde49b6c69a862bd34a2eecc2f04405 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9D=B4=EC=A0=95=EB=B0=B0?= Date: Fri, 26 Dec 2025 17:50:14 +0900 Subject: [PATCH] =?UTF-8?q?round9:=20=EB=9E=AD=ED=82=B9=20=EC=8B=9C?= =?UTF-8?q?=EC=8A=A4=ED=85=9C=20=EB=8F=84=EC=9E=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../application/product/ProductFacade.java | 30 ++ .../application/ranking/RankingFacade.java | 107 ++++++ .../application/ranking/RankingInfo.java | 77 +++++ .../api/product/ProductV1Controller.java | 7 +- .../interfaces/api/product/ProductV1Dto.java | 27 +- .../api/ranking/RankingV1ApiSpec.java | 29 ++ .../api/ranking/RankingV1Controller.java | 41 +++ .../interfaces/api/ranking/RankingV1Dto.java | 87 +++++ .../src/main/resources/application.yml | 7 + .../domain/event/outbox/OutboxEvent.java | 2 +- .../ranking/RankingScoreCalculator.java | 121 +++++++ .../domain/ranking/RankingService.java | 150 ++++++++ .../loopers/CommerceStreamerApplication.java | 2 + .../listener/RankingEventListener.java | 321 ++++++++++++++++++ .../scheduler/RankingCarryOverScheduler.java | 93 +++++ .../src/main/resources/application.yml | 4 +- .../src/main/resources/monitoring.yml | 6 +- 17 files changed, 1101 insertions(+), 10 deletions(-) create mode 100644 apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java create mode 100644 apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingInfo.java create mode 100644 apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1ApiSpec.java create mode 100644 apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java create mode 100644 apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Dto.java create mode 100644 apps/commerce-core/src/main/java/com/loopers/domain/ranking/RankingScoreCalculator.java create mode 100644 apps/commerce-core/src/main/java/com/loopers/domain/ranking/RankingService.java create mode 100644 apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/RankingEventListener.java create mode 100644 apps/commerce-streamer/src/main/java/com/loopers/infrastructure/scheduler/RankingCarryOverScheduler.java diff --git a/apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.java b/apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.java index 67419958d..b1a2f68a0 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.java @@ -10,6 +10,7 @@ import com.loopers.domain.supply.Supply; import com.loopers.domain.supply.SupplyService; import com.loopers.infrastructure.cache.product.ProductCacheService; +import com.loopers.application.ranking.RankingFacade; import com.loopers.support.error.CoreException; import com.loopers.support.error.ErrorType; import lombok.RequiredArgsConstructor; @@ -18,6 +19,8 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; @@ -30,6 +33,7 @@ public class ProductFacade { private final SupplyService supplyService; private final ProductCacheService productCacheService; private final ProductViewedEventPublisher productViewedEventPublisher; + private final RankingFacade rankingFacade; @Transactional public ProductInfo createProduct(ProductCreateRequest request) { @@ -268,6 +272,32 @@ public ProductInfo getProductDetail(Long productId) { productCacheService.setProductDetail(productId, Optional.of(productInfo)); return productInfo; } + + /** + * 상품 상세 조회 (랭킹 정보 포함) + * @param productId 상품 ID + * @return 상품 정보와 랭킹 정보를 포함한 응답 + */ + @Transactional(readOnly = true) + public ProductInfoWithRanking getProductDetailWithRanking(Long productId) { + ProductInfo productInfo = getProductDetail(productId); + + // 랭킹 정보 조회 + String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")); + com.loopers.application.ranking.RankingInfo.ProductRankingInfo rankingInfo = + rankingFacade.getProductRanking(productId, today); + + return new ProductInfoWithRanking(productInfo, rankingInfo); + } + + /** + * 상품 정보와 랭킹 정보를 포함한 응답 + */ + public record ProductInfoWithRanking( + ProductInfo productInfo, + com.loopers.application.ranking.RankingInfo.ProductRankingInfo rankingInfo + ) { + } private void invalidateBrandListCache(Long brandId) { try { diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java new file mode 100644 index 000000000..d5e9cd266 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java @@ -0,0 +1,107 @@ +package com.loopers.application.ranking; + +import com.loopers.domain.product.Product; +import com.loopers.domain.product.ProductService; +import com.loopers.domain.ranking.RankingService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.domain.Pageable; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 랭킹 Facade + *

+ * 랭킹 조회 로직을 담당하며, ZSET에서 랭킹 정보를 조회하고 + * 상품 정보를 Aggregation하여 반환합니다. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RankingFacade { + + private final RankingService rankingService; + private final ProductService productService; + + /** + * 랭킹 페이지 조회 + * 1. ZSET에서 Top-N 상품 ID 조회 + * 2. 상품 정보 조회 및 Aggregation + * 3. 순위 정보 포함하여 반환 + */ + @Transactional(readOnly = true) + public RankingInfo.RankingsPageResponse getRankings(String date, Pageable pageable) { + String rankingKey = rankingService.getRankingKey(date); + + // 페이지네이션 계산 + long start = pageable.getPageNumber() * pageable.getPageSize(); + long end = start + pageable.getPageSize() - 1; + + // ZSET에서 상품 ID와 점수 조회 + List entries = + rankingService.getTopNWithScores(rankingKey, start, end); + + if (entries.isEmpty()) { + return RankingInfo.RankingsPageResponse.empty(pageable); + } + + // 상품 ID 리스트 추출 + List productIds = entries.stream() + .map(RankingService.RankingEntry::getProductId) + .collect(Collectors.toList()); + + // 상품 정보 조회 + Map productMap = productService.getProductMapByIds(productIds); + + // 랭킹 정보와 상품 정보 결합 + List rankingItems = new ArrayList<>(); + long rank = start + 1; // 1부터 시작하는 순위 + + for (RankingService.RankingEntry entry : entries) { + Product product = productMap.get(entry.getProductId()); + if (product != null) { + rankingItems.add(RankingInfo.RankingItem.of( + rank++, + entry.getProductId(), + entry.getScore(), + product + )); + } + } + + // 전체 랭킹 수 조회 (총 페이지 수 계산용) + Long totalSize = rankingService.getRankingSize(rankingKey); + + return RankingInfo.RankingsPageResponse.of( + rankingItems, + pageable.getPageNumber(), + pageable.getPageSize(), + totalSize != null ? totalSize.intValue() : 0 + ); + } + + /** + * 특정 상품의 랭킹 정보 조회 + */ + @Transactional(readOnly = true) + public RankingInfo.ProductRankingInfo getProductRanking(Long productId, String date) { + String rankingKey = rankingService.getRankingKey(date); + Long rank = rankingService.getRank(rankingKey, productId); + Double score = rankingService.getScore(rankingKey, productId); + + if (rank == null || score == null) { + return null; // 랭킹에 없음 + } + + return RankingInfo.ProductRankingInfo.of( + rank + 1, // 1부터 시작하는 순위로 변환 + score + ); + } +} + diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingInfo.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingInfo.java new file mode 100644 index 000000000..a7c95fa2c --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingInfo.java @@ -0,0 +1,77 @@ +package com.loopers.application.ranking; + +import com.loopers.domain.product.Product; +import org.springframework.data.domain.Pageable; + +import java.util.Collections; +import java.util.List; + +/** + * 랭킹 정보 DTO + */ +public class RankingInfo { + + /** + * 랭킹 페이지 응답 + */ + public record RankingsPageResponse( + List items, + int page, + int size, + int total + ) { + public static RankingsPageResponse empty(Pageable pageable) { + return new RankingsPageResponse( + Collections.emptyList(), + pageable.getPageNumber(), + pageable.getPageSize(), + 0 + ); + } + + public static RankingsPageResponse of( + List items, + int page, + int size, + int total + ) { + return new RankingsPageResponse(items, page, size, total); + } + } + + /** + * 랭킹 항목 (상품 정보 + 순위 + 점수) + */ + public record RankingItem( + Long rank, + Long productId, + Double score, + String productName, + Long brandId, + Integer price + ) { + public static RankingItem of(Long rank, Long productId, Double score, Product product) { + return new RankingItem( + rank, + productId, + score, + product.getName(), + product.getBrandId(), + product.getPrice().amount() + ); + } + } + + /** + * 상품 랭킹 정보 (상세 조회용) + */ + public record ProductRankingInfo( + Long rank, + Double score + ) { + public static ProductRankingInfo of(Long rank, Double score) { + return new ProductRankingInfo(rank, score); + } + } +} + diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Controller.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Controller.java index 82deb485c..638e0bdad 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Controller.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Controller.java @@ -79,8 +79,11 @@ private Pageable normalizePageable(Pageable pageable) { @RequestMapping(method = RequestMethod.GET, path = "/{productId}") @Override public ApiResponse getProductDetail(@PathVariable Long productId) { - ProductInfo info = productFacade.getProductDetail(productId); - ProductV1Dto.ProductResponse response = ProductV1Dto.ProductResponse.from(info); + ProductFacade.ProductInfoWithRanking infoWithRanking = productFacade.getProductDetailWithRanking(productId); + ProductV1Dto.ProductResponse response = ProductV1Dto.ProductResponse.from( + infoWithRanking.productInfo(), + ProductV1Dto.RankingInfo.from(infoWithRanking.rankingInfo()) + ); return ApiResponse.success(response); } } diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Dto.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Dto.java index ad65c9515..0e31e1d2a 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Dto.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Dto.java @@ -20,18 +20,39 @@ public record ProductResponse( String brand, int price, int likes, - int stock + int stock, + RankingInfo rankingInfo ) { - public static ProductResponse from(ProductInfo info) { + public static ProductResponse from(ProductInfo info, RankingInfo rankingInfo) { return new ProductResponse( info.id(), info.name(), info.brand(), info.price(), info.likes(), - info.stock() + info.stock(), + rankingInfo ); } + + public static ProductResponse from(ProductInfo info) { + return from(info, null); + } + } + + /** + * 상품 랭킹 정보 + */ + public record RankingInfo( + Long rank, + Double score + ) { + public static RankingInfo from(com.loopers.application.ranking.RankingInfo.ProductRankingInfo info) { + if (info == null) { + return null; + } + return new RankingInfo(info.rank(), info.score()); + } } public record ProductsPageResponse( diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1ApiSpec.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1ApiSpec.java new file mode 100644 index 000000000..9d75dd580 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1ApiSpec.java @@ -0,0 +1,29 @@ +package com.loopers.interfaces.api.ranking; + +import com.loopers.interfaces.api.ApiResponse; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.tags.Tag; + +@Tag(name = "Ranking V1 API", description = "랭킹 API 입니다.") +public interface RankingV1ApiSpec { + + @Operation( + method = "GET", + summary = "랭킹 페이지 조회", + description = "일간 랭킹을 페이지네이션으로 조회합니다." + ) + ApiResponse getRankings( + @Parameter(description = "날짜 (yyyyMMdd 형식, 미지정 시 오늘 날짜)", example = "20241219") + @Schema(description = "날짜 (yyyyMMdd 형식, 미지정 시 오늘 날짜)", example = "20241219") + String date, + @Parameter(description = "페이지 크기", example = "20") + @Schema(description = "페이지 크기", example = "20") + int size, + @Parameter(description = "페이지 번호 (0부터 시작)", example = "0") + @Schema(description = "페이지 번호 (0부터 시작)", example = "0") + int page + ); +} + diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java new file mode 100644 index 000000000..d257468ac --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java @@ -0,0 +1,41 @@ +package com.loopers.interfaces.api.ranking; + +import com.loopers.application.ranking.RankingFacade; +import com.loopers.application.ranking.RankingInfo; +import com.loopers.interfaces.api.ApiResponse; +import lombok.RequiredArgsConstructor; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; + +@RequiredArgsConstructor +@RestController +@RequestMapping("/api/v1/rankings") +public class RankingV1Controller implements RankingV1ApiSpec { + + private final RankingFacade rankingFacade; + + @GetMapping + @Override + public ApiResponse getRankings( + @RequestParam(required = false) String date, + @RequestParam(defaultValue = "20") int size, + @RequestParam(defaultValue = "0") int page + ) { + String rankingDate = date != null ? date : + LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")); + + Pageable pageable = PageRequest.of(page, size); + RankingInfo.RankingsPageResponse response = + rankingFacade.getRankings(rankingDate, pageable); + + return ApiResponse.success(RankingV1Dto.RankingsPageResponse.from(response)); + } +} + diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Dto.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Dto.java new file mode 100644 index 000000000..69fccc735 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Dto.java @@ -0,0 +1,87 @@ +package com.loopers.interfaces.api.ranking; + +import com.loopers.application.ranking.RankingInfo; +import io.swagger.v3.oas.annotations.media.Schema; + +import java.util.List; + +/** + * 랭킹 API DTO + */ +public class RankingV1Dto { + + /** + * 랭킹 페이지 응답 + */ + @Schema(description = "랭킹 페이지 응답") + public record RankingsPageResponse( + @Schema(description = "랭킹 목록") + List items, + @Schema(description = "현재 페이지 번호") + int page, + @Schema(description = "페이지 크기") + int size, + @Schema(description = "전체 랭킹 수") + int total + ) { + public static RankingsPageResponse from(RankingInfo.RankingsPageResponse info) { + return new RankingsPageResponse( + info.items().stream() + .map(RankingItem::from) + .toList(), + info.page(), + info.size(), + info.total() + ); + } + } + + /** + * 랭킹 항목 + */ + @Schema(description = "랭킹 항목") + public record RankingItem( + @Schema(description = "순위 (1부터 시작)") + Long rank, + @Schema(description = "상품 ID") + Long productId, + @Schema(description = "랭킹 점수") + Double score, + @Schema(description = "상품명") + String productName, + @Schema(description = "브랜드 ID") + Long brandId, + @Schema(description = "가격") + Integer price + ) { + public static RankingItem from(RankingInfo.RankingItem info) { + return new RankingItem( + info.rank(), + info.productId(), + info.score(), + info.productName(), + info.brandId(), + info.price() + ); + } + } + + /** + * 상품 랭킹 정보 (상세 조회용) + */ + @Schema(description = "상품 랭킹 정보") + public record ProductRankingInfo( + @Schema(description = "순위 (null이면 랭킹에 없음)") + Long rank, + @Schema(description = "랭킹 점수") + Double score + ) { + public static ProductRankingInfo from(RankingInfo.ProductRankingInfo info) { + if (info == null) { + return null; + } + return new ProductRankingInfo(info.rank(), info.score()); + } + } +} + diff --git a/apps/commerce-api/src/main/resources/application.yml b/apps/commerce-api/src/main/resources/application.yml index cd024a7e1..e6b4d8f8c 100644 --- a/apps/commerce-api/src/main/resources/application.yml +++ b/apps/commerce-api/src/main/resources/application.yml @@ -99,6 +99,13 @@ spring: activate: on-profile: local, test +server: + port: 8080 # commerce-api 기본 포트 + +management: + server: + port: 8081 # actuator 포트 (commerce-streamer와 분리) + --- spring: config: diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/event/outbox/OutboxEvent.java b/apps/commerce-core/src/main/java/com/loopers/domain/event/outbox/OutboxEvent.java index a909a7262..3e3bfc3fd 100644 --- a/apps/commerce-core/src/main/java/com/loopers/domain/event/outbox/OutboxEvent.java +++ b/apps/commerce-core/src/main/java/com/loopers/domain/event/outbox/OutboxEvent.java @@ -10,7 +10,7 @@ import java.util.UUID; @Entity -@Table(name = "outbox_events", indexes = { +@Table(name = "tb_outbox_events", indexes = { @Index(name = "idx_outbox_status_created", columnList = "status, created_at") }) @Getter diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/ranking/RankingScoreCalculator.java b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/RankingScoreCalculator.java new file mode 100644 index 000000000..71626f2ce --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/RankingScoreCalculator.java @@ -0,0 +1,121 @@ +package com.loopers.domain.ranking; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * 랭킹 점수 계산기 + *

+ * 이벤트 타입별 가중치를 적용하여 랭킹 점수를 계산합니다: + * - 조회(ProductViewed): Weight = 0.1, Score = 1 + * - 좋아요(ProductLiked): Weight = 0.2, Score = 1 + * - 주문(OrderPaid): Weight = 0.7, Score = price * amount + */ +@Slf4j +@Component +public class RankingScoreCalculator { + + // 가중치 상수 + private static final double WEIGHT_VIEW = 0.1; + private static final double WEIGHT_LIKE = 0.2; + private static final double WEIGHT_ORDER = 0.7; + + /** + * 조회 이벤트 점수 계산 + * + * @return Weight(0.1) * Score(1) = 0.1 + */ + public double calculateViewScore() { + return WEIGHT_VIEW * 1.0; + } + + /** + * 좋아요 이벤트 점수 계산 + * + * @return Weight(0.2) * Score(1) = 0.2 + */ + public double calculateLikeScore() { + return WEIGHT_LIKE * 1.0; + } + + /** + * 주문 이벤트 점수 계산 + * + * @param price 상품 가격 + * @param amount 주문 수량 + * @return Weight(0.7) * Score(price * amount) + */ + public double calculateOrderScore(Long price, Integer amount) { + if (price == null || price <= 0 || amount == null || amount <= 0) { + log.warn("Invalid price or amount for order score calculation: price={}, amount={}", price, amount); + return 0.0; + } + // 정규화를 위해 log 적용 고려 (선택사항) + // return WEIGHT_ORDER * Math.log(1 + price * amount); + return WEIGHT_ORDER * price * amount; + } + + /** + * 이벤트 타입별 점수 계산 + * + * @param eventType 이벤트 타입 (ProductViewed, ProductLiked, OrderPaid 등) + * @param eventData 이벤트 데이터 (price, amount 등 포함) + * @return 계산된 점수 + */ + public double calculateScore(String eventType, Map eventData) { + switch (eventType) { + case "ProductViewed": + return calculateViewScore(); + case "ProductLiked": + return calculateLikeScore(); + case "OrderPaid": + Long price = extractPrice(eventData); + Integer amount = extractAmount(eventData); + return calculateOrderScore(price, amount); + default: + log.debug("Unknown event type for ranking score: {}", eventType); + return 0.0; + } + } + + private Long extractPrice(Map eventData) { + if (eventData == null) { + return null; + } + Object priceObj = eventData.get("price"); + if (priceObj == null) { + return null; + } + if (priceObj instanceof Number) { + return ((Number) priceObj).longValue(); + } + try { + return Long.parseLong(priceObj.toString()); + } catch (NumberFormatException e) { + log.warn("Failed to parse price: {}", priceObj, e); + return null; + } + } + + private Integer extractAmount(Map eventData) { + if (eventData == null) { + return null; + } + Object amountObj = eventData.get("amount"); + if (amountObj == null) { + return null; + } + if (amountObj instanceof Number) { + return ((Number) amountObj).intValue(); + } + try { + return Integer.parseInt(amountObj.toString()); + } catch (NumberFormatException e) { + log.warn("Failed to parse amount: {}", amountObj, e); + return null; + } + } +} + diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/ranking/RankingService.java b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/RankingService.java new file mode 100644 index 000000000..ed5fe4da6 --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/RankingService.java @@ -0,0 +1,150 @@ +package com.loopers.domain.ranking; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ZSetOperations; +import org.springframework.stereotype.Component; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Redis ZSET 기반 랭킹 서비스 + *

+ * 일간 랭킹을 관리하며, 다음과 같은 기능을 제공합니다: + * - ZSET 점수 증가 (ZINCRBY) + * - Top-N 조회 (ZREVRANGE) + * - 개별 상품 순위 조회 (ZREVRANK) + * - 개별 상품 점수 조회 (ZSCORE) + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RankingService { + + private static final String RANKING_KEY_PREFIX = "ranking:all:"; + private static final long TTL_SECONDS = 2 * 24 * 60 * 60; // 2일 + + private final RedisTemplate redisTemplate; + + /** + * 일간 랭킹 키 생성 + * + * @param date yyyyMMdd 형식의 날짜 문자열 + * @return ranking:all:{yyyyMMdd} + */ + public String getRankingKey(String date) { + return RANKING_KEY_PREFIX + date; + } + + /** + * 오늘 날짜의 랭킹 키 반환 + */ + public String getTodayRankingKey() { + return getRankingKey(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"))); + } + + /** + * ZSET에 점수 증가 (ZINCRBY) + * + * @param key 랭킹 키 + * @param productId 상품 ID (member) + * @param score 증가할 점수 + * @return 증가 후 총 점수 + */ + public Double incrementScore(String key, Long productId, double score) { + // TTL 설정 (키가 없을 때만) + if (Boolean.FALSE.equals(redisTemplate.hasKey(key))) { + redisTemplate.expire(key, TTL_SECONDS, TimeUnit.SECONDS); + } + + return redisTemplate.opsForZSet().incrementScore(key, productId.toString(), score); + } + + /** + * Top-N 상품 ID 조회 (ZREVRANGE) + * + * @param key 랭킹 키 + * @param start 시작 인덱스 (0부터) + * @param end 종료 인덱스 (N-1) + * @return 상품 ID 리스트 (점수 내림차순) + */ + public List getTopNProductIds(String key, long start, long end) { + Set members = redisTemplate.opsForZSet().reverseRange(key, start, end); + if (members == null || members.isEmpty()) { + return Collections.emptyList(); + } + return members.stream() + .map(Long::parseLong) + .collect(Collectors.toList()); + } + + /** + * Top-N 상품 ID와 점수 조회 (ZREVRANGE WITHSCORES) + */ + public List getTopNWithScores(String key, long start, long end) { + Set> tuples = + redisTemplate.opsForZSet().reverseRangeWithScores(key, start, end); + if (tuples == null || tuples.isEmpty()) { + return Collections.emptyList(); + } + return tuples.stream() + .map(tuple -> new RankingEntry( + Long.parseLong(tuple.getValue()), + tuple.getScore() != null ? tuple.getScore() : 0.0 + )) + .collect(Collectors.toList()); + } + + /** + * 특정 상품의 순위 조회 (ZREVRANK) + * + * @param key 랭킹 키 + * @param productId 상품 ID + * @return 순위 (0부터 시작, null이면 랭킹에 없음) + */ + public Long getRank(String key, Long productId) { + return redisTemplate.opsForZSet().reverseRank(key, productId.toString()); + } + + /** + * 특정 상품의 점수 조회 (ZSCORE) + */ + public Double getScore(String key, Long productId) { + return redisTemplate.opsForZSet().score(key, productId.toString()); + } + + /** + * 랭킹에 포함된 상품 수 조회 (ZCARD) + */ + public Long getRankingSize(String key) { + Long size = redisTemplate.opsForZSet().zCard(key); + return size != null ? size : 0L; + } + + /** + * TTL 설정 + */ + public void setTtl(String key, long seconds) { + redisTemplate.expire(key, seconds, TimeUnit.SECONDS); + } + + /** + * 랭킹 엔트리 (상품 ID + 점수) + */ + @Getter + @AllArgsConstructor + public static class RankingEntry { + private Long productId; + private Double score; + } +} + diff --git a/apps/commerce-streamer/src/main/java/com/loopers/CommerceStreamerApplication.java b/apps/commerce-streamer/src/main/java/com/loopers/CommerceStreamerApplication.java index df57a58fc..40aff1668 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/CommerceStreamerApplication.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/CommerceStreamerApplication.java @@ -4,6 +4,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationPropertiesScan; +import org.springframework.scheduling.annotation.EnableScheduling; import java.util.TimeZone; @@ -16,6 +17,7 @@ "com.loopers.config", // JPA 모듈의 JpaConfig, DataSourceConfig 등 설정 클래스 스캔 "com.loopers.confg" // Kafka 모듈의 설정 클래스 스캔 (KafkaConfig) }) +@EnableScheduling // @EntityScan은 JpaConfig에서 이미 com.loopers 전체를 스캔하므로 불필요 // @EnableJpaRepositories도 JpaConfig에서 이미 com.loopers.infrastructure를 스캔하므로 불필요 public class CommerceStreamerApplication { diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/RankingEventListener.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/RankingEventListener.java new file mode 100644 index 000000000..7477a6295 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/RankingEventListener.java @@ -0,0 +1,321 @@ +package com.loopers.infrastructure.listener; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.loopers.confg.kafka.KafkaConfig; +import com.loopers.domain.order.Order; +import com.loopers.domain.order.OrderRepository; +import com.loopers.domain.ranking.RankingScoreCalculator; +import com.loopers.domain.ranking.RankingService; +import com.loopers.infrastructure.dlq.DlqService; +import com.loopers.infrastructure.idempotency.IdempotencyService; +import com.loopers.support.error.CoreException; +import com.loopers.support.error.ErrorType; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 랭킹 관련 Kafka 이벤트 리스너 + *

+ * catalog-events와 order-events 토픽에서 이벤트를 소비하여 + * Redis ZSET에 랭킹 점수를 적재합니다. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RankingEventListener { + + private final IdempotencyService idempotencyService; + private final RankingService rankingService; + private final RankingScoreCalculator scoreCalculator; + private final OrderRepository orderRepository; + private final DlqService dlqService; + private final ObjectMapper objectMapper; + + // 메시지별 재시도 횟수 추적 (메모리 기반, 재시작 시 초기화됨) + private final Map retryCountMap = new ConcurrentHashMap<>(); + + private static final int MAX_RETRY_COUNT = 3; + + /** + * catalog-events 토픽에서 랭킹 관련 이벤트 처리 + * - ProductViewed: 조회 점수 추가 + * - ProductLiked: 좋아요 점수 추가 + * - ProductUnliked: 좋아요 점수 차감 (음수) + */ + @KafkaListener( + topics = "catalog-events", + groupId = "commerce-ranking-consumer-group", + containerFactory = KafkaConfig.BATCH_LISTENER + ) + @Transactional + public void handleCatalogEventsForRanking( + List> records, + Acknowledgment acknowledgment + ) { + log.debug("Received {} messages from catalog-events for ranking", records.size()); + + String todayKey = rankingService.getTodayRankingKey(); + + for (ConsumerRecord record : records) { + try { + Map message = parseMessage(record.value()); + String eventId = extractEventId(message); + String eventType = extractEventType(message); + String aggregateId = record.key() != null ? + record.key().toString() : extractAggregateId(message); + + if (eventId == null || eventType == null || aggregateId == null) { + log.warn("Missing required fields: eventId={}, eventType={}, aggregateId={}", + eventId, eventType, aggregateId); + continue; + } + + // 멱등성 체크 (랭킹 처리용 별도 ID 사용) + String rankingEventId = eventId + ":ranking"; + if (idempotencyService.isAlreadyHandled(rankingEventId)) { + log.debug("Event already handled for ranking: eventId={}", eventId); + continue; + } + + // 랭킹 점수 계산 및 적재 + Long productId = Long.parseLong(aggregateId); + double score = scoreCalculator.calculateScore(eventType, message); + + if (score != 0.0) { + // ProductUnliked의 경우 음수 점수로 차감 + if ("ProductUnliked".equals(eventType)) { + score = -scoreCalculator.calculateLikeScore(); + } + + Double newScore = rankingService.incrementScore(todayKey, productId, score); + log.debug("Updated ranking score: productId={}, score={}, newTotal={}", + productId, score, newScore); + } + + // 멱등성 기록 (랭킹 처리용 별도 기록) + idempotencyService.markAsHandled( + rankingEventId, + eventType, + aggregateId, + "RankingEventListener" + ); + + } catch (Exception e) { + String messageKey = record.key() != null ? record.key().toString() : + extractEventId(parseMessage(record.value())); + int retryCount = retryCountMap.getOrDefault(messageKey, 0) + 1; + retryCountMap.put(messageKey, retryCount); + + log.error("Failed to process ranking event: topic={}, key={}, offset={}, retryCount={}", + record.topic(), record.key(), record.offset(), retryCount, e); + + // 재시도 횟수 초과 시 DLQ로 전송 + if (retryCount >= MAX_RETRY_COUNT) { + try { + dlqService.sendToDlq( + record.topic(), + record.key(), + record.value(), + e.getMessage(), + retryCount + ); + retryCountMap.remove(messageKey); + log.warn("Message sent to DLQ after {} retries: topic={}, key={}", + retryCount, record.topic(), record.key()); + continue; + } catch (Exception dlqException) { + log.error("Failed to send message to DLQ: topic={}, key={}", + record.topic(), record.key(), dlqException); + throw e; + } + } else { + throw e; + } + } + } + + acknowledgment.acknowledge(); + log.debug("Acknowledged {} ranking messages from catalog-events", records.size()); + } + + /** + * order-events 토픽에서 주문 완료 이벤트 처리 + * - OrderPaid: 주문 점수 추가 (각 주문 항목별로 처리) + */ + @KafkaListener( + topics = "order-events", + groupId = "commerce-ranking-consumer-group", + containerFactory = KafkaConfig.BATCH_LISTENER + ) + @Transactional + public void handleOrderEventsForRanking( + List> records, + Acknowledgment acknowledgment + ) { + log.debug("Received {} messages from order-events for ranking", records.size()); + + String todayKey = rankingService.getTodayRankingKey(); + + for (ConsumerRecord record : records) { + try { + Map message = parseMessage(record.value()); + String eventId = extractEventId(message); + String eventType = extractEventType(message); + + if (!"OrderPaid".equals(eventType)) { + continue; // 주문 완료 이벤트만 처리 + } + + // 멱등성 체크 + String rankingEventId = eventId + ":ranking"; + if (idempotencyService.isAlreadyHandled(rankingEventId)) { + log.debug("Order event already handled for ranking: eventId={}", eventId); + continue; + } + + // orderId 추출 + Object orderIdObj = message.get("orderId"); + if (orderIdObj == null) { + log.warn("OrderPaid event missing orderId: {}", message); + continue; + } + + Long orderId; + try { + orderId = Long.parseLong(orderIdObj.toString()); + } catch (NumberFormatException e) { + log.error("Failed to parse orderId: {}", orderIdObj, e); + continue; + } + + // Order 엔티티 조회하여 주문 항목 가져오기 + Order order = orderRepository.findById(orderId) + .orElseThrow(() -> new CoreException(ErrorType.NOT_FOUND, + "주문을 찾을 수 없습니다: orderId=" + orderId)); + + // 각 주문 항목에 대해 랭킹 점수 추가 + order.getOrderItems().forEach(item -> { + Long productId = item.getProductId(); + Integer quantity = item.getQuantity(); + Long price = Long.valueOf(item.getPricePerItem().amount()); + + if (productId == null || quantity == null || quantity <= 0 || price == null || price <= 0) { + log.warn("Invalid OrderItem: productId={}, quantity={}, price={}", + productId, quantity, price); + return; + } + + try { + double score = scoreCalculator.calculateOrderScore(price, quantity); + Double newScore = rankingService.incrementScore(todayKey, productId, score); + log.debug("Updated ranking score from order: productId={}, score={}, newTotal={}", + productId, score, newScore); + } catch (Exception e) { + log.error("Failed to update ranking score: productId={}, quantity={}, price={}", + productId, quantity, price, e); + throw e; // 트랜잭션 롤백을 위해 예외 재발생 + } + }); + + // 멱등성 기록 + idempotencyService.markAsHandled( + rankingEventId, + eventType, + record.key() != null ? record.key().toString() : orderId.toString(), + "RankingEventListener" + ); + + } catch (Exception e) { + String messageKey = record.key() != null ? record.key().toString() : + extractEventId(parseMessage(record.value())); + int retryCount = retryCountMap.getOrDefault(messageKey, 0) + 1; + retryCountMap.put(messageKey, retryCount); + + log.error("Failed to process order ranking event: topic={}, key={}, offset={}, retryCount={}", + record.topic(), record.key(), record.offset(), retryCount, e); + + // 재시도 횟수 초과 시 DLQ로 전송 + if (retryCount >= MAX_RETRY_COUNT) { + try { + dlqService.sendToDlq( + record.topic(), + record.key(), + record.value(), + e.getMessage(), + retryCount + ); + retryCountMap.remove(messageKey); + log.warn("Message sent to DLQ after {} retries: topic={}, key={}", + retryCount, record.topic(), record.key()); + continue; + } catch (Exception dlqException) { + log.error("Failed to send message to DLQ: topic={}, key={}", + record.topic(), record.key(), dlqException); + throw e; + } + } else { + throw e; + } + } + } + + acknowledgment.acknowledge(); + log.debug("Acknowledged {} ranking messages from order-events", records.size()); + } + + @SuppressWarnings("unchecked") + private Map parseMessage(Object value) { + if (value instanceof Map) { + return (Map) value; + } + if (value instanceof String) { + try { + return objectMapper.readValue((String) value, Map.class); + } catch (Exception e) { + log.error("Failed to parse message as JSON", e); + throw new RuntimeException("Failed to parse message", e); + } + } + throw new IllegalArgumentException("Unsupported message type: " + value.getClass()); + } + + private String extractEventId(Map message) { + if (message.containsKey("eventId")) { + return message.get("eventId").toString(); + } + if (message.containsKey("id")) { + return message.get("id").toString(); + } + return null; + } + + private String extractEventType(Map message) { + if (message.containsKey("eventType")) { + return message.get("eventType").toString(); + } + if (message.containsKey("type")) { + return message.get("type").toString(); + } + return null; + } + + private String extractAggregateId(Map message) { + if (message.containsKey("aggregateId")) { + return message.get("aggregateId").toString(); + } + if (message.containsKey("productId")) { + return message.get("productId").toString(); + } + return null; + } +} + diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/scheduler/RankingCarryOverScheduler.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/scheduler/RankingCarryOverScheduler.java new file mode 100644 index 000000000..2afc7251c --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/scheduler/RankingCarryOverScheduler.java @@ -0,0 +1,93 @@ +package com.loopers.infrastructure.scheduler; + +import com.loopers.domain.ranking.RankingService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; + +/** + * 랭킹 Score Carry-Over 스케줄러 + *

+ * 매일 23시 50분에 실행되어 전날 점수의 10%를 오늘 랭킹에 복사합니다. + * 이를 통해 콜드 스타트 문제를 완화합니다. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RankingCarryOverScheduler { + + private static final double CARRY_OVER_WEIGHT = 0.1; // 10% + private static final long TTL_SECONDS = 2 * 24 * 60 * 60; // 2일 + + private final RankingService rankingService; + private final RedisTemplate redisTemplate; + + /** + * Lua 스크립트: 전날 점수의 일부를 오늘 랭킹에 복사 + */ + private static final String CARRY_OVER_SCRIPT = + "local yesterdayKey = KEYS[1]\n" + + "local todayKey = KEYS[2]\n" + + "local weight = tonumber(ARGV[1])\n" + + "local ttl = tonumber(ARGV[2])\n" + + "\n" + + "local members = redis.call('ZRANGE', yesterdayKey, 0, -1, 'WITHSCORES')\n" + + "local count = 0\n" + + "for i = 1, #members, 2 do\n" + + " local productId = members[i]\n" + + " local score = tonumber(members[i + 1]) * weight\n" + + " redis.call('ZINCRBY', todayKey, score, productId)\n" + + " count = count + 1\n" + + "end\n" + + "\n" + + "redis.call('EXPIRE', todayKey, ttl)\n" + + "return count"; + + /** + * 매일 23시 50분에 실행 (다음 날 랭킹 미리 생성) + * 전날 점수의 10%를 오늘 랭킹에 복사 + */ + @Scheduled(cron = "0 50 23 * * *") // 매일 23:50 + public void carryOverRankingScore() { + try { + LocalDate yesterday = LocalDate.now().minusDays(1); + LocalDate today = LocalDate.now(); + + String yesterdayKey = rankingService.getRankingKey( + yesterday.format(DateTimeFormatter.ofPattern("yyyyMMdd")) + ); + String todayKey = rankingService.getRankingKey( + today.format(DateTimeFormatter.ofPattern("yyyyMMdd")) + ); + + // 전날 랭킹이 존재하는지 확인 + if (Boolean.FALSE.equals(redisTemplate.hasKey(yesterdayKey))) { + log.info("Yesterday ranking key does not exist, skipping carry-over: {}", yesterdayKey); + return; + } + + // Lua 스크립트 실행 + DefaultRedisScript script = new DefaultRedisScript<>(CARRY_OVER_SCRIPT, Long.class); + Long carriedOverCount = redisTemplate.execute( + script, + Arrays.asList(yesterdayKey, todayKey), + String.valueOf(CARRY_OVER_WEIGHT), + String.valueOf(TTL_SECONDS) + ); + + log.info("Carried over {} products from {} to {} with weight {}", + carriedOverCount, yesterdayKey, todayKey, CARRY_OVER_WEIGHT); + + } catch (Exception e) { + log.error("Failed to carry over ranking scores", e); + } + } +} + diff --git a/apps/commerce-streamer/src/main/resources/application.yml b/apps/commerce-streamer/src/main/resources/application.yml index ee106f97f..1c74d8b59 100644 --- a/apps/commerce-streamer/src/main/resources/application.yml +++ b/apps/commerce-streamer/src/main/resources/application.yml @@ -36,11 +36,11 @@ spring: on-profile: local, test server: - port: 8081 # commerce-streamer 기본 포트 + port: 8090 # commerce-streamer 기본 포트 (commerce-api Actuator 8081과 충돌 방지) management: server: - port: 8084 # actuator 포트 (commerce-api:8081, pg-simulator:8082와 분리) + port: 8091 # actuator 포트 (commerce-api:8081, pg-simulator:8083와 분리) --- spring: diff --git a/supports/monitoring/src/main/resources/monitoring.yml b/supports/monitoring/src/main/resources/monitoring.yml index c6a87a9cf..5d1fe8f5e 100644 --- a/supports/monitoring/src/main/resources/monitoring.yml +++ b/supports/monitoring/src/main/resources/monitoring.yml @@ -30,8 +30,10 @@ management: enabled: true readinessState: enabled: true - server: - port: 8081 + # server.port는 각 애플리케이션의 application.yml에서 설정 + # commerce-api: 8080 (기본), actuator: 8081 + # commerce-streamer: 8090 (기본), actuator: 8091 + # pg-simulator: 8082 (기본), actuator: 8083 observations: annotations: enabled: true