Skip to content

[volume-7] Kafka 이벤트 파이프라인 구축 #281

Open
dame2 wants to merge 7 commits intoLoopers-dev-lab:dame2from
dame2:volume-7
Open

[volume-7] Kafka 이벤트 파이프라인 구축 #281
dame2 wants to merge 7 commits intoLoopers-dev-lab:dame2from
dame2:volume-7

Conversation

@dame2
Copy link
Copy Markdown

@dame2 dame2 commented Mar 27, 2026

📌 Summary

  • 배경: 주문/좋아요 등 도메인 이벤트가 동기적으로 처리되어 시스템 간 결합도가 높고, 선착순 쿠폰 발급 시 동시성 이슈 발생
  • 목표: ApplicationEvent + Kafka를 활용한 이벤트 기반 아키텍처 구축 및 Transactional Outbox Pattern 적용
  • 결과: 시스템 간 느슨한 결합, 메시지 유실 방지, 선착순 쿠폰 동시성 제어 완료

🧭 Context & Decision

문제 정의

  • 현재 동작/제약: 좋아요/주문 등의 부가 로직(Redis 캐시 갱신, 메트릭 집계)이 동기적으로 처리됨
  • 문제(또는 리스크):
    • 부가 로직 실패 시 주 기능까지 롤백
    • 시스템 간 강한 결합
    • 메시지 유실 가능성
  • 성공 기준(완료 정의):
    • 부가 로직 실패와 무관하게 주 기능 성공
    • Kafka를 통한 시스템 간 비동기 통신
    • 정확히 한 번(exactly-once) 메시지 처리

선택지와 결정

1. 이벤트 분리 판단 기준

고려한 대안:

  • A: 모든 부가 로직을 이벤트로 분리 (일관성 우선)
  • B: 실패 허용도 기준으로 분리 여부 판단 (해당 로직이 실패해도 주요 트랜잭션은 성공해야 하는가?)
  • C: 응답 시간 기여도 기준으로 분리 (유저 응답에 필요 없는 처리만 분리)

최종 결정:

  • B: 실패 허용도 기준

트레이드오프:

  • 장점:
    • 판단 기준이 명확하고 일관성 있게 적용 가능 — "이게 실패하면 주문이 실패해야 하는가?"라는 질문 하나로 결정
    • 주요 트랜잭션의 안정성이 부가 로직 장애에 영향받지 않음
  • 단점:
    • 분리된 부가 로직(집계, 알림)의 실패를 별도로 모니터링하고 복구하는 체계가 필요
    • eventual consistency를 수용해야 하므로 like_count가 일시적으로 부정확할 수 있음

2. 이벤트 클래스 설계 방식

고려한 대안:

  • A: 도메인 Entity 객체를 이벤트에 직접 담기 (OrderCompletedEvent(Order order))
  • B: 필요한 필드만 원시 타입으로 추출해서 담기 (OrderCompletedEvent(Long orderId, Long userId, ...))
  • C: 범용 이벤트 클래스 + payload Map 사용 (DomainEvent(String type, Map<String, Object> payload))

최종 결정:

  • B: 필요한 필드만 원시 타입으로 추출

트레이드오프:

  • 장점:
    • 이벤트가 Entity의 변경에 영향받지 않음 (Entity에 필드가 추가돼도 이벤트는 그대로)
    • 리스너가 추가 DB 조회 없이 처리 가능 — 필요한 정보가 이벤트에 다 들어있음
    • 이벤트 객체가 불변(immutable)이므로 멀티스레드 환경에서 안전
  • 단점:
    • Entity와 이벤트 사이에 필드 매핑 코드가 필요 (코드량 증가)
    • 이벤트에 담을 필드를 매번 판단해야 함 — 너무 적으면 리스너가 추가 조회 필요, 너무 많으면 불필요한 데이터 전달

3. @TransactionalEventListener phase 선택

고려한 대안:

  • A: 모든 리스너에 AFTER_COMMIT 적용 (커밋 확정 후 처리)
  • B: 리스너별로 phase를 다르게 설정 (BEFORE_COMMIT, AFTER_COMMIT 혼용)
  • C: @EventListener + @Async 조합 (트랜잭션 phase 무관하게 비동기 처리)

최종 결정:

  • B: 리스너별로 phase를 다르게 설정
    • 좋아요 집계, 알림, 로깅 → AFTER_COMMIT
    • Outbox INSERT (Step 2) → BEFORE_COMMIT

트레이드오프:

  • 장점:
    • 목적에 맞는 정확한 타이밍에 실행 — 커밋 전에 실행해야 하는 것(Outbox)과 커밋 후에 실행해야 하는 것(알림)을 구분
    • AFTER_COMMIT 리스너 실패가 원래 트랜잭션을 롤백시키지 않음
  • 단점:
    • phase별 동작 차이를 이해해야 함 — AFTER_COMMIT 시점에는 기존 트랜잭션이 끝났으므로 DB 쓰기가 필요하면 REQUIRES_NEW 필요
    • BEFORE_COMMIT 리스너가 예외를 던지면 원래 트랜잭션도 롤백됨 — Outbox INSERT 실패가 비즈니스 로직까지 롤백시킬 수 있음

4. 좋아요 집계 리스너의 트랜잭션 전파 방식

고려한 대안:

  • A: REQUIRES_NEW — 별도 트랜잭션으로 실행
  • B: 기존 트랜잭션에 참여 (별도 설정 없음)
  • C: 트랜잭션 없이 실행 (NOT_SUPPORTED)

최종 결정:

  • A: @Transactional(propagation = REQUIRES_NEW)

트레이드오프:

  • 장점:
    • 집계 실패가 좋아요 INSERT에 영향을 주지 않음 — 좋아요는 이미 커밋됐고 집계는 독립 트랜잭션
    • AFTER_COMMIT 시점에서 실행되므로 기존 트랜잭션이 이미 끝난 상태 — REQUIRES_NEW가 아니면 트랜잭션 컨텍스트 자체가 없음
  • 단점:
    • DB 커넥션을 추가로 하나 더 사용
    • 좋아요와 집계 사이에 일시적 불일치 발생 가능 (좋아요는 됐는데 like_count 갱신은 아직 안 된 순간)

5. 유저 행동 로깅의 비동기 처리 방식

고려한 대안:

  • A: @Async + @TransactionalEventListener(AFTER_COMMIT) 조합
  • B: @TransactionalEventListener(AFTER_COMMIT)만 사용 (동기)
  • C: 별도 스레드풀 + BlockingQueue로 직접 구현

최종 결정:

  • A: @Async + @TransactionalEventListener(AFTER_COMMIT)

트레이드오프:

  • 장점:
    • 로깅이 메인 스레드를 블로킹하지 않음 — 유저 응답 시간에 영향 없음
    • Spring 기본 인프라만으로 구현 가능 (@EnableAsync + 스레드풀 설정)
  • 단점:
    • @Async 리스너에서 예외가 발생하면 호출자에게 전파되지 않음 — 별도 에러 핸들링 필요
    • 스레드풀이 가득 차면 이벤트가 거부될 수 있음 (queueCapacity 초과 시)
    • 애플리케이션이 갑자기 종료되면 큐에 쌓인 로깅 이벤트가 유실될 수 있음

6. Outbox 발행 메커니즘 선택

고려한 대안:

  • A: Polling Publisher — @Scheduled로 주기적 DB 조회 후 Kafka 발행
  • B: CDC (Change Data Capture) — Debezium으로 binlog 캡처 후 자동 발행
  • C: Kafka Transaction — KafkaTemplateexecuteInTransaction()으로 DB + Kafka 묶기

최종 결정:

  • A: Polling Publisher

트레이드오프:

  • 장점:
    • 별도 인프라(Debezium, Kafka Connect) 없이 Spring 코드만으로 구현 가능
    • 디버깅이 직관적 — outbox_event 테이블의 published 컬럼으로 현재 상태 확인 가능
    • 장애 시 outbox 테이블에 데이터가 남아있으므로 수동 복구 가능
  • 단점:
    • polling 주기만큼 발행 지연 발생 (1초 간격이면 최대 1초 지연)
    • 주기적 SELECT 쿼리가 DB에 부하를 줌 — row가 많아지면 인덱스 필요
    • polling 주기를 짧게 하면 DB 부하 증가, 길게 하면 지연 증가하는 트레이드오프

7. Outbox 기록 시점 (리스너 phase)

고려한 대안:

  • A: BEFORE_COMMIT — 비즈니스 트랜잭션과 같은 트랜잭션에서 outbox INSERT
  • B: AFTER_COMMIT + REQUIRES_NEW — 비즈니스 커밋 확인 후 별도 트랜잭션으로 outbox INSERT
  • C: 서비스 레이어에서 직접 outbox INSERT (이벤트 리스너 미사용)

최종 결정:

  • A: @TransactionalEventListener(phase = BEFORE_COMMIT)

트레이드오프:

  • 장점:
    • 비즈니스 데이터와 outbox 기록이 원자적 — 하나가 실패하면 둘 다 롤백
    • Transactional Outbox 패턴의 핵심 보장(At Least Once)이 DB 트랜잭션 레벨에서 달성됨
  • 단점:
    • Outbox INSERT 실패가 비즈니스 트랜잭션 전체를 롤백시킴 — outbox 테이블 장애가 주문 실패로 이어질 수 있음
    • BEFORE_COMMITAFTER_COMMIT 리스너가 같은 이벤트에 혼재하면 실행 순서 파악이 복잡해짐

8. Consumer 멱등 처리 전략

고려한 대안:

  • A: event_handled 테이블 PK 기반 — eventId로 중복 체크
  • B: Redis SET 기반 — SISMEMBER로 빠른 중복 체크
  • C: DB Upsert만으로 멱등 보장 — INSERT ... ON DUPLICATE KEY UPDATE로 결과적 멱등

최종 결정:

  • A: event_handled 테이블 PK 기반

트레이드오프:

  • 장점:
    • 별도 인프라(Redis) 없이 DB만으로 구현 가능
    • 멱등 체크와 비즈니스 처리를 같은 DB 트랜잭션으로 묶을 수 있음 — 원자성 보장
    • 어떤 이벤트가 처리됐는지 DB에서 직접 확인 가능 (운영/디버깅 용이)
  • 단점:
    • 이벤트 수가 많아지면 event_handled 테이블이 커짐 — 주기적 정리(TTL) 필요
    • Redis 대비 조회 속도가 느림 (DB I/O)
    • PK 충돌 시 예외 처리가 필요 (INSERT 시 DuplicateKeyException)

9. event_handled와 이벤트 로그 테이블 분리 여부

고려한 대안:

  • A: 분리 — event_handled(멱등 체크 전용) + event_log(감사/디버깅용) 별도 테이블
  • B: 통합 — 하나의 테이블에서 멱등 체크 + 로그 용도 겸용
  • C: event_handled만 사용, 로그는 별도 로깅 시스템(ELK 등)으로 위임

최종 결정:

  • A: 테이블 분리

트레이드오프:

  • 장점:
    • 목적과 생명주기가 다름 — event_handled는 멱등 체크 전용이라 PK 조회만 하면 되고 일정 기간 후 삭제 가능. 로그 테이블은 장기 보관하며 더 많은 컬럼(payload, 처리 결과 등)을 가짐
    • 멱등 체크 쿼리가 가벼움 — PK 하나로 존재 여부만 확인, 불필요한 컬럼이 없음
    • 보관 정책을 독립적으로 운영 가능 — event_handled는 7일, 로그는 90일 등
  • 단점:
    • 테이블이 2개로 늘어남 — 관리 포인트 증가
    • 두 테이블 모두에 INSERT해야 하므로 쓰기 비용 증가

10. Kafka Producer 직렬화 방식

고려한 대안:

  • A: JsonSerializer — 객체를 JSON으로 직렬화
  • B: StringSerializer + 수동 JSON 변환 — ObjectMapper로 직접 직렬화 후 String 전송
  • C: Avro + Schema Registry — 스키마 기반 직렬화

최종 결정:

  • A: Spring Kafka JsonSerializer

트레이드오프:

  • 장점:
    • Spring Kafka가 직렬화/역직렬화를 자동 처리 — 보일러플레이트 코드 최소화
    • JSON이므로 Kafka 콘솔에서 메시지 내용을 사람이 읽을 수 있음 (디버깅 용이)
  • 단점:
    • 스키마 강제가 없으므로 Producer/Consumer 간 메시지 포맷 불일치 시 런타임 에러 발생
    • Avro 대비 메시지 크기가 큼 (필드명이 포함되므로)
    • Consumer 쪽에서 trusted.packages 설정이 필요 — 보안상 "*" 대신 구체적 패키지 지정이 권장됨

11. product_metrics Upsert 방식

