[Volume 7] 이벤트 기반 아키텍처 및 Kafka 파이프라인 구현 - 양권모#298
Open
Praesentia-YKM wants to merge 10 commits intoLoopers-dev-lab:Praesentia-YKMfrom
Open
[Volume 7] 이벤트 기반 아키텍처 및 Kafka 파이프라인 구현 - 양권모#298Praesentia-YKM wants to merge 10 commits intoLoopers-dev-lab:Praesentia-YKMfrom
Praesentia-YKM wants to merge 10 commits intoLoopers-dev-lab:Praesentia-YKMfrom
Conversation
- ProductService: getById() → getProduct() 리네임, getProductForAdmin() 추가 - ProductSortType: LATEST 값 추가, ProductRepositoryImpl에 LATEST 케이스 처리 - BrandFacade: deleteAllByBrandId() → softDeleteByBrandId() 호출 수정 - ProductRepositoryImpl: 중복 save() 메서드 제거 - 테스트 5개 파일: 메서드명 불일치 일괄 수정 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- ProductFacade에 @Cacheable/@CacheEvict 적용 (상품 상세 캐시) - LikeTransactionService에 좋아요 변경 시 캐시 무효화 추가 - RedisCacheConfig, CustomCacheErrorHandler 추가 - 고아 클래스 ProductListCacheService 삭제 - 중복 admin/ 패키지 및 interfaces/auth/ 패키지 정리 - import 경로 수정 및 getter 이름 불일치 해결 - 캐시 통합 테스트 추가 (ProductCacheIntegrationTest, LikeTransactionCacheTest) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- QueryDSL BooleanBuilder로 brandId 동적 필터 + 정렬(좋아요순 등) 구현 - 복합 인덱스 4개 추가 (brand+deleted+like, deleted+like, deleted+created, deleted+price) - N+1 문제 해결: 목록 조회 시 배치 쿼리 패턴(Map<Long, Model>) 적용 - likes 테이블 유니크 제약조건 추가 (uk_likes_user_product) - StockService 메서드 참조 오류 수정 (productId → getProductId) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
# Conflicts: # apps/commerce-api/src/main/java/com/loopers/application/brand/BrandService.java # apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.java # apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductRepositoryImpl.java # apps/commerce-api/src/test/java/com/loopers/application/like/LikeFacadeTest.java # apps/commerce-api/src/test/java/com/loopers/application/order/OrderFacadeTest.java
- Payment 도메인 전체 레이어 구현 (Model, Repository, Service, Facade, Controller) - PG 클라이언트 인프라 구성 (PgClient, PgProperties, PgClientConfig) - Order 도메인에 결제 관련 상태 및 로직 추가 - ErrorType에 결제 관련 에러 코드 추가 - 설계 문서 업데이트 (클래스 다이어그램, ERD, 시퀀스, 상태 다이어그램) - 단위/통합 테스트 작성 (PaymentModelTest, PaymentFacadeTest) - API 테스트용 .http 파일 추가 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Resilience4j 적용 (CircuitBreaker + Retry + Bulkhead) - TX 분리 패턴으로 PG 호출 동안 DB 커넥션 미점유 - Polling 스케줄러로 PENDING 결제 자동 복구 (60초 주기, fail-fast) - 고아 Payment 복구 (orderId 기반 Phase C) - WireMock 기반 PG 연동 통합 테스트 9종 - resilience4j-lab 분석 기반 설정 함정 수정 🤖 Generated with Claude Code
- 좋아요/주문/결제 도메인에 이벤트 record 정의 (LikeToggledEvent, OrderPlacedEvent, PaymentCompletedEvent) - LikeTransactionService에서 직접 호출 대신 이벤트 발행으로 전환 - LikeMetricsEventListener: AFTER_COMMIT + REQUIRES_NEW로 좋아요 집계 처리 - OrderFacade.placeOrder(), PaymentFacade.handleCallback()에서 이벤트 발행 - UserActivityEventListener: 모든 도메인 이벤트를 구독하여 유저 행동 로깅 - 캐시 evict 실패 시에도 DB 업데이트가 롤백되지 않도록 try-catch 처리 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Step 2: Transactional Outbox Pattern + Kafka Producer/Consumer - OutboxEvent 엔티티 (2-Phase: PENDING → PROCESSING → PUBLISHED/FAILED) - OutboxEventListener (BEFORE_COMMIT으로 도메인 TX 원자성 보장) - OutboxRelayService (전용 ExecutorService + @scheduled 폴링 + Recovery/Cleanup) - ProductViewedEvent 추가 및 ProductFacade 연동 - Kafka 설정: acks=all, idempotence=true - CatalogEventConsumer (LIKED/UNLIKED/PRODUCT_VIEWED → ProductMetrics 집계) - OrderEventConsumer (ORDER_PLACED/PAYMENT_COMPLETED 로깅) - EventHandled 테이블 기반 Consumer 멱등 처리 - @Version 낙관적 락 충돌 시 재시도 + DLQ 전송 Step 3: Kafka 기반 선착순 쿠폰 발급 - CouponModel에 maxQuantity/issuedCount 추가 - CouponFacade.requestCouponIssue() → Kafka 비동기 위임 - CouponIssueConsumer (비관적 락 + 수량 제한 + UK 중복 방지) - 발급 결과 확인 Polling API (/issue-status) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Important Review skippedToo many files! This PR contains 154 files, which is 4 over the limit of 150. ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: ⛔ Files ignored due to path filters (22)
📒 Files selected for processing (154)
You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
📌 Summary
🧭 Context & Decision
문제 정의
LikeTransactionService에서 좋아요 저장 + 집계 카운트 + 캐시 evict를 하나의 트랜잭션에서 처리 → Redis 장애 시 전체 롤백선택지와 결정
1. Outbox 이벤트 저장 시점
BEFORE_COMMIT(같은 TX)AFTER_COMMIT+ 별도 TXBEFORE_COMMIT)2. Outbox Relay 전략
3. Kafka 발행 병렬 처리
parallelStream()ExecutorService4. 선착순 쿠폰 발급 방식
/issue-status)로 보완. 순간 트래픽 버퍼링 + 순서 보장이라는 이점이 더 큼5. Consumer 멱등 처리
event_handled테이블추후 개선 여지:
🏗️ Design Overview
변경 범위
domain/outbox/— OutboxEvent, OutboxStatus, OutboxRepository, OutboxEventEnvelopeapplication/outbox/— OutboxEventListener (BEFORE_COMMIT)infrastructure/outbox/— OutboxJpaRepository, OutboxRelayService, OutboxRepositoryImpldomain/*/event/— LikeToggledEvent, OrderPlacedEvent, PaymentCompletedEvent, ProductViewedEvent, CouponIssueRequestedEventapplication/like/— LikeMetricsEventListener (AFTER_COMMIT + REQUIRES_NEW)application/logging/— UserActivityEventListener (AFTER_COMMIT)application/coupon/— CouponFacade (선착순 비동기 발급)config/— KafkaTopicConfig, OutboxRelayConfigLikeTransactionService에서 ProductService/CacheManager 직접 의존 제거 → ApplicationEventPublisher로 대체주요 컴포넌트 책임
commerce-api (Producer 측)
OutboxEventListenerOutboxRelayServiceLikeMetricsEventListenerUserActivityEventListenerCouponFacadeKafkaTopicConfigcommerce-streamer (Consumer 측)
CatalogEventConsumerOrderEventConsumerCouponIssueConsumerProductMetricsEventHandled🔁 Flow Diagram
Main Flow — Transactional Outbox + Kafka Pipeline
sequenceDiagram autonumber participant Client participant API as commerce-api participant DB as MySQL participant Relay as OutboxRelayService participant Kafka participant Streamer as commerce-streamer Client->>API: POST /api/v1/likes (좋아요 토글) API->>DB: INSERT like + INSERT outbox_event (같은 TX, BEFORE_COMMIT) DB-->>API: commit API-->>Client: 200 OK Note over API: AFTER_COMMIT → LikeMetricsEventListener<br/>(REQUIRES_NEW TX로 집계) loop @Scheduled(fixedDelay=1000) Relay->>DB: SELECT outbox WHERE status=PENDING<br/>FOR UPDATE SKIP LOCKED Relay->>DB: UPDATE status=PROCESSING (Phase 1) Relay->>Kafka: send(topic, key, payload, headers) (Phase 2) alt 발행 성공 Relay->>DB: UPDATE status=PUBLISHED else 발행 실패 Relay->>DB: UPDATE status=FAILED, retryCount++ end end Kafka->>Streamer: consume(catalog-events) Streamer->>DB: EventHandled 멱등 체크 alt 미처리 이벤트 Streamer->>DB: ProductMetrics upsert (좋아요/조회수/판매량) Streamer->>DB: INSERT event_handled else 이미 처리됨 Note over Streamer: skip (멱등) endMain Flow — 선착순 쿠폰 발급
sequenceDiagram autonumber participant Client participant API as commerce-api participant DB as MySQL participant Relay as OutboxRelayService participant Kafka participant Consumer as CouponIssueConsumer Client->>API: POST /coupons/{id}/issue-async API->>API: 만료/수량 기본 검증 API->>DB: INSERT outbox_event (BEFORE_COMMIT) API-->>Client: 200 OK (요청 접수) Relay->>Kafka: send(coupon-issue-requests, couponId) Kafka->>Consumer: consume Consumer->>DB: SELECT coupon FOR UPDATE (비관적 락) Consumer->>DB: 만료/수량/중복 체크 alt 발급 가능 Consumer->>DB: INSERT coupon_issue + UPDATE issued_count Consumer-->>DB: event_handled 저장 else 수량 초과 or 중복 Consumer-->>DB: event_handled 저장 (skip) end Client->>API: GET /coupons/{id}/issue-status (Polling) API->>DB: SELECT coupon_issue WHERE user+coupon alt 발급 완료 API-->>Client: status ISSUED, issue details else 아직 처리 안 됨 API-->>Client: status PENDING endException Flow — DLQ + Recovery
flowchart TB subgraph Relay Recovery A[PROCESSING > 5분] -->|recoverStalledEvents| B[PENDING으로 복구] C[FAILED + retryCount < 5] -->|retryFailedEvents| B D[PUBLISHED > 1시간] -->|cleanupPublishedEvents| E[DELETE] end subgraph Consumer DLQ F[처리 실패] --> G{재시도 가능?} G -->|Version 충돌| H[최대 3회 재시도] H -->|3회 초과| I[DLQ 토픽 전송] G -->|기타 에러| I I --> J[catalog-events.DLT / order-events.DLT] endKafka Header 구조
X-Event-Id550e8400-e29b-41d4-a716-446655440000X-Event-TypeLIKED/ORDER_PLACED/COUPON_ISSUE_REQUESTEDX-Aggregate-TypeProduct/Order/Payment/Coupon"100"(productId / orderId / couponId)