From 3d2ddcdbcae465aa310e17faddc4595d4e6a33e1 Mon Sep 17 00:00:00 2001 From: junoade Date: Mon, 22 Dec 2025 02:56:05 +0900 Subject: [PATCH 1/6] =?UTF-8?q?test:=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20?= =?UTF-8?q?=EB=A9=B1=EB=93=B1=EC=84=B1=20=ED=85=8C=EC=8A=A4=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. kafka.yml 에 기재된 testImplementation(testFixtures(project(":modules:kafka"))) 에 대한 설정 코드들을 추가. 2. 그 후 테스트 코드 수행시, - 스프링컨텍스트, 카프카 토픽, 컨슈머그룹, 컨슈머 등 생성 -> 테스트 될 수 있도록 설정 - @BeforeEach 시 아래와 같이 떳고, 이는 컨슈머 컨테이너가 먼저 떠서 토픽 메타데이터를 조회하는데, 그 시점에 토픽이 없기 때문이다. ``` org.apache.kafka.clients.NetworkClient : [Consumer clientId=commerce-streamer-0, groupId=loopers-default-consumer] Error while fetching metadata with correlation id 2 : {order-events=UNKNOWN_TOPIC_OR_PARTITION} ``` --- .../idempotency/EventHandledServiceTest.java | 31 ++- modules/kafka/src/main/resources/kafka.yml | 8 +- .../KafkaTestContainersConfig.java | 47 +++++ .../java/com/loopers/utils/KafkaCleanUp.java | 189 ++++++++++++++++++ 4 files changed, 271 insertions(+), 4 deletions(-) create mode 100644 modules/kafka/src/testFixtures/java/com/loopers/testcontainers/KafkaTestContainersConfig.java create mode 100644 modules/kafka/src/testFixtures/java/com/loopers/utils/KafkaCleanUp.java diff --git a/apps/commerce-streamer/src/test/java/com/loopers/application/idempotency/EventHandledServiceTest.java b/apps/commerce-streamer/src/test/java/com/loopers/application/idempotency/EventHandledServiceTest.java index 2a48eb6f0..84c3b1286 100644 --- a/apps/commerce-streamer/src/test/java/com/loopers/application/idempotency/EventHandledServiceTest.java +++ b/apps/commerce-streamer/src/test/java/com/loopers/application/idempotency/EventHandledServiceTest.java @@ -3,18 +3,31 @@ import com.loopers.domain.ProductLikeMetricsModel; import com.loopers.infrastructure.EventHandleRepository; import com.loopers.infrastructure.ProductLikeMetricsRepository; +import com.loopers.testcontainers.KafkaTestContainersConfig; +import com.loopers.utils.KafkaCleanUp; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; -import org.testcontainers.junit.jupiter.Testcontainers; +import org.springframework.test.context.ActiveProfiles; + +import java.util.Map; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -@Testcontainers +@ActiveProfiles("local") +@Import(KafkaTestContainersConfig.class) @SpringBootTest class EventHandledServiceTest { + + @Autowired + private KafkaCleanUp kafkaCleanUp; + @Autowired KafkaTemplate kafkaTemplate; @Autowired @@ -22,9 +35,20 @@ class EventHandledServiceTest { @Autowired EventHandleRepository handledRepo; + @Autowired + private KafkaProperties kafkaProperties; + + @BeforeEach + void setUp() { + // @BeforeEach는 이미 SpringBootTest 컨텍스트 + KafkaListener 컨테이너가 다 올라간 뒤에 실행 + // kafkaCleanUp.resetAllTestTopics(); + kafkaCleanUp.resetAllConsumerGroups(); + } + @Test @DisplayName("멱등성 테스트") void duplicate_message_should_be_applied_once() throws Exception { + long productId = 1L; ProductLikeMetricsModel metrics = metricsRepo.findById(productId) .orElseGet(() -> metricsRepo.save(ProductLikeMetricsModel.of(productId))); @@ -42,6 +66,9 @@ void duplicate_message_should_be_applied_once() throws Exception { // TODO - 카프카 브로커/컨슈머 테스트환경에서 설정필요 Thread.sleep(1500); + Map consumerProps = kafkaProperties.buildConsumerProperties(); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "${spring.kafka.consumer.group-id}"); + // then // like 카운트는 1번만 증가 diff --git a/modules/kafka/src/main/resources/kafka.yml b/modules/kafka/src/main/resources/kafka.yml index 9609dbf85..b2e040955 100644 --- a/modules/kafka/src/main/resources/kafka.yml +++ b/modules/kafka/src/main/resources/kafka.yml @@ -15,6 +15,7 @@ spring: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer retries: 3 + ## TODO acks, enable.idempotence, max.in.flight.requests.per.connection 설정 등 공부하기 consumer: group-id: loopers-default-consumer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer @@ -29,10 +30,13 @@ spring.config.activate.on-profile: local, test spring: kafka: - bootstrap-servers: localhost:19092 + # Testcontainers를 사용하는 경우 BOOTSTRAP_SERVERS가 자동으로 설정됨 + # 로컬 개발 환경에서는 localhost:19092 사용 + bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:19092} admin: properties: - bootstrap.servers: kafka:9092 + bootstrap.servers: ${BOOTSTRAP_SERVERS:localhost:19092} + auto-create: true --- spring.config.activate.on-profile: dev diff --git a/modules/kafka/src/testFixtures/java/com/loopers/testcontainers/KafkaTestContainersConfig.java b/modules/kafka/src/testFixtures/java/com/loopers/testcontainers/KafkaTestContainersConfig.java new file mode 100644 index 000000000..256f037d5 --- /dev/null +++ b/modules/kafka/src/testFixtures/java/com/loopers/testcontainers/KafkaTestContainersConfig.java @@ -0,0 +1,47 @@ +package com.loopers.testcontainers; + +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; +import org.testcontainers.kafka.ConfluentKafkaContainer; + + +/** + * Kafka Testcontainers 설정. + *

+ * 테스트 실행 시 자동으로 Kafka 컨테이너를 시작하고, + * Spring Boot의 Kafka 설정에 동적으로 포트를 주입합니다. + *

+ *

+ * 동작 방식: + * 1. Kafka 컨테이너를 시작 + * 2. 동적으로 할당된 포트를 System Property로 설정 + * 3. kafka.yml의 ${BOOTSTRAP_SERVERS}가 이 값을 사용 + *

+ */ +@Configuration +public class KafkaTestContainersConfig { + + private static final ConfluentKafkaContainer kafkaContainer; + + static { + // Kafka 컨테이너 생성 및 시작 + // ConfluentKafkaContainer는 confluentinc/cp-kafka 이미지를 사용 + kafkaContainer = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.5.0"); + kafkaContainer.start(); + + // Spring Boot의 Kafka 설정에 동적으로 포트 주입 + // kafka.yml의 ${BOOTSTRAP_SERVERS}가 이 값을 사용 + String bootstrapServers = kafkaContainer.getBootstrapServers(); + System.setProperty("BOOTSTRAP_SERVERS", bootstrapServers); + } + + @Bean + public NewTopic productLikeEvents() { + return TopicBuilder.name("product-like-events") + .partitions(1) + .replicas(1) + .build(); + } +} diff --git a/modules/kafka/src/testFixtures/java/com/loopers/utils/KafkaCleanUp.java b/modules/kafka/src/testFixtures/java/com/loopers/utils/KafkaCleanUp.java new file mode 100644 index 000000000..e7a453307 --- /dev/null +++ b/modules/kafka/src/testFixtures/java/com/loopers/utils/KafkaCleanUp.java @@ -0,0 +1,189 @@ +package com.loopers.utils; + +import org.apache.kafka.clients.admin.*; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Kafka 테스트 정리 유틸리티. + *

+ * 테스트 간 Kafka 메시지 격리를 위해 토픽을 삭제하고 재생성합니다. + *

+ *

+ * 사용 방법: + *

    + *
  • 통합 테스트에서 `@BeforeEach` 또는 `@AfterEach`에서 호출하여 테스트 간 격리 보장
  • + *
  • 단위 테스트는 Mock을 사용하므로 불필요
  • + *
+ *

+ *

+ * 주의: + * 프로덕션 환경에서는 사용하지 마세요. 테스트 환경에서만 사용해야 합니다. + *

+ */ +@Component +public class KafkaCleanUp { + + private static final List TEST_TOPICS = List.of( + "order-events", + "product-like-events", + "payment-events", + "coupon-events", + "user-events" + ); + + private final KafkaAdmin kafkaAdmin; + + public KafkaCleanUp(KafkaAdmin kafkaAdmin) { + this.kafkaAdmin = kafkaAdmin; + } + + /** + * 테스트용 토픽의 모든 메시지를 삭제합니다. + *

+ * 토픽을 삭제하고 재생성하여 모든 메시지를 제거합니다. + *

+ *

+ * 주의: 프로덕션 환경에서는 사용하지 마세요. + *

+ */ + public void deleteAllTestTopics() { + try (AdminClient adminClient = createAdminClient()) { + // 존재하는 토픽만 삭제 + Set existingTopics = adminClient.listTopics() + .names() + .get(5, TimeUnit.SECONDS); + + List topicsToDelete = TEST_TOPICS.stream() + .filter(existingTopics::contains) + .toList(); + + if (topicsToDelete.isEmpty()) { + return; + } + + // 토픽 삭제 (모든 메시지 제거) + DeleteTopicsResult deleteResult = adminClient.deleteTopics(topicsToDelete); + deleteResult.all().get(10, TimeUnit.SECONDS); + + // 토픽 삭제 후 재생성 대기 (Kafka가 토픽 삭제를 완료할 때까지) + Thread.sleep(1000); + } catch (Exception e) { + // 토픽이 없거나 이미 삭제된 경우 무시 + // 테스트 환경에서는 토픽이 없을 수 있음 + } + } + + /** + * 테스트용 토픽을 재생성합니다. + *

+ * 삭제된 토픽을 원래 설정으로 재생성합니다. + *

+ */ + public void recreateTestTopics() { + try (AdminClient adminClient = createAdminClient()) { + for (String topicName : TEST_TOPICS) { + try { + // 토픽이 이미 존재하는지 확인 + adminClient.describeTopics(Collections.singletonList(topicName)) + .allTopicNames() + .get(2, TimeUnit.SECONDS); + // 이미 존재하면 스킵 + continue; + } catch (Exception e) { + // 토픽이 없으면 생성 + } + + // 토픽 생성 + NewTopic newTopic = TopicBuilder.name(topicName) + .partitions(3) + .replicas(1) + .config("min.insync.replicas", "1") + .build(); + + adminClient.createTopics(Collections.singletonList(newTopic)) + .all() + .get(5, TimeUnit.SECONDS); + } + } catch (Exception e) { + // 토픽 생성 실패는 무시 (이미 존재할 수 있음) + } + } + + /** + * 테스트용 토픽을 삭제하고 재생성합니다. + *

+ * 모든 메시지를 제거하고 깨끗한 상태로 시작합니다. + *

+ */ + public void resetAllTestTopics() { + deleteAllTestTopics(); + recreateTestTopics(); + } + + /** + * 모든 Consumer Group을 삭제하여 offset을 리셋합니다. + *

+ * 테스트 간 격리를 위해 사용합니다. + *

+ *

+ * 주의: 모든 Consumer Group을 삭제하므로 프로덕션 환경에서는 사용하지 마세요. + *

+ */ + public void resetAllConsumerGroups() { + try (AdminClient adminClient = createAdminClient()) { + // 모든 Consumer Group 목록 조회 + Set consumerGroups = adminClient.listConsumerGroups() + .all() + .get(5, TimeUnit.SECONDS) + .stream() + .map(group -> group.groupId()) + .collect(java.util.stream.Collectors.toSet()); + + if (consumerGroups.isEmpty()) { + return; + } + + // Consumer Group 삭제 (offset 리셋) + DeleteConsumerGroupsResult deleteResult = adminClient.deleteConsumerGroups(consumerGroups); + deleteResult.all().get(5, TimeUnit.SECONDS); + } catch (Exception e) { + // Consumer Group이 없거나 이미 삭제된 경우 무시 + // 테스트 환경에서는 Consumer Group이 없을 수 있음 + } + } + + /** + * 특정 Consumer Group을 삭제합니다. + * + * @param groupId 삭제할 Consumer Group ID + */ + public void resetConsumerGroup(String groupId) { + try (AdminClient adminClient = createAdminClient()) { + DeleteConsumerGroupsResult deleteResult = adminClient.deleteConsumerGroups( + Collections.singletonList(groupId) + ); + deleteResult.all().get(5, TimeUnit.SECONDS); + } catch (Exception e) { + // Consumer Group이 없거나 이미 삭제된 경우 무시 + } + } + + /** + * AdminClient를 생성합니다. + */ + private AdminClient createAdminClient() { + Properties props = new Properties(); + Object bootstrapServers = kafkaAdmin.getConfigurationProperties() + .getOrDefault(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + return AdminClient.create(props); + } +} From 7c2114fbe6be3f41c30e98a0117a19b629467f1c Mon Sep 17 00:00:00 2001 From: junoade Date: Mon, 22 Dec 2025 13:58:34 +0900 Subject: [PATCH 2/6] =?UTF-8?q?init:=20redis=EB=A5=BC=20=ED=86=B5=ED=95=9C?= =?UTF-8?q?=20ranking=20=EC=A7=91=EA=B3=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit > 일단 돌아가게 구현 1. 초기 랭킹 ZSET 및 키 전략 - ProductLike 이벤트에 대해서 weight=0.2, score = weight * 1 로 계산하여 zset에 집계 - TTL : 2Day / Key: ranking:all:{yyyyMMdd} 2. 컨슈머 메시지 처리 및 집계 - (초기) streamer 모듈에 랭킹 집계 서비스 구현 및 컨슈머 리스너에 단일 메시지 단건 처리 --- .../ranking/RankingAggregationService.java | 32 +++++++++++++++++++ .../consumer/ProductLikeEventConsumer.java | 3 ++ .../java/com/loopers/ranking/RankingKey.java | 21 ++++++++++++ .../com/loopers/ranking/RankingPolicy.java | 7 ++++ .../ranking/RankingZSetRepository.java | 7 ++++ 5 files changed, 70 insertions(+) create mode 100644 apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java create mode 100644 modules/redis/src/main/java/com/loopers/ranking/RankingKey.java create mode 100644 modules/redis/src/main/java/com/loopers/ranking/RankingPolicy.java create mode 100644 modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java b/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java new file mode 100644 index 000000000..84d800882 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java @@ -0,0 +1,32 @@ +package com.loopers.application.ranking; + +import com.loopers.ranking.RankingKey; +import lombok.RequiredArgsConstructor; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; + +@Service +@RequiredArgsConstructor +public class RankingAggregationService { + private static final double LIKE_WEIGHT = 0.2d; + private static final Duration TTL = Duration.ofDays(2); + + private final StringRedisTemplate redisTemplate; + + public void applyLike(long productId, Instant occurredAt) { + LocalDate day = occurredAt.atZone(ZoneOffset.UTC).toLocalDate(); + String key = RankingKey.dailyAll(day); + String member = String.valueOf(productId); + + // Score = Weight * 1 + redisTemplate.opsForZSet().incrementScore(key, member, LIKE_WEIGHT); + + // test 설정 + redisTemplate.expire(key, TTL); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductLikeEventConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductLikeEventConsumer.java index 282d0138e..c8bca55fa 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductLikeEventConsumer.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductLikeEventConsumer.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.loopers.application.idempotency.EventHandledService; import com.loopers.application.metrics.MetricsAggregationService; +import com.loopers.application.ranking.RankingAggregationService; import com.loopers.contract.event.ProductLikeEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -22,6 +23,7 @@ public class ProductLikeEventConsumer { private final ObjectMapper objectMapper; private final EventHandledService eventHandledService; private final MetricsAggregationService metricsAggregationService; + private final RankingAggregationService rankingAggregationService; @KafkaListener(topics = "product-like-events", groupId = "${spring.kafka.consumer.group-id}") public void consume( @Header(KafkaHeaders.RECEIVED_KEY) String key, @@ -42,6 +44,7 @@ public void consume( @Header(KafkaHeaders.RECEIVED_KEY) String key, } metricsAggregationService.handleProductLiked(event.productId()); + rankingAggregationService.applyLike(event.productId(), event.occurredAt()); ack.acknowledge(); } diff --git a/modules/redis/src/main/java/com/loopers/ranking/RankingKey.java b/modules/redis/src/main/java/com/loopers/ranking/RankingKey.java new file mode 100644 index 000000000..363fe48dc --- /dev/null +++ b/modules/redis/src/main/java/com/loopers/ranking/RankingKey.java @@ -0,0 +1,21 @@ +package com.loopers.ranking; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; + +/** + * 랭킹 키를 정의합니다. + */ +public final class RankingKey { + + /** + * 일별 랭킹 레디스키를 정의합니다. + * ranking:all:{yyyyMMdd} + * @param date + * @return + */ + public static String dailyAll(LocalDate date) { + return "ranking:all:" + date.format(DateTimeFormatter.BASIC_ISO_DATE); + } + +} diff --git a/modules/redis/src/main/java/com/loopers/ranking/RankingPolicy.java b/modules/redis/src/main/java/com/loopers/ranking/RankingPolicy.java new file mode 100644 index 000000000..c3d4320b5 --- /dev/null +++ b/modules/redis/src/main/java/com/loopers/ranking/RankingPolicy.java @@ -0,0 +1,7 @@ +package com.loopers.ranking; + +/** + * 이벤트 타입으로부터 score를 계산하는 정책을 결정합니다 + */ +public class RankingPolicy { +} diff --git a/modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java b/modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java new file mode 100644 index 000000000..d3909250f --- /dev/null +++ b/modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java @@ -0,0 +1,7 @@ +package com.loopers.ranking; + +/** + * Redis ZSET 쓰기(zincrby + ttl + pipeline) + */ +public class RankingZSetRepository { +} From 30611963fb65598b2bb585964a8c9f23b394ad48 Mon Sep 17 00:00:00 2001 From: junoade Date: Mon, 22 Dec 2025 14:27:29 +0900 Subject: [PATCH 3/6] =?UTF-8?q?init:=20redis=EB=A5=BC=20=ED=86=B5=ED=95=9C?= =?UTF-8?q?=20ranking=20=EC=A1=B0=ED=9A=8C=20api?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit > 일단 돌아가게 구현 --- .../ranking/RankingQueryService.java | 35 +++++++++++++++++++ .../api/ranking/RankingV1Controller.java | 26 ++++++++++++++ .../loopers/ranking/DailyRankingResponse.java | 7 ++++ .../com/loopers/ranking/RankingEntry.java | 4 +++ .../ranking/RankingZSetRepository.java | 35 +++++++++++++++++++ 5 files changed, 107 insertions(+) create mode 100644 apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryService.java create mode 100644 apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java create mode 100644 modules/redis/src/main/java/com/loopers/ranking/DailyRankingResponse.java create mode 100644 modules/redis/src/main/java/com/loopers/ranking/RankingEntry.java diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryService.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryService.java new file mode 100644 index 000000000..922b13ed4 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryService.java @@ -0,0 +1,35 @@ +package com.loopers.application.ranking; + +import com.loopers.ranking.DailyRankingResponse; +import com.loopers.ranking.RankingZSetRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +@Slf4j +@Service +@RequiredArgsConstructor +public class RankingQueryService { + private final RankingZSetRepository rankingZSetRepository; + + public DailyRankingResponse getDailyPopularProducts(String date, int size) { + LocalDate target = (hasValidDate(date)) + ? LocalDate.now(ZoneOffset.UTC) + : LocalDate.parse(date, DateTimeFormatter.BASIC_ISO_DATE); + + int limit = (size <= 0) ? 20 : Math.min(size, 100); + + return new DailyRankingResponse( + target, + rankingZSetRepository.findTopDailyAllByLimit(target, limit) + ); + } + + private boolean hasValidDate(String date) { + return date == null || date.isBlank(); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java new file mode 100644 index 000000000..df7c0e6da --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java @@ -0,0 +1,26 @@ +package com.loopers.interfaces.api.ranking; + +import com.loopers.application.ranking.RankingQueryService; +import com.loopers.ranking.DailyRankingResponse; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RequiredArgsConstructor +@RestController +@RequestMapping("/api/v1/ranking") +public class RankingV1Controller { + + private final RankingQueryService rankingQueryService; + + @GetMapping + public DailyRankingResponse getDailyRanking( + @RequestParam(required = false, name = "date") String date, + @RequestParam(defaultValue = "20", name = "size") int size + ) { + return rankingQueryService.getDailyPopularProducts(date, size); + } + +} diff --git a/modules/redis/src/main/java/com/loopers/ranking/DailyRankingResponse.java b/modules/redis/src/main/java/com/loopers/ranking/DailyRankingResponse.java new file mode 100644 index 000000000..212265202 --- /dev/null +++ b/modules/redis/src/main/java/com/loopers/ranking/DailyRankingResponse.java @@ -0,0 +1,7 @@ +package com.loopers.ranking; + +import java.time.LocalDate; +import java.util.List; + +public record DailyRankingResponse(LocalDate date, List rankingEntries) { +} diff --git a/modules/redis/src/main/java/com/loopers/ranking/RankingEntry.java b/modules/redis/src/main/java/com/loopers/ranking/RankingEntry.java new file mode 100644 index 000000000..317faa814 --- /dev/null +++ b/modules/redis/src/main/java/com/loopers/ranking/RankingEntry.java @@ -0,0 +1,4 @@ +package com.loopers.ranking; + +public record RankingEntry(long productId, double score) { +} diff --git a/modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java b/modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java index d3909250f..6011483c7 100644 --- a/modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java +++ b/modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java @@ -1,7 +1,42 @@ package com.loopers.ranking; +import lombok.RequiredArgsConstructor; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.ZSetOperations; +import org.springframework.stereotype.Repository; + +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + /** * Redis ZSET 쓰기(zincrby + ttl + pipeline) */ +@Repository +@RequiredArgsConstructor public class RankingZSetRepository { + private final StringRedisTemplate redisTemplate; + + public List findTopDailyAllByLimit(LocalDate date, int limit) { + String key = RankingKey.dailyAll(date); + + Set> tuples = + redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, limit - 1); + + List result = new ArrayList<>(); + + // TODO 랭킹정보가 없다면? + if(tuples == null) { + return result; + } + + for(ZSetOperations.TypedTuple tuple : tuples) { + if(tuple.getValue() == null) continue; + long productId = Long.parseLong(tuple.getValue()); + double score = tuple.getScore() == null ? 0d : tuple.getScore(); + result.add(new RankingEntry(productId, score)); + } + return result; + } } From fb6d5c0f664686812af00b947a19a9dedd02549c Mon Sep 17 00:00:00 2001 From: junoade Date: Tue, 23 Dec 2025 17:24:24 +0900 Subject: [PATCH 4/6] =?UTF-8?q?feat:=20=EC=9D=BC=EB=B3=84=20=EB=9E=AD?= =?UTF-8?q?=ED=82=B9=20ZSet=20=EC=A0=90=EC=88=98=20carry=20over=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 오늘 ZSET 점수를 0.1 가중치로 내일 키로 복사하고 TTL을 설정 --- .../ranking/RankingAggregationService.java | 49 +++++++++++-- .../RankingAggregationServiceTest.java | 70 +++++++++++++++++++ 2 files changed, 114 insertions(+), 5 deletions(-) create mode 100644 apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingAggregationServiceTest.java diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java b/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java index 84d800882..2ec9e9744 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java @@ -2,24 +2,26 @@ import com.loopers.ranking.RankingKey; import lombok.RequiredArgsConstructor; + +import org.springframework.data.redis.connection.RedisZSetCommands; +import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import java.time.Duration; -import java.time.Instant; -import java.time.LocalDate; -import java.time.ZoneOffset; +import java.time.*; @Service @RequiredArgsConstructor public class RankingAggregationService { private static final double LIKE_WEIGHT = 0.2d; private static final Duration TTL = Duration.ofDays(2); + private static final double CARRY_OVER_WEIGHT = 0.1d; private final StringRedisTemplate redisTemplate; public void applyLike(long productId, Instant occurredAt) { - LocalDate day = occurredAt.atZone(ZoneOffset.UTC).toLocalDate(); + LocalDate day = occurredAt.atZone(ZoneId.systemDefault()).toLocalDate(); String key = RankingKey.dailyAll(day); String member = String.valueOf(productId); @@ -29,4 +31,41 @@ public void applyLike(long productId, Instant occurredAt) { // test 설정 redisTemplate.expire(key, TTL); } + + /** + * 일별 랭킹에 대한 전일자 Carry Over 스케줄러 + * - 매일 23:30분 수행합니다. + */ + @Scheduled(cron = "0 30 23 * * *") + public void carryOverDailyRanking() { + LocalDate today = LocalDate.now(ZoneId.systemDefault()); + LocalDate tomorrow = today.plusDays(1); + + String sourceKey = RankingKey.dailyAll(today); + String targetKey = RankingKey.dailyAll(tomorrow); + + carryOver(sourceKey, targetKey, CARRY_OVER_WEIGHT); + } + + private void carryOver(String sourceKey, String targetKey, double weight) { + redisTemplate.execute((RedisCallback) connection -> { + byte[] target = redisTemplate.getStringSerializer().serialize(targetKey); + byte[] source = redisTemplate.getStringSerializer().serialize(sourceKey); + // ZUNIONSTORE dest 1 src WEIGHTS 0.1 AGGREGATE SUM + Long result = (Long) connection.execute( + "ZUNIONSTORE", + target, + "1".getBytes(), + source, + "WEIGHTS".getBytes(), + String.valueOf(weight).getBytes(), + "AGGREGATE".getBytes(), + "SUM".getBytes() + ); + // 순서 주의할 것 + // ZUNIONSTORE target ... → 키 재생성 되므로 이전 TTL 소멸됨 + connection.expire(target, TTL.getSeconds()); + return result; + }); + } } diff --git a/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingAggregationServiceTest.java b/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingAggregationServiceTest.java new file mode 100644 index 000000000..8d6463169 --- /dev/null +++ b/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingAggregationServiceTest.java @@ -0,0 +1,70 @@ +package com.loopers.application.ranking; + +import com.loopers.ranking.RankingKey; +import com.loopers.testcontainers.RedisTestContainersConfig; +import com.loopers.utils.RedisCleanUp; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Import; +import org.springframework.data.redis.core.StringRedisTemplate; + +import java.time.LocalDate; +import java.time.ZoneId; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +@Import(RedisTestContainersConfig.class) +@SpringBootTest +class RankingAggregationServiceTest { + + @Autowired + RedisCleanUp redisCleanUp; + + @Autowired + private StringRedisTemplate redisTemplate; + + @Autowired + private RankingAggregationService rankingAggregationService; + + @BeforeEach + void setUp() { + redisCleanUp.truncateAll(); + } + + @Test + @DisplayName("일별 랭킹 carry over 테스트 - 오늘 ZSET 점수를 0.1 가중치로 내일 키로 복사하고 TTL을 설정한다") + void carryOver_should_copy_weighted_scores_to_tomorrow_key() { + // given + LocalDate today = LocalDate.now(ZoneId.systemDefault()); + LocalDate tomorrow = today.plusDays(1); + + String sourceKey = RankingKey.dailyAll(today); + String targetKey = RankingKey.dailyAll(tomorrow); + + // source ZSET 준비 (productId=11: 6.0, productId=22: 1.5) + redisTemplate.opsForZSet().add(sourceKey, "11", 6.0); + redisTemplate.opsForZSet().add(sourceKey, "22", 1.5); + + // when + rankingAggregationService.carryOverDailyRanking(); + + // then (0.1 가중치 적용) + Double s11 = redisTemplate.opsForZSet().score(targetKey, "11"); + Double s22 = redisTemplate.opsForZSet().score(targetKey, "22"); + + assertThat(s11).isNotNull(); + assertThat(s22).isNotNull(); + + assertThat(s11).isCloseTo(0.6, Assertions.offset(1e-9)); + assertThat(s22).isCloseTo(0.15, Assertions.offset(1e-9)); + + // TTL 확인 (expire를 먼저 걸고 unionstore를 하므로 TTL이 존재해야 함) + Long ttl = redisTemplate.getExpire(targetKey); // seconds + assertThat(ttl).isNotNull(); + assertThat(ttl).isGreaterThan(0); + } +} From ed75d4e3c7096fd35b2d13d114735ade11fd28ff Mon Sep 17 00:00:00 2001 From: junoade Date: Tue, 23 Dec 2025 17:33:28 +0900 Subject: [PATCH 5/6] =?UTF-8?q?feat:=20=EC=9D=B4=EB=B2=A4=ED=8A=B8=20?= =?UTF-8?q?=EC=BB=A8=EC=8A=88=EB=B0=8D=EC=8B=9C=20TTL=20=EC=84=A4=EC=A0=95?= =?UTF-8?q?=20=EA=B0=9C=EC=84=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Redis에서 EXPIRE는 쓰기 연산이다. - 네트워크 RTT - 매번 ZINCRBY + EXPIRE = 왕복 2번 - Kafka 이벤트당 1~2 RTT → TPS 높아질수록 체감 --- .../application/ranking/RankingAggregationService.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java b/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java index 2ec9e9744..7bf7b4709 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingAggregationService.java @@ -28,8 +28,7 @@ public void applyLike(long productId, Instant occurredAt) { // Score = Weight * 1 redisTemplate.opsForZSet().incrementScore(key, member, LIKE_WEIGHT); - // test 설정 - redisTemplate.expire(key, TTL); + setTTLOnlyOnce(key); } /** @@ -68,4 +67,11 @@ private void carryOver(String sourceKey, String targetKey, double weight) { return result; }); } + + private void setTTLOnlyOnce(String key) { + Boolean exists = redisTemplate.hasKey(key); + if (Boolean.FALSE.equals(exists)) { + redisTemplate.expire(key, TTL); + } + } } From 9f4f62a6d498fe5b328bda4609f698424e957435 Mon Sep 17 00:00:00 2001 From: junoade Date: Fri, 26 Dec 2025 12:57:52 +0900 Subject: [PATCH 6/6] =?UTF-8?q?feat:=20=EB=9E=AD=ED=82=B9=20=EC=8A=A4?= =?UTF-8?q?=EC=BD=94=EC=96=B4=EB=B3=84=20=EC=83=81=ED=92=88=EC=83=81?= =?UTF-8?q?=EC=84=B8=EC=A0=95=EB=B3=B4=20=EC=A1=B0=ED=9A=8C=20=EB=B0=8F=20?= =?UTF-8?q?=EC=83=81=ED=92=88id=EC=97=90=20=EB=8C=80=ED=95=9C=20=EA=B8=88?= =?UTF-8?q?=EC=9D=BC=20=EC=9D=BC=EB=B3=84=20=EB=9E=AD=ED=82=B9=EC=A0=90?= =?UTF-8?q?=EC=88=98=20=EC=A1=B0=ED=9A=8C=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ranking/RankingQueryResponse.java | 17 ++++++ .../ranking/RankingQueryService.java | 59 +++++++++++++++++-- .../api/product/ProductV1Controller.java | 9 ++- .../interfaces/api/product/ProductV1Dto.java | 11 +++- .../api/ranking/RankingV1Controller.java | 4 +- .../ranking/RankingZSetRepository.java | 18 ++++++ 6 files changed, 106 insertions(+), 12 deletions(-) create mode 100644 apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryResponse.java diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryResponse.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryResponse.java new file mode 100644 index 000000000..204ad90a1 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryResponse.java @@ -0,0 +1,17 @@ +package com.loopers.application.ranking; + +import com.loopers.application.product.ProductLikeSummary; +import com.loopers.ranking.RankingEntry; + +import java.time.LocalDate; +import java.util.List; + +public record RankingQueryResponse( + LocalDate date, + List rankingEntries, + List productLikeSummary +) { + public static RankingQueryResponse of(LocalDate date, List rankingEntries, List productLikeSummary) { + return new RankingQueryResponse(date, rankingEntries, productLikeSummary); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryService.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryService.java index 922b13ed4..f881e18ef 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryService.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingQueryService.java @@ -1,35 +1,82 @@ package com.loopers.application.ranking; +import com.loopers.application.like.event.ProductLikeEvent; +import com.loopers.application.product.ProductLikeSummary; +import com.loopers.application.product.ProductQueryService; +import com.loopers.domain.product.ProductSortType; import com.loopers.ranking.DailyRankingResponse; +import com.loopers.ranking.RankingEntry; import com.loopers.ranking.RankingZSetRepository; +import com.loopers.support.error.CoreException; +import com.loopers.support.error.ErrorType; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import java.time.LocalDate; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.OptionalDouble; @Slf4j @Service @RequiredArgsConstructor public class RankingQueryService { private final RankingZSetRepository rankingZSetRepository; + private final ProductQueryService productQueryService; - public DailyRankingResponse getDailyPopularProducts(String date, int size) { - LocalDate target = (hasValidDate(date)) - ? LocalDate.now(ZoneOffset.UTC) - : LocalDate.parse(date, DateTimeFormatter.BASIC_ISO_DATE); + @Transactional(readOnly = true) + public RankingQueryResponse getDailyPopularProducts(String date, int size) { + LocalDate target = initLocalDate(date); int limit = (size <= 0) ? 20 : Math.min(size, 100); - return new DailyRankingResponse( + List rankingEntries = rankingZSetRepository.findTopDailyAllByLimit(target, limit); + List productLikeSummaries = findProductSummaryFrom(rankingEntries); + + return new RankingQueryResponse( target, - rankingZSetRepository.findTopDailyAllByLimit(target, limit) + rankingEntries, + productLikeSummaries ); } + @Transactional(readOnly = true) + public OptionalDouble getDailyRankingScore(Long productId) { + LocalDate now = LocalDate.now(ZoneId.systemDefault()); + return rankingZSetRepository.findDailyRanking(now, productId); + } + private boolean hasValidDate(String date) { return date == null || date.isBlank(); } + + private LocalDate initLocalDate(String date) { + return (hasValidDate(date)) + ? LocalDate.now(ZoneId.systemDefault()) + : LocalDate.parse(date, DateTimeFormatter.BASIC_ISO_DATE); + + } + + private List findProductSummaryFrom(List rankingEntries) { + List result = new ArrayList<>(); + + for(RankingEntry rankingEntry : rankingEntries) { + ProductLikeSummary summary; + try { + summary = productQueryService.getProductLikeSummary(rankingEntry.productId()); + } catch (CoreException e) { + if(e.getErrorType() == ErrorType.NOT_FOUND) { + log.error("Could not find product like summary for {}", rankingEntry.productId()); + } + summary = null; + } + result.add(summary); + } + return result; + } } diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Controller.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Controller.java index d1a77b0c2..c766297f2 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Controller.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Controller.java @@ -2,7 +2,9 @@ import com.loopers.application.product.ProductLikeSummary; import com.loopers.application.product.ProductQueryService; +import com.loopers.application.ranking.RankingQueryService; import com.loopers.domain.product.ProductSortType; +import com.loopers.ranking.RankingEntry; import com.loopers.support.tracking.general.UserActionType; import com.loopers.interfaces.api.ApiResponse; import com.loopers.support.tracking.annotation.TrackUserAction; @@ -12,11 +14,14 @@ import org.springframework.data.web.PageableDefault; import org.springframework.web.bind.annotation.*; +import java.util.OptionalDouble; + @RequiredArgsConstructor @RestController @RequestMapping("/api/v1/products") public class ProductV1Controller implements ProductV1ApiSpec{ private final ProductQueryService productQueryService; + private final RankingQueryService rankingQueryService; @Override @GetMapping @@ -41,7 +46,7 @@ public ApiResponse> getProd ) public ApiResponse> getProductDetail(@PathVariable("productId") Long productId) { ProductLikeSummary productLikeSummary = productQueryService.getProductLikeSummary(productId); - - return ApiResponse.success(ProductV1Dto.ProductDetailResponse.of(productLikeSummary)); + OptionalDouble rankingEntry = rankingQueryService.getDailyRankingScore(productId); + return ApiResponse.success(ProductV1Dto.ProductDetailResponse.of(productLikeSummary, rankingEntry)); } } diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Dto.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Dto.java index 9bc1a0ffd..bd3c4ed33 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Dto.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Dto.java @@ -1,8 +1,10 @@ package com.loopers.interfaces.api.product; +import com.loopers.ranking.RankingEntry; import org.springframework.data.domain.Page; import java.util.List; +import java.util.OptionalDouble; public class ProductV1Dto { @@ -28,10 +30,15 @@ public static ProductListResponse of(Page page, List content) { } public record ProductDetailResponse( - T content + T content, + OptionalDouble rankingScore ){ static ProductDetailResponse of(T content) { - return new ProductDetailResponse<>(content); + return new ProductDetailResponse<>(content, null); + } + + static ProductDetailResponse of(T content, OptionalDouble rankingScore) { + return new ProductDetailResponse<>(content, rankingScore); } } } diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java index df7c0e6da..abaf729d5 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java @@ -1,7 +1,7 @@ package com.loopers.interfaces.api.ranking; +import com.loopers.application.ranking.RankingQueryResponse; import com.loopers.application.ranking.RankingQueryService; -import com.loopers.ranking.DailyRankingResponse; import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -16,7 +16,7 @@ public class RankingV1Controller { private final RankingQueryService rankingQueryService; @GetMapping - public DailyRankingResponse getDailyRanking( + public RankingQueryResponse getDailyRanking( @RequestParam(required = false, name = "date") String date, @RequestParam(defaultValue = "20", name = "size") int size ) { diff --git a/modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java b/modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java index 6011483c7..a55b15d69 100644 --- a/modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java +++ b/modules/redis/src/main/java/com/loopers/ranking/RankingZSetRepository.java @@ -8,6 +8,7 @@ import java.time.LocalDate; import java.util.ArrayList; import java.util.List; +import java.util.OptionalDouble; import java.util.Set; /** @@ -39,4 +40,21 @@ public List findTopDailyAllByLimit(LocalDate date, int limit) { } return result; } + + /** + * 해당 상품의 순위가 함께 반환 합니다. + * - 순위에 없다면 null + * @param date + * @param productId + * @return + */ + public OptionalDouble findDailyRanking(LocalDate date, Long productId) { + String key = RankingKey.dailyAll(date); + String member = String.valueOf(productId); + Double score = redisTemplate.opsForZSet().score(key, member); + return score == null + ? OptionalDouble.empty() + : OptionalDouble.of(score); + } + }