고려한 대안:

  • A: INSERT ... ON DUPLICATE KEY UPDATE — SQL 레벨에서 원자적 upsert
  • B: findById → 존재하면 update, 없으면 insert (JPA 방식)
  • C: JPA merge() 사용

최종 결정:

  • A: INSERT ... ON DUPLICATE KEY UPDATE (Native Query)

트레이드오프:

  • 장점:
    • 하나의 쿼리로 존재 여부 확인 + 삽입/갱신이 원자적으로 처리됨
    • SELECT → INSERT/UPDATE 패턴 대비 race condition 위험 없음
    • 쿼리 수가 절반 (1회 vs 2회)
  • 단점:
    • MySQL 종속 SQL — DB 변경 시 쿼리 수정 필요
    • JPA 영속성 컨텍스트와 동기화되지 않음 — Native Query 실행 후 같은 트랜잭션에서 JPA로 조회하면 캐시된 이전 값이 반환될 수 있음
    • 복잡한 갱신 로직(조건부 업데이트 등)에는 쿼리가 길어짐

🏗️ Design Overview

변경 범위

  • 영향 받는 모듈/도메인: commerce-api (Producer), commerce-collector (Consumer)
  • 신규 추가:
    • supports/kafka-events/: 공유 이벤트 DTO
    • apps/commerce-collector/: Kafka Consumer 서버
    • Outbox 도메인 및 JPA
    • FCFS 쿠폰 도메인
  • 제거/대체: 없음

주요 컴포넌트 책임

  • OutboxEventListener: @TransactionalEventListener(BEFORE_COMMIT)로 Outbox 테이블에 이벤트 저장
  • OutboxPublisher: @scheduled로 PENDING 이벤트 폴링 후 Kafka 발행
  • MetricsEventConsumer: catalog-events, order-events 구독하여 product_metrics 집계
  • CouponIssueConsumer: coupon-issue-requests 구독, SELECT...FOR UPDATE로 동시성 제어
  • LikeCountEventListener: @async로 Redis 카운터 비동기 갱신

🔁 Flow Diagram

1. 좋아요 이벤트 흐름

sequenceDiagram
    autonumber
    participant Client
    participant LikeApplicationService
    participant LikeRepository
    participant EventPublisher
    participant LikeCountEventListener
    participant OutboxEventListener
    participant Redis
    participant OutboxDB

    Client->>LikeApplicationService: like(userId, productId)
    LikeApplicationService->>LikeRepository: save(like)
    LikeApplicationService->>EventPublisher: publishEvent(LikeCreatedEvent)
    
    par Async - Redis 캐시
        EventPublisher->>LikeCountEventListener: @Async AFTER_COMMIT
        LikeCountEventListener->>Redis: INCR like:count:{productId}
    and Sync - Outbox 저장
        EventPublisher->>OutboxEventListener: @TransactionalEventListener BEFORE_COMMIT
        OutboxEventListener->>OutboxDB: INSERT outbox_event
    end
    
    LikeApplicationService-->>Client: 200 OK
Loading

2. Outbox → Kafka 발행 흐름

sequenceDiagram
    autonumber
    participant OutboxPublisher
    participant OutboxDB
    participant KafkaTemplate
    participant Kafka
    
    loop Every 1 second
        OutboxPublisher->>OutboxDB: SELECT WHERE status=PENDING LIMIT 100
        OutboxDB-->>OutboxPublisher: List<OutboxEvent>
        
        loop For each event
            OutboxPublisher->>KafkaTemplate: send(topic, aggregateId, envelope)
            KafkaTemplate->>Kafka: produce
            
            alt Success
                Kafka-->>KafkaTemplate: ack
                OutboxPublisher->>OutboxDB: UPDATE status=SENT
            else Failure
                OutboxPublisher->>OutboxDB: UPDATE retry_count++
            end
        end
    end
Loading

3. 선착순 쿠폰 발급 흐름

sequenceDiagram
    autonumber
    participant Client
    participant FcfsCouponV1Api
    participant CouponIssueApplicationService
    participant CouponIssueRequestRepository
    participant EventPublisher
    participant Kafka
    participant CouponIssueConsumer
    participant FcfsCouponRepository
    participant FcfsIssuedCouponRepository

    Client->>FcfsCouponV1Api: POST /fcfs-coupons/{id}/issue
    FcfsCouponV1Api->>CouponIssueApplicationService: requestIssue(couponId, userId)
    CouponIssueApplicationService->>CouponIssueRequestRepository: save(PENDING)
    CouponIssueApplicationService->>EventPublisher: publishEvent(CouponIssueRequestCreatedEvent)
    FcfsCouponV1Api-->>Client: 202 Accepted {requestId}
    
    Note over Kafka: OutboxPublisher 발행
    
    Kafka->>CouponIssueConsumer: consume(CouponIssueRequestedEvent)
    CouponIssueConsumer->>FcfsCouponRepository: findByIdWithLock(couponId)
    Note right of CouponIssueConsumer: SELECT ... FOR UPDATE
    
    alt 수량 남음 & 미발급
        CouponIssueConsumer->>FcfsCouponRepository: incrementIssuedCount
        CouponIssueConsumer->>FcfsIssuedCouponRepository: save(issuedCoupon)
        CouponIssueConsumer->>CouponIssueRequestRepository: updateStatus(ISSUED)
    else 수량 소진 or 중복
        CouponIssueConsumer->>CouponIssueRequestRepository: updateStatus(FAILED, reason)
    end
    
    CouponIssueConsumer->>Kafka: ack.acknowledge()
    
    Client->>FcfsCouponV1Api: GET /fcfs-coupons/issue-result/{requestId}
    FcfsCouponV1Api-->>Client: {status: ISSUED/FAILED}
Loading

4. 메트릭 집계 Consumer 흐름

sequenceDiagram
    autonumber
    participant Kafka
    participant MetricsEventConsumer
    participant EventHandledRepository
    participant ProductMetricsRepository
    
    Kafka->>MetricsEventConsumer: consume(EventEnvelope)
    MetricsEventConsumer->>EventHandledRepository: existsByEventId(eventId)
    
    alt 이미 처리됨
        MetricsEventConsumer-->>Kafka: ack (skip)
    else 미처리
        MetricsEventConsumer->>ProductMetricsRepository: getOrCreate(productId)
        
        alt LIKE_CREATED
            MetricsEventConsumer->>ProductMetricsRepository: incrementLikeCount
        else PRODUCT_VIEWED
            MetricsEventConsumer->>ProductMetricsRepository: incrementViewCount
        else ORDER_COMPLETED
            MetricsEventConsumer->>ProductMetricsRepository: addOrder(qty, amount)
        end
        
        MetricsEventConsumer->>EventHandledRepository: save(eventId, type)
        MetricsEventConsumer-->>Kafka: ack.acknowledge()
    end
Loading

변경 목적

동기식 도메인 이벤트 부작용(Redis 캐시 업데이트, 메트릭 집계)을 분리하고 Kafka를 통한 느슨한 결합을 달성하며, Transactional Outbox 패턴으로 메시지 손실을 방지하고 FCFS 쿠폰 발급 시 동시성을 제어합니다.

핵심 변경점

  • Shared Module: supports/kafka-eventsEventEnvelope, EventType(5가지), AggregateType(4가지), KafkaTopics(3개) 정의
  • Outbox Pattern: OutboxEvent domain + JPA entity(인덱싱), OutboxEventListener(BEFORE_COMMIT), OutboxPublisher(1초 주기 폴링)로 pending 이벤트 발행
  • ApplicationEvent 기반: 5가지 이벤트 발행 및 @TransactionalEventListener 구현, UserActionEvent 비동기 로깅
  • FCFS 쿠폰 플로우: POST /api/v1/fcfs-coupons/{id}/issueCouponIssueRequest(PENDING) 생성 → Outbox 이벤트 발행 → commerce-collector에서 SELECT...FOR UPDATE + 중복 검증으로 동시성 제어
  • commerce-collector: 신규 모듈로 CouponIssueConsumer, MetricsEventConsumer 구현, 이벤트 순서 검증(warn만 로깅, 처리 계속) 및 멱등성 강화

리스크/주의사항

Outbox 폴링(1초 간격)으로 인한 최대 1초 지연 발생 가능합니다. 메트릭 컨슈머의 validateEventOrder()는 비정렬 이벤트를 경고만 하고 처리를 계속하므로, 시스템 시간 스큐 상황에서 메트릭 수집 정확성이 영향받을 수 있으며 이를 수락 가능한지 확인 필요합니다. SELECT...FOR UPDATE 기반 잠금이 고부하 상황에서 데이터베이스 경합을 유발할 수 있습니다.

테스트/검증

150개 동시 요청 테스트(100/150 성공, 50 실패 검증), Outbox integration 테스트, 메트릭 멱등성 검증, Kafka 발행 실패 시 재시도 증가 검증이 포함되어 있으며, Kafka 설정에 acks=all, enable.idempotence=true, max.in.flight.requests.per.connection=5 적용으로 배달 보증을 강화했습니다.

dame2 and others added 7 commits March 27, 2026 13:47
- EventEnvelope: Kafka 메시지 전송을 위한 이벤트 봉투
- EventType: 이벤트 타입 enum (LIKE_CREATED, ORDER_COMPLETED 등)
- AggregateType: 집계 타입 enum (PRODUCT, ORDER, COUPON)
- KafkaTopics: Kafka 토픽 상수 정의
- CouponIssueRequestedEvent: 쿠폰 발급 요청 이벤트 DTO

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- OutboxEvent: Outbox 도메인 모델 (PENDING, SENT, FAILED 상태)
- OutboxEventJpaEntity: JPA 엔티티 (status, created_at 인덱스 포함)
- OutboxPublisher: 스케줄러 기반 이벤트 발행 (fixedDelay=1000ms)
  - @ConditionalOnBean(KafkaTemplate.class)로 Kafka 미설정 시 비활성화
  - aggregateId를 파티션 키로 사용하여 순서 보장
  - 실패 시 retry_count 증가

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- LikeCreatedEvent, LikeCanceledEvent: 좋아요 이벤트
- OrderCompletedEvent: 주문 완료 이벤트
- ProductViewedEvent: 상품 조회 이벤트
- UserActionEvent: 유저 행동 로깅 이벤트
- CouponIssueRequestCreatedEvent: 쿠폰 발급 요청 이벤트

이벤트 리스너:
- LikeCountEventListener: @async로 Redis 카운터 비동기 갱신
- OrderNotificationListener: 주문 알림 처리
- OutboxEventListener: @TransactionalEventListener(BEFORE_COMMIT)로 Outbox 저장
- UserActionLogListener: @async + REQUIRES_NEW로 유저 행동 로깅

인프라:
- AsyncConfig: 이벤트 처리용 ThreadPoolTaskExecutor 설정
- KafkaTopicConfig: Kafka 토픽 자동 생성 설정
- UserActionLog: 유저 행동 로그 도메인 및 JPA 구현

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
도메인:
- FcfsCoupon: 선착순 쿠폰 (totalQuantity, issuedCount)
- CouponIssueRequest: 발급 요청 (PENDING → ISSUED/FAILED)
- FcfsIssuedCoupon: 발급된 쿠폰 (couponId + userId unique)

API:
- POST /api/v1/fcfs-coupons/{couponId}/issue: 발급 요청 (202 Accepted)
- GET /api/v1/fcfs-coupons/issue-result/{requestId}: 결과 조회 (polling)

Application Service:
- CouponIssueApplicationService: 요청 저장 후 Outbox 이벤트 발행

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
모듈 구조:
- CollectorApplication: Kafka Consumer 전용 서버
- MetricsEventConsumer: 메트릭 집계 Consumer
- CouponIssueConsumer: 선착순 쿠폰 발급 Consumer

MetricsEventConsumer:
- catalog-events, order-events 토픽 구독
- event_handled 테이블로 멱등성 보장
- product_metrics upsert (like, view, order 집계)
- lastEventTimestamp 추적으로 이벤트 순서 검증
- manual Ack 사용

CouponIssueConsumer:
- coupon-issue-requests 토픽 구독
- concurrency=1로 순서 보장
- SELECT...FOR UPDATE 비관적 락
- 중복 발급 방지 (couponId + userId unique)

동시성 테스트:
- 150개 동시 요청 → 100개만 발급 검증
- 중복 발급 방지 검증

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
이벤트 리스너 테스트:
- EventListenerIntegrationTest: Redis 카운터 비동기 갱신 검증
- OutboxEventListenerTest: Outbox 이벤트 저장 검증

Outbox 테스트:
- OutboxPublisherTest: 이벤트 발행 로직 검증
- OutboxIntegrationTest: E2E 통합 테스트

