Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/commerce-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
// add-ons
implementation(project(":modules:jpa"))
implementation(project(":modules:redis"))
implementation(project(":modules:kafka"))
implementation(project(":supports:jackson"))
implementation(project(":supports:logging"))
implementation(project(":supports:monitoring"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.util.TimeZone;

Expand All @@ -12,6 +13,7 @@
* REST API 서버를 기동하며, 타임존을 Asia/Seoul로 설정하고 스케줄링을 활성화한다.
*/
@ConfigurationPropertiesScan
@EnableAsync
@EnableScheduling
@SpringBootApplication
public class CommerceApiApplication {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.loopers.application.coupon;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.loopers.domain.coupon.CouponDeduplicationCache;
import com.loopers.domain.coupon.CouponIssueMetrics;
import com.loopers.domain.coupon.CouponIssueRequestMessage;
import com.loopers.domain.coupon.CouponIssueResultModel;
import com.loopers.domain.coupon.CouponIssueResultRepository;
import com.loopers.domain.coupon.CouponRemainingCache;
import com.loopers.support.error.CoreException;
import com.loopers.support.error.ErrorType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* 선착순 쿠폰 발급 Facade — Thin Producer 전략.
*
* <p>Producer에서 DB TX를 사용하지 않는다.
* Redis SETNX(중복 거절) + Redis DECR(수량 게이트키퍼) + Kafka send만 수행.
* 모든 DB 작업은 Consumer(commerce-streamer)에서 단일 TX로 원자적 처리.</p>
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class CouponIssueFacade {

private final KafkaTemplate<Object, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
private final CouponRemainingCache couponRemainingCache;
private final CouponDeduplicationCache couponDeduplicationCache;
private final CouponIssueResultRepository resultRepository;
private final CouponIssueMetrics couponIssueMetrics;

public String requestRushIssue(Long userId, Long couponId) {
String requestId = UUID.randomUUID().toString();

// Phase 1: Redis SETNX — 중복 요청 거절
if (!tryDeduplication(userId, couponId)) {
throw new CoreException(ErrorType.CONFLICT, "이미 발급 요청한 쿠폰입니다");
}

// Phase 2: Redis DECR — 수량 소진 즉시 거절
if (!tryDecrementRemaining(couponId)) {
removeDeduplication(userId, couponId);
throw new CoreException(ErrorType.BAD_REQUEST, "쿠폰이 모두 소진되었습니다.");
}

// Phase 3: Kafka send — Consumer에게 처리 위임
try {
String jsonMessage = objectMapper.writeValueAsString(
new CouponIssueRequestMessage(requestId, userId, couponId, LocalDateTime.now()));
kafkaTemplate.send("coupon-issue-requests",
couponId.toString(), jsonMessage).get(5, TimeUnit.SECONDS);
} catch (Exception e) {
restoreRemaining(couponId);
removeDeduplication(userId, couponId);
throw new CoreException(ErrorType.INTERNAL_ERROR, "발급 요청 실패. 다시 시도해주세요.");
}

return requestId;
}

public CouponIssueResultModel getIssueResult(String requestId) {
return resultRepository.findById(requestId)
.orElseThrow(() -> new CoreException(ErrorType.NOT_FOUND, "발급 처리 중입니다"));
}

private boolean tryDeduplication(Long userId, Long couponId) {
try {
return couponDeduplicationCache.trySetIfAbsent(userId, couponId, Duration.ofHours(24));
} catch (Exception e) {
log.warn("[Redis장애] 중복 확인 실패, Consumer에서 방어", e);
return true;
}
}

private void removeDeduplication(Long userId, Long couponId) {
try {
couponDeduplicationCache.delete(userId, couponId);
} catch (Exception e) {
log.warn("[Redis장애] dedup key 삭제 실패, TTL(24h) 후 자동 만료", e);
}
}

private boolean tryDecrementRemaining(Long couponId) {
try {
Long remaining = couponRemainingCache.decrementAndGet(couponId);
if (remaining == null || remaining < 0) {
restoreRemaining(couponId);
return false;
}
return true;
} catch (Exception e) {
log.warn("[Redis장애] DECR 실패, DB CAS fallback", e);
couponIssueMetrics.incrementRedisFallback();
return true;
}
}

private void restoreRemaining(Long couponId) {
try {
couponRemainingCache.increment(couponId);
} catch (Exception e) {
log.warn("[Redis장애] INCR 복원 실패, 동기화 배치에서 보정", e);
couponIssueMetrics.incrementIncrRestoreFail();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.loopers.application.event;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.loopers.domain.like.event.ProductLikedEvent;
import com.loopers.domain.like.event.ProductUnlikedEvent;
import com.loopers.domain.product.ProductService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

import java.time.LocalDateTime;
import java.util.Map;

/**
* 좋아요 이벤트 핸들러.
*
* <p>트랜잭션 커밋 후(AFTER_COMMIT) 비동기로 실행되며,
* 상품의 좋아요 수를 증감시킨다. REQUIRES_NEW 트랜잭션으로 독립 실행된다.</p>
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class LikeEventHandler {

private final ProductService productService;
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final ObjectMapper objectMapper;

/**
* 좋아요 등록 이벤트를 처리한다 (좋아요 수 증가).
*/
@Async("eventTaskExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleProductLiked(ProductLikedEvent event) {
try {
productService.incrementLikeCount(event.productId());
log.info("[ProductLiked] userId={}, productId={}", event.userId(), event.productId());

// Kafka direct send: 카탈로그 이벤트 (Outbox 불필요 — 비핵심 지표)
sendCatalogEvent("PRODUCT_LIKED", event.productId(), event.userId());
} catch (Exception e) {
log.error("[ProductLiked] Failed to increment like count. userId={}, productId={}",
event.userId(), event.productId(), e);
}
}

/**
* 좋아요 취소 이벤트를 처리한다 (좋아요 수 감소).
*/
@Async("eventTaskExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleProductUnliked(ProductUnlikedEvent event) {
try {
productService.decrementLikeCount(event.productId());
log.info("[ProductUnliked] userId={}, productId={}", event.userId(), event.productId());

// Kafka direct send: 카탈로그 이벤트 (Outbox 불필요 — 비핵심 지표)
sendCatalogEvent("PRODUCT_UNLIKED", event.productId(), event.userId());
} catch (Exception e) {
log.error("[ProductUnliked] Failed to decrement like count. userId={}, productId={}",
event.userId(), event.productId(), e);
}
}

private void sendCatalogEvent(String eventType, Long productId, Long userId) {
try {
Map<String, Object> message = Map.of(
"eventType", eventType,
"productId", productId,
"userId", userId,
"occurredAt", LocalDateTime.now().toString()
);
String jsonMessage = objectMapper.writeValueAsString(message);
kafkaTemplate.send("catalog-events", String.valueOf(productId), jsonMessage)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("[CatalogEvent] Kafka 발행 실패 — eventType={}, productId={}",
eventType, productId, ex);
}
});
} catch (Exception e) {
log.warn("[CatalogEvent] Kafka 전송 실패 — eventType={}, productId={}", eventType, productId, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.loopers.application.event;

import com.loopers.domain.order.event.OrderCancelledEvent;
import com.loopers.domain.order.event.OrderCreatedEvent;
import com.loopers.domain.order.event.OrderExpiredEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

/**
* 주문 이벤트 핸들러.
*
* <p>트랜잭션 커밋 후(AFTER_COMMIT) 비동기로 실행되며,
* 주문 생성/취소/만료 이벤트를 처리한다.</p>
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderEventHandler {

/**
* 주문 생성 이벤트를 처리한다 (로깅).
*/
@Async("eventTaskExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCreated(OrderCreatedEvent event) {
log.info("[OrderCreated] orderId={}, userId={}, orderType={}, totalAmount={}",
event.orderId(), event.userId(), event.orderType(), event.totalAmount());
}

/**
* 주문 취소 이벤트를 처리한다 (로깅).
*/
@Async("eventTaskExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCancelled(OrderCancelledEvent event) {
log.info("[OrderCancelled] orderId={}, userId={}",
event.orderId(), event.userId());
}

/**
* 주문 만료 이벤트를 처리한다 (로깅).
*/
@Async("eventTaskExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderExpired(OrderExpiredEvent event) {
log.info("[OrderExpired] orderId={}, userId={}",
event.orderId(), event.userId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.loopers.application.event;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.loopers.domain.product.event.ProductViewedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

import java.time.LocalDateTime;
import java.util.Map;

/**
* 상품 조회 이벤트 핸들러.
*
* <p>트랜잭션 커밋 후(AFTER_COMMIT) 비동기로 실행되며,
* 상품 조회 이벤트를 로깅하고 Kafka로 전송한다.</p>
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ProductViewEventHandler {

private final KafkaTemplate<Object, Object> kafkaTemplate;
private final ObjectMapper objectMapper;

/**
* 상품 조회 이벤트를 처리한다 (로깅 + Kafka 전송).
*/
@Async("eventTaskExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleProductViewed(ProductViewedEvent event) {
log.info("[ProductViewed] productId={}, userId={}", event.productId(), event.userId());

try {
Map<String, Object> message = Map.of(
"eventType", "PRODUCT_VIEWED",
"productId", event.productId(),
"userId", event.userId(),
"occurredAt", LocalDateTime.now().toString()
);
String jsonMessage = objectMapper.writeValueAsString(message);
kafkaTemplate.send("catalog-events", String.valueOf(event.productId()), jsonMessage)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("[CatalogEvent] Kafka 발행 실패 — PRODUCT_VIEWED, productId={}",
event.productId(), ex);
}
});
} catch (Exception e) {
log.warn("[CatalogEvent] Kafka 전송 실패 — PRODUCT_VIEWED, productId={}", event.productId(), e);
}
}
}
Loading