-
Notifications
You must be signed in to change notification settings - Fork 44
[volume-7] Kafka 이벤트 파이프라인 구축 #281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dame2
Are you sure you want to change the base?
Changes from all commits
04b5ea0
5bd1a20
a8314a7
09c6b1b
424d5fd
8d6983c
27d67d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| package com.loopers.application.event; | ||
|
|
||
| import java.time.ZonedDateTime; | ||
|
|
||
| /** | ||
| * 쿠폰 발급 요청 생성 이벤트. | ||
| * 발급 요청이 생성되었을 때 발행됨. | ||
| */ | ||
| public record CouponIssueRequestCreatedEvent( | ||
| String eventId, | ||
| String requestId, | ||
| Long couponId, | ||
| Long userId, | ||
| ZonedDateTime occurredAt | ||
| ) { | ||
| public static CouponIssueRequestCreatedEvent of( | ||
| String eventId, | ||
| String requestId, | ||
| Long couponId, | ||
| Long userId | ||
| ) { | ||
| return new CouponIssueRequestCreatedEvent(eventId, requestId, couponId, userId, ZonedDateTime.now()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package com.loopers.application.event; | ||
|
|
||
| import java.time.ZonedDateTime; | ||
|
|
||
| /** | ||
| * 좋아요 취소 이벤트. | ||
| * 사용자가 상품 좋아요를 취소했을 때 발행됨. | ||
| */ | ||
| public record LikeCanceledEvent( | ||
| Long likeId, | ||
| Long userId, | ||
| Long productId, | ||
| ZonedDateTime occurredAt | ||
| ) { | ||
| public static LikeCanceledEvent of(Long likeId, Long userId, Long productId) { | ||
| return new LikeCanceledEvent(likeId, userId, productId, ZonedDateTime.now()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package com.loopers.application.event; | ||
|
|
||
| import java.time.ZonedDateTime; | ||
|
|
||
| /** | ||
| * 좋아요 생성 이벤트. | ||
| * 사용자가 상품에 좋아요를 등록했을 때 발행됨. | ||
| */ | ||
| public record LikeCreatedEvent( | ||
| Long likeId, | ||
| Long userId, | ||
| Long productId, | ||
| ZonedDateTime occurredAt | ||
| ) { | ||
| public static LikeCreatedEvent of(Long likeId, Long userId, Long productId) { | ||
| return new LikeCreatedEvent(likeId, userId, productId, ZonedDateTime.now()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| package com.loopers.application.event; | ||
|
|
||
| import java.time.ZonedDateTime; | ||
|
|
||
| /** | ||
| * 주문 완료 이벤트. | ||
| * 주문이 성공적으로 생성되었을 때 발행됨. | ||
| */ | ||
| public record OrderCompletedEvent( | ||
| Long orderId, | ||
| Long userId, | ||
| Long productId, | ||
| Integer quantity, | ||
| Long totalAmount, | ||
| ZonedDateTime occurredAt | ||
| ) { | ||
| public static OrderCompletedEvent of(Long orderId, Long userId, Long productId, Integer quantity, Long totalAmount) { | ||
| return new OrderCompletedEvent(orderId, userId, productId, quantity, totalAmount, ZonedDateTime.now()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| package com.loopers.application.event; | ||
|
|
||
| import java.time.ZonedDateTime; | ||
|
|
||
| /** | ||
| * 상품 조회 이벤트. | ||
| * 사용자가 상품 상세 페이지를 조회했을 때 발행됨. | ||
| */ | ||
| public record ProductViewedEvent( | ||
| Long userId, | ||
| Long productId, | ||
| ZonedDateTime occurredAt | ||
| ) { | ||
| public static ProductViewedEvent of(Long userId, Long productId) { | ||
| return new ProductViewedEvent(userId, productId, ZonedDateTime.now()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| package com.loopers.application.event; | ||
|
|
||
| import com.loopers.domain.useraction.ActionType; | ||
|
|
||
| import java.time.ZonedDateTime; | ||
|
|
||
| /** | ||
| * 유저 행동 이벤트. | ||
| * 사용자의 행동(조회, 클릭, 좋아요, 주문 등)을 기록하기 위해 발행됨. | ||
| */ | ||
| public record UserActionEvent( | ||
| Long userId, | ||
| ActionType actionType, | ||
| Long targetId, | ||
| ZonedDateTime occurredAt | ||
| ) { | ||
| public static UserActionEvent of(Long userId, ActionType actionType, Long targetId) { | ||
| return new UserActionEvent(userId, actionType, targetId, ZonedDateTime.now()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| package com.loopers.application.event.listener; | ||
|
|
||
| import com.loopers.application.event.LikeCanceledEvent; | ||
| import com.loopers.application.event.LikeCreatedEvent; | ||
| import com.loopers.infrastructure.cache.LikeCountCacheService; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.stereotype.Component; | ||
| import org.springframework.transaction.event.TransactionPhase; | ||
| import org.springframework.transaction.event.TransactionalEventListener; | ||
|
|
||
| /** | ||
| * 좋아요 카운트 이벤트 리스너. | ||
| * 좋아요 생성/취소 이벤트 발생 시 Redis 카운터 갱신. | ||
| * Redis 작업만 수행하므로 별도 트랜잭션이 필요 없음. | ||
| */ | ||
| @Slf4j | ||
| @Component | ||
| @RequiredArgsConstructor | ||
| public class LikeCountEventListener { | ||
|
|
||
| private final LikeCountCacheService likeCountCacheService; | ||
|
|
||
| /** | ||
| * 좋아요 생성 시 카운터 증가. | ||
| */ | ||
| @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) | ||
| public void handleLikeCreated(LikeCreatedEvent event) { | ||
| try { | ||
| Long count = likeCountCacheService.increment(event.productId()); | ||
| log.debug("좋아요 카운터 증가: productId={}, count={}", event.productId(), count); | ||
| } catch (Exception e) { | ||
| log.warn("좋아요 카운터 증가 실패 (무시): productId={}", event.productId(), e); | ||
| } | ||
|
Comment on lines
+32
to
+34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 예외를 무시만 하면 캐시-원본 불일치가 장기화될 수 있다 운영 관점에서 현재처럼 예외를 경고 로그만 남기고 종료하면 Redis 일시 장애 시 좋아요 카운트가 지속적으로 틀어질 수 있고, 자동 복구 경로가 없어 수동 정합화 비용이 커진다. 수정안은 실패 이벤트를 재시도 큐(또는 outbox 재발행)로 보내거나, 주기적 리컨실리에이션 작업을 추가해 최종 정합성을 보장하는 방식이다. 추가 테스트는 Redis 예외를 강제로 발생시킨 뒤 재시도/리컨실리에이션으로 카운트가 회복되는지 검증하면 된다. Also applies to: 45-47 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| /** | ||
| * 좋아요 취소 시 카운터 감소. | ||
| */ | ||
| @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) | ||
| public void handleLikeCanceled(LikeCanceledEvent event) { | ||
| try { | ||
| Long count = likeCountCacheService.decrement(event.productId()); | ||
| log.debug("좋아요 카운터 감소: productId={}, count={}", event.productId(), count); | ||
| } catch (Exception e) { | ||
| log.warn("좋아요 카운터 감소 실패 (무시): productId={}", event.productId(), e); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| package com.loopers.application.event.listener; | ||
|
|
||
| import com.loopers.application.event.OrderCompletedEvent; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.stereotype.Component; | ||
| import org.springframework.transaction.event.TransactionPhase; | ||
| import org.springframework.transaction.event.TransactionalEventListener; | ||
|
|
||
| /** | ||
| * 주문 알림 이벤트 리스너. | ||
| * 주문 완료 시 알림 발송 (현재는 로그로 대체). | ||
| */ | ||
| @Slf4j | ||
| @Component | ||
| public class OrderNotificationListener { | ||
|
|
||
| /** | ||
| * 주문 완료 시 알림 발송. | ||
| */ | ||
| @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) | ||
| public void handleOrderCompleted(OrderCompletedEvent event) { | ||
| log.info("주문 완료 알림: orderId={}, userId={}, totalAmount={}", | ||
| event.orderId(), event.userId(), event.totalAmount()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| package com.loopers.application.event.listener; | ||
|
|
||
| import com.fasterxml.jackson.core.JsonProcessingException; | ||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import com.loopers.application.event.CouponIssueRequestCreatedEvent; | ||
| import com.loopers.application.event.LikeCanceledEvent; | ||
| import com.loopers.application.event.LikeCreatedEvent; | ||
| import com.loopers.application.event.OrderCompletedEvent; | ||
| import com.loopers.application.event.ProductViewedEvent; | ||
| import com.loopers.domain.outbox.OutboxEvent; | ||
| import com.loopers.domain.outbox.OutboxEventRepository; | ||
| import com.loopers.event.AggregateType; | ||
| import com.loopers.event.EventType; | ||
| import com.loopers.event.KafkaTopics; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.stereotype.Component; | ||
| import org.springframework.transaction.event.TransactionPhase; | ||
| import org.springframework.transaction.event.TransactionalEventListener; | ||
|
|
||
| /** | ||
| * Transactional Outbox Pattern을 위한 이벤트 리스너. | ||
| * 트랜잭션 커밋 전에 Outbox 테이블에 이벤트를 저장. | ||
| */ | ||
| @Slf4j | ||
| @Component | ||
| @RequiredArgsConstructor | ||
| public class OutboxEventListener { | ||
|
|
||
| private final OutboxEventRepository outboxEventRepository; | ||
| private final ObjectMapper objectMapper; | ||
|
|
||
| @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) | ||
| public void handleLikeCreatedEvent(LikeCreatedEvent event) { | ||
| saveOutboxEvent( | ||
| EventType.LIKE_CREATED, | ||
| AggregateType.PRODUCT, | ||
| String.valueOf(event.productId()), | ||
| KafkaTopics.CATALOG_EVENTS, | ||
| event | ||
| ); | ||
| } | ||
|
|
||
| @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) | ||
| public void handleLikeCanceledEvent(LikeCanceledEvent event) { | ||
| saveOutboxEvent( | ||
| EventType.LIKE_CANCELED, | ||
| AggregateType.PRODUCT, | ||
| String.valueOf(event.productId()), | ||
| KafkaTopics.CATALOG_EVENTS, | ||
| event | ||
| ); | ||
| } | ||
|
|
||
| @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) | ||
| public void handleOrderCompletedEvent(OrderCompletedEvent event) { | ||
| saveOutboxEvent( | ||
| EventType.ORDER_COMPLETED, | ||
| AggregateType.ORDER, | ||
| String.valueOf(event.orderId()), | ||
| KafkaTopics.ORDER_EVENTS, | ||
| event | ||
| ); | ||
| } | ||
|
|
||
| @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) | ||
| public void handleProductViewedEvent(ProductViewedEvent event) { | ||
| saveOutboxEvent( | ||
| EventType.PRODUCT_VIEWED, | ||
| AggregateType.PRODUCT, | ||
| String.valueOf(event.productId()), | ||
| KafkaTopics.CATALOG_EVENTS, | ||
| event | ||
| ); | ||
| } | ||
|
|
||
| @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) | ||
| public void handleCouponIssueRequestCreatedEvent(CouponIssueRequestCreatedEvent event) { | ||
| saveOutboxEvent( | ||
| EventType.COUPON_ISSUE_REQUESTED, | ||
| AggregateType.COUPON, | ||
| String.valueOf(event.couponId()), | ||
| KafkaTopics.COUPON_ISSUE_REQUESTS, | ||
| event | ||
| ); | ||
| } | ||
|
|
||
| private void saveOutboxEvent( | ||
| EventType eventType, | ||
| AggregateType aggregateType, | ||
| String aggregateId, | ||
| String topic, | ||
| Object event | ||
| ) { | ||
| try { | ||
| String payload = objectMapper.writeValueAsString(event); | ||
| OutboxEvent outboxEvent = OutboxEvent.create( | ||
| eventType.name(), | ||
| aggregateType.name(), | ||
| aggregateId, | ||
| topic, | ||
| payload | ||
| ); | ||
| outboxEventRepository.save(outboxEvent); | ||
| log.debug("Outbox event saved: type={}, aggregateId={}", eventType, aggregateId); | ||
| } catch (JsonProcessingException e) { | ||
| log.error("Failed to serialize event: {}", event, e); | ||
| throw new RuntimeException("Failed to serialize event for outbox", e); | ||
|
Comment on lines
+95
to
+108
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 직렬화 실패를 일반 RuntimeException과 전체 이벤트 로깅으로 처리하면 응답과 로그가 불안정하다 Line 95-108은 요청 트랜잭션 안에서 실행되므로, 여기서 🤖 Prompt for AI Agents |
||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| package com.loopers.application.event.listener; | ||
|
|
||
| import com.loopers.application.event.UserActionEvent; | ||
| import com.loopers.domain.useraction.UserActionLog; | ||
| import com.loopers.domain.useraction.UserActionLogRepository; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.scheduling.annotation.Async; | ||
| import org.springframework.stereotype.Component; | ||
| import org.springframework.transaction.annotation.Propagation; | ||
| import org.springframework.transaction.annotation.Transactional; | ||
| import org.springframework.transaction.event.TransactionPhase; | ||
| import org.springframework.transaction.event.TransactionalEventListener; | ||
|
|
||
| /** | ||
| * 사용자 행동 로그 이벤트 리스너. | ||
| * 사용자 행동 이벤트 발생 시 비동기로 로그 저장. | ||
| * @Async로 비동기 실행되므로 메인 트랜잭션과 커넥션 경쟁 없음. | ||
| */ | ||
| @Slf4j | ||
| @Component | ||
| @RequiredArgsConstructor | ||
| public class UserActionLogListener { | ||
|
|
||
| private final UserActionLogRepository userActionLogRepository; | ||
|
|
||
| /** | ||
| * 사용자 행동 로그 저장. | ||
| * @TransactionalEventListener와 함께 사용 시 REQUIRES_NEW 필수. | ||
| */ | ||
| @Async("eventExecutor") | ||
| @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) | ||
| @Transactional(propagation = Propagation.REQUIRES_NEW) | ||
| public void handleUserAction(UserActionEvent event) { | ||
| try { | ||
| UserActionLog actionLog = UserActionLog.create( | ||
| event.userId(), | ||
| event.actionType(), | ||
| event.targetId(), | ||
| event.occurredAt() | ||
| ); | ||
| userActionLogRepository.save(actionLog); | ||
| log.debug("사용자 행동 로그 저장: userId={}, actionType={}, targetId={}", | ||
| event.userId(), event.actionType(), event.targetId()); | ||
| } catch (Exception e) { | ||
| log.warn("사용자 행동 로그 저장 실패 (무시): userId={}, actionType={}", | ||
| event.userId(), event.actionType(), e); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 8871
리스너에
@Async를명시하지 않아 Redis I/O가 동기 실행되는 문제애플리케이션에는
@EnableAsync와eventExecutor빈이 설정되어 있지만,LikeCountEventListener의 핸들러가@Async어노테이션을 누락했다. 따라서@TransactionalEventListener(AFTER_COMMIT)만으로는 비동기 실행이 보장되지 않으며, Redis I/O가 동기로 실행되어 트랜잭션 커밋 직후에도 API 응답 지연과 스레드 점유가 증가한다. 같은 이벤트 리스너 구조를 사용하는UserActionLogListener는@Async("eventExecutor")를 명시하고 있으므로 이를 따르도록 수정해야 한다.수정안: 두 핸들러 메서드(
handleLikeCreated,handleLikeCanceled)에@Async("eventExecutor")어노테이션을 추가한다. 추가 테스트는 Redis 응답 지연을 주입했을 때 API 응답 시간이 리스너 처리 시간에 묶이지 않는지 검증하면 된다.🤖 Prompt for AI Agents