FCFS 쿠폰 테스트:
- CouponIssueApplicationServiceTest: 발급 요청 및 결과 조회 검증

테스트 인프라:
- TestKafkaAutoConfiguration: Mock KafkaTemplate 제공
- Fake Repository 구현체들

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Kafka 설정:
- acks=all: 모든 replica 확인 후 응답 (데이터 내구성)
- enable.idempotence=true: 중복 메시지 방지 (정확히 한 번 전송)
- max.in.flight.requests.per.connection=5

settings.gradle.kts:
- commerce-collector 모듈 추가
- kafka-events 모듈 추가

기존 서비스 수정:
- LikeApplicationService: LikeCreatedEvent/LikeCanceledEvent 발행
- OrderApplicationService: OrderCompletedEvent 발행
- ProductService: ProductViewedEvent 발행

테스트 수정:
- 이벤트 발행 검증 추가
- TestRedisConfiguration import

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 27, 2026

📝 Walkthrough

Walkthrough

이벤트 기반 아키텍처와 트랜잭션 아웃박스 패턴을 도입하여 commerce-api의 좋아요, 주문, 상품 조회 등의 도메인 이벤트를 Kafka를 통해 비동기로 발행한다. FCFS 쿠폰 발급 시스템을 추가하고, 사용자 액션 로깅 및 제품 메트릭 수집을 위한 commerce-collector 애플리케이션을 신규로 구축한다.

Changes

