-
Notifications
You must be signed in to change notification settings - Fork 44
[volume-7] 이벤트 기반 아키텍처 및 Kafka 파이프라인 구현 #280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1f4a0c2
3c9a194
9bf13df
bc8c08c
ba51f0c
eee6bfa
d2beb8f
f2081b8
f190694
f53fa47
a29ec87
1ab241c
91f4347
b1e0f27
e28a027
1bc9b92
8b77efd
6da80b6
bb28fd0
799e35b
5546dbe
aaff975
e47c35e
7cfabf9
14dcaab
7059b6f
3b954ad
7b07682
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package com.loopers.application; | ||
|
|
||
| import com.fasterxml.jackson.core.JsonProcessingException; | ||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
|
||
| public class OutboxEventHelper { | ||
|
|
||
| private OutboxEventHelper() { | ||
| } | ||
|
|
||
| public static String toJson(ObjectMapper objectMapper, Object value) { | ||
| try { | ||
| return objectMapper.writeValueAsString(value); | ||
| } catch (JsonProcessingException e) { | ||
| throw new RuntimeException("Outbox payload 직렬화 실패", e); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| package com.loopers.application.coupon; | ||
|
|
||
| import com.loopers.domain.coupon.CouponIssueRequest; | ||
| import com.loopers.domain.coupon.CouponIssueRequestStatus; | ||
|
|
||
| public record CouponIssueRequestInfo( | ||
| Long requestId, | ||
| Long templateId, | ||
| CouponIssueRequestStatus status, | ||
| String failReason | ||
| ) { | ||
| public static CouponIssueRequestInfo from(CouponIssueRequest request) { | ||
| return new CouponIssueRequestInfo( | ||
| request.getId(), | ||
| request.getTemplateId(), | ||
| request.getStatus(), | ||
| request.getFailReason() | ||
| ); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ public record CreateCouponTemplateCommand( | |
| DiscountType discountType, | ||
| Integer discountValue, | ||
| Integer minOrderAmount, | ||
| ZonedDateTime expiredAt | ||
| ZonedDateTime expiredAt, | ||
| Integer maxIssuanceCount | ||
| ) { | ||
|
Comment on lines
+11
to
13
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Description: CouponTemplate 도메인에서 maxIssuanceCount 검증 로직 확인
ast-grep --pattern $'public static CouponTemplate create($$$) {
$$$
}'
rg -n "maxIssuanceCount" --type=java -g '*CouponTemplate.java' -C5Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java Length of output: 7614
도메인 클래스
이 검증을 🤖 Prompt for AI Agents |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| package com.loopers.application.like; | ||
|
|
||
| import com.loopers.domain.like.LikedEvent; | ||
| import com.loopers.domain.like.UnlikedEvent; | ||
| import com.loopers.domain.product.ProductService; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.stereotype.Component; | ||
| import org.springframework.transaction.annotation.Propagation; | ||
| import org.springframework.transaction.annotation.Transactional; | ||
| import org.springframework.transaction.event.TransactionPhase; | ||
| import org.springframework.transaction.event.TransactionalEventListener; | ||
|
|
||
| @Slf4j | ||
| @Component | ||
| @RequiredArgsConstructor | ||
| public class LikeEventListener { | ||
|
|
||
| private final ProductService productService; | ||
|
|
||
| @Transactional(propagation = Propagation.REQUIRES_NEW) | ||
| @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) | ||
| public void handleLiked(LikedEvent event) { | ||
| log.info("좋아요 이벤트 수신. productId={}", event.getProductId()); | ||
| productService.increaseLikeCount(event.getProductId()); | ||
| } | ||
|
|
||
| @Transactional(propagation = Propagation.REQUIRES_NEW) | ||
| @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) | ||
| public void handleUnliked(UnlikedEvent event) { | ||
| log.info("좋아요 취소 이벤트 수신. productId={}", event.getProductId()); | ||
| productService.decreaseLikeCount(event.getProductId()); | ||
| } | ||
|
Comment on lines
+21
to
+33
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 리스너 실패 시 복구 메커니즘이 없다.
운영 관점에서 고려할 사항:
해결 방안:
🛡️ 에러 핸들링 추가 예시+import lombok.extern.slf4j.Slf4j;
+
`@Slf4j`
`@Component`
`@RequiredArgsConstructor`
public class LikeEventListener {
private final ProductService productService;
`@Transactional`(propagation = Propagation.REQUIRES_NEW)
`@TransactionalEventListener`(phase = TransactionPhase.AFTER_COMMIT)
public void handleLiked(LikedEvent event) {
- log.info("좋아요 이벤트 수신. productId={}", event.getProductId());
- productService.increaseLikeCount(event.getProductId());
+ try {
+ log.info("좋아요 이벤트 수신. productId={}", event.getProductId());
+ productService.increaseLikeCount(event.getProductId());
+ } catch (Exception e) {
+ log.error("좋아요 카운트 증가 실패. productId={}", event.getProductId(), e);
+ // TODO: 재시도 큐 저장 또는 알림 발송
+ }
}🤖 Prompt for AI Agents |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,17 @@ | ||
| package com.loopers.application.like; | ||
|
|
||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import com.loopers.application.OutboxEventHelper; | ||
| import com.loopers.domain.like.LikeAction; | ||
| import com.loopers.domain.like.LikedEvent; | ||
| import com.loopers.domain.like.LikeService; | ||
| import com.loopers.domain.product.ProductService; | ||
| import com.loopers.domain.like.UnlikedEvent; | ||
| import com.loopers.domain.outbox.OutboxEvent; | ||
| import com.loopers.domain.outbox.OutboxEventRepository; | ||
| import java.time.ZonedDateTime; | ||
| import java.util.Map; | ||
| import lombok.RequiredArgsConstructor; | ||
| import org.springframework.context.ApplicationEventPublisher; | ||
| import org.springframework.stereotype.Service; | ||
| import org.springframework.transaction.annotation.Transactional; | ||
|
|
||
|
|
@@ -12,15 +21,30 @@ | |
| public class LikeFacade { | ||
|
|
||
| private final LikeService likeService; | ||
| private final ProductService productService; | ||
| private final ApplicationEventPublisher eventPublisher; | ||
| private final OutboxEventRepository outboxEventRepository; | ||
| private final ObjectMapper objectMapper; | ||
|
|
||
| public void toggleLike(Long productId, Long userId) { | ||
| public LikeAction toggleLike(Long productId, Long userId) { | ||
| if (likeService.isLiked(productId, userId)) { | ||
| likeService.unlike(productId, userId); | ||
| productService.decreaseLikeCount(productId); | ||
| eventPublisher.publishEvent(new UnlikedEvent(productId)); | ||
| outboxEventRepository.save(OutboxEvent.create( | ||
| "catalog-events", | ||
| OutboxEventHelper.toJson(objectMapper, Map.of("type", "UNLIKED", "productId", productId, "occurredAt", ZonedDateTime.now().toString())), | ||
| String.valueOf(productId) | ||
| )); | ||
| return LikeAction.UNLIKED; | ||
| } else { | ||
| likeService.like(productId, userId); | ||
| productService.increaseLikeCount(productId); | ||
| eventPublisher.publishEvent(new LikedEvent(productId)); | ||
| outboxEventRepository.save(OutboxEvent.create( | ||
| "catalog-events", | ||
| OutboxEventHelper.toJson(objectMapper, Map.of("type", "LIKED", "productId", productId, "occurredAt", ZonedDateTime.now().toString())), | ||
| String.valueOf(productId) | ||
| )); | ||
| return LikeAction.LIKED; | ||
| } | ||
| } | ||
|
Comment on lines
+28
to
48
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 좋아요 카운트 업데이트의 이중 경로가 데이터 불일치를 유발할 수 있다. 현재 구조에서 좋아요 카운트 업데이트가 두 경로로 발생한다:
이후 Kafka consumer가 같은 이벤트를 처리하면 카운트가 한 번만 증가/감소해야 하는데, 리스너 재시도 시 중복 처리될 수 있다. in-process 리스너와 Kafka consumer 중 하나의 경로만 사용하거나, 멱등성 보장 로직이 필요하다. |
||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| package com.loopers.application.logging; | ||
|
|
||
| import com.loopers.domain.like.ProductLikedEvent; | ||
| import com.loopers.domain.like.ProductUnlikedEvent; | ||
| import com.loopers.domain.order.OrderRequestedEvent; | ||
| import com.loopers.domain.product.ProductViewedEvent; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.context.event.EventListener; | ||
| import org.springframework.stereotype.Component; | ||
|
|
||
| @Slf4j | ||
| @Component | ||
| public class UserActionLogListener { | ||
|
|
||
| @EventListener | ||
| public void handleProductViewed(ProductViewedEvent event) { | ||
| log.info("상품 조회. productId={}", event.getProductId()); | ||
| } | ||
|
|
||
| @EventListener | ||
| public void handleProductLiked(ProductLikedEvent event) { | ||
| log.info("좋아요. productId={}, userId={}", event.getProductId(), event.getUserId()); | ||
| } | ||
|
|
||
| @EventListener | ||
| public void handleProductUnliked(ProductUnlikedEvent event) { | ||
| log.info("좋아요 취소. productId={}, userId={}", event.getProductId(), event.getUserId()); | ||
| } | ||
|
|
||
| @EventListener | ||
| public void handleOrderRequested(OrderRequestedEvent event) { | ||
| log.info("주문 요청. userId={}", event.getUserId()); | ||
|
Comment on lines
+21
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 원문 운영 관점에서 사용자 식별자가 일반 로그 파이프라인에 그대로 남으면 접근 통제, 보관 기간, 파기 범위가 불필요하게 커진다. As per coding guidelines 🤖 Prompt for AI Agents |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| package com.loopers.application.order; | ||
|
|
||
| import com.loopers.domain.coupon.CouponTemplate; | ||
| import com.loopers.domain.coupon.CouponService; | ||
| import com.loopers.domain.coupon.CouponTemplateService; | ||
| import com.loopers.domain.order.OrderConfirmedEvent; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.beans.factory.annotation.Value; | ||
| import org.springframework.stereotype.Component; | ||
| import org.springframework.transaction.annotation.Propagation; | ||
| import org.springframework.transaction.annotation.Transactional; | ||
| import org.springframework.transaction.event.TransactionPhase; | ||
| import org.springframework.transaction.event.TransactionalEventListener; | ||
|
|
||
| @Slf4j | ||
| @Component | ||
| @RequiredArgsConstructor | ||
| public class OrderConfirmedEventListener { | ||
|
|
||
| private final CouponTemplateService couponTemplateService; | ||
| private final CouponService couponService; | ||
|
|
||
| @Value("${app.order-confirmed-coupon-template-id:1}") | ||
| private Long couponTemplateId; | ||
|
|
||
| @Transactional(propagation = Propagation.REQUIRES_NEW) | ||
| @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) | ||
| public void handle(OrderConfirmedEvent event) { | ||
| log.info("주문 확정 이벤트 수신. orderId={}, userId={}", event.getOrderId(), event.getUserId()); | ||
|
|
||
| CouponTemplate template = couponTemplateService.getById(couponTemplateId); | ||
| couponService.issue(event.getUserId(), template); | ||
|
|
||
| log.info("쿠폰 발급 완료. userId={}, templateId={}", event.getUserId(), couponTemplateId); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| package com.loopers.application.outbox; | ||
|
|
||
| import com.loopers.domain.outbox.OutboxEvent; | ||
| import com.loopers.domain.outbox.OutboxEventRepository; | ||
| import java.util.List; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.kafka.core.KafkaTemplate; | ||
| import org.springframework.scheduling.annotation.Scheduled; | ||
| import org.springframework.stereotype.Component; | ||
|
|
||
| @Slf4j | ||
| @Component | ||
| @RequiredArgsConstructor | ||
| public class OutboxRelayScheduler { | ||
|
|
||
| private final OutboxEventRepository outboxEventRepository; | ||
| private final KafkaTemplate<Object, Object> kafkaTemplate; | ||
|
|
||
| @Scheduled(fixedDelay = 5000) | ||
| public void relay() { | ||
| List<OutboxEvent> unpublished = outboxEventRepository.findUnpublished(); | ||
| for (OutboxEvent event : unpublished) { | ||
| try { | ||
| kafkaTemplate.send(event.getEventType(), event.getPartitionKey(), event.getPayload()); | ||
| outboxEventRepository.markPublished(event.getId()); | ||
|
Comment on lines
+24
to
+26
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
제안: 전송 완료 대기 후 markPublished 호출-kafkaTemplate.send(event.getEventType(), event.getPartitionKey(), event.getPayload());
-outboxEventRepository.markPublished(event.getId());
+kafkaTemplate.send(event.getEventType(), event.getPartitionKey(), event.getPayload())
+ .get(10, TimeUnit.SECONDS); // 동기 대기
+outboxEventRepository.markPublished(event.getId());추가 테스트: Kafka 브로커 장애 상황에서 이벤트가 재발행 대상으로 유지되는지 검증 필요하다. 🤖 Prompt for AI Agents |
||
| } catch (Exception e) { | ||
| log.error("Failed to publish outbox event. id={}, eventType={}", event.getId(), event.getEventType(), e); | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+20
to
+31
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# OutboxEventJpaRepository에서 `@Transactional` 및 Lock 어노테이션 확인
echo "=== Checking OutboxEventJpaRepository for transaction and lock annotations ==="
ast-grep --pattern '@Modifying
`@Query`($_)
void markPublished($$$)'
echo ""
echo "=== Checking findByPublishedAtIsNull for locking ==="
rg -n -A3 "findByPublishedAtIsNull|findUnpublished" --type=javaRepository: Loopers-dev-lab/loop-pack-be-l2-vol3-java Length of output: 2529 🏁 Script executed: # Check OutboxEventJpaRepository for all annotations and method signatures
echo "=== Full OutboxEventJpaRepository.java ==="
find . -name "OutboxEventJpaRepository.java" -type f -exec cat -n {} \;
echo ""
echo "=== Full OutboxEventRepositoryImpl.java ==="
find . -name "OutboxEventRepositoryImpl.java" -type f -exec cat -n {} \;
echo ""
echo "=== Full OutboxRelayScheduler.java with class-level annotations ==="
find . -name "OutboxRelayScheduler.java" -type f -exec head -50 {} \;Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java Length of output: 3454 다중 인스턴스 환경에서 트랜잭션 누락 및 중복 메시지 발행 문제가 발생한다.
수정 방안:
동시 스케줄러 실행 환경에서 중복 메시지가 발행되지 않는지 확인하는 통합 테스트를 추가해야 한다. 🤖 Prompt for AI Agents |
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.