diff --git a/.DS_Store b/.DS_Store
new file mode 100644
index 0000000000..9364f445d6
Binary files /dev/null and b/.DS_Store differ
diff --git a/README.md b/README.md
index b067a71026..9d7bad64ac 100644
--- a/README.md
+++ b/README.md
@@ -80,3 +80,53 @@ You can use Graphql;
When you finish your challenge, after forking a repository, you **must** open a pull request to our repository. There are no limitations to the implementation, you can follow the programming paradigm, modularization, and style that you feel is the most appropriate solution.
If you have any questions, please let us know.
+
+# Solucion
+Para manejar el requerimiento se ha creado dos microservicios (transaction-service, status-service).
+Ambos se comunicar de manera asincrona a travez de dos Topicos Kafka (transaction-creation-topic, anti-fraud-validation-topic).
+
+El primer microservicio expone dos endpoint, uno para la creacion de la transacion y segundo para listar las transacciones creadas.
+
+El segundo microservicio para evaluar la transaccion apartir del monto (Si el monto es mayor de 1000 el monto es rejectado y si es menor es aceptado).
+
+## Stack Tecnico
+- Java 17
+- Maven 3.2.0
+- Apache Kafka 5.5.3
+- Postgres 14
+- Spring WebFlux
+- R2DBC Connection
+- Lombok
+
+## Instalacion
+1. Clonar el repositorio y entrar en la carpeta del proyecto
+2. Para iniciar los servicios de postgres, zookeper, kafka corremos el siguiente docker comand
+```sh
+docker-compose up -d
+```
+3. Crear el nombre de la base de datos 'transactions_db'
+4. Iniciar el microservicio transaction-service (Se iniciara en el puerto 8080)
+5. Iniciar el microservicio status-service (se iniciara en el puerto 8081)
+
+## Pruebas
+1. Crear una transaccion (La transacion se creara con el estado PENDING, y se enviara al segundo microservicio a travez del topico de kafka para su evaluacion).
+```curl
+curl --location 'http://localhost:8080/api/v1/transactions' \
+--header 'Content-Type: application/json' \
+--data '{
+ "accountExternalIdDebit":"DEBIT_123_1",
+ "accountExternalIdCredit":"CREDIT_123_1",
+ "transferTypeId":1,
+ "value": 100
+}'
+```
+
+2. El segundo Microservicio recibe el mensaje y evalua el monto es mayor a 1000 para devolver el estado REJECTED y si el monto es menor a 1000 devolvera APPROVED.
+
+3. Para verificar si se cambio los estados segun la evaluacion, podemos usar el siguiete endpoint (Lista las transacciones creadas)
+```curl
+curl --location 'http://localhost:8080/api/v1/transactions
+```
+
+
+
diff --git a/docker-compose.yml b/docker-compose.yml
index 0e8807f21c..d6231996e6 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -7,19 +7,64 @@ services:
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
+ networks:
+ - microservices-network
+
zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ networks:
+ - microservices-network
+ ports:
+ - "2181:2181"
+
kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
- depends_on: [zookeeper]
+ depends_on:
+ - zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ KAFKA_NUM_PARTITIONS: 3
+ KAFKA_DEFAULT_REPLICATION_FACTOR: 1
ports:
- - 9092:9092
+ - "9092:9092"
+ - "29092:29092"
+ networks:
+ - microservices-network
+
+ kafka-init:
+ image: confluentinc/cp-kafka:7.5.0
+ container_name: kafka-init
+ depends_on:
+ - kafka
+ entrypoint: ['/bin/sh', '-c']
+ command: |
+ "
+ echo 'Waiting for Kafka to be ready...'
+ sleep 10
+
+ echo 'Creating topics...'
+ kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic transaction-creation-topic --partitions 3 --replication-factor 1
+ kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic anti-fraud-validation-topic --partitions 3 --replication-factor 1
+
+ echo 'Listing topics...'
+ kafka-topics --bootstrap-server kafka:9092 --list
+
+ echo 'Topics created successfully!'
+ "
+ networks:
+ - microservices-network
+
+networks:
+ microservices-network:
+ driver: bridge
\ No newline at end of file
diff --git a/status-service/.idea/.gitignore b/status-service/.idea/.gitignore
new file mode 100644
index 0000000000..a0ccf77bc5
--- /dev/null
+++ b/status-service/.idea/.gitignore
@@ -0,0 +1,5 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Environment-dependent path to Maven home directory
+/mavenHomeManager.xml
diff --git a/status-service/.idea/compiler.xml b/status-service/.idea/compiler.xml
new file mode 100644
index 0000000000..4151a54296
--- /dev/null
+++ b/status-service/.idea/compiler.xml
@@ -0,0 +1,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/status-service/.idea/encodings.xml b/status-service/.idea/encodings.xml
new file mode 100644
index 0000000000..63e9001932
--- /dev/null
+++ b/status-service/.idea/encodings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/status-service/.idea/jarRepositories.xml b/status-service/.idea/jarRepositories.xml
new file mode 100644
index 0000000000..712ab9d985
--- /dev/null
+++ b/status-service/.idea/jarRepositories.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/status-service/.idea/misc.xml b/status-service/.idea/misc.xml
new file mode 100644
index 0000000000..67e1e6113b
--- /dev/null
+++ b/status-service/.idea/misc.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/status-service/.idea/vcs.xml b/status-service/.idea/vcs.xml
new file mode 100644
index 0000000000..6c0b863585
--- /dev/null
+++ b/status-service/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/status-service/Dockerfile b/status-service/Dockerfile
new file mode 100644
index 0000000000..0151e6552f
--- /dev/null
+++ b/status-service/Dockerfile
@@ -0,0 +1,25 @@
+FROM maven:3.9-eclipse-temurin-17-alpine AS build
+
+WORKDIR /app
+
+# Copiar archivos de Maven
+COPY pom.xml .
+RUN mvn dependency:go-offline -B
+
+# Copiar código fuente y compilar
+COPY src ./src
+RUN mvn clean package -DskipTests
+
+# Imagen final
+FROM eclipse-temurin:17-jre-alpine
+
+WORKDIR /app
+
+# Copiar el JAR compilado
+COPY --from=build /app/target/*.jar app.jar
+
+# Exponer puerto
+EXPOSE 8081
+
+# Punto de entrada
+ENTRYPOINT ["java", "-jar", "app.jar"]
diff --git a/status-service/pom.xml b/status-service/pom.xml
new file mode 100644
index 0000000000..1d6c6ac9a6
--- /dev/null
+++ b/status-service/pom.xml
@@ -0,0 +1,98 @@
+
+
+ 4.0.0
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 3.2.0
+
+
+
+ com.example
+ status-service
+ 1.0.0
+ status-service
+ Microservicio de actualización de estados con WebFlux y Kafka
+
+
+ 17
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+
+
+ io.projectreactor.kafka
+ reactor-kafka
+
+
+
+
+ org.projectlombok
+ lombok
+ true
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+
+
+
diff --git a/status-service/src/main/java/com/example/statusservice/StatusServiceApplication.java b/status-service/src/main/java/com/example/statusservice/StatusServiceApplication.java
new file mode 100644
index 0000000000..739da6d9eb
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/StatusServiceApplication.java
@@ -0,0 +1,12 @@
+package com.example.statusservice;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class StatusServiceApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(StatusServiceApplication.class, args);
+ }
+}
diff --git a/status-service/src/main/java/com/example/statusservice/config/JacksonConfig.java b/status-service/src/main/java/com/example/statusservice/config/JacksonConfig.java
new file mode 100644
index 0000000000..c8770ab78d
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/config/JacksonConfig.java
@@ -0,0 +1,19 @@
+package com.example.statusservice.config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class JacksonConfig {
+
+ @Bean
+ public ObjectMapper objectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ return mapper;
+ }
+}
diff --git a/status-service/src/main/java/com/example/statusservice/config/KafkaConfig.java b/status-service/src/main/java/com/example/statusservice/config/KafkaConfig.java
new file mode 100644
index 0000000000..84ff7556bc
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/config/KafkaConfig.java
@@ -0,0 +1,82 @@
+package com.example.statusservice.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverOptions;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@ConfigurationProperties(prefix = "spring.kafka")
+@Getter
+@Setter
+public class KafkaConfig {
+
+ private String bootstrapServers;
+ private Producer producer;
+ private Consumer consumer;
+
+ @Getter
+ @Setter
+ public static class Producer {
+ private String keySerializer;
+ private String valueSerializer;
+ private String acks;
+ private Integer retries;
+ }
+
+ @Getter
+ @Setter
+ public static class Consumer {
+ private String groupId;
+ private String keyDeserializer;
+ private String valueDeserializer;
+ private String autoOffsetReset;
+ private Boolean enableAutoCommit;
+ }
+
+ @Bean
+ public KafkaSender kafkaSender() {
+ Map props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.ACKS_CONFIG, producer.getAcks());
+ props.put(ProducerConfig.RETRIES_CONFIG, producer.getRetries());
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
+
+ SenderOptions senderOptions = SenderOptions.create(props);
+ return KafkaSender.create(senderOptions);
+ }
+
+ @Bean
+ public KafkaReceiver kafkaReceiver(KafkaTopicConfig topicConfig) {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.getGroupId());
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumer.getAutoOffsetReset());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.getEnableAutoCommit());
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
+
+ ReceiverOptions receiverOptions = ReceiverOptions
+ .create(props)
+ .subscription(Collections.singleton(topicConfig.getTransactionCreation()));
+
+ return KafkaReceiver.create(receiverOptions);
+ }
+}
diff --git a/status-service/src/main/java/com/example/statusservice/config/KafkaTopicConfig.java b/status-service/src/main/java/com/example/statusservice/config/KafkaTopicConfig.java
new file mode 100644
index 0000000000..fb38df1c00
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/config/KafkaTopicConfig.java
@@ -0,0 +1,16 @@
+package com.example.statusservice.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConfigurationProperties(prefix = "kafka.topics")
+@Getter
+@Setter
+public class KafkaTopicConfig {
+
+ private String transactionCreation;
+ private String antiFraudValidation;
+}
diff --git a/status-service/src/main/java/com/example/statusservice/controller/StatusController.java b/status-service/src/main/java/com/example/statusservice/controller/StatusController.java
new file mode 100644
index 0000000000..779af7120f
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/controller/StatusController.java
@@ -0,0 +1,19 @@
+package com.example.statusservice.controller;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequestMapping("/api/v1/status")
+@RequiredArgsConstructor
+@Slf4j
+public class StatusController {
+
+ @GetMapping("/health")
+ public Mono> health() {
+ return Mono.just(ResponseEntity.ok("Status Service is running"));
+ }
+}
diff --git a/status-service/src/main/java/com/example/statusservice/dto/TransactionCreatedEvent.java b/status-service/src/main/java/com/example/statusservice/dto/TransactionCreatedEvent.java
new file mode 100644
index 0000000000..fd2e0bcb67
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/dto/TransactionCreatedEvent.java
@@ -0,0 +1,30 @@
+package com.example.statusservice.dto;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TransactionCreatedEvent {
+
+ private String transactionId;
+ private TransactionType transactionType;
+ private TransactionStatus transactionStatus;
+ private BigDecimal value;
+
+ @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
+ private LocalDateTime createdAt;
+
+ private String eventId;
+
+ @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
+ private LocalDateTime eventTimestamp;
+}
diff --git a/status-service/src/main/java/com/example/statusservice/dto/TransactionStatus.java b/status-service/src/main/java/com/example/statusservice/dto/TransactionStatus.java
new file mode 100644
index 0000000000..9427950b2a
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/dto/TransactionStatus.java
@@ -0,0 +1,12 @@
+package com.example.statusservice.dto;
+
+import lombok.*;
+
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TransactionStatus {
+ private String name;
+}
diff --git a/status-service/src/main/java/com/example/statusservice/dto/TransactionStatusUpdatedEvent.java b/status-service/src/main/java/com/example/statusservice/dto/TransactionStatusUpdatedEvent.java
new file mode 100644
index 0000000000..a21baf4bd2
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/dto/TransactionStatusUpdatedEvent.java
@@ -0,0 +1,30 @@
+package com.example.statusservice.dto;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TransactionStatusUpdatedEvent {
+
+ private String transactionId;
+ private String oldStatus;
+ private String newStatus;
+ private String reason;
+ private String updatedBy;
+
+ @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
+ private LocalDateTime updatedAt;
+
+ private String eventId;
+
+ @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
+ private LocalDateTime eventTimestamp;
+}
diff --git a/status-service/src/main/java/com/example/statusservice/dto/TransactionType.java b/status-service/src/main/java/com/example/statusservice/dto/TransactionType.java
new file mode 100644
index 0000000000..4a3ef5ac33
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/dto/TransactionType.java
@@ -0,0 +1,12 @@
+package com.example.statusservice.dto;
+
+import lombok.*;
+
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TransactionType {
+ private String name;
+}
diff --git a/status-service/src/main/java/com/example/statusservice/service/KafkaConsumerService.java b/status-service/src/main/java/com/example/statusservice/service/KafkaConsumerService.java
new file mode 100644
index 0000000000..e1abd2f6cd
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/service/KafkaConsumerService.java
@@ -0,0 +1,98 @@
+package com.example.statusservice.service;
+
+import com.example.statusservice.dto.TransactionCreatedEvent;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverRecord;
+
+import java.time.Duration;
+import java.util.Random;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaConsumerService {
+
+ private final KafkaReceiver kafkaReceiver;
+ private final ObjectMapper objectMapper;
+ private final StatusService statusService;
+ private final Random random = new Random();
+ private Disposable disposable;
+
+ @PostConstruct
+ public void startConsuming() {
+ log.info("Starting Kafka consumer for transaction created events...");
+
+ disposable = kafkaReceiver.receive()
+ .flatMap(this::processMessage)
+ .subscribe();
+ }
+
+ @PreDestroy
+ public void stopConsuming() {
+ if (disposable != null && !disposable.isDisposed()) {
+ log.info("Stopping Kafka consumer...");
+ disposable.dispose();
+ }
+ }
+
+ private Mono processMessage(ReceiverRecord record) {
+ return Mono.fromCallable(() -> {
+ log.info("Received transaction created event: key={}, partition={}, offset={}",
+ record.key(), record.partition(), record.offset());
+
+ try {
+ return objectMapper.readValue(
+ record.value(),
+ TransactionCreatedEvent.class
+ );
+ } catch (Exception e) {
+ log.error("Error deserializing message: {}", e.getMessage(), e);
+ throw new RuntimeException("Failed to deserialize message", e);
+ }
+ })
+ .flatMap(event -> {
+ log.info("Processing transaction: transactionId={}, amount={}",
+ event.getTransactionId(), event.getValue());
+
+ // Simular procesamiento de la transacción
+ return processTransaction(event);
+ })
+ .doOnSuccess(result -> {
+ log.info("Successfully processed transaction: {}", record.key());
+ record.receiverOffset().acknowledge();
+ })
+ .doOnError(error -> {
+ log.error("Error processing message for transaction: {}, error: {}",
+ record.key(), error.getMessage(), error);
+ record.receiverOffset().acknowledge();
+ })
+ .onErrorResume(error -> Mono.empty())
+ .then();
+ }
+
+ private Mono processTransaction(TransactionCreatedEvent event) {
+ // Simular el procesamiento asíncrono de la transacción
+ return Mono.delay(Duration.ofSeconds(2))
+ .flatMap(delay -> {
+
+ log.info("Transaction processed: transactionId={}",
+ event.getTransactionId());
+
+ // Actualizar el estado
+ return statusService.validateTransactionStatus(
+ event.getTransactionId(),
+ event.getTransactionStatus().getName(),
+ "status-service",
+ event.getValue()
+ );
+ });
+ }
+}
diff --git a/status-service/src/main/java/com/example/statusservice/service/KafkaProducerService.java b/status-service/src/main/java/com/example/statusservice/service/KafkaProducerService.java
new file mode 100644
index 0000000000..501921b0f2
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/service/KafkaProducerService.java
@@ -0,0 +1,74 @@
+package com.example.statusservice.service;
+
+import com.example.statusservice.config.KafkaTopicConfig;
+import com.example.statusservice.dto.TransactionStatusUpdatedEvent;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderRecord;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaProducerService {
+
+ private final KafkaSender kafkaSender;
+ private final KafkaTopicConfig topicConfig;
+ private final ObjectMapper objectMapper;
+
+ public Mono publishStatusUpdated(TransactionStatusUpdatedEvent event) {
+ return Mono.fromCallable(() -> {
+ // Generar IDs de evento si no existen
+ if (event.getEventId() == null) {
+ event.setEventId(UUID.randomUUID().toString());
+ }
+ if (event.getEventTimestamp() == null) {
+ event.setEventTimestamp(LocalDateTime.now());
+ }
+
+ try {
+ return objectMapper.writeValueAsString(event);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Error serializing event", e);
+ }
+ })
+ .flatMap(eventJson -> {
+ ProducerRecord producerRecord = new ProducerRecord<>(
+ topicConfig.getAntiFraudValidation(),
+ event.getTransactionId(),
+ eventJson
+ );
+
+ SenderRecord senderRecord = SenderRecord.create(
+ producerRecord,
+ event.getTransactionId()
+ );
+
+ return kafkaSender.send(Mono.just(senderRecord))
+ .next()
+ .doOnSuccess(result -> {
+ log.info("Successfully published status update event: transactionId={}, partition={}, offset={}",
+ event.getTransactionId(),
+ result.recordMetadata().partition(),
+ result.recordMetadata().offset());
+ })
+ .doOnError(error -> {
+ log.error("Error publishing status update event: transactionId={}, error={}",
+ event.getTransactionId(), error.getMessage(), error);
+ })
+ .then();
+ })
+ .onErrorResume(error -> {
+ log.error("Failed to publish status update event", error);
+ return Mono.error(new RuntimeException("Failed to publish event", error));
+ });
+ }
+}
diff --git a/status-service/src/main/java/com/example/statusservice/service/StatusService.java b/status-service/src/main/java/com/example/statusservice/service/StatusService.java
new file mode 100644
index 0000000000..be19d07e7f
--- /dev/null
+++ b/status-service/src/main/java/com/example/statusservice/service/StatusService.java
@@ -0,0 +1,39 @@
+package com.example.statusservice.service;
+
+import com.example.statusservice.dto.TransactionStatusUpdatedEvent;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class StatusService {
+
+ private final KafkaProducerService kafkaProducerService;
+
+ public Mono validateTransactionStatus(
+ String transactionId,
+ String oldStatus,
+ String updatedBy,
+ BigDecimal amount) {
+
+
+ // Crear evento
+ TransactionStatusUpdatedEvent event = TransactionStatusUpdatedEvent.builder()
+ .transactionId(transactionId)
+ .oldStatus(oldStatus)
+ .newStatus(amount.compareTo(BigDecimal.valueOf(999))>0 ? "REJECTED":"ACCEPTED")
+ .updatedBy(updatedBy)
+ .updatedAt(LocalDateTime.now())
+ .build();
+
+ // Publicar evento
+ return kafkaProducerService.publishStatusUpdated(event)
+ .doOnSuccess(v -> log.info("Status update event published for transaction: {}", transactionId));
+ }
+}
diff --git a/status-service/src/main/resources/application.yml b/status-service/src/main/resources/application.yml
new file mode 100644
index 0000000000..51d61e3fec
--- /dev/null
+++ b/status-service/src/main/resources/application.yml
@@ -0,0 +1,65 @@
+server:
+ port: 8081
+
+spring:
+ application:
+ name: status-service
+
+ kafka:
+ bootstrap-servers: localhost:9092
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ acks: all
+ retries: 3
+ properties:
+ max.in.flight.requests.per.connection: 5
+ enable.idempotence: true
+ consumer:
+ group-id: status-service-group
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ auto-offset-reset: earliest
+ enable-auto-commit: false
+ properties:
+ max.poll.records: 100
+ request.timeout.ms: 60000
+ connections.max.idle.ms: 600000
+ metadata.max.age.ms: 180000
+ session.timeout.ms: 60000
+ heartbeat.interval.ms: 3000
+
+# Configuración de tópicos de Kafka
+kafka:
+ topics:
+ transaction-creation: transaction-creation-topic
+ anti-fraud-validation: anti-fraud-validation-topic
+ consumer:
+ concurrency: 3
+ max-poll-records: 100
+
+# Logging
+logging:
+ level:
+ root: INFO
+ com.example.statusservice: DEBUG
+ reactor.kafka: DEBUG
+
+management:
+ endpoints:
+ web:
+ exposure:
+ include: health,info,metrics,prometheus
+ metrics:
+ export:
+ prometheus:
+ enabled: true
+
+---
+spring:
+ config:
+ activate:
+ on-profile: docker
+
+ kafka:
+ bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS}
diff --git a/status-service/target/classes/application.yml b/status-service/target/classes/application.yml
new file mode 100644
index 0000000000..51d61e3fec
--- /dev/null
+++ b/status-service/target/classes/application.yml
@@ -0,0 +1,65 @@
+server:
+ port: 8081
+
+spring:
+ application:
+ name: status-service
+
+ kafka:
+ bootstrap-servers: localhost:9092
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ acks: all
+ retries: 3
+ properties:
+ max.in.flight.requests.per.connection: 5
+ enable.idempotence: true
+ consumer:
+ group-id: status-service-group
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ auto-offset-reset: earliest
+ enable-auto-commit: false
+ properties:
+ max.poll.records: 100
+ request.timeout.ms: 60000
+ connections.max.idle.ms: 600000
+ metadata.max.age.ms: 180000
+ session.timeout.ms: 60000
+ heartbeat.interval.ms: 3000
+
+# Configuración de tópicos de Kafka
+kafka:
+ topics:
+ transaction-creation: transaction-creation-topic
+ anti-fraud-validation: anti-fraud-validation-topic
+ consumer:
+ concurrency: 3
+ max-poll-records: 100
+
+# Logging
+logging:
+ level:
+ root: INFO
+ com.example.statusservice: DEBUG
+ reactor.kafka: DEBUG
+
+management:
+ endpoints:
+ web:
+ exposure:
+ include: health,info,metrics,prometheus
+ metrics:
+ export:
+ prometheus:
+ enabled: true
+
+---
+spring:
+ config:
+ activate:
+ on-profile: docker
+
+ kafka:
+ bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS}
diff --git a/status-service/target/classes/com/example/statusservice/StatusServiceApplication.class b/status-service/target/classes/com/example/statusservice/StatusServiceApplication.class
new file mode 100644
index 0000000000..a3db4c76eb
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/StatusServiceApplication.class differ
diff --git a/status-service/target/classes/com/example/statusservice/config/JacksonConfig.class b/status-service/target/classes/com/example/statusservice/config/JacksonConfig.class
new file mode 100644
index 0000000000..5d1158a946
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/config/JacksonConfig.class differ
diff --git a/status-service/target/classes/com/example/statusservice/config/KafkaConfig$Consumer.class b/status-service/target/classes/com/example/statusservice/config/KafkaConfig$Consumer.class
new file mode 100644
index 0000000000..d4ef5e533c
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/config/KafkaConfig$Consumer.class differ
diff --git a/status-service/target/classes/com/example/statusservice/config/KafkaConfig$Producer.class b/status-service/target/classes/com/example/statusservice/config/KafkaConfig$Producer.class
new file mode 100644
index 0000000000..b242443f86
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/config/KafkaConfig$Producer.class differ
diff --git a/status-service/target/classes/com/example/statusservice/config/KafkaConfig.class b/status-service/target/classes/com/example/statusservice/config/KafkaConfig.class
new file mode 100644
index 0000000000..3a7089180a
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/config/KafkaConfig.class differ
diff --git a/status-service/target/classes/com/example/statusservice/config/KafkaTopicConfig.class b/status-service/target/classes/com/example/statusservice/config/KafkaTopicConfig.class
new file mode 100644
index 0000000000..e8ddbbfc1d
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/config/KafkaTopicConfig.class differ
diff --git a/status-service/target/classes/com/example/statusservice/controller/StatusController.class b/status-service/target/classes/com/example/statusservice/controller/StatusController.class
new file mode 100644
index 0000000000..9a7804a969
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/controller/StatusController.class differ
diff --git a/status-service/target/classes/com/example/statusservice/dto/TransactionCreatedEvent$TransactionCreatedEventBuilder.class b/status-service/target/classes/com/example/statusservice/dto/TransactionCreatedEvent$TransactionCreatedEventBuilder.class
new file mode 100644
index 0000000000..001039f6ae
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/dto/TransactionCreatedEvent$TransactionCreatedEventBuilder.class differ
diff --git a/status-service/target/classes/com/example/statusservice/dto/TransactionCreatedEvent.class b/status-service/target/classes/com/example/statusservice/dto/TransactionCreatedEvent.class
new file mode 100644
index 0000000000..5ebad6b3d0
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/dto/TransactionCreatedEvent.class differ
diff --git a/status-service/target/classes/com/example/statusservice/dto/TransactionStatus$TransactionStatusBuilder.class b/status-service/target/classes/com/example/statusservice/dto/TransactionStatus$TransactionStatusBuilder.class
new file mode 100644
index 0000000000..29fd9ce7e5
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/dto/TransactionStatus$TransactionStatusBuilder.class differ
diff --git a/status-service/target/classes/com/example/statusservice/dto/TransactionStatus.class b/status-service/target/classes/com/example/statusservice/dto/TransactionStatus.class
new file mode 100644
index 0000000000..e8c5c8c775
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/dto/TransactionStatus.class differ
diff --git a/status-service/target/classes/com/example/statusservice/dto/TransactionStatusUpdatedEvent$TransactionStatusUpdatedEventBuilder.class b/status-service/target/classes/com/example/statusservice/dto/TransactionStatusUpdatedEvent$TransactionStatusUpdatedEventBuilder.class
new file mode 100644
index 0000000000..f0316130c4
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/dto/TransactionStatusUpdatedEvent$TransactionStatusUpdatedEventBuilder.class differ
diff --git a/status-service/target/classes/com/example/statusservice/dto/TransactionStatusUpdatedEvent.class b/status-service/target/classes/com/example/statusservice/dto/TransactionStatusUpdatedEvent.class
new file mode 100644
index 0000000000..15ea409183
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/dto/TransactionStatusUpdatedEvent.class differ
diff --git a/status-service/target/classes/com/example/statusservice/dto/TransactionType$TransactionTypeBuilder.class b/status-service/target/classes/com/example/statusservice/dto/TransactionType$TransactionTypeBuilder.class
new file mode 100644
index 0000000000..d05ccc46ec
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/dto/TransactionType$TransactionTypeBuilder.class differ
diff --git a/status-service/target/classes/com/example/statusservice/dto/TransactionType.class b/status-service/target/classes/com/example/statusservice/dto/TransactionType.class
new file mode 100644
index 0000000000..bcb8399e65
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/dto/TransactionType.class differ
diff --git a/status-service/target/classes/com/example/statusservice/service/KafkaConsumerService.class b/status-service/target/classes/com/example/statusservice/service/KafkaConsumerService.class
new file mode 100644
index 0000000000..61d0ea2e49
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/service/KafkaConsumerService.class differ
diff --git a/status-service/target/classes/com/example/statusservice/service/KafkaProducerService.class b/status-service/target/classes/com/example/statusservice/service/KafkaProducerService.class
new file mode 100644
index 0000000000..27a727acdc
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/service/KafkaProducerService.class differ
diff --git a/status-service/target/classes/com/example/statusservice/service/StatusService.class b/status-service/target/classes/com/example/statusservice/service/StatusService.class
new file mode 100644
index 0000000000..0f08c86fac
Binary files /dev/null and b/status-service/target/classes/com/example/statusservice/service/StatusService.class differ
diff --git a/transaction-service/.idea/.gitignore b/transaction-service/.idea/.gitignore
new file mode 100644
index 0000000000..a0ccf77bc5
--- /dev/null
+++ b/transaction-service/.idea/.gitignore
@@ -0,0 +1,5 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Environment-dependent path to Maven home directory
+/mavenHomeManager.xml
diff --git a/transaction-service/.idea/compiler.xml b/transaction-service/.idea/compiler.xml
new file mode 100644
index 0000000000..61be318787
--- /dev/null
+++ b/transaction-service/.idea/compiler.xml
@@ -0,0 +1,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/transaction-service/.idea/encodings.xml b/transaction-service/.idea/encodings.xml
new file mode 100644
index 0000000000..63e9001932
--- /dev/null
+++ b/transaction-service/.idea/encodings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/transaction-service/.idea/jarRepositories.xml b/transaction-service/.idea/jarRepositories.xml
new file mode 100644
index 0000000000..712ab9d985
--- /dev/null
+++ b/transaction-service/.idea/jarRepositories.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/transaction-service/.idea/misc.xml b/transaction-service/.idea/misc.xml
new file mode 100644
index 0000000000..67e1e6113b
--- /dev/null
+++ b/transaction-service/.idea/misc.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/transaction-service/.idea/vcs.xml b/transaction-service/.idea/vcs.xml
new file mode 100644
index 0000000000..6c0b863585
--- /dev/null
+++ b/transaction-service/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/transaction-service/Dockerfile b/transaction-service/Dockerfile
new file mode 100644
index 0000000000..9093338176
--- /dev/null
+++ b/transaction-service/Dockerfile
@@ -0,0 +1,25 @@
+FROM maven:3.9-eclipse-temurin-17-alpine AS build
+
+WORKDIR /app
+
+# Copiar archivos de Maven
+COPY pom.xml .
+RUN mvn dependency:go-offline -B
+
+# Copiar código fuente y compilar
+COPY src ./src
+RUN mvn clean package -DskipTests
+
+# Imagen final
+FROM eclipse-temurin:17-jre-alpine
+
+WORKDIR /app
+
+# Copiar el JAR compilado
+COPY --from=build /app/target/*.jar app.jar
+
+# Exponer puerto
+EXPOSE 8080
+
+# Punto de entrada
+ENTRYPOINT ["java", "-jar", "app.jar"]
diff --git a/transaction-service/pom.xml b/transaction-service/pom.xml
new file mode 100644
index 0000000000..275f3848e1
--- /dev/null
+++ b/transaction-service/pom.xml
@@ -0,0 +1,119 @@
+
+
+ 4.0.0
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 3.2.0
+
+
+
+ com.example
+ transaction-service
+ 1.0.0
+ transaction-service
+ Microservicio de transacciones con WebFlux y Kafka
+
+
+ 17
+ 2023.0.0
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-r2dbc
+
+
+
+
+ org.postgresql
+ r2dbc-postgresql
+ runtime
+
+
+
+
+ org.postgresql
+ postgresql
+ runtime
+
+
+
+
+ io.projectreactor.kafka
+ reactor-kafka
+
+
+
+
+ org.projectlombok
+ lombok
+ true
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+
+
+
diff --git a/transaction-service/src/main/java/com/example/transactionservice/TransactionServiceApplication.java b/transaction-service/src/main/java/com/example/transactionservice/TransactionServiceApplication.java
new file mode 100644
index 0000000000..90b4245b78
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/TransactionServiceApplication.java
@@ -0,0 +1,16 @@
+package com.example.transactionservice;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.data.r2dbc.config.EnableR2dbcAuditing;
+import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
+
+@SpringBootApplication
+@EnableR2dbcRepositories
+@EnableR2dbcAuditing
+public class TransactionServiceApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(TransactionServiceApplication.class, args);
+ }
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/config/JacksonConfig.java b/transaction-service/src/main/java/com/example/transactionservice/config/JacksonConfig.java
new file mode 100644
index 0000000000..754f3769a8
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/config/JacksonConfig.java
@@ -0,0 +1,19 @@
+package com.example.transactionservice.config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class JacksonConfig {
+
+ @Bean
+ public ObjectMapper objectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ return mapper;
+ }
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/config/KafkaConfig.java b/transaction-service/src/main/java/com/example/transactionservice/config/KafkaConfig.java
new file mode 100644
index 0000000000..d8d9bc79f9
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/config/KafkaConfig.java
@@ -0,0 +1,82 @@
+package com.example.transactionservice.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverOptions;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@ConfigurationProperties(prefix = "spring.kafka")
+@Getter
+@Setter
+public class KafkaConfig {
+
+ private String bootstrapServers;
+ private Producer producer;
+ private Consumer consumer;
+
+ @Getter
+ @Setter
+ public static class Producer {
+ private String keySerializer;
+ private String valueSerializer;
+ private String acks;
+ private Integer retries;
+ }
+
+ @Getter
+ @Setter
+ public static class Consumer {
+ private String groupId;
+ private String keyDeserializer;
+ private String valueDeserializer;
+ private String autoOffsetReset;
+ private Boolean enableAutoCommit;
+ }
+
+ @Bean
+ public KafkaSender kafkaSender() {
+ Map props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.ACKS_CONFIG, producer.getAcks());
+ props.put(ProducerConfig.RETRIES_CONFIG, producer.getRetries());
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
+
+ SenderOptions senderOptions = SenderOptions.create(props);
+ return KafkaSender.create(senderOptions);
+ }
+
+ @Bean
+ public KafkaReceiver kafkaReceiver(KafkaTopicConfig topicConfig) {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.getGroupId());
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumer.getAutoOffsetReset());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.getEnableAutoCommit());
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
+
+ ReceiverOptions receiverOptions = ReceiverOptions
+ .create(props)
+ .subscription(Collections.singleton(topicConfig.getAntiFraudValidation()));
+
+ return KafkaReceiver.create(receiverOptions);
+ }
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/config/KafkaTopicConfig.java b/transaction-service/src/main/java/com/example/transactionservice/config/KafkaTopicConfig.java
new file mode 100644
index 0000000000..6d022c33c4
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/config/KafkaTopicConfig.java
@@ -0,0 +1,16 @@
+package com.example.transactionservice.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConfigurationProperties(prefix = "kafka.topics")
+@Getter
+@Setter
+public class KafkaTopicConfig {
+
+ private String transactionCreation;
+ private String antiFraudValidation;
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/controller/TransactionController.java b/transaction-service/src/main/java/com/example/transactionservice/controller/TransactionController.java
new file mode 100644
index 0000000000..1136095c38
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/controller/TransactionController.java
@@ -0,0 +1,54 @@
+package com.example.transactionservice.controller;
+
+import com.example.transactionservice.domain.Transaction;
+import com.example.transactionservice.dto.CreateTransactionRequest;
+import com.example.transactionservice.service.TransactionService;
+import jakarta.validation.Valid;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequestMapping("/api/v1/transactions")
+@RequiredArgsConstructor
+@Slf4j
+public class TransactionController {
+
+ private final TransactionService transactionService;
+
+ @PostMapping
+ public Mono> createTransaction(
+ @Valid @RequestBody CreateTransactionRequest request) {
+ log.info("POST /api/v1/transactions - Creating transaction");
+
+ return transactionService.createTransaction(request)
+ .map(transaction -> ResponseEntity.status(HttpStatus.CREATED).body(transaction))
+ .onErrorResume(error -> {
+ log.error("Error creating transaction: {}", error.getMessage());
+ return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
+ });
+ }
+
+ @GetMapping("")
+ public ResponseEntity> getTransactions() {
+ log.info("GET /api/v1/transactions - Retrieving all transactions");
+
+ Flux transactions = transactionService.getAllTransactions();
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+
+ return new ResponseEntity<>(transactions, headers, HttpStatus.OK);
+ }
+
+
+ @GetMapping("/health")
+ public Mono> health() {
+ return Mono.just(ResponseEntity.ok("Transaction Service is running"));
+ }
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/domain/Transaction.java b/transaction-service/src/main/java/com/example/transactionservice/domain/Transaction.java
new file mode 100644
index 0000000000..913682127e
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/domain/Transaction.java
@@ -0,0 +1,30 @@
+package com.example.transactionservice.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.relational.core.mapping.Table;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@Table("transactions")
+public class Transaction {
+
+ @Id
+ private Long id;
+ private String transactionId;
+ private String accountExternalIdDebit;
+ private String accountExternalIdCredit;
+ private int transferTypeId;
+ private BigDecimal value;
+ private String status;
+ private LocalDateTime createdAt;
+ private LocalDateTime updatedAt;
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/dto/CreateTransactionRequest.java b/transaction-service/src/main/java/com/example/transactionservice/dto/CreateTransactionRequest.java
new file mode 100644
index 0000000000..93f31f7c0c
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/dto/CreateTransactionRequest.java
@@ -0,0 +1,28 @@
+package com.example.transactionservice.dto;
+
+import jakarta.validation.constraints.NotBlank;
+import jakarta.validation.constraints.NotNull;
+import jakarta.validation.constraints.Positive;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.math.BigDecimal;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CreateTransactionRequest {
+
+ @NotBlank(message = "Account External Id Debit is required")
+ private String accountExternalIdDebit;
+ @NotBlank(message = "Account External Id Credit is required")
+ private String accountExternalIdCredit;
+ @NotNull(message = "transfer Type Id is required")
+ private Integer transferTypeId;
+ @NotNull(message = "Amount is required")
+ @Positive(message = "Amount must be positive")
+ private BigDecimal value;
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionCreatedEvent.java b/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionCreatedEvent.java
new file mode 100644
index 0000000000..1b547dff26
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionCreatedEvent.java
@@ -0,0 +1,30 @@
+package com.example.transactionservice.dto;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TransactionCreatedEvent {
+
+ private String transactionId;
+ private TransactionType transactionType;
+ private TransactionStatus transactionStatus;
+ private BigDecimal value;
+
+ @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
+ private LocalDateTime createdAt;
+
+ private String eventId;
+
+ @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
+ private LocalDateTime eventTimestamp;
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionStatus.java b/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionStatus.java
new file mode 100644
index 0000000000..0d6d1829eb
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionStatus.java
@@ -0,0 +1,12 @@
+package com.example.transactionservice.dto;
+
+import lombok.*;
+
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TransactionStatus {
+ private String name;
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionStatusUpdatedEvent.java b/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionStatusUpdatedEvent.java
new file mode 100644
index 0000000000..cae30ed79b
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionStatusUpdatedEvent.java
@@ -0,0 +1,30 @@
+package com.example.transactionservice.dto;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TransactionStatusUpdatedEvent {
+
+ private String transactionId;
+ private String oldStatus;
+ private String newStatus;
+ private String reason;
+ private String updatedBy;
+
+ @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
+ private LocalDateTime updatedAt;
+
+ private String eventId;
+
+ @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
+ private LocalDateTime eventTimestamp;
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionType.java b/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionType.java
new file mode 100644
index 0000000000..7ad66351f3
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionType.java
@@ -0,0 +1,12 @@
+package com.example.transactionservice.dto;
+
+import lombok.*;
+
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TransactionType {
+ private String name;
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/repository/TransactionRepository.java b/transaction-service/src/main/java/com/example/transactionservice/repository/TransactionRepository.java
new file mode 100644
index 0000000000..263e63ec2f
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/repository/TransactionRepository.java
@@ -0,0 +1,12 @@
+package com.example.transactionservice.repository;
+
+import com.example.transactionservice.domain.Transaction;
+import org.springframework.data.r2dbc.repository.R2dbcRepository;
+import org.springframework.stereotype.Repository;
+import reactor.core.publisher.Mono;
+
+@Repository
+public interface TransactionRepository extends R2dbcRepository {
+
+ Mono findByTransactionId(String transactionId);
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/service/KafkaConsumerService.java b/transaction-service/src/main/java/com/example/transactionservice/service/KafkaConsumerService.java
new file mode 100644
index 0000000000..fbb57b5649
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/service/KafkaConsumerService.java
@@ -0,0 +1,80 @@
+package com.example.transactionservice.service;
+
+import com.example.transactionservice.dto.TransactionStatusUpdatedEvent;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverRecord;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaConsumerService {
+
+ private final KafkaReceiver kafkaReceiver;
+ private final ObjectMapper objectMapper;
+ private final TransactionService transactionService;
+ private Disposable disposable;
+
+ @PostConstruct
+ public void startConsuming() {
+ log.info("Starting Kafka consumer for transaction status updates...");
+
+ disposable = kafkaReceiver.receive()
+ .flatMap(this::processMessage)
+ .subscribe();
+ }
+
+ @PreDestroy
+ public void stopConsuming() {
+ if (disposable != null && !disposable.isDisposed()) {
+ log.info("Stopping Kafka consumer...");
+ disposable.dispose();
+ }
+ }
+
+ private Mono processMessage(ReceiverRecord record) {
+ return Mono.fromCallable(() -> {
+ log.info("Received message from Kafka: key={}, partition={}, offset={}",
+ record.key(), record.partition(), record.offset());
+
+ try {
+ return objectMapper.readValue(
+ record.value(),
+ TransactionStatusUpdatedEvent.class
+ );
+ } catch (Exception e) {
+ log.error("Error deserializing message: {}", e.getMessage(), e);
+ throw new RuntimeException("Failed to deserialize message", e);
+ }
+ })
+ .flatMap(event -> {
+ log.info("Processing status update: transactionId={}, oldStatus={}, newStatus={}",
+ event.getTransactionId(), event.getOldStatus(), event.getNewStatus());
+
+ return transactionService.updateTransactionStatus(
+ event.getTransactionId(),
+ event.getNewStatus()
+ );
+ })
+ .doOnSuccess(transaction -> {
+ log.info("Successfully processed status update for transaction: {}",
+ record.key());
+ record.receiverOffset().acknowledge();
+ })
+ .doOnError(error -> {
+ log.error("Error processing message for transaction: {}, error: {}",
+ record.key(), error.getMessage(), error);
+ // En producción, aquí podrías implementar lógica de retry o dead letter queue
+ record.receiverOffset().acknowledge(); // Acknowledge para evitar reprocessamiento infinito
+ })
+ .onErrorResume(error -> Mono.empty())
+ .then();
+ }
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/service/KafkaProducerService.java b/transaction-service/src/main/java/com/example/transactionservice/service/KafkaProducerService.java
new file mode 100644
index 0000000000..9f74aeabd6
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/service/KafkaProducerService.java
@@ -0,0 +1,74 @@
+package com.example.transactionservice.service;
+
+import com.example.transactionservice.config.KafkaTopicConfig;
+import com.example.transactionservice.dto.TransactionCreatedEvent;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderRecord;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaProducerService {
+
+ private final KafkaSender kafkaSender;
+ private final KafkaTopicConfig topicConfig;
+ private final ObjectMapper objectMapper;
+
+ public Mono publishTransactionCreation(TransactionCreatedEvent event) {
+ return Mono.fromCallable(() -> {
+ // Generar IDs de evento si no existen
+ if (event.getEventId() == null) {
+ event.setEventId(UUID.randomUUID().toString());
+ }
+ if (event.getEventTimestamp() == null) {
+ event.setEventTimestamp(LocalDateTime.now());
+ }
+
+ try {
+ return objectMapper.writeValueAsString(event);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Error serializing event", e);
+ }
+ })
+ .flatMap(eventJson -> {
+ ProducerRecord producerRecord = new ProducerRecord<>(
+ topicConfig.getTransactionCreation(),
+ event.getTransactionId(),
+ eventJson
+ );
+
+ SenderRecord senderRecord = SenderRecord.create(
+ producerRecord,
+ event.getTransactionId()
+ );
+
+ return kafkaSender.send(Mono.just(senderRecord))
+ .next()
+ .doOnSuccess(result -> {
+ log.info("Successfully published transaction created event: transactionId={}, partition={}, offset={}",
+ event.getTransactionId(),
+ result.recordMetadata().partition(),
+ result.recordMetadata().offset());
+ })
+ .doOnError(error -> {
+ log.error("Error publishing transaction created event: transactionId={}, error={}",
+ event.getTransactionId(), error.getMessage(), error);
+ })
+ .then();
+ })
+ .onErrorResume(error -> {
+ log.error("Failed to publish transaction created event", error);
+ return Mono.error(new RuntimeException("Failed to publish event", error));
+ });
+ }
+}
diff --git a/transaction-service/src/main/java/com/example/transactionservice/service/TransactionService.java b/transaction-service/src/main/java/com/example/transactionservice/service/TransactionService.java
new file mode 100644
index 0000000000..35f8f5e0ab
--- /dev/null
+++ b/transaction-service/src/main/java/com/example/transactionservice/service/TransactionService.java
@@ -0,0 +1,92 @@
+package com.example.transactionservice.service;
+
+import com.example.transactionservice.domain.Transaction;
+import com.example.transactionservice.dto.CreateTransactionRequest;
+import com.example.transactionservice.dto.TransactionCreatedEvent;
+import com.example.transactionservice.dto.TransactionStatus;
+import com.example.transactionservice.dto.TransactionType;
+import com.example.transactionservice.repository.TransactionRepository;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class TransactionService {
+
+ private final TransactionRepository transactionRepository;
+ private final KafkaProducerService kafkaProducerService;
+
+ @Transactional
+ public Mono createTransaction(CreateTransactionRequest request) {
+ log.info("Creating new transaction: {}", request.getTransferTypeId());
+
+ String transactionId = UUID.randomUUID().toString();
+
+ Transaction transaction = Transaction.builder()
+ .transactionId(transactionId)
+ .accountExternalIdDebit(request.getAccountExternalIdDebit())
+ .accountExternalIdCredit(request.getAccountExternalIdCredit())
+ .transferTypeId(request.getTransferTypeId())
+ .value(request.getValue())
+ .status("PENDING")
+ .createdAt(LocalDateTime.now())
+ .updatedAt(LocalDateTime.now())
+ .build();
+
+ return transactionRepository.save(transaction)
+ .flatMap(savedTransaction -> {
+ log.info("Transaction saved: {}", savedTransaction.getTransactionId());
+
+ // Crear evento
+ TransactionCreatedEvent event = TransactionCreatedEvent.builder()
+ .transactionId(savedTransaction.getTransactionId())
+ .transactionType(new TransactionType(String.valueOf(savedTransaction.getTransferTypeId())))
+ .transactionStatus(new TransactionStatus(savedTransaction.getStatus()))
+ .value(savedTransaction.getValue())
+ .createdAt(savedTransaction.getCreatedAt())
+ .build();
+
+ // Publicar evento
+ return kafkaProducerService.publishTransactionCreation(event)
+ .thenReturn(savedTransaction);
+ })
+ .doOnSuccess(tx -> log.info("Transaction created and event published: {}", tx.getTransactionId()))
+ .doOnError(error -> log.error("Error creating transaction: {}", error.getMessage(), error));
+ }
+
+ public Mono getTransactionById(String transactionId) {
+ log.info("Retrieving transaction: {}", transactionId);
+ return transactionRepository.findByTransactionId(transactionId)
+ .switchIfEmpty(Mono.error(new RuntimeException("Transaction not found: " + transactionId)));
+ }
+
+ public Flux getAllTransactions() {
+ log.info("Retrieving all transactions");
+ return transactionRepository.findAll()
+ .switchIfEmpty(Flux.just());
+ }
+
+ @Transactional
+ public Mono updateTransactionStatus(String transactionId, String newStatus) {
+ log.info("Updating transaction status: transactionId={}, newStatus={}", transactionId, newStatus);
+
+ return transactionRepository.findByTransactionId(transactionId)
+ .switchIfEmpty(Mono.error(new RuntimeException("Transaction not found: " + transactionId)))
+ .flatMap(transaction -> {
+ transaction.setStatus(newStatus);
+ transaction.setUpdatedAt(LocalDateTime.now());
+ return transactionRepository.save(transaction);
+ })
+ .doOnSuccess(tx -> log.info("Transaction status updated: transactionId={}, status={}",
+ tx.getTransactionId(), tx.getStatus()))
+ .doOnError(error -> log.error("Error updating transaction status: {}", error.getMessage(), error));
+ }
+}
diff --git a/transaction-service/src/main/resources/application.yml b/transaction-service/src/main/resources/application.yml
new file mode 100644
index 0000000000..4fb66c52de
--- /dev/null
+++ b/transaction-service/src/main/resources/application.yml
@@ -0,0 +1,83 @@
+server:
+ port: 8080
+
+spring:
+ application:
+ name: transaction-service
+ sql:
+ init:
+ mode: always
+ r2dbc:
+ url: r2dbc:postgresql://localhost:5432/transactions_db
+ username: postgres
+ password: postgres
+ initialization-mode: always
+ pool:
+ initial-size: 10
+ max-size: 50
+ max-idle-time: 30m
+ max-acquire-time: 3s
+
+ kafka:
+ bootstrap-servers: localhost:9092
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ acks: all
+ retries: 3
+ properties:
+ max.in.flight.requests.per.connection: 5
+ enable.idempotence: true
+ request.timeout.ms: 60000
+ connections.max.idle.ms: 600000
+ metadata.max.age.ms: 180000
+ consumer:
+ group-id: transaction-service-group
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ auto-offset-reset: earliest
+ enable-auto-commit: false
+ properties:
+ max.poll.records: 100
+ session.timeout.ms: 30000
+
+# Configuración de tópicos de Kafka
+kafka:
+ topics:
+ transaction-creation: transaction-creation-topic
+ anti-fraud-validation: anti-fraud-validation-topic
+ consumer:
+ concurrency: 3
+ max-poll-records: 100
+
+# Logging
+logging:
+ level:
+ root: INFO
+ com.example.transactionservice: DEBUG
+ org.springframework.r2dbc: DEBUG
+ reactor.kafka: DEBUG
+
+management:
+ endpoints:
+ web:
+ exposure:
+ include: health,info,metrics,prometheus
+ metrics:
+ export:
+ prometheus:
+ enabled: true
+
+---
+spring:
+ config:
+ activate:
+ on-profile: docker
+
+ r2dbc:
+ url: ${SPRING_R2DBC_URL}
+ username: ${SPRING_R2DBC_USERNAME}
+ password: ${SPRING_R2DBC_PASSWORD}
+
+ kafka:
+ bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS}
diff --git a/transaction-service/src/main/resources/schema.sql b/transaction-service/src/main/resources/schema.sql
new file mode 100644
index 0000000000..06fed6e4b1
--- /dev/null
+++ b/transaction-service/src/main/resources/schema.sql
@@ -0,0 +1,22 @@
+-- Schema for Transaction Service
+CREATE TABLE IF NOT EXISTS transactions (
+ id BIGSERIAL PRIMARY KEY,
+ transaction_id VARCHAR(255) UNIQUE NOT NULL,
+ account_external_id_debit VARCHAR(255) NOT NULL,
+ account_external_id_credit VARCHAR(255) NOT NULL,
+ transfer_type_id VARCHAR(255) NOT NULL,
+ value DECIMAL(19, 4) NOT NULL,
+ status VARCHAR(50) NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
+);
+
+-- Indexes
+CREATE INDEX IF NOT EXISTS idx_transaction_id ON transactions(transaction_id);
+CREATE INDEX IF NOT EXISTS idx_status ON transactions(status);
+CREATE INDEX IF NOT EXISTS idx_created_at ON transactions(created_at);
+
+-- Comments
+COMMENT ON TABLE transactions IS 'Tabla de transacciones del sistema';
+COMMENT ON COLUMN transactions.transaction_id IS 'ID único de la transacción (UUID)';
+COMMENT ON COLUMN transactions.status IS 'Estado de la transacción: PENDING, PROCESSING, COMPLETED, FAILED';
diff --git a/transaction-service/target/classes/application.yml b/transaction-service/target/classes/application.yml
new file mode 100644
index 0000000000..4fb66c52de
--- /dev/null
+++ b/transaction-service/target/classes/application.yml
@@ -0,0 +1,83 @@
+server:
+ port: 8080
+
+spring:
+ application:
+ name: transaction-service
+ sql:
+ init:
+ mode: always
+ r2dbc:
+ url: r2dbc:postgresql://localhost:5432/transactions_db
+ username: postgres
+ password: postgres
+ initialization-mode: always
+ pool:
+ initial-size: 10
+ max-size: 50
+ max-idle-time: 30m
+ max-acquire-time: 3s
+
+ kafka:
+ bootstrap-servers: localhost:9092
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ acks: all
+ retries: 3
+ properties:
+ max.in.flight.requests.per.connection: 5
+ enable.idempotence: true
+ request.timeout.ms: 60000
+ connections.max.idle.ms: 600000
+ metadata.max.age.ms: 180000
+ consumer:
+ group-id: transaction-service-group
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ auto-offset-reset: earliest
+ enable-auto-commit: false
+ properties:
+ max.poll.records: 100
+ session.timeout.ms: 30000
+
+# Configuración de tópicos de Kafka
+kafka:
+ topics:
+ transaction-creation: transaction-creation-topic
+ anti-fraud-validation: anti-fraud-validation-topic
+ consumer:
+ concurrency: 3
+ max-poll-records: 100
+
+# Logging
+logging:
+ level:
+ root: INFO
+ com.example.transactionservice: DEBUG
+ org.springframework.r2dbc: DEBUG
+ reactor.kafka: DEBUG
+
+management:
+ endpoints:
+ web:
+ exposure:
+ include: health,info,metrics,prometheus
+ metrics:
+ export:
+ prometheus:
+ enabled: true
+
+---
+spring:
+ config:
+ activate:
+ on-profile: docker
+
+ r2dbc:
+ url: ${SPRING_R2DBC_URL}
+ username: ${SPRING_R2DBC_USERNAME}
+ password: ${SPRING_R2DBC_PASSWORD}
+
+ kafka:
+ bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS}
diff --git a/transaction-service/target/classes/com/example/transactionservice/TransactionServiceApplication.class b/transaction-service/target/classes/com/example/transactionservice/TransactionServiceApplication.class
new file mode 100644
index 0000000000..a74d62e1e3
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/TransactionServiceApplication.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/config/JacksonConfig.class b/transaction-service/target/classes/com/example/transactionservice/config/JacksonConfig.class
new file mode 100644
index 0000000000..0a6609a8fe
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/config/JacksonConfig.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/config/KafkaConfig$Consumer.class b/transaction-service/target/classes/com/example/transactionservice/config/KafkaConfig$Consumer.class
new file mode 100644
index 0000000000..52937f3eb7
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/config/KafkaConfig$Consumer.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/config/KafkaConfig$Producer.class b/transaction-service/target/classes/com/example/transactionservice/config/KafkaConfig$Producer.class
new file mode 100644
index 0000000000..740f269b4f
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/config/KafkaConfig$Producer.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/config/KafkaConfig.class b/transaction-service/target/classes/com/example/transactionservice/config/KafkaConfig.class
new file mode 100644
index 0000000000..ca29396b64
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/config/KafkaConfig.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/config/KafkaTopicConfig.class b/transaction-service/target/classes/com/example/transactionservice/config/KafkaTopicConfig.class
new file mode 100644
index 0000000000..3f9769b2c9
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/config/KafkaTopicConfig.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/controller/TransactionController.class b/transaction-service/target/classes/com/example/transactionservice/controller/TransactionController.class
new file mode 100644
index 0000000000..a9e79fd1ea
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/controller/TransactionController.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/domain/Transaction$TransactionBuilder.class b/transaction-service/target/classes/com/example/transactionservice/domain/Transaction$TransactionBuilder.class
new file mode 100644
index 0000000000..ff2feecad1
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/domain/Transaction$TransactionBuilder.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/domain/Transaction.class b/transaction-service/target/classes/com/example/transactionservice/domain/Transaction.class
new file mode 100644
index 0000000000..940991cd9f
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/domain/Transaction.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/dto/CreateTransactionRequest$CreateTransactionRequestBuilder.class b/transaction-service/target/classes/com/example/transactionservice/dto/CreateTransactionRequest$CreateTransactionRequestBuilder.class
new file mode 100644
index 0000000000..2d2e561176
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/dto/CreateTransactionRequest$CreateTransactionRequestBuilder.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/dto/CreateTransactionRequest.class b/transaction-service/target/classes/com/example/transactionservice/dto/CreateTransactionRequest.class
new file mode 100644
index 0000000000..0fa73ab949
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/dto/CreateTransactionRequest.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/dto/TransactionCreatedEvent$TransactionCreatedEventBuilder.class b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionCreatedEvent$TransactionCreatedEventBuilder.class
new file mode 100644
index 0000000000..9e35c3b01f
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionCreatedEvent$TransactionCreatedEventBuilder.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/dto/TransactionCreatedEvent.class b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionCreatedEvent.class
new file mode 100644
index 0000000000..d98c3e1c35
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionCreatedEvent.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatus$TransactionStatusBuilder.class b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatus$TransactionStatusBuilder.class
new file mode 100644
index 0000000000..90e5e2884e
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatus$TransactionStatusBuilder.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatus.class b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatus.class
new file mode 100644
index 0000000000..94c6137af2
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatus.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatusUpdatedEvent$TransactionStatusUpdatedEventBuilder.class b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatusUpdatedEvent$TransactionStatusUpdatedEventBuilder.class
new file mode 100644
index 0000000000..b24f098028
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatusUpdatedEvent$TransactionStatusUpdatedEventBuilder.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatusUpdatedEvent.class b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatusUpdatedEvent.class
new file mode 100644
index 0000000000..b09cb63409
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionStatusUpdatedEvent.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/dto/TransactionType$TransactionTypeBuilder.class b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionType$TransactionTypeBuilder.class
new file mode 100644
index 0000000000..91649b1bb7
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionType$TransactionTypeBuilder.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/dto/TransactionType.class b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionType.class
new file mode 100644
index 0000000000..4e35446593
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/dto/TransactionType.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/repository/TransactionRepository.class b/transaction-service/target/classes/com/example/transactionservice/repository/TransactionRepository.class
new file mode 100644
index 0000000000..e3e5fffb14
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/repository/TransactionRepository.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/service/KafkaConsumerService.class b/transaction-service/target/classes/com/example/transactionservice/service/KafkaConsumerService.class
new file mode 100644
index 0000000000..2e46aadb5f
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/service/KafkaConsumerService.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/service/KafkaProducerService.class b/transaction-service/target/classes/com/example/transactionservice/service/KafkaProducerService.class
new file mode 100644
index 0000000000..7827b2f443
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/service/KafkaProducerService.class differ
diff --git a/transaction-service/target/classes/com/example/transactionservice/service/TransactionService.class b/transaction-service/target/classes/com/example/transactionservice/service/TransactionService.class
new file mode 100644
index 0000000000..ce3008a754
Binary files /dev/null and b/transaction-service/target/classes/com/example/transactionservice/service/TransactionService.class differ
diff --git a/transaction-service/target/classes/schema.sql b/transaction-service/target/classes/schema.sql
new file mode 100644
index 0000000000..06fed6e4b1
--- /dev/null
+++ b/transaction-service/target/classes/schema.sql
@@ -0,0 +1,22 @@
+-- Schema for Transaction Service
+CREATE TABLE IF NOT EXISTS transactions (
+ id BIGSERIAL PRIMARY KEY,
+ transaction_id VARCHAR(255) UNIQUE NOT NULL,
+ account_external_id_debit VARCHAR(255) NOT NULL,
+ account_external_id_credit VARCHAR(255) NOT NULL,
+ transfer_type_id VARCHAR(255) NOT NULL,
+ value DECIMAL(19, 4) NOT NULL,
+ status VARCHAR(50) NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
+);
+
+-- Indexes
+CREATE INDEX IF NOT EXISTS idx_transaction_id ON transactions(transaction_id);
+CREATE INDEX IF NOT EXISTS idx_status ON transactions(status);
+CREATE INDEX IF NOT EXISTS idx_created_at ON transactions(created_at);
+
+-- Comments
+COMMENT ON TABLE transactions IS 'Tabla de transacciones del sistema';
+COMMENT ON COLUMN transactions.transaction_id IS 'ID único de la transacción (UUID)';
+COMMENT ON COLUMN transactions.status IS 'Estado de la transacción: PENDING, PROCESSING, COMPLETED, FAILED';
diff --git a/transaction-service/target/maven-archiver/pom.properties b/transaction-service/target/maven-archiver/pom.properties
new file mode 100644
index 0000000000..2b890b2419
--- /dev/null
+++ b/transaction-service/target/maven-archiver/pom.properties
@@ -0,0 +1,3 @@
+artifactId=transaction-service
+groupId=com.example
+version=1.0.0
diff --git a/transaction-service/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst b/transaction-service/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst
new file mode 100644
index 0000000000..bd1611652a
--- /dev/null
+++ b/transaction-service/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst
@@ -0,0 +1,19 @@
+com/example/transactionservice/config/KafkaConfig$Consumer.class
+com/example/transactionservice/dto/TransactionCreatedEvent.class
+com/example/transactionservice/config/JacksonConfig.class
+com/example/transactionservice/service/KafkaConsumerService.class
+com/example/transactionservice/dto/TransactionCreatedEvent$TransactionCreatedEventBuilder.class
+com/example/transactionservice/dto/CreateTransactionRequest.class
+com/example/transactionservice/dto/CreateTransactionRequest$CreateTransactionRequestBuilder.class
+com/example/transactionservice/config/KafkaTopicConfig.class
+com/example/transactionservice/config/KafkaConfig$Producer.class
+com/example/transactionservice/dto/TransactionStatusUpdatedEvent$TransactionStatusUpdatedEventBuilder.class
+com/example/transactionservice/controller/TransactionController.class
+com/example/transactionservice/dto/TransactionStatusUpdatedEvent.class
+com/example/transactionservice/repository/TransactionRepository.class
+com/example/transactionservice/config/KafkaConfig.class
+com/example/transactionservice/domain/Transaction$TransactionBuilder.class
+com/example/transactionservice/domain/Transaction.class
+com/example/transactionservice/TransactionServiceApplication.class
+com/example/transactionservice/service/TransactionService.class
+com/example/transactionservice/service/KafkaProducerService.class
diff --git a/transaction-service/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst b/transaction-service/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst
new file mode 100644
index 0000000000..37458d0711
--- /dev/null
+++ b/transaction-service/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst
@@ -0,0 +1,13 @@
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/repository/TransactionRepository.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/domain/Transaction.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/service/KafkaProducerService.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionStatusUpdatedEvent.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/dto/CreateTransactionRequest.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/TransactionServiceApplication.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/service/KafkaConsumerService.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/dto/TransactionCreatedEvent.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/config/KafkaConfig.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/controller/TransactionController.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/service/TransactionService.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/config/KafkaTopicConfig.java
+/Users/jimmy/Documents/workspace-intellij/yape-challange/transaction-service/src/main/java/com/example/transactionservice/config/JacksonConfig.java
diff --git a/transaction-service/target/transaction-service-1.0.0.jar b/transaction-service/target/transaction-service-1.0.0.jar
new file mode 100644
index 0000000000..bbcb448f2b
Binary files /dev/null and b/transaction-service/target/transaction-service-1.0.0.jar differ
diff --git a/transaction-service/target/transaction-service-1.0.0.jar.original b/transaction-service/target/transaction-service-1.0.0.jar.original
new file mode 100644
index 0000000000..7d164e709a
Binary files /dev/null and b/transaction-service/target/transaction-service-1.0.0.jar.original differ