Cohort / File(s) Summary
Kafka 공통 설정
supports/kafka-events/*, modules/kafka/src/main/resources/kafka.yml
EventEnvelope, EventType, AggregateType, KafkaTopics 등 Kafka 메시지 구조 정의 및 producer 설정 강화(acks=all, idempotence 활성화)
commerce-api 부트스트랩 및 설정
apps/commerce-api/build.gradle.kts, apps/commerce-api/src/main/java/com/loopers/CommerceApiApplication.java, apps/commerce-api/src/main/java/com/loopers/config/AsyncConfig.java, apps/commerce-api/src/main/java/com/loopers/config/KafkaTopicConfig.java, apps/commerce-api/src/main/resources/application.yml
Kafka 모듈 의존성 추가, @EnableScheduling 활성화, 비동기 실행기(eventExecutor) 및 Kafka 주제 bean 정의
도메인 이벤트 클래스
apps/commerce-api/src/main/java/com/loopers/application/event/*.java
CouponIssueRequestCreatedEvent, LikeCreatedEvent, LikeCanceledEvent, OrderCompletedEvent, ProductViewedEvent, UserActionEvent 신규 생성
트랜잭션 아웃박스 구현
apps/commerce-api/src/main/java/com/loopers/domain/outbox/*, apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/outbox/*, apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxPublisher.java
OutboxEvent 도메인 모델, 저장소, JPA 엔티티/매퍼, 주기적 폴링 발행자 구현
이벤트 리스너
apps/commerce-api/src/main/java/com/loopers/application/event/listener/*.java
OutboxEventListener(트랜잭션 커밋 전 아웃박스 저장), LikeCountEventListener(Redis 카운터 갱신), OrderNotificationListener, UserActionLogListener 추가
FCFS 쿠폰 도메인
apps/commerce-api/src/main/java/com/loopers/domain/fcfs/*
CouponIssueRequest, CouponIssueRequestStatus, FcfsCoupon, FcfsIssuedCoupon 및 해당 저장소 인터페이스 정의
FCFS 쿠폰 비즈니스 로직
apps/commerce-api/src/main/java/com/loopers/application/fcfs/*
CouponIssueApplicationService(요청 발행 및 결과 조회), 응답 DTO(CouponIssueRequestResult, CouponIssueResultResponse) 추가
FCFS 쿠폰 영속성
apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/*
JPA 엔티티(CouponIssueRequestJpaEntity, FcfsCouponJpaEntity, FcfsIssuedCouponJpaEntity), 저장소 구현체, 매퍼 클래스
FCFS 쿠폰 API
apps/commerce-api/src/main/java/com/loopers/interfaces/api/fcfs/*
FcfsCouponV1Api 컨트롤러 및 요청/응답 DTO 정의
사용자 액션 로깅 도메인
apps/commerce-api/src/main/java/com/loopers/domain/useraction/*
ActionType enum, UserActionLog 도메인 모델 및 저장소 인터페이스
사용자 액션 로깅 영속성
apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/useraction/*
UserActionLogJpaEntity, 저장소 구현체, 매퍼 클래스
commerce-api 서비스 수정
apps/commerce-api/src/main/java/com/loopers/application/like/LikeApplicationService.java, apps/commerce-api/src/main/java/com/loopers/application/order/OrderApplicationService.java, apps/commerce-api/src/main/java/com/loopers/application/product/ProductService.java
좋아요/취소 시 이벤트 발행, 주문 시 이벤트 발행, 제품 조회 시 조건부 이벤트 발행(userId 존재 시)
commerce-api 테스트
apps/commerce-api/src/test/java/com/loopers/application/event/*, apps/commerce-api/src/test/java/com/loopers/application/*/...*Test.java, apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/*, apps/commerce-api/src/test/java/com/loopers/config/TestKafkaAutoConfiguration.java
이벤트 리스너 통합 테스트, 서비스 단위 테스트 업데이트, 아웃박스 통합 테스트, Kafka 목 설정
commerce-api 테스트 유틸
apps/commerce-api/src/test/java/com/loopers/fake/*
FakeApplicationEventPublisher, FakeCouponIssueRequestRepository, FakeFcfsCouponRepository, FakeOutboxEventRepository, FakeUserActionLogRepository
commerce-collector 신규 애플리케이션
apps/commerce-collector/build.gradle.kts, apps/commerce-collector/src/main/java/com/loopers/CollectorApplication.java, apps/commerce-collector/src/main/resources/application.yml
새로운 Spring Boot 애플리케이션 진입점 및 빌드 설정
commerce-collector 이벤트 소비자
apps/commerce-collector/src/main/java/com/loopers/application/consumer/*.java
CouponIssueConsumer(쿠폰 발급 처리), MetricsEventConsumer(제품 메트릭 집계)
commerce-collector 메트릭 도메인
apps/commerce-collector/src/main/java/com/loopers/domain/metrics/*
ProductMetrics 도메인 모델 및 저장소 인터페이스
commerce-collector 메트릭 영속성
apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/metrics/*
ProductMetricsJpaEntity, 저장소 구현체, 매퍼
commerce-collector 이벤트 핸들링 추적
apps/commerce-collector/src/main/java/com/loopers/domain/eventhandled/*, apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/eventhandled/*
EventHandled 도메인 모델, JPA 엔티티/저장소/매퍼로 중복 처리 방지
commerce-collector 영속성
apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/*
FCFS 쿠폰 JPA 엔티티 및 저장소(commerce-api와 중복)
commerce-collector 테스트
apps/commerce-collector/src/test/java/com/loopers/application/consumer/*, apps/commerce-collector/src/test/java/com/loopers/concurrency/FcfsCouponIssueConcurrencyTest.java, apps/commerce-collector/src/test/resources/application-test.yml
컨슈머 단위 테스트, 동시성 테스트, 테스트 설정
commerce-collector 테스트 유틸
apps/commerce-collector/src/test/java/com/loopers/fake/*
FakeEventHandledRepository, FakeProductMetricsRepository
루트 설정
build.gradle.kts, settings.gradle.kts
awaitility 의존성 추가, commerce-collector 및 kafka-events 모듈 포함

Sequence Diagram

sequenceDiagram
    participant User
    participant CommerceAPI
    participant AppEventPublisher
    participant OutboxListener
    participant Database
    participant OutboxPublisher
    participant Kafka
    participant MetricsCollector

    User->>CommerceAPI: 좋아요 생성 요청
    CommerceAPI->>Database: Like 저장
    CommerceAPI->>AppEventPublisher: LikeCreatedEvent 발행
    AppEventPublisher->>OutboxListener: transactional event 수신
    OutboxListener->>Database: OutboxEvent INSERT (BEFORE_COMMIT)
    Database-->>OutboxListener: 저장 완료
    Database-->>CommerceAPI: 트랜잭션 커밋
    CommerceAPI-->>User: 응답

    OutboxPublisher->>Database: PENDING 이벤트 조회 (1초 간격)
    Database-->>OutboxPublisher: OutboxEvent 반환
    OutboxPublisher->>OutboxPublisher: EventEnvelope 생성
    OutboxPublisher->>Kafka: ProducerRecord 전송
    Kafka-->>OutboxPublisher: 전송 성공
    OutboxPublisher->>Database: OutboxEvent 상태를 SENT로 갱신

    Kafka->>MetricsCollector: 메시지 수신
    MetricsCollector->>Database: idempotency 확인 (EventHandled)
    Database-->>MetricsCollector: 미존재
    MetricsCollector->>Database: ProductMetrics 갱신 (likeCount++)
    MetricsCollector->>Database: EventHandled 저장
    Database-->>MetricsCollector: 저장 완료
    MetricsCollector->>Kafka: acknowledge
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120분

복합적인 이유로 매우 높은 검토 난도를 요구한다:

  • 광범위한 범위: commerce-api 및 commerce-collector 전체에 걸친 아키텍처 변경, 50+ 신규 파일
  • 높은 로직 밀도: 이벤트 기반 아키텍처, 트랜잭션 아웃박스 패턴, 비동기 처리, Kafka 통합, FCFS 동시성 제어 등 복잡한 개념들의 상호작용
  • 이질적 변경: 도메인 모델, 영속성 계층, 애플리케이션 계층, API 계층, 이벤트 리스너, 동시성 처리 등 여러 영역의 서로 다른 관심사
  • 운영 위험: 아웃박스 발행자의 스케줄 동작(fixedDelay), Kafka 전송 실패 시 재시도 로직, 중복 처리 방지(idempotency), FCFS 쿠폰의 동시성 안전성 등에 대한 세밀한 검토 필요
  • 테스트 충분성: 많은 통합 테스트가 추가되었지만, 아웃박스 발행자 오류 상황, Kafka 부분 장애, 네트워크 분할 시나리오 등 엣지 케이스에 대한 검토 필요

주요 관찰사항 및 운영 관점 지적

1. 아웃박스 발행자의 오류 처리 전략 부재

문제점: OutboxPublisher.publishPendingEvents()에서 Kafka 전송 실패 시 retryCount만 증가하고, 재시도 상한값(max retry)이나 DLQ(Dead Letter Queue) 처리가 없다. fixedDelay=1000으로 1초 주기 폴링하면, 계속 실패하는 이벤트가 무한 재시도될 수 있다.

운영 영향:

  • 특정 이벤트가 Kafka로 전송되지 못하면, 메트릭 수집 지연 및 불일치 발생
  • 데이터베이스에 PENDING 상태의 이벤트가 누적되어 성능 저하
  • 알람 없이 조용히 실패할 수 있음

수정안:

  • retryCount 초과 시(예: 5회) 상태를 FAILED로 변경하고 로깅
  • DLQ 주제로 최종 실패 이벤트 발행
  • OutboxEventRepositorycountByStatus(FAILED) 쿼리 추가 및 모니터링

추가 테스트:

  • retryCount 초과 후 자동 DLQ 발행 여부 확인
  • 연속 실패 시 PENDING 이벤트 누적량 모니터링

2. FCFS 쿠폰 동시성 제어: 낙관적 락 부재

문제점: CouponIssueConsumer에서 쿠폰 발급 시 findByIdWithLock(Long id) (비관적 락)으로 쿠폰을 읽지만, 이미 발급된 쿠폰 확인(existsByCouponIdAndUserId)과 발급 카운트 증가(incrementIssuedCount)가 별도 쿼리로 실행된다. 다음 레이스 컨디션이 가능하다:

  • 스레드 A: 쿠폰 락 획득, canIssue() = true 확인
  • 스레드 B: 동일 쿠폰 락 획득(A 해제 후), issuedCount 증가
  • 스레드 A: 이미 issuedCount 감소했지만, A도 증가 실행 → 초과 발급

운영 영향:

  • FCFS 쿠폰 초과 발급으로 재고 불일치
  • 고객 분쟁(쿠폰 중복 발급, 미발급 불일치)

수정안:

  • incrementIssuedCount 전에 최종 canIssue() 재확인 또는 원자적 UPDATE 쿼리 사용
  • 쿠폰 비관적 락 범위 확대: 락 획득 후 모든 갱신 작업 완료 후 해제

추가 테스트:

  • FcfsCouponIssueConcurrencyTest 확대: 150 동시 요청 + 재고 감소 추적
  • 이미 발급된 쿠폰에 대한 중복 발급 시도 차단 확인

3. 이벤트 핸들링 중복 처리: eventId 기반만 사용

문제점: MetricsEventConsumerCouponIssueConsumer에서 EventHandledRepository.existsByEventId(eventId) 기반으로 중복 처리를 방지한다. 그러나 EventEnvelope.eventId는 producer(commerce-api의 OutboxEventListener)에서 생성되는데, 만약 Kafka 재전송으로 동일 eventId가 여러 번 도착하면 첫 번째만 처리되고 나머지는 무시된다. 장기적으로 데이터 정합성이 문제될 수 있다.

운영 영향:

  • Kafka 메시지 재전송 시 일부 이벤트만 처리되어 메트릭/상태 불일치
  • 모니터링 시 누락된 이벤트 감지 어려움

수정안:

  • EventHandled 저장 시 transaction 격리 수준 확인(READ_COMMITTED 이상)
  • existsByEventId 확인 후 저장 전 재확인(Double-Check Locking) 또는 unique constraint 활용
  • 중복 이벤트 감지 시 로그 레벨을 WARN으로 올림

추가 테스트:

  • 동일 eventId의 중복 메시지 도착 시뮬레이션
  • EventHandled unique constraint 위반 시 예외 처리 확인

4. 비동기 이벤트 리스너 예외 처리 미흡

문제점: UserActionLogListener에서 @Async("eventExecutor")로 비동기 실행되며, try-catch에서 warn 로깅 후 예외를 삼킨다. 그러나 eventExecutor가 사용 불가(스레드 풀 고갈, 네트워크 오류 등)일 경우 리스너 태스크 자체가 실행되지 않는다.

운영 영향:

  • 사용자 액션 로그 손실 가능
  • 장시간 로그 누락이 감지되지 않을 수 있음

수정안:

  • 비동기 예외를 위해 AsyncUncaughtExceptionHandler 구현 추가
  • eventExecutor 태스크 거부 시 폴백 메커니즘(예: 동기 처리 또는 큐 적재)
  • 스케줄 모니터링: 주기적으로 executor 상태(활성 스레드, 큐 크기) 로깅

추가 테스트:

  • eventExecutor 스레드 풀 고갈 시나리오: 큐 오버플로우 처리 확인
  • 비동기 콜백 예외 발생 시 AsyncUncaughtExceptionHandler 호출 확인

5. Kafka Topic 생성 자동화의 위험

문제점: KafkaTopicConfig에서 3 파티션, 1 레플리카로 하드코딩하여 자동 주제 생성한다. 프로덕션 환경에서는 권장되지 않으며, 운영 요구사항(파티션 수, 레플리카 수, retention, 압축)과 맞지 않을 수 있다.

운영 영향:

  • 프로덕션 배포 시 자동 생성된 주제로 인한 성능/확장성 문제
  • 높은 부하에서 파티션 수 부족으로 처리량 제한
  • 레플리카 1개로 인한 가용성 저하 가능성

수정안:

  • 프로덕션 프로필에서는 자동 생성 비활성화(spring.kafka.admin.auto-create: false)
  • 별도 Kafka 초기화 스크립트/IaC로 주제 수동 생성
  • 환경별 주제 설정 파일화(kafka-local.yml, kafka-prd.yml)

추가 테스트:

  • CI/CD 파이프라인에서 주제 존재 여부 확인 테스트
  • 프로덕션 프로필 활성화 시 자동 생성 비활성화 확인

Possibly related PRs

  • PR #275: 같은 이벤트 기반 아키텍처, 트랜잭션 아웃박스, Kafka 통합, 쿠폰 발급/컨슈머 로직을 구현하고 있어 변경사항이 상당히 겹친다.
  • PR #240: CommerceApiApplication 부트스트랩 변경(@EnableScheduling 추가)이 동일 파일을 수정한다.
  • PR #195: 좋아요 카운트 갱신 경로를 이벤트 기반으로 변경하는 부분에서, 기존 직접 업데이트 방식과의 전환점이 되는 변경사항이다.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

@dame2 dame2 changed the title Volume 7 [volume-7] Kafka 이벤트 파이프라인 구축 Mar 27, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 20

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
apps/commerce-api/src/test/java/com/loopers/concurrency/LikeConcurrencyTest.java (1)

75-76: ⚠️ Potential issue | 🟡 Minor

latch.await() 타임아웃 미설정으로 테스트 행(hang) 위험이 있다.

CountDownLatch.await()에 타임아웃이 없으면 스레드 예외 발생 시 latch.countDown()이 호출되지 않아 테스트가 무한 대기할 수 있다. CI 환경에서 타임아웃 없는 대기는 파이프라인 중단의 원인이 된다.

동일한 패턴이 Line 115, 169에도 존재한다.

🔧 수정안
-        latch.await();
-        executor.shutdown();
+        boolean completed = latch.await(30, TimeUnit.SECONDS);
+        executor.shutdown();
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+        assertThat(completed).isTrue();

import java.util.concurrent.TimeUnit; 추가 필요하다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/test/java/com/loopers/concurrency/LikeConcurrencyTest.java`
around lines 75 - 76, In LikeConcurrencyTest replace bare CountDownLatch.await()
calls with timed waits: call latch.await(TIMEOUT, TimeUnit.SECONDS) (add import
java.util.concurrent.TimeUnit) and assert that the boolean result is true (fail
the test if false) so tests don't hang; update each location where latch.await()
is used (the CountDownLatch variable named latch in LikeConcurrencyTest) and
keep executor.shutdown() as-is after the timed await.
apps/commerce-api/src/main/resources/application.yml (1)

20-28: ⚠️ Potential issue | 🟠 Major

kafka.yml의 dev, qa, prd 환경 프로필에 Kafka 브로커 및 보안 설정이 누락되어 있다.

kafka.ymlmodules/kafka/src/main/resources/에 존재하고 정상 import되고 있으나, dev, qa, prd 프로필 섹션이 공백 상태다. 현재 기본값은 ${BOOTSTRAP_SERVERS} 환경 변수에만 의존하고 있으며, 프로덕션 환경에서 이 변수가 누락되면 서비스 시작 실패로 이어진다.

다음 항목들을 보완해야 한다:

  1. dev, qa, prd 환경별 브로커 주소 명시: 환경 변수가 아닌 명시적 설정값 필요
  2. 보안 설정 추가: SASL/SSL/TLS 인증서 경로, 암호화 설정 등 운영 환경 보안 요구사항 반영
  3. 컨슈머 그룹 ID 분리: 현재 모든 환경에서 loopers-default-consumer로 고정되어 환경별 격리 필요
  4. 타임아웃 및 커넥션 설정 검증: 환경별로 다른 부하 특성에 대응하는 설정 필요

환경별 명시적 설정 없이 배포할 경우 프로덕션 메시지 유실, 장애 전파 위험이 있으므로, 각 환경별 kafka.yml 프로필 섹션을 보완하고 환경 변수 미설정 시나리오에 대한 테스트를 추가로 진행하기 바란다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/commerce-api/src/main/resources/application.yml` around lines 20 - 28,
kafka.yml’s dev/qa/prd profile blocks are empty and must be populated: for each
profile (dev, qa, prd) add explicit bootstrap-servers (do not rely on
${BOOTSTRAP_SERVERS}), add security settings (SASL mechanism, jaas config or
username/password, ssl enabled, truststore/keystore file paths and passwords)
under the same profile, replace the hardcoded consumer.group.id
"loopers-default-consumer" with environment/profile-specific group IDs (e.g.,
loopers-{env}-consumer), and tune connection/timeouts per profile
(request.timeout.ms, session.timeout.ms, retries/backoff,
connections.max.idle.ms) to match expected load; also add validation/tests that
startup fails with a clear error or fallback when BOOTSTRAP_SERVERS is missing
so deployments won’t silently use empty config.
modules/kafka/src/main/resources/kafka.yml (1)

25-25: ⚠️ Potential issue | 🟡 Minor

Consumer 설정에 오타가 있다.

Line 25의 value-serializervalue-deserializer여야 한다. Consumer는 역직렬화(deserialize)를 수행하므로 이 설정이 올바르지 않으면 메시지 소비 시 예상치 못한 동작이 발생할 수 있다.

수정안
    consumer:
      group-id: loopers-default-consumer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-      value-serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
+      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modules/kafka/src/main/resources/kafka.yml` at line 25, The consumer config
has a typo: replace the incorrect key `value-serializer` with
`value-deserializer` in the Kafka consumer configuration (the line containing
`value-serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer`)
so the consumer uses the deserializer class for value deserialization; ensure
the config now reads `value-deserializer:
org.apache.kafka.common.serialization.ByteArrayDeserializer`.
🟡 Minor comments (10)
supports/kafka-events/src/main/java/com/loopers/event/EventEnvelope.java-16-23 (1)

16-23: ⚠️ Potential issue | 🟡 Minor

필수 필드의 null 방어를 record 생성 시점에 강제하는 편이 안정적이다

운영 관점에서 필수 필드 null이 허용되면 전송 시점이 아니라 소비 시점에 지연 실패가 발생해 장애 원인 추적 시간이 길어진다. 수정안은 record compact constructor에서 Objects.requireNonNull로 즉시 실패시키는 방식이다. 추가 테스트는 각 필수 필드별 null 입력 시 NullPointerException(또는 프로젝트 표준 예외)이 즉시 발생하는지 검증하면 된다.

제안 diff
+import java.util.Objects;
@@
 public record EventEnvelope(
@@
 ) {
+    public EventEnvelope {
+        Objects.requireNonNull(eventId, "eventId");
+        Objects.requireNonNull(eventType, "eventType");
+        Objects.requireNonNull(aggregateType, "aggregateType");
+        Objects.requireNonNull(aggregateId, "aggregateId");
+        Objects.requireNonNull(timestamp, "timestamp");
+        Objects.requireNonNull(payload, "payload");
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@supports/kafka-events/src/main/java/com/loopers/event/EventEnvelope.java`
around lines 16 - 23, Add a compact constructor to the EventEnvelope record that
enforces non-null for required fields by calling Objects.requireNonNull for each
field (eventId, eventType, aggregateType, aggregateId, timestamp, payload) so
creation fails fast; update or add unit tests that attempt to construct
EventEnvelope with each of those fields set to null and assert a
NullPointerException is thrown.
apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/CouponIssueRequestJpaRepository.java-26-28 (1)

26-28: ⚠️ Potential issue | 🟡 Minor

countByStatus 쿼리의 성능 최적화를 위해 status 단일 컬럼 인덱스 추가가 필요하다.

countByCouponIdAndStatus의 복합 인덱스 (coupon_id, status)는 정의되어 있으나, countByStatusstatus만을 조건으로 하므로 전체 테이블 스캔을 유발할 수 있다. 운영 환경에서 데이터가 증가하면 성능 병목이 될 수 있으므로, status 컬럼에 대한 별도의 인덱스를 DB 마이그레이션 스크립트에 추가하고 성능 개선을 검증해야 한다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/CouponIssueRequestJpaRepository.java`
around lines 26 - 28, countByStatus 조회가 status만으로 필터링하기 때문에 전체 테이블 스캔을 유발할 수 있으니
CouponIssueRequestJpaRepository의 countByStatus 메서드 성능 개선을 위해 DB 마이그레이션에 status
단일 컬럼 인덱스(예: idx_coupon_issue_request_status)를 추가하고 배포 전 마이그레이션 스크립트와 스키마에 해당
인덱스가 반영되었는지 확인하세요; 기존 복합 인덱스 (coupon_id, status)를 유지하되 단일 컬럼 인덱스가 충돌하지 않도록 이름을
명확히 지정하고, 추가 후 EXPLAIN/쿼리 플랜 및 벤치마크로 countByStatus 성능 개선을 검증하세요.
apps/commerce-api/src/test/java/com/loopers/fake/FakeCouponIssueRequestRepository.java-42-58 (1)

42-58: ⚠️ Potential issue | 🟡 Minor

updateStatus에서 updatedAt 타임스탬프가 갱신되지 않는다.

실제 JPA 구현에서는 @PreUpdate 또는 @LastModifiedDate를 통해 updatedAt이 자동 갱신될 가능성이 높다. 그러나 Fake 구현에서는 기존 updatedAt 값을 그대로 사용하고 있어, 실제 동작과 불일치가 발생할 수 있다.

테스트 시 updatedAt 변경 여부를 검증하는 케이스가 있다면 실패할 수 있으므로, 실제 구현과 동작을 맞추는 것이 테스트 신뢰성 향상에 도움이 된다.

🐛 updatedAt 갱신 로직 추가 제안
     `@Override`
     public void updateStatus(String requestId, CouponIssueRequestStatus status, String failureReason) {
         CouponIssueRequest request = store.get(requestId);
         if (request != null) {
             CouponIssueRequest updated = CouponIssueRequest.reconstitute(
                 request.getId(),
                 request.getRequestId(),
                 request.getCouponId(),
                 request.getUserId(),
                 status,
                 failureReason,
                 request.getCreatedAt(),
-                request.getUpdatedAt()
+                java.time.Instant.now()
             );
             store.put(requestId, updated);
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/test/java/com/loopers/fake/FakeCouponIssueRequestRepository.java`
around lines 42 - 58, The FakeCouponIssueRequestRepository.updateStatus
currently reuses the old updatedAt, causing tests to diverge from JPA behavior;
change updateStatus (in FakeCouponIssueRequestRepository) to set updatedAt to
the current timestamp when calling CouponIssueRequest.reconstitute (keep
createdAt as-is, update status and failureReason as already done) so the fake
mirrors `@PreUpdate/`@LastModifiedDate behavior.
apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/CouponIssueRequestJpaRepository.java-18-24 (1)

18-24: ⚠️ Potential issue | 🟡 Minor

@Modifying 사용 시 영속성 컨텍스트 동기화 문제가 발생할 수 있다.

JPQL 벌크 업데이트는 영속성 컨텍스트를 우회한다. 동일 트랜잭션 내에서 해당 엔티티를 이미 로드한 경우, 메모리 상의 엔티티와 DB 상태가 불일치한다. 운영 환경에서 예기치 않은 데이터 불일치가 발생할 수 있다.

🛠️ clearAutomatically 옵션 추가 제안
-    `@Modifying`
+    `@Modifying`(clearAutomatically = true)
     `@Query`("UPDATE CouponIssueRequestJpaEntity r SET r.status = :status, r.failureReason = :failureReason WHERE r.requestId = :requestId")
     void updateStatus(
         `@Param`("requestId") String requestId,
         `@Param`("status") CouponIssueRequestStatus status,
         `@Param`("failureReason") String failureReason
     );

단, clearAutomatically = true는 영속성 컨텍스트 전체를 flush 후 clear하므로, 다른 엔티티에도 영향을 줄 수 있다. 호출 시점과 트랜잭션 경계를 고려하여 테스트해야 한다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/CouponIssueRequestJpaRepository.java`
around lines 18 - 24, The JPQL bulk update in
CouponIssueRequestJpaRepository.updateStatus bypasses the persistence context
and can leave loaded entities stale; update the `@Modifying` annotation on the
updateStatus method to include clearAutomatically = true so the EntityManager
flushes and clears the persistence context after the bulk update, and verify
this behavior against transaction boundaries (since clearAutomatically will
flush and clear all managed entities in the current context) to avoid unintended
side effects on other entities or operations.
apps/commerce-api/src/main/java/com/loopers/application/event/listener/UserActionLogListener.java-43-47 (1)

43-47: ⚠️ Potential issue | 🟡 Minor

로그에 userId를 포함하는 것이 개인정보 로깅 정책에 부합하는지 확인해야 한다.

userId가 개인 식별 정보(PII)로 취급되는 경우, debug/warn 레벨 로그에 직접 포함하면 GDPR/CCPA 등 개인정보 규정 위반 가능성이 있다. 로그 수집 시스템에서 마스킹 처리가 되지 않으면 민감 정보가 노출될 수 있다.

운영 환경의 로깅 정책을 검토하고, 필요 시 userId를 해시 처리하거나 로그 레벨을 trace로 낮추는 것을 고려해야 한다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/application/event/listener/UserActionLogListener.java`
around lines 43 - 47, The logs in UserActionLogListener currently include raw
userId in debug/warn calls which may violate PII policies; update the logging to
either (a) remove userId from these messages, (b) lower the level to trace, or
(c) log a non-reversible identifier instead (e.g., hashed/salted userId) by
introducing a helper like hashUserId(String) and using that value in the log
calls inside the UserActionLogListener event handler; ensure the chosen approach
is applied consistently for both the debug and warn branches and document which
policy-driven option you picked.
apps/commerce-api/src/test/java/com/loopers/application/event/EventListenerIntegrationTest.java-121-137 (1)

121-137: ⚠️ Potential issue | 🟡 Minor

테스트명과 실제 검증 내용 불일치

테스트명은 "캐시 실패해도 좋아요는 유지된다"이지만, 실제로는 캐시 실패 상황을 시뮬레이션하지 않고 단순히 좋아요 결과가 즉시 반환되는지만 확인한다. 테스트명을 실제 검증 내용에 맞게 수정하거나, 캐시 실패 시나리오를 실제로 테스트해야 한다.

♻️ 수정안: 테스트명 변경
     `@Test`
-    `@DisplayName`("좋아요 DB 저장과 Redis 캐시 갱신이 분리되어 있어 캐시 실패해도 좋아요는 유지된다")
+    `@DisplayName`("좋아요 결과는 이벤트 처리 완료와 무관하게 즉시 반환된다")
     void 좋아요_저장_성공_검증() {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/test/java/com/loopers/application/event/EventListenerIntegrationTest.java`
around lines 121 - 137, The test method 좋아요_저장_성공_검증 currently only verifies the
immediate LikeResult return but its DisplayName claims "캐시 실패해도 좋아요는 유지된다";
either rename the test DisplayName to match the current assertion (e.g., "좋아요
결과가 즉시 반환된다") or modify the test to simulate a Redis/cache failure and assert DB
persistence: in the latter case, arrange a cache client or event handler to
throw when updating (or mock the cache updater used by
LikeApplicationService.like/event listener), call
likeApplicationService.like(userId, product.getId()), then assert the LikeResult
is returned and verify the like record exists in productRepository (or the like
persistence method) even though the cache update failed; update the test method
name or implementation accordingly.
apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxIntegrationTest.java-28-30 (1)

28-30: ⚠️ Potential issue | 🟡 Minor

통합 테스트에서 스케줄링된 OutboxPublisher를 비활성화하거나 명시적으로 격리하라

현재 테스트는 @SpringBootTest와 @EnableScheduling이 활성화된 전체 애플리케이션 컨텍스트에서 실행된다. TestKafkaAutoConfiguration이 제공하는 Mockito 목(mock)은 콜백을 자동으로 실행하지 않으므로 이벤트가 PENDING 상태로 유지되지만, 스케줄링된 publisher가 테스트 환경에 존재하는 것 자체가 테스트 격리를 약화시킨다.

outbox 저장 동작을 검증하는 테스트는 publisher와 분리되어야 한다. 스케줄링을 비활성화하거나 publisher를 명시적으로 mock 처리하고, "outbox 저장 완료" 상태만 검증하도록 수정하라. LIKE_CREATED/LIKE_CANCELED 이벤트가 각각 정확히 1건 저장되는지 확인하는 별도의 단위 테스트를 추가해야 한다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxIntegrationTest.java`
around lines 28 - 30, The integration test OutboxIntegrationTest runs with the
full scheduled OutboxPublisher active; stop scheduling or isolate the publisher
so the test only verifies persistence. Modify the test configuration to disable
scheduling (e.g., set spring.scheduling.enabled=false for the test context) or
replace the OutboxPublisher bean with a Mockito mock in the test
(mockBean(OutboxPublisher.class)/@Import a test config that defines a no-op
OutboxPublisher) so the scheduled publish job does not run; keep
TestRedisConfiguration and TestKafkaAutoConfiguration as-is but ensure their
mocks do not trigger scheduling. Then make the integration test assert only that
saving produces PENDING outbox entries (verify LIKE_CREATED and LIKE_CANCELED
counts), and add separate unit tests for OutboxPublisher (or a dedicated
Publisher unit test) that mock repositories and verify the publisher processes
exactly one LIKE_CREATED and one LIKE_CANCELED event when triggered.
apps/commerce-collector/src/main/java/com/loopers/domain/metrics/ProductMetrics.java-96-100 (1)

96-100: ⚠️ Potential issue | 🟡 Minor

addOrder에서 음수 값 검증 누락

addOrder에서 quantityamount에 대한 검증이 없다. 음수 값이 전달되면 메트릭이 의도치 않게 감소하여 데이터 정합성이 깨질 수 있다. 이벤트 페이로드 파싱 오류나 악의적인 데이터로 인해 메트릭이 손상될 수 있다.

🛡️ 방어 코드 추가 제안
     public void addOrder(int quantity, long amount) {
+        if (quantity < 0 || amount < 0) {
+            throw new IllegalArgumentException("quantity and amount must be non-negative");
+        }
         this.orderCount += quantity;
         this.orderTotalAmount += amount;
         this.updatedAt = Instant.now();
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-collector/src/main/java/com/loopers/domain/metrics/ProductMetrics.java`
around lines 96 - 100, addOrder에서 quantity와 amount에 대한 음수 검증이 빠져 있어 음수 입력 시 메트릭이
감소할 수 있으니, ProductMetrics.addOrder 메서드 시작 부분에서 quantity와 amount가 0 이상인지 확인하고(예:
quantity < 0 || amount < 0) 유효하지 않으면 IllegalArgumentException을 던지거나 적절히 무시하도록
처리한 뒤 유효할 때만 this.orderCount, this.orderTotalAmount, this.updatedAt를 갱신하도록
변경하세요; 메서드 이름 addOrder와 필드 orderCount, orderTotalAmount, updatedAt를 참조하여 수정하세요.
apps/commerce-collector/src/main/java/com/loopers/application/consumer/MetricsEventConsumer.java-108-126 (1)

108-126: ⚠️ Potential issue | 🟡 Minor

JSON 페이로드 파싱 시 NPE 가능성

payload.get("productId"), payload.get("quantity"), payload.get("totalAmount") 호출 시 필드가 없으면 null이 반환되고, 이후 asLong(), asInt() 호출 시 NullPointerException이 발생한다. 이는 JsonProcessingException과 다르게 catch되지 않아 예외 메시지가 불명확해진다.

🛡️ null 체크 추가
         try {
             JsonNode payload = objectMapper.readTree(envelope.payload());
-            Long productId = payload.get("productId").asLong();
-            int quantity = payload.get("quantity").asInt();
-            long totalAmount = payload.get("totalAmount").asLong();
+            JsonNode productIdNode = payload.get("productId");
+            JsonNode quantityNode = payload.get("quantity");
+            JsonNode totalAmountNode = payload.get("totalAmount");
+
+            if (productIdNode == null || quantityNode == null || totalAmountNode == null) {
+                throw new IllegalArgumentException("Missing required fields in ORDER_COMPLETED payload");
+            }
+
+            Long productId = productIdNode.asLong();
+            int quantity = quantityNode.asInt();
+            long totalAmount = totalAmountNode.asLong();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-collector/src/main/java/com/loopers/application/consumer/MetricsEventConsumer.java`
around lines 108 - 126, In handleOrderCompleted, avoid potential NPEs when
parsing payload by validating the JSON fields before calling asLong()/asInt():
after JsonNode payload = objectMapper.readTree(...), check that
payload.get("productId"), payload.get("quantity"), and
payload.get("totalAmount") are non-null (or use payload.has/nonMissingNode/path
and isNumber checks) and produce a clear error (log with
envelope.eventId()/timestamp and throw a descriptive exception) if any are
missing or of the wrong type; keep the rest of the flow (validateEventOrder,
metrics.addOrder, updateLastEventTimestamp, productMetricsRepository.save)
unchanged so downstream logic still executes when inputs are valid.
apps/commerce-collector/src/main/java/com/loopers/application/consumer/MetricsEventConsumer.java-74-75 (1)

74-75: ⚠️ Potential issue | 🟡 Minor

aggregateId 파싱 실패 시 무한 재처리로 인한 컨슈머 장애 위험

Long.parseLong(envelope.aggregateId())NumberFormatException을 던질 수 있으며, 이는 라인 74-75, 87, 98의 여러 메서드에서 발생한다. 예외가 발생하면 consume() 메서드의 catch 블록에서 잡혀 로그만 기록되고, ack.acknowledge()가 호출되지 않는다. KafkaConfig에서 MANUAL ack 모드를 설정했으므로 메시지는 Kafka 큐에 남아 무한 재처리된다. DLQ나 에러 핸들러가 구성되지 않았으므로 잘못된 데이터가 계속 재시도되어 로그 폭증과 컨슈머 지연을 초래한다.

개선 방안:

  • 파싱 실패한 이벤트는 명시적으로 catch하여 로깅 후 acknowledge하거나 별도 로직으로 스킵
  • 또는 컨슈머 설정에 DLQ(Dead Letter Queue) 또는 DefaultErrorHandler를 구성하여 실패한 메시지를 별도 토픽으로 이동

추가 점검:
handleOrderCompleted() 메서드는 JsonProcessingException을 catch하여 RuntimeException으로 재던지지만, 다른 메서드들은 파싱 실패를 처리하지 않아 에러 핸들링 패턴이 일관성이 없다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-collector/src/main/java/com/loopers/application/consumer/MetricsEventConsumer.java`
around lines 74 - 75, The code currently calls
Long.parseLong(envelope.aggregateId()) in methods like handleLikeCreated (and
similar lines in other handlers) which can throw NumberFormatException and leave
the Kafka message unacknowledged causing infinite re-delivery; catch
NumberFormatException (and any other parse errors) inside each handler (e.g.,
handleLikeCreated, handleOrderCompleted, other handlers that call
envelope.aggregateId()), log the error with the envelope details, and
immediately acknowledge the record via ack.acknowledge() or route it to a
DLQ/error handler so the message is not retried forever; also make the error
handling consistent with handleOrderCompleted (which rethrows on
JsonProcessingException) by either converting parse failures to a deliberate
skip+ack or integrating with a DefaultErrorHandler/DLQ strategy.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: baa93c8c-9da0-4b25-9d9d-f27a109cbec6

📥 Commits

Reviewing files that changed from the base of the PR and between d877967 and 27d67d5.

📒 Files selected for processing (110)
  • apps/commerce-api/build.gradle.kts
  • apps/commerce-api/src/main/java/com/loopers/CommerceApiApplication.java
  • apps/commerce-api/src/main/java/com/loopers/application/event/CouponIssueRequestCreatedEvent.java
  • apps/commerce-api/src/main/java/com/loopers/application/event/LikeCanceledEvent.java
  • apps/commerce-api/src/main/java/com/loopers/application/event/LikeCreatedEvent.java
  • apps/commerce-api/src/main/java/com/loopers/application/event/OrderCompletedEvent.java
  • apps/commerce-api/src/main/java/com/loopers/application/event/ProductViewedEvent.java
  • apps/commerce-api/src/main/java/com/loopers/application/event/UserActionEvent.java
  • apps/commerce-api/src/main/java/com/loopers/application/event/listener/LikeCountEventListener.java
  • apps/commerce-api/src/main/java/com/loopers/application/event/listener/OrderNotificationListener.java
  • apps/commerce-api/src/main/java/com/loopers/application/event/listener/OutboxEventListener.java
  • apps/commerce-api/src/main/java/com/loopers/application/event/listener/UserActionLogListener.java
  • apps/commerce-api/src/main/java/com/loopers/application/fcfs/CouponIssueApplicationService.java
  • apps/commerce-api/src/main/java/com/loopers/application/fcfs/CouponIssueRequestResult.java
  • apps/commerce-api/src/main/java/com/loopers/application/fcfs/CouponIssueResultResponse.java
  • apps/commerce-api/src/main/java/com/loopers/application/like/LikeApplicationService.java
  • apps/commerce-api/src/main/java/com/loopers/application/order/OrderApplicationService.java
  • apps/commerce-api/src/main/java/com/loopers/application/product/ProductService.java
  • apps/commerce-api/src/main/java/com/loopers/config/AsyncConfig.java
  • apps/commerce-api/src/main/java/com/loopers/config/KafkaTopicConfig.java
  • apps/commerce-api/src/main/java/com/loopers/domain/fcfs/CouponIssueRequest.java
  • apps/commerce-api/src/main/java/com/loopers/domain/fcfs/CouponIssueRequestRepository.java
  • apps/commerce-api/src/main/java/com/loopers/domain/fcfs/CouponIssueRequestStatus.java
  • apps/commerce-api/src/main/java/com/loopers/domain/fcfs/FcfsCoupon.java
  • apps/commerce-api/src/main/java/com/loopers/domain/fcfs/FcfsCouponRepository.java
  • apps/commerce-api/src/main/java/com/loopers/domain/fcfs/FcfsIssuedCoupon.java
  • apps/commerce-api/src/main/java/com/loopers/domain/fcfs/FcfsIssuedCouponRepository.java
  • apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxEvent.java
  • apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxEventRepository.java
  • apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxStatus.java
  • apps/commerce-api/src/main/java/com/loopers/domain/useraction/ActionType.java
  • apps/commerce-api/src/main/java/com/loopers/domain/useraction/UserActionLog.java
  • apps/commerce-api/src/main/java/com/loopers/domain/useraction/UserActionLogRepository.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxPublisher.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/CouponIssueRequestJpaEntity.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/CouponIssueRequestJpaRepository.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/CouponIssueRequestMapper.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/CouponIssueRequestRepositoryImpl.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsCouponJpaEntity.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsCouponJpaRepository.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsCouponMapper.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsCouponRepositoryImpl.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsIssuedCouponJpaEntity.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsIssuedCouponJpaRepository.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsIssuedCouponMapper.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsIssuedCouponRepositoryImpl.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/outbox/OutboxEventJpaEntity.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/outbox/OutboxEventJpaRepository.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/outbox/OutboxEventMapper.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/outbox/OutboxEventRepositoryImpl.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/useraction/UserActionLogJpaEntity.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/useraction/UserActionLogJpaRepository.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/useraction/UserActionLogMapper.java
  • apps/commerce-api/src/main/java/com/loopers/infrastructure/persistence/jpa/useraction/UserActionLogRepositoryImpl.java
  • apps/commerce-api/src/main/java/com/loopers/interfaces/api/fcfs/FcfsCouponV1Api.java
  • apps/commerce-api/src/main/java/com/loopers/interfaces/api/fcfs/FcfsCouponV1Dto.java
  • apps/commerce-api/src/main/resources/application.yml
  • apps/commerce-api/src/test/java/com/loopers/application/event/EventListenerIntegrationTest.java
  • apps/commerce-api/src/test/java/com/loopers/application/event/listener/OutboxEventListenerTest.java
  • apps/commerce-api/src/test/java/com/loopers/application/fcfs/CouponIssueApplicationServiceTest.java
  • apps/commerce-api/src/test/java/com/loopers/application/like/LikeApplicationServiceTest.java
  • apps/commerce-api/src/test/java/com/loopers/application/order/OrderApplicationServiceTest.java
  • apps/commerce-api/src/test/java/com/loopers/concurrency/LikeConcurrencyTest.java
  • apps/commerce-api/src/test/java/com/loopers/config/TestKafkaAutoConfiguration.java
  • apps/commerce-api/src/test/java/com/loopers/fake/FakeApplicationEventPublisher.java
  • apps/commerce-api/src/test/java/com/loopers/fake/FakeCouponIssueRequestRepository.java
  • apps/commerce-api/src/test/java/com/loopers/fake/FakeFcfsCouponRepository.java
  • apps/commerce-api/src/test/java/com/loopers/fake/FakeOutboxEventRepository.java
  • apps/commerce-api/src/test/java/com/loopers/fake/FakeUserActionLogRepository.java
  • apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxIntegrationTest.java
  • apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxPublisherTest.java
  • apps/commerce-api/src/test/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
  • apps/commerce-collector/build.gradle.kts
  • apps/commerce-collector/src/main/java/com/loopers/CollectorApplication.java
  • apps/commerce-collector/src/main/java/com/loopers/application/consumer/CouponIssueConsumer.java
  • apps/commerce-collector/src/main/java/com/loopers/application/consumer/MetricsEventConsumer.java
  • apps/commerce-collector/src/main/java/com/loopers/domain/eventhandled/EventHandled.java
  • apps/commerce-collector/src/main/java/com/loopers/domain/eventhandled/EventHandledRepository.java
  • apps/commerce-collector/src/main/java/com/loopers/domain/fcfs/CouponIssueRequestStatus.java
  • apps/commerce-collector/src/main/java/com/loopers/domain/metrics/ProductMetrics.java
  • apps/commerce-collector/src/main/java/com/loopers/domain/metrics/ProductMetricsRepository.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/eventhandled/EventHandledJpaEntity.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/eventhandled/EventHandledJpaRepository.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/eventhandled/EventHandledMapper.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/eventhandled/EventHandledRepositoryImpl.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/CouponIssueRequestJpaEntity.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/CouponIssueRequestJpaRepository.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsCouponJpaEntity.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsCouponJpaRepository.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsIssuedCouponJpaEntity.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsIssuedCouponJpaRepository.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/metrics/ProductMetricsJpaEntity.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/metrics/ProductMetricsJpaRepository.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/metrics/ProductMetricsMapper.java
  • apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/metrics/ProductMetricsRepositoryImpl.java
  • apps/commerce-collector/src/main/resources/application.yml
  • apps/commerce-collector/src/test/java/com/loopers/application/consumer/MetricsEventConsumerTest.java
  • apps/commerce-collector/src/test/java/com/loopers/concurrency/FcfsCouponIssueConcurrencyTest.java
  • apps/commerce-collector/src/test/java/com/loopers/fake/FakeEventHandledRepository.java
  • apps/commerce-collector/src/test/java/com/loopers/fake/FakeProductMetricsRepository.java
  • apps/commerce-collector/src/test/resources/application-test.yml
  • build.gradle.kts
  • modules/kafka/src/main/resources/kafka.yml
  • settings.gradle.kts
  • supports/kafka-events/build.gradle.kts
  • supports/kafka-events/src/main/java/com/loopers/event/AggregateType.java
  • supports/kafka-events/src/main/java/com/loopers/event/CouponIssueRequestedEvent.java
  • supports/kafka-events/src/main/java/com/loopers/event/EventEnvelope.java
  • supports/kafka-events/src/main/java/com/loopers/event/EventType.java
  • supports/kafka-events/src/main/java/com/loopers/event/KafkaTopics.java

Comment on lines +27 to +28
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleLikeCreated(LikeCreatedEvent event) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) 전역 비동기 이벤트 멀티캐스터 설정 확인
rg -n -C4 --type=java 'ApplicationEventMulticaster|SimpleApplicationEventMulticaster|setTaskExecutor|@EnableAsync|TaskExecutor'

# 2) LikeCountEventListener의 `@Async` 적용 여부 및 관련 설정 확인
fd LikeCountEventListener.java --exec sed -n '1,220p' {}
rg -n -C3 --type=java '@Async|LikeCountEventListener'

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 8871


리스너에 @Async를 명시하지 않아 Redis I/O가 동기 실행되는 문제

애플리케이션에는 @EnableAsynceventExecutor 빈이 설정되어 있지만, LikeCountEventListener의 핸들러가 @Async 어노테이션을 누락했다. 따라서 @TransactionalEventListener(AFTER_COMMIT)만으로는 비동기 실행이 보장되지 않으며, Redis I/O가 동기로 실행되어 트랜잭션 커밋 직후에도 API 응답 지연과 스레드 점유가 증가한다. 같은 이벤트 리스너 구조를 사용하는 UserActionLogListener@Async("eventExecutor")를 명시하고 있으므로 이를 따르도록 수정해야 한다.

수정안: 두 핸들러 메서드(handleLikeCreated, handleLikeCanceled)에 @Async("eventExecutor") 어노테이션을 추가한다. 추가 테스트는 Redis 응답 지연을 주입했을 때 API 응답 시간이 리스너 처리 시간에 묶이지 않는지 검증하면 된다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/application/event/listener/LikeCountEventListener.java`
around lines 27 - 28, The event listener methods in LikeCountEventListener are
missing asynchronous execution, causing Redis I/O to run synchronously; add the
`@Async`("eventExecutor") annotation to both handler methods (handleLikeCreated
and handleLikeCanceled) so they run on the configured eventExecutor thread pool
instead of blocking the transaction/HTTP thread, then run tests injecting Redis
latency to confirm API response times are no longer tied to listener processing.

Comment on lines +32 to +34
} catch (Exception e) {
log.warn("좋아요 카운터 증가 실패 (무시): productId={}", event.productId(), e);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

예외를 무시만 하면 캐시-원본 불일치가 장기화될 수 있다

운영 관점에서 현재처럼 예외를 경고 로그만 남기고 종료하면 Redis 일시 장애 시 좋아요 카운트가 지속적으로 틀어질 수 있고, 자동 복구 경로가 없어 수동 정합화 비용이 커진다. 수정안은 실패 이벤트를 재시도 큐(또는 outbox 재발행)로 보내거나, 주기적 리컨실리에이션 작업을 추가해 최종 정합성을 보장하는 방식이다. 추가 테스트는 Redis 예외를 강제로 발생시킨 뒤 재시도/리컨실리에이션으로 카운트가 회복되는지 검증하면 된다.

Also applies to: 45-47

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/application/event/listener/LikeCountEventListener.java`
around lines 32 - 34, The catch block in LikeCountEventListener merely logs and
swallows exceptions causing long-term cache-source drift; modify the exception
handler in the method that processes the like event (LikeCountEventListener) to
publish the failed event to a retry/outbox mechanism (e.g., call a
RetryQueue.enqueue(event) or OutboxService.saveFailedEvent(event)) and log the
publish result, and also add or register a periodic reconciliation job (e.g.,
LikeCountReconciler.reconcileLikes()) to repair mismatches; add a
unit/integration test that forces a Redis exception when handling the event and
verifies the event is enqueued for retry/outbox and that the reconcilier can
recover the correct count.

Comment on lines +95 to +108
try {
String payload = objectMapper.writeValueAsString(event);
OutboxEvent outboxEvent = OutboxEvent.create(
eventType.name(),
aggregateType.name(),
aggregateId,
topic,
payload
);
outboxEventRepository.save(outboxEvent);
log.debug("Outbox event saved: type={}, aggregateId={}", eventType, aggregateId);
} catch (JsonProcessingException e) {
log.error("Failed to serialize event: {}", event, e);
throw new RuntimeException("Failed to serialize event for outbox", e);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

직렬화 실패를 일반 RuntimeException과 전체 이벤트 로깅으로 처리하면 응답과 로그가 불안정하다

Line 95-108은 요청 트랜잭션 안에서 실행되므로, 여기서 RuntimeException을 그대로 던지면 API 계층의 표준 오류 포맷을 우회한 500 응답으로 번질 수 있다. 또한 event 전체를 로그에 남기면 주문/사용자 식별자 등 payload가 장애 로그로 확산될 수 있다. 내부적으로는 cause를 보존한 CoreException 또는 전용 애플리케이션 예외를 던지고, 로그는 eventType, aggregateId, eventId 같은 최소 메타데이터만 남기도록 바꾸는 편이 안전하다. 추가 테스트로는 ObjectMapper 실패를 주입했을 때 표준 오류 흐름이 유지되고 로그나 예외 메시지에 payload 전체가 포함되지 않는지 검증해야 한다. Based on learnings, 'enforce unified error handling by routing errors through CoreException to ApiControllerAdvice to ensure a consistent response format' and as per coding guidelines, '로깅 시 민감정보 노출 가능성을 점검한다.'

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/application/event/listener/OutboxEventListener.java`
around lines 95 - 108, Replace the current error handling in OutboxEventListener
where objectMapper.writeValueAsString(event) is caught: instead of logging the
entire event and throwing a plain RuntimeException, catch
JsonProcessingException, log only minimal metadata (eventType, aggregateId,
eventId if available) and the fact serialization failed, and rethrow a
domain-aware exception (e.g., CoreException or a new
OutboxSerializationException) that preserves the original cause so
ApiControllerAdvice/central error handling can produce the unified API error
response; update any tests to inject a failing ObjectMapper and assert the
unified error flow and that logs do not contain the full event payload (verify
only metadata logged).

Comment on lines +50 to +61
@Transactional(readOnly = true)
public ProductResult findById(Long id, Long userId) {
Product product = productDomainService.findById(id);
long likeCount = likeCountQueryService.getLikeCount(id);

if (userId != null) {
eventPublisher.publishEvent(ProductViewedEvent.of(userId, id));
eventPublisher.publishEvent(UserActionEvent.of(userId, ActionType.VIEW, id));
}

return ProductResult.from(product, likeCount);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

readOnly = true 트랜잭션에서 Outbox 이벤트 발행 시 데이터 손실 위험

ProductViewedEventOutboxEventListener에서 BEFORE_COMMIT 단계에 outbox 레코드를 저장한다. 그러나 현재 메서드는 @Transactional(readOnly = true)로 선언되어 있어, outbox 레코드 저장 시 문제가 발생할 수 있다.

운영 관점에서 readOnly 트랜잭션 내 쓰기 작업은 일부 DB/드라이버에서 무시되거나 예외를 발생시킬 수 있다. 이로 인해 이벤트가 Kafka로 전달되지 않아 메트릭 수집 누락이 발생한다.

🔧 수정안: 이벤트 발행을 별도 트랜잭션으로 분리하거나 readOnly 제거

방안 1: 이벤트 발행 전용 메서드 분리

 `@Transactional`(readOnly = true)
 public ProductResult findById(Long id, Long userId) {
     Product product = productDomainService.findById(id);
     long likeCount = likeCountQueryService.getLikeCount(id);
 
     if (userId != null) {
-        eventPublisher.publishEvent(ProductViewedEvent.of(userId, id));
-        eventPublisher.publishEvent(UserActionEvent.of(userId, ActionType.VIEW, id));
+        publishViewEvents(userId, id);
     }
 
     return ProductResult.from(product, likeCount);
 }
+
+@Transactional
+public void publishViewEvents(Long userId, Long productId) {
+    eventPublisher.publishEvent(ProductViewedEvent.of(userId, productId));
+    eventPublisher.publishEvent(UserActionEvent.of(userId, ActionType.VIEW, productId));
+}

방안 2: OutboxEventListener가 AFTER_COMMIT + REQUIRES_NEW 사용

추가 테스트: readOnly 트랜잭션에서 outbox 저장이 정상 동작하는지 통합 테스트 추가 필요.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/application/product/ProductService.java`
around lines 50 - 61, The findById method is marked `@Transactional`(readOnly =
true) but publishes ProductViewedEvent and UserActionEvent which trigger
OutboxEventListener writes (BEFORE_COMMIT), risking lost outbox records; fix by
removing readOnly on ProductService.findById or move event publication into a
separate method that runs in its own write transaction (e.g., create a new
method publishViewEvents(userId, id) annotated `@Transactional`(propagation =
REQUIRES_NEW) and call it from findById after loading data) so that
ProductViewedEvent/UserActionEvent are persisted reliably by
OutboxEventListener.

Comment on lines +17 to +33
private FcfsCoupon(Long id, String name, int totalQuantity, int issuedCount, Instant createdAt) {
this.id = id;
this.name = Objects.requireNonNull(name, "name must not be null");
this.totalQuantity = totalQuantity;
this.issuedCount = issuedCount;
this.createdAt = createdAt;
}

public static FcfsCoupon create(String name, int totalQuantity) {
if (totalQuantity <= 0) {
throw new IllegalArgumentException("totalQuantity must be positive");
}
return new FcfsCoupon(null, name, totalQuantity, 0, Instant.now());
}

public static FcfsCoupon reconstitute(Long id, String name, int totalQuantity, int issuedCount, Instant createdAt) {
return new FcfsCoupon(id, name, totalQuantity, issuedCount, createdAt);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

reconstitute()가 잘못된 영속 상태를 그대로 허용한다

Line 17-33에서는 create()에서만 totalQuantity > 0를 검증하고, reconstitute()issuedCount < 0, issuedCount > totalQuantity, createdAt == null 같은 상태를 그대로 받아들인다. 운영 중 잘못된 row나 매퍼 버그가 들어오면 remainingQuantity()가 음수가 되거나 canIssue() 판단이 왜곡되어 과발급 또는 조기 품절 오판으로 이어질 수 있다. 생성과 복원 경로가 동일한 불변식 검증을 공유하도록 생성자 또는 전용 검증 메서드에 검증을 모아야 한다. 추가 테스트로는 reconstitute()에 음수 발급 수량, 총수량 초과 발급 수량, null createdAt을 넣었을 때 즉시 예외가 발생하는지 확인해야 한다. As per coding guidelines, '**/domain/**/*.java: 엔티티/값 객체/DTO 경계를 명확히 하고, 불변성과 캡슐화를 점검한다.'

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/commerce-api/src/main/java/com/loopers/domain/fcfs/FcfsCoupon.java`
around lines 17 - 33, The reconstitute() path currently accepts invalid state
(negative issuedCount, issuedCount > totalQuantity, null createdAt) while
create() validates totalQuantity; centralize and apply the invariant checks
(totalQuantity>0, 0<=issuedCount<=totalQuantity, createdAt != null) into the
private FcfsCoupon constructor or a single validate method and call it from both
create() and reconstitute(); update/reuse the same validation for
remainingQuantity() and canIssue() invariants and add unit tests that call
reconstitute() with negative issuedCount, issuedCount > totalQuantity, and null
createdAt to ensure exceptions are thrown immediately.

Comment on lines +17 to +19
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT c FROM FcfsCouponJpaEntity c WHERE c.id = :id")
Optional<FcfsCouponJpaEntity> findByIdWithLock(@Param("id") Long id);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

락 타임아웃 힌트 누락으로 인한 무한 대기 위험이 있다.

PESSIMISTIC_WRITE 락을 사용하면서 jakarta.persistence.lock.timeout 힌트가 없으면, 동시 요청이 몰리는 FCFS 시나리오에서 락 획득 대기 스레드가 무한정 블로킹되어 스레드 풀 고갈 및 서비스 장애로 이어질 수 있다.

🔧 락 타임아웃 힌트 추가 제안
+import org.springframework.data.jpa.repository.QueryHints;
+import jakarta.persistence.QueryHint;

     `@Lock`(LockModeType.PESSIMISTIC_WRITE)
+    `@QueryHints`({`@QueryHint`(name = "jakarta.persistence.lock.timeout", value = "3000")})
     `@Query`("SELECT c FROM FcfsCouponJpaEntity c WHERE c.id = :id")
     Optional<FcfsCouponJpaEntity> findByIdWithLock(`@Param`("id") Long id);

추가 테스트: 락 타임아웃 초과 시 PessimisticLockException 발생을 검증하는 동시성 테스트를 추가하여, 타임아웃 동작이 정상적으로 작동하는지 확인해야 한다. Based on learnings, 이 프로젝트의 다른 WithLock 메서드들은 모두 QueryHints로 3000ms 타임아웃을 적용하고 있다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/fcfs/FcfsCouponJpaRepository.java`
around lines 17 - 19, The findByIdWithLock method in FcfsCouponJpaRepository
uses PESSIMISTIC_WRITE without a lock timeout hint, risking indefinite blocking;
update the repository method findByIdWithLock to add QueryHints setting
"jakarta.persistence.lock.timeout" (use 3000 for consistency with other WithLock
methods) so the DB throws on timeout, and add a concurrency test that triggers
the lock and asserts a PessimisticLockException is thrown when the timeout
elapses.

