Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e3f3bcf
feat: ApplicationEvent 경계 분리
iohyeon Mar 25, 2026
72d4d5b
feat: Outbox + Relay + 인프라 테이블
iohyeon Mar 25, 2026
e2a9634
feat: Outbox 패턴 구현 BEFORE_COMMIT으로 비즈니스 TX와 원자성 보장
iohyeon Mar 25, 2026
f9bd89f
feat: Outbox 패턴 구현 — Facade TX 내 직접 저장으로 비즈니스-이벤트 원자성 보장
iohyeon Mar 25, 2026
dd9a42e
feat: commerce-streamer Consumer 구현 — catalog-events 집계 + Relay 헤더 전달
iohyeon Mar 25, 2026
e7a6c0b
feat: 선착순 쿠폰 발급 Kafka 비동기 전환 — Outbox + 파티션 순차 처리로 동시성 제어
iohyeon Mar 25, 2026
56742c6
test: 선착순 쿠폰 비동기 발급 테스트 — Outbox 원자성 + Consumer 멱등성 + 재고 소진
iohyeon Mar 25, 2026
2c0f1ae
fix: Kafka 메시지 유실 구간 방어 — Producer 멱등성 + Consumer 중복 방지 + DLQ 격리
iohyeon Mar 26, 2026
40da025
fix: Consumer 재시도 + DLQ 동기화 kafka-pipeline-lab 신뢰성 패턴 적용
iohyeon Mar 26, 2026
f760b62
fix: 설정파일 추가
iohyeon Mar 26, 2026
e0230cc
fix: Outbox 멀티 인스턴스 중복 발행 방지 및 테스트 추가
iohyeon Mar 27, 2026
f1c277b
feat: Outbox 안정성 및 모니터링 개선
iohyeon Mar 27, 2026
d383d18
test: Outbox Relay 성능 테스트
iohyeon Mar 27, 2026
a514485
refactor: Consumer → Processor 분리로 self-invocation 해결 — @Transactiona…
iohyeon Mar 27, 2026
0428e0a
test: CouponIssueProcessor 테스트 — @Transactional 원자성 검증 테스트
iohyeon Mar 27, 2026
e452e0c
feat: Outbox Relay E2E 성능 테스트 — Phase 1 + Phase 2 (실제 Kafka 발행)
iohyeon Mar 28, 2026
f0eebb8
test: 테스트코드 설정 topic 추가
iohyeon Mar 28, 2026
f43741b
test: Outbox Pipeline 성능 테스트
iohyeon Mar 28, 2026
df094ef
fix: OrderItemSoldEvent 페이로드 파싱 버그 수정 + CouponIssueRequest 상태 전이 가드 추가
iohyeon Mar 28, 2026
a981838
fix: OrderItemSoldEvent 파싱 버그 수정 + 비즈니스 실패 시 TX 롤백으로 FAILED 미기록 수정
iohyeon Mar 29, 2026
3e04b1a
refactor: CouponIssueConsumer 인프라 예외 시 건별 DLQ 격리로 전환
iohyeon Apr 1, 2026
99c30dd
refactor: 좋아요 파이프라인 단일화 — product_metrics + products.like_count 동시 업데이트
iohyeon Apr 1, 2026
31ff975
refactor: ProductViewed 이벤트를 Kafka fire-and-forget 직접 발행으로 전환
iohyeon Apr 1, 2026
ffc93c9
feat: product_metrics 보정 스케줄러 추가 — Data Drift 대응
iohyeon Apr 1, 2026
d17dc4e
feat: DLQ 재처리 어드민 API 추가
iohyeon Apr 1, 2026
a231855
refactor: 동기 issueCoupon() 제거, 모든 발급이 Kafka 파이프라인으로 통일
iohyeon Apr 1, 2026
c148c64
Merge upstream/iohyeon into volume-7: resolve conflicts in OrderFacad…
iohyeon Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/commerce-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
// add-ons
implementation(project(":modules:jpa"))
implementation(project(":modules:redis"))
implementation(project(":modules:kafka"))
implementation(project(":supports:jackson"))
implementation(project(":supports:logging"))
implementation(project(":supports:monitoring"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.loopers.domain.coupon.CouponService;
import com.loopers.domain.coupon.CouponTemplate;
import com.loopers.domain.coupon.IssuedCoupon;
import com.loopers.infrastructure.coupon.CouponIssueRequestEntity;
import com.loopers.infrastructure.coupon.CouponIssueRequestJpaRepository;
import com.loopers.infrastructure.outbox.OutboxEventService;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -22,18 +25,57 @@
public class CouponFacade {

private final CouponService couponService;
private final CouponIssueRequestJpaRepository couponIssueRequestRepository;
private final OutboxEventService outboxEventService;

public CouponFacade(CouponService couponService) {
public CouponFacade(CouponService couponService,
CouponIssueRequestJpaRepository couponIssueRequestRepository,
OutboxEventService outboxEventService) {
this.couponService = couponService;
this.couponIssueRequestRepository = couponIssueRequestRepository;
this.outboxEventService = outboxEventService;
}

/** 쿠폰 발급 */
/**
* 선착순 쿠폰 발급 요청 (비동기 — Kafka 기반)
*
* 1. 발급 요청 이력을 DB에 PENDING으로 저장
* 2. Outbox에 이벤트 저장 (같은 TX — 원자성)
* 3. 즉시 202 응답 → 유저가 결과를 폴링
* 4. Relay → Kafka → CouponIssueConsumer가 실제 발급
* 5. Consumer가 요청 이력을 ISSUED/FAILED로 업데이트
*/
@Transactional
public IssueCouponResult issueCoupon(Long templateId, Long userId) {
IssuedCoupon issued = couponService.issue(templateId, userId);
return new IssueCouponResult(issued.getId(), issued.getStatus().name());
public CouponIssueRequestResult requestCouponIssue(Long templateId, Long userId) {
String eventId = java.util.UUID.randomUUID().toString();

// 발급 요청 이력 저장 — 폴링 대상 + 추적용
CouponIssueRequestEntity request = CouponIssueRequestEntity.create(templateId, userId, eventId);
couponIssueRequestRepository.save(request);

// Outbox 저장 — 같은 TX (비즈니스 + Outbox 원자성)
outboxEventService.save(
"COUPON", templateId,
"CouponIssueRequestedEvent",
new CouponIssueRequestPayload(request.getId(), templateId, userId, eventId),
"coupon-issue-requests-v1",
String.valueOf(templateId) // key=couponTemplateId → 같은 쿠폰은 같은 파티션
);

return new CouponIssueRequestResult(request.getId(), eventId, "PENDING");
}

/** 발급 결과 폴링 */
@Transactional(readOnly = true)
public CouponIssueRequestResult getCouponIssueResult(Long requestId) {
CouponIssueRequestEntity request = couponIssueRequestRepository.findById(requestId)
.orElseThrow(() -> new IllegalArgumentException("발급 요청을 찾을 수 없습니다: " + requestId));
return new CouponIssueRequestResult(request.getId(), request.getEventId(), request.getStatus().name());
}

public record CouponIssueRequestPayload(Long requestId, Long templateId, Long userId, String eventId) {}
public record CouponIssueRequestResult(Long requestId, String eventId, String status) {}

/** 내 쿠폰 목록 조회 */
@Transactional(readOnly = true)
public CouponListResult getMyCoupons(Long userId) {
Expand Down Expand Up @@ -74,8 +116,6 @@ public AvailableCouponListResult getAvailableCoupons() {
return new AvailableCouponListResult(details);
}

public record IssueCouponResult(Long issuedCouponId, String status) {}

public record IssuedCouponDetail(
Long issuedCouponId, Long couponTemplateId,
String couponName, String discountType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.loopers.application.coupon;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.loopers.domain.coupon.CouponService;
import com.loopers.domain.coupon.IssuedCoupon;
import com.loopers.support.error.CoreException;
import com.loopers.infrastructure.coupon.CouponIssueRequestEntity;
import com.loopers.infrastructure.coupon.CouponIssueRequestJpaRepository;
import com.loopers.infrastructure.event.EventHandledEntity;
import com.loopers.infrastructure.event.EventHandledJpaRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
* 쿠폰 발급 처리 — @Transactional 보장
*
* Consumer(Interfaces)에서 분리된 비즈니스 처리 Bean.
* → 프록시를 통한 호출 → @Transactional 정상 동작
* → 쿠폰 발급 + 상태 업데이트 + 멱등성 기록이 같은 TX
*
* 비즈니스 실패(재고 소진, 중복 발급)는 BusinessFailureException으로 래핑:
* → Consumer에서 재시도 없이 즉시 처리 (이미 FAILED 기록 완료)
*/
@Service
public class CouponIssueProcessor {

private static final Logger log = LoggerFactory.getLogger(CouponIssueProcessor.class);

private final ObjectMapper objectMapper;
private final CouponService couponService;
private final CouponIssueRequestJpaRepository couponIssueRequestRepository;
private final EventHandledJpaRepository eventHandledRepository;

public CouponIssueProcessor(ObjectMapper objectMapper,
CouponService couponService,
CouponIssueRequestJpaRepository couponIssueRequestRepository,
EventHandledJpaRepository eventHandledRepository) {
this.objectMapper = objectMapper;
this.couponService = couponService;
this.couponIssueRequestRepository = couponIssueRequestRepository;
this.eventHandledRepository = eventHandledRepository;
}

/**
* 쿠폰 발급 요청 처리
*
* @throws BusinessFailureException 비즈니스 실패 (재고 소진 등) — 재시도 불필요
* @throws RuntimeException 인프라 장애 — ErrorHandler가 DLQ로 격리
*/
@Transactional
public void process(String payload) {
JsonNode node;
try {
node = objectMapper.readTree(payload);
} catch (Exception e) {
log.error("[CouponProcessor] JSON 파싱 실패 — payload={}", payload, e);
return;
}

String eventId = node.path("eventId").asText(null);
Long requestId = node.path("requestId").asLong(0);
Long templateId = node.path("templateId").asLong(0);
Long userId = node.path("userId").asLong(0);

if (eventId == null || requestId == 0) {
log.warn("[CouponProcessor] 필수 필드 누락 — payload={}", payload);
return;
}

// 멱등성 체크
if (eventHandledRepository.existsByEventId(eventId)) {
log.warn("[CouponProcessor] 중복 스킵 — eventId={}", eventId);
return;
}

// 발급 요청 조회
CouponIssueRequestEntity request = couponIssueRequestRepository.findById(requestId)
.orElse(null);
if (request == null) {
log.error("[CouponProcessor] 발급 요청 없음 — requestId={}", requestId);
return;
}

// 쿠폰 발급 시도
try {
IssuedCoupon issued = couponService.issue(templateId, userId);

request.markIssued(issued.getId());
couponIssueRequestRepository.save(request);
eventHandledRepository.save(EventHandledEntity.of(eventId, "coupon-issue-requests-v1"));

log.info("[CouponProcessor] 발급 성공 — templateId={}, userId={}, issuedCouponId={}",
templateId, userId, issued.getId());

} catch (CoreException e) {
// 비즈니스 실패 (재고 소진, 발급 한도 초과 등)
// CouponService.issue()가 같은 TX에 참여하므로, CoreException throw 시
// TX가 rollback-only로 마킹된다. 여기서 markFailed()를 해도 커밋 시 롤백된다.
// → Consumer에서 별도 TX로 FAILED 기록을 위임한다.
throw new BusinessFailureException(
e.getMessage(), e, requestId, eventId);
}
}

/**
* 비즈니스 실패 시 FAILED 기록 — 별도 TX (REQUIRES_NEW)
*
* process()의 TX가 rollback-only 상태이므로, 새 TX에서 FAILED + event_handled를 저장한다.
*/
@Transactional(propagation = org.springframework.transaction.annotation.Propagation.REQUIRES_NEW)
public void markFailedInNewTx(Long requestId, String eventId, String reason) {
CouponIssueRequestEntity request = couponIssueRequestRepository.findById(requestId)
.orElse(null);
if (request != null) {
request.markFailed(reason);
couponIssueRequestRepository.save(request);
}
eventHandledRepository.save(EventHandledEntity.of(eventId, "coupon-issue-requests-v1"));
}

public static class BusinessFailureException extends RuntimeException {
private final Long requestId;
private final String eventId;

public BusinessFailureException(String message, Throwable cause, Long requestId, String eventId) {
super(message, cause);
this.requestId = requestId;
this.eventId = eventId;
}

public Long getRequestId() { return requestId; }
public String getEventId() { return eventId; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.loopers.domain.like.ProductLike;
import com.loopers.domain.product.Product;
import com.loopers.domain.product.ProductService;
import com.loopers.domain.common.event.ProductLikedEvent;
import com.loopers.infrastructure.outbox.OutboxEventService;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -32,34 +34,59 @@ public class LikeFacade {
private final ProductService productService;
private final BrandService brandService;
private final ProductCacheManager productCacheManager;
private final OutboxEventService outboxEventService;

public LikeFacade(LikeService likeService, BrandLikeService brandLikeService,
ProductService productService, BrandService brandService,
ProductCacheManager productCacheManager) {
ProductCacheManager productCacheManager,
OutboxEventService outboxEventService) {
this.likeService = likeService;
this.brandLikeService = brandLikeService;
this.productService = productService;
this.brandService = brandService;
this.productCacheManager = productCacheManager;
this.outboxEventService = outboxEventService;
}

/** 상품 좋아요 (상품 검증 → 좋아요 생성 → likeCount 증가 → 상세 캐시만 삭제) */
/**
* 상품 좋아요 (상품 검증 → 좋아요 생성 → Outbox 이벤트 발행 → 상세 캐시 삭제)
*
* products.like_count 직접 증분은 하지 않음:
* Outbox → Kafka → CatalogMetricsProcessor가
* product_metrics.like_count + products.like_count를 같은 TX에서 업데이트.
* 단일 파이프라인으로 정합성을 보장한다. (eventual consistency)
*/
@Transactional
public LikeResult likeProduct(Long userId, Long productId) {
Product product = productService.getDisplayableProduct(productId);
likeService.like(userId, productId);
productService.incrementLikeCount(productId);
productCacheManager.registerDetailOnlyEvictAfterCommit(productId);

// Outbox 저장 — 같은 TX (좋아요 집계 → catalog-events-v1)
outboxEventService.save("PRODUCT", productId,
"ProductLikedEvent", new ProductLikedEvent(userId, productId, true),
"catalog-events-v1", String.valueOf(productId));

return new LikeResult(product.getLikeCount() + 1);
}

/** 상품 좋아요 취소 (상품 존재 검증 → 좋아요 삭제 → likeCount 감소 → 상세 캐시만 삭제) */
/**
* 상품 좋아요 취소 (상품 존재 검증 → 좋아요 삭제 → Outbox 이벤트 발행 → 상세 캐시 삭제)
*
* products.like_count 직접 감소는 하지 않음:
* CatalogMetricsProcessor가 단일 파이프라인으로 처리.
*/
@Transactional
public LikeResult unlikeProduct(Long userId, Long productId) {
Product product = productService.getById(productId);
likeService.unlike(userId, productId);
productService.decrementLikeCount(productId);
productCacheManager.registerDetailOnlyEvictAfterCommit(productId);

// Outbox 저장 — 같은 TX (좋아요 취소 집계 → catalog-events-v1)
outboxEventService.save("PRODUCT", productId,
"ProductUnlikedEvent", new ProductLikedEvent(userId, productId, false),
"catalog-events-v1", String.valueOf(productId));

return new LikeResult(product.getLikeCount() - 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.loopers.support.error.PointErrorType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.loopers.infrastructure.outbox.OutboxEventService;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Transactional;
Expand Down Expand Up @@ -66,14 +67,16 @@ public class OrderFacade {
private final PaymentFacade paymentFacade;
private final TransactionTemplate txTemplate;
private final OrderCacheManager orderCacheManager;
private final OutboxEventService outboxEventService;

public OrderFacade(OrderService orderService, UserAddressService userAddressService,
ProductService productService, BrandService brandService,
InventoryService inventoryService, CartItemService cartItemService,
CouponService couponService, PointService pointService,
PaymentFacade paymentFacade,
PlatformTransactionManager txManager,
OrderCacheManager orderCacheManager) {
OrderCacheManager orderCacheManager,
OutboxEventService outboxEventService) {
this.orderService = orderService;
this.userAddressService = userAddressService;
this.productService = productService;
Expand All @@ -83,6 +86,7 @@ public OrderFacade(OrderService orderService, UserAddressService userAddressServ
this.couponService = couponService;
this.pointService = pointService;
this.paymentFacade = paymentFacade;
this.outboxEventService = outboxEventService;
this.txTemplate = new TransactionTemplate(txManager);
this.txTemplate.setTimeout(30);
this.orderCacheManager = orderCacheManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.loopers.support.error.OrderErrorType;
import com.loopers.support.error.PaymentErrorType;
import com.loopers.support.error.PointErrorType;
import com.loopers.infrastructure.outbox.OutboxEventService;
import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class PaymentFacade {
private final CouponService couponService;
private final ProductService productService;
private final OrderCacheManager orderCacheManager;
private final OutboxEventService outboxEventService;
private final CompensationDlqRepository compensationDlqRepository;

private final org.springframework.transaction.support.TransactionTemplate txTemplate;
Expand All @@ -71,6 +73,7 @@ public PaymentFacade(OrderService orderService, PaymentService paymentService,
InventoryService inventoryService, PointService pointService,
CouponService couponService, ProductService productService,
OrderCacheManager orderCacheManager,
OutboxEventService outboxEventService,
CompensationDlqRepository compensationDlqRepository,
org.springframework.transaction.PlatformTransactionManager txManager) {
this.orderService = orderService;
Expand All @@ -80,6 +83,7 @@ public PaymentFacade(OrderService orderService, PaymentService paymentService,
this.couponService = couponService;
this.productService = productService;
this.orderCacheManager = orderCacheManager;
this.outboxEventService = outboxEventService;
this.compensationDlqRepository = compensationDlqRepository;
this.txTemplate = new org.springframework.transaction.support.TransactionTemplate(txManager);
this.txTemplate.setTimeout(30);
Expand Down Expand Up @@ -160,6 +164,7 @@ public PaymentRequestResult requestPayment(Long orderId, Long userId, String pay
Payment payment = paymentService.create(
orderId, order.getTotalAmount(), paymentMethod, generateIdempotencyKey());


Map<Long, Integer> productQtyMap = order.getItems().stream()
.collect(Collectors.toMap(OrderItem::getProductId, OrderItem::getQuantity));

Expand Down Expand Up @@ -288,6 +293,15 @@ public void confirmPayment(Long orderId, String pgTxnId) {
orderService.confirm(orderId, payment.getId(), payment.getPaymentMethod());
pointService.earn(order.getUserId(), order.getTotalAmount());

// Outbox 저장 — 같은 TX (판매량 집계 → catalog-events-v1)
for (var entry : productQtyMap.entrySet()) {
outboxEventService.save("PRODUCT", entry.getKey(),
"OrderItemSoldEvent",
new com.loopers.domain.common.event.OrderItemSoldEvent(
orderId, Map.of(entry.getKey(), entry.getValue())),
"catalog-events-v1", String.valueOf(entry.getKey()));
}

orderCacheManager.registerEvictAfterCommit(order.getUserId());
});
}
Expand Down
Loading