Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1f4a0c2
feat : 결제시 주문 완료 이벤트 발행 코드 추가
Mar 23, 2026
3c9a194
feat : 주문 완료 이벤트 리스너 추가
Mar 23, 2026
9bf13df
docs : md 업데이트
Mar 23, 2026
bc8c08c
feat : 좋아요 eventual consistency 적용
Mar 23, 2026
ba51f0c
feat : 유저 행동에 대한 로깅을 이벤트로 처리
Mar 23, 2026
eee6bfa
docs : md 업데이트
Mar 23, 2026
d2beb8f
docs : md 업데이트
Mar 24, 2026
f2081b8
chore : 카프카 설정 추가
Mar 24, 2026
f190694
feat : OutboxEvent 도메인 & 엔티티 & repo 추가
Mar 24, 2026
f53fa47
docs : md 업데이트
Mar 24, 2026
a29ec87
feat : Outbox 저장 로직 구현
Mar 24, 2026
1ab241c
docs : md 업데이트
Mar 24, 2026
91f4347
docs : md 업데이트
Mar 25, 2026
b1e0f27
feat : 스케줄러를 사용해서 kafka에 메시지 발행
Mar 25, 2026
e28a027
feat : product_metrics 도메인 및 jpa 엔티티 추가
Mar 25, 2026
1bc9b92
docs : md 업데이트
Mar 25, 2026
8b77efd
feat : 멱등 처리를 위한 event_handled 테이블과 레포지토리 추가
Mar 25, 2026
6da80b6
docs : md 업데이트
Mar 25, 2026
bb28fd0
chore : kafka 모듈 사용을 위한 import
Mar 25, 2026
799e35b
feat : ProductMetrics Consumer 구현
Mar 25, 2026
5546dbe
docs : md 업데이트
Mar 25, 2026
aaff975
feat : CouponTemplate에 선착순 수량 제한 필드 추가
Mar 27, 2026
e47c35e
feat: 선착순 쿠폰 발급 요청/조회 API 구현
Mar 27, 2026
7cfabf9
feat : 선착순 쿠폰 발급 Consumer 구현
Mar 27, 2026
14dcaab
refactor: CouponFacade Kafka 발행을 Outbox Pattern으로 교체
Mar 27, 2026
7059b6f
refactor: Kafka 토픽 설계를 도메인별 묶음 토픽으로 개선
Mar 27, 2026
3b954ad
feat: occurredAt 기반 stale 이벤트 필터링 구현
Mar 27, 2026
7b07682
docs: md 업데이트
Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/commerce-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
// add-ons
implementation(project(":modules:jpa"))
implementation(project(":modules:redis"))
implementation(project(":modules:kafka"))
implementation(project(":supports:jackson"))
implementation(project(":supports:logging"))
implementation(project(":supports:monitoring"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.loopers.application;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class OutboxEventHelper {

private OutboxEventHelper() {
}

public static String toJson(ObjectMapper objectMapper, Object value) {
try {
return objectMapper.writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new RuntimeException("Outbox payload 직렬화 실패", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package com.loopers.application.coupon;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.loopers.application.OutboxEventHelper;
import com.loopers.domain.coupon.Coupon;
import com.loopers.domain.coupon.CouponIssueRequest;
import com.loopers.domain.coupon.CouponIssueRequestRepository;
import com.loopers.domain.coupon.CouponService;
import com.loopers.domain.coupon.CouponTemplate;
import com.loopers.domain.coupon.CouponTemplateService;
import com.loopers.domain.outbox.OutboxEvent;
import com.loopers.domain.outbox.OutboxEventRepository;
import com.loopers.support.error.CoreException;
import com.loopers.support.error.ErrorMessage;
import com.loopers.support.error.ErrorType;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -15,11 +25,14 @@ public class CouponFacade {

private final CouponTemplateService couponTemplateService;
private final CouponService couponService;
private final CouponIssueRequestRepository couponIssueRequestRepository;
private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;

@Transactional
public CouponTemplateInfo createCouponTemplate(CreateCouponTemplateCommand command) {
CouponTemplate template = couponTemplateService.create(
command.name(), command.discountType(), command.discountValue(), command.minOrderAmount(), command.expiredAt()
command.name(), command.discountType(), command.discountValue(), command.minOrderAmount(), command.expiredAt(), command.maxIssuanceCount()
);
return CouponTemplateInfo.from(template);
}
Expand Down Expand Up @@ -63,4 +76,36 @@ public List<CouponInfo> getMyCoupons(Long userId) {
.map(CouponInfo::from)
.toList();
}

@Transactional
public CouponIssueRequestInfo requestCouponIssue(Long userId, Long templateId) {
couponTemplateService.getById(templateId); // 템플릿 존재 여부 확인

CouponIssueRequest request = CouponIssueRequest.create(userId, templateId);
CouponIssueRequest saved = couponIssueRequestRepository.save(request);

outboxEventRepository.save(OutboxEvent.create(
"coupon-issue-requests",
OutboxEventHelper.toJson(objectMapper, Map.of(
"requestId", saved.getId(),
"userId", userId,
"templateId", templateId
)),
String.valueOf(templateId)
));

return CouponIssueRequestInfo.from(saved);
}

@Transactional(readOnly = true)
public CouponIssueRequestInfo getCouponIssueRequest(Long userId, Long requestId) {
CouponIssueRequest request = couponIssueRequestRepository.findById(requestId)
.orElseThrow(() -> new CoreException(ErrorType.NOT_FOUND, ErrorMessage.Coupon.COUPON_ISSUE_REQUEST_NOT_FOUND));

if (!request.getUserId().equals(userId)) {
throw new CoreException(ErrorType.NOT_FOUND, ErrorMessage.Coupon.COUPON_ISSUE_REQUEST_NOT_FOUND);
}

return CouponIssueRequestInfo.from(request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.loopers.application.coupon;

import com.loopers.domain.coupon.CouponIssueRequest;
import com.loopers.domain.coupon.CouponIssueRequestStatus;

public record CouponIssueRequestInfo(
Long requestId,
Long templateId,
CouponIssueRequestStatus status,
String failReason
) {
public static CouponIssueRequestInfo from(CouponIssueRequest request) {
return new CouponIssueRequestInfo(
request.getId(),
request.getTemplateId(),
request.getStatus(),
request.getFailReason()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ public record CouponTemplateInfo(
DiscountType discountType,
Integer discountValue,
Integer minOrderAmount,
ZonedDateTime expiredAt
ZonedDateTime expiredAt,
Integer maxIssuanceCount,
Integer issuedCount
) {
public static CouponTemplateInfo from(CouponTemplate template) {
return new CouponTemplateInfo(
Expand All @@ -19,7 +21,9 @@ public static CouponTemplateInfo from(CouponTemplate template) {
template.getType(),
template.getDiscountValue(),
template.getMinOrderAmount().value(),
template.getExpiredAt()
template.getExpiredAt(),
template.getMaxIssuanceCount(),
template.getIssuedCount()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public record CreateCouponTemplateCommand(
DiscountType discountType,
Integer discountValue,
Integer minOrderAmount,
ZonedDateTime expiredAt
ZonedDateTime expiredAt,
Integer maxIssuanceCount
) {
Comment on lines +11 to 13
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: CouponTemplate 도메인에서 maxIssuanceCount 검증 로직 확인

ast-grep --pattern $'public static CouponTemplate create($$$) {
  $$$
}'

rg -n "maxIssuanceCount" --type=java -g '*CouponTemplate.java' -C5

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 7614


maxIssuanceCount 필드에 대한 도메인 검증이 누락되었다.

도메인 클래스 CouponTemplate.create() 메서드에서 discountTypeexpiredAtvalidateDiscountType(), validateExpiredAt() 메서드로 검증되지만, maxIssuanceCount는 검증 로직이 없이 직접 전달된다. maxIssuanceCount가 null인 경우(무제한)는 허용되더라도, 0 또는 음수 값에 대한 방어가 필요하다.

validateMaxIssuanceCount() 메서드를 추가하여 다음을 검증해야 한다:

  • null이 아닌 경우, 1 이상의 값만 허용
  • 0 또는 음수가 입력될 경우 적절한 예외 발생

이 검증을 create() 메서드에서 호출하고, 추가 테스트로 유효하지 않은 값 입력 시 예외 발생을 확인해야 한다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/application/coupon/CreateCouponTemplateCommand.java`
around lines 11 - 13, The CouponTemplate.create() flow is missing validation for
maxIssuanceCount; add a validateMaxIssuanceCount(Integer maxIssuanceCount)
method and call it from CouponTemplate.create() alongside validateDiscountType()
and validateExpiredAt(); the validator should allow null (meaning unlimited) but
throw an IllegalArgumentException (or the project’s domain validation exception)
if maxIssuanceCount is non-null and < 1, and include a clear message;
update/create unit tests to assert that null is accepted and that 0 or negative
values cause the expected exception.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.loopers.application.like;

import com.loopers.domain.like.LikedEvent;
import com.loopers.domain.like.UnlikedEvent;
import com.loopers.domain.product.ProductService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

@Slf4j
@Component
@RequiredArgsConstructor
public class LikeEventListener {

private final ProductService productService;

@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleLiked(LikedEvent event) {
log.info("좋아요 이벤트 수신. productId={}", event.getProductId());
productService.increaseLikeCount(event.getProductId());
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleUnliked(UnlikedEvent event) {
log.info("좋아요 취소 이벤트 수신. productId={}", event.getProductId());
productService.decreaseLikeCount(event.getProductId());
}
Comment on lines +21 to +33
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

리스너 실패 시 복구 메커니즘이 없다.

AFTER_COMMIT 단계에서 실행되므로 이 리스너가 실패해도 메인 트랜잭션(like 테이블 변경)은 이미 커밋된 상태다. 실패한 카운트 업데이트는 별도 재시도 없이 유실된다.

운영 관점에서 고려할 사항:

  1. ProductService.increaseLikeCount()가 예외를 던지면 로그만 남고 카운트 불일치 발생
  2. 재시도 로직이 없어 일시적 DB 장애 시 데이터 정합성 깨짐
  3. 모니터링/알림 없이 silent failure 발생

해결 방안:

  • 방안 1: try-catch로 감싸고 실패 시 별도 재시도 큐에 저장
  • 방안 2: 이 리스너를 제거하고 Kafka consumer(ProductMetricsProcessor)에서만 카운트 업데이트 (단일 경로)
  • 방안 3: @Retryable 적용하여 일시적 실패 재시도
🛡️ 에러 핸들링 추가 예시
+import lombok.extern.slf4j.Slf4j;
+
 `@Slf4j`
 `@Component`
 `@RequiredArgsConstructor`
 public class LikeEventListener {
 
     private final ProductService productService;
 
     `@Transactional`(propagation = Propagation.REQUIRES_NEW)
     `@TransactionalEventListener`(phase = TransactionPhase.AFTER_COMMIT)
     public void handleLiked(LikedEvent event) {
-        log.info("좋아요 이벤트 수신. productId={}", event.getProductId());
-        productService.increaseLikeCount(event.getProductId());
+        try {
+            log.info("좋아요 이벤트 수신. productId={}", event.getProductId());
+            productService.increaseLikeCount(event.getProductId());
+        } catch (Exception e) {
+            log.error("좋아요 카운트 증가 실패. productId={}", event.getProductId(), e);
+            // TODO: 재시도 큐 저장 또는 알림 발송
+        }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/application/like/LikeEventListener.java`
around lines 21 - 33, The AFTER_COMMIT listeners handleLiked and handleUnliked
call productService.increaseLikeCount/decreaseLikeCount with no recovery, so
failures after the main transaction cause silent loss; wrap each handler in
resilient error-handling: catch exceptions around productService calls, log them
with context, and on failure either enqueue a retry task/message (persist
minimal retry record or push to a retry queue) or annotate the handler with a
retry mechanism (e.g., `@Retryable`) so transient DB errors are retried;
alternatively, remove these listeners and consolidate count updates into the
existing ProductMetricsProcessor Kafka consumer to ensure a single, reliable
update path—apply one of these fixes to handleLiked and handleUnliked and add
monitoring/logging for retries.

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
package com.loopers.application.like;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.loopers.application.OutboxEventHelper;
import com.loopers.domain.like.LikeAction;
import com.loopers.domain.like.LikedEvent;
import com.loopers.domain.like.LikeService;
import com.loopers.domain.product.ProductService;
import com.loopers.domain.like.UnlikedEvent;
import com.loopers.domain.outbox.OutboxEvent;
import com.loopers.domain.outbox.OutboxEventRepository;
import java.time.ZonedDateTime;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -12,15 +21,30 @@
public class LikeFacade {

private final LikeService likeService;
private final ProductService productService;
private final ApplicationEventPublisher eventPublisher;
private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;

public void toggleLike(Long productId, Long userId) {
public LikeAction toggleLike(Long productId, Long userId) {
if (likeService.isLiked(productId, userId)) {
likeService.unlike(productId, userId);
productService.decreaseLikeCount(productId);
eventPublisher.publishEvent(new UnlikedEvent(productId));
outboxEventRepository.save(OutboxEvent.create(
"catalog-events",
OutboxEventHelper.toJson(objectMapper, Map.of("type", "UNLIKED", "productId", productId, "occurredAt", ZonedDateTime.now().toString())),
String.valueOf(productId)
));
return LikeAction.UNLIKED;
} else {
likeService.like(productId, userId);
productService.increaseLikeCount(productId);
eventPublisher.publishEvent(new LikedEvent(productId));
outboxEventRepository.save(OutboxEvent.create(
"catalog-events",
OutboxEventHelper.toJson(objectMapper, Map.of("type", "LIKED", "productId", productId, "occurredAt", ZonedDateTime.now().toString())),
String.valueOf(productId)
));
return LikeAction.LIKED;
}
}
Comment on lines +28 to 48
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

좋아요 카운트 업데이트의 이중 경로가 데이터 불일치를 유발할 수 있다.

현재 구조에서 좋아요 카운트 업데이트가 두 경로로 발생한다:

  1. LikeEventListenerLikedEvent/UnlikedEvent를 받아 ProductService.increaseLikeCount()/decreaseLikeCount() 호출 (in-process, AFTER_COMMIT)
  2. Outbox → Kafka → ProductMetricsProcessor가 메트릭 업데이트

LikeEventListenerAFTER_COMMIT + REQUIRES_NEW로 동작하므로, 메인 트랜잭션(like 테이블 + outbox)이 커밋된 후 리스너가 실패하면:

  • like 테이블: 변경됨
  • outbox: 저장됨 (Kafka로 발행 예정)
  • product.likeCount: 업데이트 실패

이후 Kafka consumer가 같은 이벤트를 처리하면 카운트가 한 번만 증가/감소해야 하는데, 리스너 재시도 시 중복 처리될 수 있다.

in-process 리스너와 Kafka consumer 중 하나의 경로만 사용하거나, 멱등성 보장 로직이 필요하다.


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.loopers.application.logging;

import com.loopers.domain.like.ProductLikedEvent;
import com.loopers.domain.like.ProductUnlikedEvent;
import com.loopers.domain.order.OrderRequestedEvent;
import com.loopers.domain.product.ProductViewedEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class UserActionLogListener {

@EventListener
public void handleProductViewed(ProductViewedEvent event) {
log.info("상품 조회. productId={}", event.getProductId());
}

@EventListener
public void handleProductLiked(ProductLikedEvent event) {
log.info("좋아요. productId={}, userId={}", event.getProductId(), event.getUserId());
}

@EventListener
public void handleProductUnliked(ProductUnlikedEvent event) {
log.info("좋아요 취소. productId={}, userId={}", event.getProductId(), event.getUserId());
}

@EventListener
public void handleOrderRequested(OrderRequestedEvent event) {
log.info("주문 요청. userId={}", event.getUserId());
Comment on lines +21 to +32
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

원문 userIdinfo 로그에 남기지 않는 편이 안전하다.

운영 관점에서 사용자 식별자가 일반 로그 파이프라인에 그대로 남으면 접근 통제, 보관 기간, 파기 범위가 불필요하게 커진다. userId는 마스킹 또는 해시 처리하거나 별도 감사 로그 sink로 분리하는 것이 좋다. 로그 캡처 테스트를 추가해 원문 userId가 출력되지 않는지 검증해야 한다.

As per coding guidelines 로깅 시 민감정보 노출 가능성을 점검한다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/application/logging/UserActionLogListener.java`
around lines 21 - 32, 로그에 원문 userId가 남지 않도록 handleProductLiked,
handleProductUnliked, handleOrderRequested 메서드의 로깅을 변경하세요: 원문 userId를 직접 출력하지 말고
마스킹(hash 또는 mask)한 값(예: maskedUserId 또는 hashedUserId)을 생성해 로그에 사용하거나 민감정보 전용 감사
sink로 분리하고 info 로그에서는 제거하세요; 또한 로그 캡처 단위 테스트를 추가해 원시 userId가 출력되지 않는지 검증하도록 하세요.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.loopers.application.order;

import com.loopers.domain.coupon.CouponTemplate;
import com.loopers.domain.coupon.CouponService;
import com.loopers.domain.coupon.CouponTemplateService;
import com.loopers.domain.order.OrderConfirmedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderConfirmedEventListener {

private final CouponTemplateService couponTemplateService;
private final CouponService couponService;

@Value("${app.order-confirmed-coupon-template-id:1}")
private Long couponTemplateId;

@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handle(OrderConfirmedEvent event) {
log.info("주문 확정 이벤트 수신. orderId={}, userId={}", event.getOrderId(), event.getUserId());

CouponTemplate template = couponTemplateService.getById(couponTemplateId);
couponService.issue(event.getUserId(), template);

log.info("쿠폰 발급 완료. userId={}, templateId={}", event.getUserId(), couponTemplateId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.loopers.application.outbox;

import com.loopers.domain.outbox.OutboxEvent;
import com.loopers.domain.outbox.OutboxEventRepository;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxRelayScheduler {

private final OutboxEventRepository outboxEventRepository;
private final KafkaTemplate<Object, Object> kafkaTemplate;

@Scheduled(fixedDelay = 5000)
public void relay() {
List<OutboxEvent> unpublished = outboxEventRepository.findUnpublished();
for (OutboxEvent event : unpublished) {
try {
kafkaTemplate.send(event.getEventType(), event.getPartitionKey(), event.getPayload());
outboxEventRepository.markPublished(event.getId());
Comment on lines +24 to +26
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

KafkaTemplate.send()가 비동기이므로 전송 완료 전에 markPublished가 호출된다.

send()CompletableFuture를 반환하며 비동기로 실행된다. 현재 코드는 전송 완료를 기다리지 않고 바로 markPublished()를 호출하므로, Kafka 전송 실패 시에도 이벤트가 발행된 것으로 표시되어 메시지가 유실된다.

제안: 전송 완료 대기 후 markPublished 호출
-kafkaTemplate.send(event.getEventType(), event.getPartitionKey(), event.getPayload());
-outboxEventRepository.markPublished(event.getId());
+kafkaTemplate.send(event.getEventType(), event.getPartitionKey(), event.getPayload())
+    .get(10, TimeUnit.SECONDS); // 동기 대기
+outboxEventRepository.markPublished(event.getId());

추가 테스트: Kafka 브로커 장애 상황에서 이벤트가 재발행 대상으로 유지되는지 검증 필요하다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/application/outbox/OutboxRelayScheduler.java`
around lines 24 - 26, KafkaTemplate.send(...) is asynchronous so
markPublished(event.getId()) is called before the send completes; change
OutboxRelayScheduler to wait for the send result and only call
outboxEventRepository.markPublished(...) on successful send, and handle failures
by logging and leaving the event unmarked (or scheduling retry). Concretely,
replace the direct kafkaTemplate.send(event.getEventType(),
event.getPartitionKey(), event.getPayload()) + immediate
outboxEventRepository.markPublished(...) with waiting on the send Future (e.g.,
use the returned Future/ListenableFuture from kafkaTemplate.send(...).get() or
addCallback/onSuccess/onFailure) and call
outboxEventRepository.markPublished(event.getId()) inside the success handler
and handle exceptions in the failure handler (log and do not mark as published).

} catch (Exception e) {
log.error("Failed to publish outbox event. id={}, eventType={}", event.getId(), event.getEventType(), e);
}
}
}
Comment on lines +20 to +31
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# OutboxEventJpaRepository에서 `@Transactional` 및 Lock 어노테이션 확인
echo "=== Checking OutboxEventJpaRepository for transaction and lock annotations ==="
ast-grep --pattern '@Modifying
`@Query`($_)
void markPublished($$$)'

echo ""
echo "=== Checking findByPublishedAtIsNull for locking ==="
rg -n -A3 "findByPublishedAtIsNull|findUnpublished" --type=java

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 2529


🏁 Script executed:

# Check OutboxEventJpaRepository for all annotations and method signatures
echo "=== Full OutboxEventJpaRepository.java ==="
find . -name "OutboxEventJpaRepository.java" -type f -exec cat -n {} \;

echo ""
echo "=== Full OutboxEventRepositoryImpl.java ==="
find . -name "OutboxEventRepositoryImpl.java" -type f -exec cat -n {} \;

echo ""
echo "=== Full OutboxRelayScheduler.java with class-level annotations ==="
find . -name "OutboxRelayScheduler.java" -type f -exec head -50 {} \;

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 3454


다중 인스턴스 환경에서 트랜잭션 누락 및 중복 메시지 발행 문제가 발생한다.

relay() 메서드에 @Transactional이 없어 markPublished() 호출 시 TransactionRequiredException이 발생한다. @Modifying 어노테이션이 있는 쿼리는 명시적인 트랜잭션 경계가 필수이다. 또한 findUnpublished()에 비관적 잠금이 없어 여러 스케줄러 인스턴스가 동시에 같은 이벤트를 조회하여 Kafka에 중복 메시지를 발행한다. Outbox 패턴의 신뢰성 보장이 무너진다.

수정 방안:

  1. relay() 메서드에 @Transactional 추가
  2. findUnpublished() 메서드에 SELECT ... FOR UPDATE SKIP LOCKED 적용하고 락 타임아웃 힌트(jakarta.persistence.lock.timeout=3000) 추가
  3. 쿼리 결과를 제한(예: LIMIT 100)하여 대량 백로그 시 메모리 부담 방지

동시 스케줄러 실행 환경에서 중복 메시지가 발행되지 않는지 확인하는 통합 테스트를 추가해야 한다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/application/outbox/OutboxRelayScheduler.java`
around lines 20 - 31, Add a transactional boundary to the scheduler by
annotating relay() with `@Transactional` so markPublished(...) executes inside a
transaction; change outboxEventRepository.findUnpublished() to use a SELECT ...
FOR UPDATE SKIP LOCKED (with a jakarta.persistence.lock.timeout=3000 hint) and
limit the result set (e.g., LIMIT 100) to avoid memory spikes and prevent
multiple scheduler instances from selecting the same rows; keep relay()
iterating over the locked rows and calling kafkaTemplate.send(...) then
outboxEventRepository.markPublished(...); finally add an integration test that
runs multiple scheduler instances concurrently and asserts no duplicate Kafka
messages are produced for the same outbox id.

}
Loading