Comment on lines +25 to +51
public class ProductMetricsJpaEntity {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Column(name = "product_id", nullable = false, unique = true)
private Long productId;

@Column(name = "like_count", nullable = false)
private long likeCount;

@Column(name = "view_count", nullable = false)
private long viewCount;

@Column(name = "order_count", nullable = false)
private long orderCount;

@Column(name = "order_total_amount", nullable = false)
private long orderTotalAmount;

@Column(name = "updated_at", nullable = false)
private Instant updatedAt;

@Column(name = "last_event_timestamp")
private Instant lastEventTimestamp;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# MetricsEventConsumer의 동시성 처리 방식 확인
rg -n -B5 -A15 'class MetricsEventConsumer' --type=java

# 트랜잭션 및 락 설정 확인
rg -n '@Transactional|@Lock|FOR UPDATE' --type=java -g '*Metrics*'

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 6018


🏁 Script executed:

# ProductMetricsRepository 구현 확인 (업데이트 패턴)
find . -name "ProductMetricsRepository.java" -type f | head -5 | xargs -I {} sh -c 'echo "=== {} ===" && cat -n {}'

# ProductMetricsJpaEntity 전체 코드 확인
find . -name "ProductMetricsJpaEntity.java" -type f | head -5 | xargs -I {} sh -c 'echo "=== {} ===" && cat -n {}'

# 메트릭 관련 `@Version`, `@Lock` 검색
rg -n '@Version|@Lock' --type=java -g '*Metrics*'

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 4367


🏁 Script executed:

# ProductMetricsRepository 구현체 찾기 (JPA 구현체)
find . -name "*ProductMetrics*Repository*.java" -type f | xargs -I {} sh -c 'echo "=== {} ===" && cat -n {} | head -80'

# MetricsEventConsumer의 processEvent 메서드 전체 확인
rg -n -A50 'void processEvent\(' --type=java -g '*MetricsEventConsumer*'

# 원자적 증감 쿼리 패턴 검색
rg -n 'like_count\s*=\s*like_count|view_count\s*=\s*view_count|order_count\s*=\s*order_count' --type=java

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 12520


🏁 Script executed:

# 커스텀 UPDATE 쿼리 (원자적 증감) 검색
rg -n '@Query|@Modifying' --type=java -g '*Metrics*'

# 동시성 테스트 검색 (concurrent, parallel, thread)
rg -n 'concurrent|parallel|Thread|ExecutorService' --type=java -g '*MetricsEventConsumer*Test*'

# ProductMetricsMapper.updateEntity 확인 (어떻게 매핑하는지)
find . -name "ProductMetricsMapper.java" -type f | xargs cat -n

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 1456


@Version 필드 추가 또는 원자적 증감 연산으로 동시 업데이트 문제 해결이 필요하다.

ProductMetricsRepositoryImpl이 Load-Modify-Save 패턴을 사용하고 있으며, 원자적 증감 쿼리(UPDATE ... SET field = field + 1)가 존재하지 않는다. Kafka의 여러 파티션에서 동시에 같은 productId에 대한 이벤트가 처리되면, 후행 스레드가 선행 스레드의 업데이트를 덮어쓸 수 있다(Lost Update).

예: 스레드 A와 B가 동시에 productId=1을 로드(likeCount=10)하면, A가 +1을 저장한 후 B가 자신의 오래된 상태(likeCount=10)를 저장하여 A의 업데이트가 손실된다. EventHandledRepository의 멱등성 체크는 중복 이벤트 처리를 방지할 뿐, 동시 업데이트는 방지하지 못한다.

수정안: @Version 필드를 추가하여 낙관적 락을 활성화하거나, ProductMetricsJpaRepository에 원자적 증감 쿼리를 구현하고 MetricsEventConsumer에서 사용하도록 변경해야 한다.

♻️ `@Version` 추가 제안
 `@Id`
 `@GeneratedValue`(strategy = GenerationType.IDENTITY)
 private Long id;

+@Version
+private Long version;
+
 `@Column`(name = "product_id", nullable = false, unique = true)
 private Long productId;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public class ProductMetricsJpaEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "product_id", nullable = false, unique = true)
private Long productId;
@Column(name = "like_count", nullable = false)
private long likeCount;
@Column(name = "view_count", nullable = false)
private long viewCount;
@Column(name = "order_count", nullable = false)
private long orderCount;
@Column(name = "order_total_amount", nullable = false)
private long orderTotalAmount;
@Column(name = "updated_at", nullable = false)
private Instant updatedAt;
@Column(name = "last_event_timestamp")
private Instant lastEventTimestamp;
public class ProductMetricsJpaEntity {
`@Id`
`@GeneratedValue`(strategy = GenerationType.IDENTITY)
private Long id;
`@Version`
private Long version;
`@Column`(name = "product_id", nullable = false, unique = true)
private Long productId;
`@Column`(name = "like_count", nullable = false)
private long likeCount;
`@Column`(name = "view_count", nullable = false)
private long viewCount;
`@Column`(name = "order_count", nullable = false)
private long orderCount;
`@Column`(name = "order_total_amount", nullable = false)
private long orderTotalAmount;
`@Column`(name = "updated_at", nullable = false)
private Instant updatedAt;
`@Column`(name = "last_event_timestamp")
private Instant lastEventTimestamp;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/metrics/ProductMetricsJpaEntity.java`
around lines 25 - 51, ProductMetricsJpaEntity currently allows lost-updates
under concurrent Kafka processing; add an optimistic locking `@Version` field
(e.g., private Long version annotated with `@Version`) to ProductMetricsJpaEntity
to enable optimistic locking, or alternatively implement atomic increment
methods on ProductMetricsJpaRepository (e.g., incrementLikeCountByProductId,
incrementViewCountByProductId, addOrderTotalAmountByProductId) and call those
from MetricsEventConsumer instead of the current load-modify-save in
ProductMetricsRepositoryImpl; update the consumer/service to use the chosen
approach so concurrent updates on productId are handled atomically.

Comment on lines +35 to +44
@Override
public ProductMetrics getOrCreate(Long productId) {
return jpaRepository.findByProductId(productId)
.map(ProductMetricsMapper::toDomain)
.orElseGet(() -> {
ProductMetricsJpaEntity entity = new ProductMetricsJpaEntity(productId);
ProductMetricsJpaEntity saved = jpaRepository.save(entity);
return ProductMetricsMapper.toDomain(saved);
});
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# MetricsEventConsumer에서 getOrCreate 호출 시 트랜잭션 경계 확인
ast-grep --pattern $'@Transactional
$$$
getOrCreate($$$)
$$$'

# Consumer에서 어떻게 호출되는지 확인
rg -n -A5 'getOrCreate' --type=java

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 5619


🏁 Script executed:

# MetricsEventConsumer 전체 구조 확인
rg -B5 -A15 'class MetricsEventConsumer' --type=java

# ProductMetricsJpaEntity의 제약 조건 확인
rg -B5 -A15 'class ProductMetricsJpaEntity' --type=java

# Kafka consumer 설정 확인
fd -e properties -e yml -e yaml | xargs grep -l 'kafka\|consumer' | head -5

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 8976


🏁 Script executed:

# ProductMetrics Repository interface and unique constraint check
rg -B2 -A8 'findByProductId' --type=java apps/commerce-collector

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 14923


🏁 Script executed:

# MetricsEventConsumer의 processEvent() 전체 구현 확인
rg -B2 -A20 'private void processEvent' --type=java apps/commerce-collector

# consume() 메서드의 exception handling 전체 구현
rg -B2 -A25 'public void consume' --type=java apps/commerce-collector

# Kafka consumer 설정 확인
cat apps/commerce-collector/src/main/resources/application.yml

Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java

Length of output: 7984


getOrCreate() 메서드에서 동시성 레이스 컨디션 발생 가능하다.

MetricsEventConsumer가 같은 productId에 대해 서로 다른 이벤트를 동시에 처리할 때 레이스 컨디션이 발생한다. 예를 들어 LIKE_CREATED와 PRODUCT_VIEWED 이벤트가 같은 product ID에 대해 동시 도착하면, 두 스레드가 모두 findByProductId에서 empty를 받고 각각 새 엔티티를 생성하려다 DataIntegrityViolationException이 발생한다. 현재는 Kafka 레벨에서 재시도되지만 불필요한 트랜잭션 롤백을 유발한다.

CouponIssueConsumer처럼 getOrCreate() 메서드 내에서 발생한 DataIntegrityViolationException을 명시적으로 처리하고 재조회하는 것이 더 효율적이다.

♻️ 수정 제안
 `@Override`
 public ProductMetrics getOrCreate(Long productId) {
     return jpaRepository.findByProductId(productId)
         .map(ProductMetricsMapper::toDomain)
         .orElseGet(() -> {
-            ProductMetricsJpaEntity entity = new ProductMetricsJpaEntity(productId);
-            ProductMetricsJpaEntity saved = jpaRepository.save(entity);
-            return ProductMetricsMapper.toDomain(saved);
+            try {
+                ProductMetricsJpaEntity entity = new ProductMetricsJpaEntity(productId);
+                ProductMetricsJpaEntity saved = jpaRepository.save(entity);
+                return ProductMetricsMapper.toDomain(saved);
+            } catch (org.springframework.dao.DataIntegrityViolationException e) {
+                // 동시 insert로 인한 중복 - 재조회
+                return jpaRepository.findByProductId(productId)
+                    .map(ProductMetricsMapper::toDomain)
+                    .orElseThrow(() -> new IllegalStateException("Unexpected state: metrics not found after conflict", e));
+            }
         });
 }

동시성 레이스 컨디션 시나리오를 포함한 통합 테스트를 추가하여 두 스레드가 동시에 같은 productId로 getOrCreate를 호출할 때 정확히 하나의 메트릭스만 생성되는지 검증해야 한다.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Override
public ProductMetrics getOrCreate(Long productId) {
return jpaRepository.findByProductId(productId)
.map(ProductMetricsMapper::toDomain)
.orElseGet(() -> {
ProductMetricsJpaEntity entity = new ProductMetricsJpaEntity(productId);
ProductMetricsJpaEntity saved = jpaRepository.save(entity);
return ProductMetricsMapper.toDomain(saved);
});
}
`@Override`
public ProductMetrics getOrCreate(Long productId) {
return jpaRepository.findByProductId(productId)
.map(ProductMetricsMapper::toDomain)
.orElseGet(() -> {
try {
ProductMetricsJpaEntity entity = new ProductMetricsJpaEntity(productId);
ProductMetricsJpaEntity saved = jpaRepository.save(entity);
return ProductMetricsMapper.toDomain(saved);
} catch (org.springframework.dao.DataIntegrityViolationException e) {
// 동시 insert로 인한 중복 - 재조회
return jpaRepository.findByProductId(productId)
.map(ProductMetricsMapper::toDomain)
.orElseThrow(() -> new IllegalStateException("Unexpected state: metrics not found after conflict", e));
}
});
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-collector/src/main/java/com/loopers/infrastructure/persistence/jpa/metrics/ProductMetricsRepositoryImpl.java`
around lines 35 - 44, getOrCreate currently races when two threads both miss
findByProductId and try to save a new ProductMetricsJpaEntity; modify
getOrCreate (the method using jpaRepository, ProductMetricsJpaEntity and
ProductMetricsMapper) to catch DataIntegrityViolationException around the save,
and on catch re-query jpaRepository.findByProductId and return the found entity
mapped by ProductMetricsMapper::toDomain (so only one row is created without
bubbling the exception); mirror the pattern used in CouponIssueConsumer
(catch-and-relookup) and add an integration test that concurrently calls
getOrCreate for the same productId and asserts exactly one metrics row is
created.

Comment on lines +19 to +20
profiles:
active: local
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

프로덕션 기본 프로파일이 local로 고정되어 운영 오동작 위험이 크다.

운영에서 SPRING_PROFILES_ACTIVE가 누락되면 로컬 프로파일로 기동되어 잘못된 DB/카프카/로깅 설정을 사용할 수 있어 장애 전파 위험이 있다. spring.profiles.active는 제거하고 spring.profiles.default로 전환해 미지정 시에만 로컬 기본값이 적용되도록 수정하는 것이 안전하다. 추가로 프로파일 미지정 기동과 prd 지정 기동 각각에서 활성 프로파일을 검증하는 설정 테스트를 추가해야 한다.

🔧 제안 수정안
 spring:
   main:
     web-application-type: servlet
   application:
     name: commerce-collector
   profiles:
-    active: local
+    default: local

As per coding guidelines: "**/application*.yml: 환경별 분리(프로파일)와 기본값 적절성을 점검하고 ... 운영에 영향을 주는 설정 변경은 근거와 영향 범위를 요구한다."

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
profiles:
active: local
spring:
main:
web-application-type: servlet
application:
name: commerce-collector
profiles:
default: local
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/commerce-collector/src/main/resources/application.yml` around lines 19 -
20, application.yml currently hardcodes spring.profiles.active: local which
risks starting production with local settings; remove the spring.profiles.active
entry and instead set spring.profiles.default: local so local is only used when
no SPRING_PROFILES_ACTIVE is provided, and add tests that assert startup
behavior for both (a) no SPRING_PROFILES_ACTIVE -> effective profile equals
"local" and (b) SPRING_PROFILES_ACTIVE=prd -> effective profile includes "prd";
also add a startup validation check (e.g., in an ApplicationRunner or test) that
fails fast if an unexpected profile combination is detected in production to
prevent silent misconfiguration.

Comment on lines +17 to +20
String eventId,
String eventType,
String aggregateType,
String aggregateId,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

이벤트 라우팅 핵심 필드를 문자열 대신 타입으로 고정해야 한다

운영 관점에서 eventType/aggregateType를 문자열로 두면 오탈자 1건으로 소비자 라우팅 실패, 재처리 증가, 관측 지표 분산이 발생한다. 수정안은 EventType, AggregateType 같은 명시 타입으로 필드를 교체해 컴파일 단계에서 오류를 차단하는 방식이다. 추가 테스트는 잘못된 문자열 주입 시나리오 대신, 각 enum 값이 직렬화/역직렬화 후 동일하게 유지되는 통합 테스트로 검증하면 된다.

제안 diff
 public record EventEnvelope(
     String eventId,
-    String eventType,
-    String aggregateType,
+    EventType eventType,
+    AggregateType aggregateType,
     String aggregateId,
     Instant timestamp,
     String payload
 ) {
@@
     public static EventEnvelope of(
         String eventId,
-        String eventType,
-        String aggregateType,
+        EventType eventType,
+        AggregateType aggregateType,
         String aggregateId,
         String payload
     ) {

Also applies to: 25-29

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@supports/kafka-events/src/main/java/com/loopers/event/EventEnvelope.java`
around lines 17 - 20, EventEnvelope currently uses raw strings for eventType and
aggregateType which risks runtime routing errors; change the EventEnvelope
fields named eventType and aggregateType to use strongly-typed enums (e.g.,
EventType and AggregateType) instead of String, update all constructors,
builders, serializers/deserializers and usages to accept and persist those enum
types, and adjust any JSON/mapping configuration to serialize enums correctly;
additionally add an integration test that serializes and then deserializes each
enum value to assert equality (verifying that EventType and AggregateType values
survive round-trip) and update any code referencing the old String fields
accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant