From 6cb3047afbb7d40e6839e4b7aac5beb3dda93974 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 16 Oct 2025 21:04:44 +0300 Subject: [PATCH 1/8] feature: degraded rps limiter value by one for test on remote --- src/main/kotlin/ru/quipy/config/RpcControlConfig.kt | 3 +-- src/main/resources/application.properties | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt index 805399a17..e7a4d2900 100644 --- a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -13,8 +13,7 @@ class RpcControlConfig { @Bean fun getSlidingWindowRateLimiter(accountProperties: PaymentAccountProperties) = - SlidingWindowRateLimiter( - accountProperties.rateLimitPerSec.toLong(), + SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong()-1, Duration.ofSeconds(1) ) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 06f5f41a1..0abc8486e 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -23,5 +23,5 @@ payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} # payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} # payment.accounts=${PAYMENT_ACCOUNTS:acc-3} -payment.accounts=${PAYMENT_ACCOUNTS:acc-5} +payment.accounts=${PAYMENT_ACCOUNTS:acc-23} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file From 4a7f689336323555f8969a6daa80dce008402adc Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 16 Oct 2025 21:44:05 +0300 Subject: [PATCH 2/8] feature: added ordered queue for payment service --- .../kotlin/ru/quipy/payments/dto/Transaction.kt | 5 +++++ .../kotlin/ru/quipy/payments/logic/OrderPayer.kt | 13 ++++++++----- 2 files changed, 13 insertions(+), 5 deletions(-) create mode 100644 src/main/kotlin/ru/quipy/payments/dto/Transaction.kt diff --git a/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt b/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt new file mode 100644 index 000000000..ae3685708 --- /dev/null +++ b/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt @@ -0,0 +1,5 @@ +package ru.quipy.payments.dto + +import java.util.UUID + +data class Transaction(val orderId: UUID, val amount: Int, val paymentId: UUID, val deadline: Long) diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index c30a5d160..5abfc4c9b 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -11,8 +11,10 @@ import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate +import ru.quipy.payments.dto.Transaction import java.time.Duration import java.util.* +import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.Semaphore import java.util.concurrent.ThreadPoolExecutor @@ -28,6 +30,7 @@ class OrderPayer( private val parallelLimiter: Semaphore, ) { + private val taskQueue: Queue = ConcurrentLinkedDeque() private val paymentProcessingPlannedCounter: Counter = Metrics.counter("payment.processing.planned", "accountName", accountProperties.accountName) private val paymentProcessingStartedCounter: Counter = Metrics.counter("payment.processing.started", "accountName", accountProperties.accountName) private val paymentProcessingCompletedCounter: Counter = Metrics.counter("payment.processing.completed", "accountName", accountProperties.accountName) @@ -57,10 +60,10 @@ class OrderPayer( fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() - paymentProcessingPlannedCounter.increment() + taskQueue.add(Transaction(orderId, amount, paymentId, deadline)) parallelLimiter.acquire() - + val taskParam = taskQueue.remove() return try { while (!rateLimit.tick()) { Thread.sleep(Random.nextLong(0, 100)) @@ -70,10 +73,10 @@ class OrderPayer( paymentProcessingStartedCounter.increment() try { val createdEvent = paymentESService.create { - it.create(paymentId, orderId, amount) + it.create(taskParam.paymentId, taskParam.orderId, taskParam.amount) } - logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) - paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, taskParam.orderId) + paymentService.submitPaymentRequest(taskParam.paymentId, taskParam.amount, createdAt, taskParam.deadline) } finally { parallelLimiter.release() paymentProcessingCompletedCounter.increment() From eca925ec00b35d49ad0e89073a13363f4a163d0f Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 16 Oct 2025 22:27:05 +0300 Subject: [PATCH 3/8] feature: added priority queue --- .../kotlin/ru/quipy/config/RpcControlConfig.kt | 2 +- .../kotlin/ru/quipy/payments/logic/OrderPayer.kt | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt index e7a4d2900..bd37c79e2 100644 --- a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -13,7 +13,7 @@ class RpcControlConfig { @Bean fun getSlidingWindowRateLimiter(accountProperties: PaymentAccountProperties) = - SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong()-1, + SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong(), Duration.ofSeconds(1) ) diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 5abfc4c9b..c05504755 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -14,8 +14,10 @@ import ru.quipy.payments.api.PaymentAggregate import ru.quipy.payments.dto.Transaction import java.time.Duration import java.util.* +import java.util.concurrent.BlockingQueue import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.Semaphore import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @@ -30,7 +32,7 @@ class OrderPayer( private val parallelLimiter: Semaphore, ) { - private val taskQueue: Queue = ConcurrentLinkedDeque() + private val taskQueue: BlockingQueue = PriorityBlockingQueue(11, compareBy { transaction -> transaction.orderId} ) private val paymentProcessingPlannedCounter: Counter = Metrics.counter("payment.processing.planned", "accountName", accountProperties.accountName) private val paymentProcessingStartedCounter: Counter = Metrics.counter("payment.processing.started", "accountName", accountProperties.accountName) private val paymentProcessingCompletedCounter: Counter = Metrics.counter("payment.processing.completed", "accountName", accountProperties.accountName) @@ -61,13 +63,13 @@ class OrderPayer( fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() paymentProcessingPlannedCounter.increment() - taskQueue.add(Transaction(orderId, amount, paymentId, deadline)) + taskQueue.offer(Transaction(orderId, amount, paymentId, deadline)) parallelLimiter.acquire() - val taskParam = taskQueue.remove() return try { while (!rateLimit.tick()) { - Thread.sleep(Random.nextLong(0, 100)) + Thread.sleep(10) } + val taskParam = taskQueue.poll() paymentExecutor.submit { paymentProcessingStartedCounter.increment() @@ -76,7 +78,9 @@ class OrderPayer( it.create(taskParam.paymentId, taskParam.orderId, taskParam.amount) } logger.trace("Payment {} for order {} created.", createdEvent.paymentId, taskParam.orderId) - paymentService.submitPaymentRequest(taskParam.paymentId, taskParam.amount, createdAt, taskParam.deadline) + if (deadline > System.currentTimeMillis()) { + paymentService.submitPaymentRequest(taskParam.paymentId, taskParam.amount, createdAt, taskParam.deadline) + } } finally { parallelLimiter.release() paymentProcessingCompletedCounter.increment() From 2629c3374bca0d168b0a4f1ba8ff4e5152502d06 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 16 Oct 2025 23:17:21 +0300 Subject: [PATCH 4/8] fix: fixed priority queue --- .../ru/quipy/payments/logic/OrderPayer.kt | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index c05504755..3f8f187ec 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -14,14 +14,7 @@ import ru.quipy.payments.api.PaymentAggregate import ru.quipy.payments.dto.Transaction import java.time.Duration import java.util.* -import java.util.concurrent.BlockingQueue -import java.util.concurrent.ConcurrentLinkedDeque -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.PriorityBlockingQueue -import java.util.concurrent.Semaphore -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit -import kotlin.random.Random +import java.util.concurrent.* @Service class OrderPayer( @@ -32,10 +25,14 @@ class OrderPayer( private val parallelLimiter: Semaphore, ) { - private val taskQueue: BlockingQueue = PriorityBlockingQueue(11, compareBy { transaction -> transaction.orderId} ) - private val paymentProcessingPlannedCounter: Counter = Metrics.counter("payment.processing.planned", "accountName", accountProperties.accountName) - private val paymentProcessingStartedCounter: Counter = Metrics.counter("payment.processing.started", "accountName", accountProperties.accountName) - private val paymentProcessingCompletedCounter: Counter = Metrics.counter("payment.processing.completed", "accountName", accountProperties.accountName) + private val taskQueue: BlockingQueue = + PriorityBlockingQueue(1800, compareBy { it.deadline }) + private val paymentProcessingPlannedCounter: Counter = + Metrics.counter("payment.processing.planned", "accountName", accountProperties.accountName) + private val paymentProcessingStartedCounter: Counter = + Metrics.counter("payment.processing.started", "accountName", accountProperties.accountName) + private val paymentProcessingCompletedCounter: Counter = + Metrics.counter("payment.processing.completed", "accountName", accountProperties.accountName) companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) @@ -63,11 +60,11 @@ class OrderPayer( fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() paymentProcessingPlannedCounter.increment() - taskQueue.offer(Transaction(orderId, amount, paymentId, deadline)) + taskQueue.put(Transaction(orderId, amount, paymentId, deadline)) parallelLimiter.acquire() return try { while (!rateLimit.tick()) { - Thread.sleep(10) + Thread.sleep(Random().nextInt(0,10).toLong()) } val taskParam = taskQueue.poll() @@ -78,9 +75,12 @@ class OrderPayer( it.create(taskParam.paymentId, taskParam.orderId, taskParam.amount) } logger.trace("Payment {} for order {} created.", createdEvent.paymentId, taskParam.orderId) - if (deadline > System.currentTimeMillis()) { - paymentService.submitPaymentRequest(taskParam.paymentId, taskParam.amount, createdAt, taskParam.deadline) - } + paymentService.submitPaymentRequest( + taskParam.paymentId, + taskParam.amount, + createdAt, + taskParam.deadline + ) } finally { parallelLimiter.release() paymentProcessingCompletedCounter.increment() From 5af096ef2530e564b264867222414a37137c0304 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 17 Oct 2025 16:41:48 +0300 Subject: [PATCH 5/8] fix: fixed queue by using paymentExecutor --- .../ru/quipy/payments/logic/OrderPayer.kt | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 3f8f187ec..f395502e6 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -11,7 +11,6 @@ import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate -import ru.quipy.payments.dto.Transaction import java.time.Duration import java.util.* import java.util.concurrent.* @@ -24,9 +23,6 @@ class OrderPayer( @field:Qualifier("parallelLimiter") private val parallelLimiter: Semaphore, ) { - - private val taskQueue: BlockingQueue = - PriorityBlockingQueue(1800, compareBy { it.deadline }) private val paymentProcessingPlannedCounter: Counter = Metrics.counter("payment.processing.planned", "accountName", accountProperties.accountName) private val paymentProcessingStartedCounter: Counter = @@ -60,26 +56,24 @@ class OrderPayer( fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() paymentProcessingPlannedCounter.increment() - taskQueue.put(Transaction(orderId, amount, paymentId, deadline)) - parallelLimiter.acquire() - return try { - while (!rateLimit.tick()) { - Thread.sleep(Random().nextInt(0,10).toLong()) - } - val taskParam = taskQueue.poll() + return try { paymentExecutor.submit { + parallelLimiter.acquire() + while (!rateLimit.tick()) { + Thread.sleep(Random().nextInt(0, 10).toLong()) + } paymentProcessingStartedCounter.increment() try { val createdEvent = paymentESService.create { - it.create(taskParam.paymentId, taskParam.orderId, taskParam.amount) + it.create(paymentId, orderId, amount) } - logger.trace("Payment {} for order {} created.", createdEvent.paymentId, taskParam.orderId) + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) paymentService.submitPaymentRequest( - taskParam.paymentId, - taskParam.amount, + paymentId, + amount, createdAt, - taskParam.deadline + deadline ) } finally { parallelLimiter.release() From 600bec6e7f5ebacaf02e00bfd67c5e53c2595034 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Fri, 17 Oct 2025 17:39:09 +0300 Subject: [PATCH 6/8] fix: fixed queue by using paymentExecutor and using priority queue --- .../ru/quipy/payments/dto/Transaction.kt | 5 +- .../ru/quipy/payments/logic/OrderPayer.kt | 64 ++++++++++--------- 2 files changed, 38 insertions(+), 31 deletions(-) diff --git a/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt b/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt index ae3685708..ce491998a 100644 --- a/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt +++ b/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt @@ -1,5 +1,8 @@ package ru.quipy.payments.dto +import kotlinx.coroutines.Runnable import java.util.UUID -data class Transaction(val orderId: UUID, val amount: Int, val paymentId: UUID, val deadline: Long) +data class Transaction(val orderId: UUID, val amount: Int, val paymentId: UUID, val deadline: Long, val task : Runnable) : Runnable { + override fun run() = task.run() +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index f395502e6..91f705c45 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -11,6 +11,7 @@ import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate +import ru.quipy.payments.dto.Transaction import java.time.Duration import java.util.* import java.util.concurrent.* @@ -40,7 +41,7 @@ class OrderPayer( accountProperties.parallelRequests, 0L, TimeUnit.MILLISECONDS, - LinkedBlockingQueue(accountProperties.parallelRequests * 10), + PriorityBlockingQueue(accountProperties.parallelRequests, compareBy { it.deadline }) as BlockingQueue, NamedThreadFactory("payment-submission-executor"), CallerBlockingRejectedExecutionHandler() ) @@ -57,33 +58,36 @@ class OrderPayer( val createdAt = System.currentTimeMillis() paymentProcessingPlannedCounter.increment() - return try { - paymentExecutor.submit { - parallelLimiter.acquire() - while (!rateLimit.tick()) { - Thread.sleep(Random().nextInt(0, 10).toLong()) - } - paymentProcessingStartedCounter.increment() - try { - val createdEvent = paymentESService.create { - it.create(paymentId, orderId, amount) - } - logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) - paymentService.submitPaymentRequest( - paymentId, - amount, - createdAt, - deadline - ) - } finally { - parallelLimiter.release() - paymentProcessingCompletedCounter.increment() - } - } - createdAt - } catch (e: Exception) { - parallelLimiter.release() - throw e - } - } + return try { + val task = Runnable { + while (!rateLimit.tick()) { + Thread.sleep(Random().nextInt(0, 10).toLong()) + } + parallelLimiter.acquire() + paymentProcessingStartedCounter.increment() + try { + val createdEvent = paymentESService.create { + it.create(paymentId, orderId, amount) + } + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) + paymentService.submitPaymentRequest( + paymentId, + amount, + createdAt, + deadline + ) + } finally { + parallelLimiter.release() + paymentProcessingCompletedCounter.increment() + } + } + val transaction = Transaction(orderId, amount, paymentId, deadline, task) + paymentExecutor.execute( transaction) + createdAt + } catch (e: Exception) { + parallelLimiter.release() + throw e + } + +} } \ No newline at end of file From fb3b5c9584239df3cbca8bebdc6832d7588c9f5a Mon Sep 17 00:00:00 2001 From: 21092004Goda Date: Wed, 22 Oct 2025 22:20:53 +0300 Subject: [PATCH 7/8] -- fix: Add response 429. --- .../common/utils/GlobalExceptionHandler.kt | 24 +++++++ .../common/utils/TooManyRequestsException.kt | 3 + .../ru/quipy/payments/logic/OrderPayer.kt | 68 +++++++++---------- src/main/resources/application.properties | 1 + test-local-run.http | 6 +- 5 files changed, 65 insertions(+), 37 deletions(-) create mode 100644 src/main/kotlin/ru/quipy/common/utils/GlobalExceptionHandler.kt create mode 100644 src/main/kotlin/ru/quipy/common/utils/TooManyRequestsException.kt diff --git a/src/main/kotlin/ru/quipy/common/utils/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/common/utils/GlobalExceptionHandler.kt new file mode 100644 index 000000000..acdc494fe --- /dev/null +++ b/src/main/kotlin/ru/quipy/common/utils/GlobalExceptionHandler.kt @@ -0,0 +1,24 @@ +package ru.quipy.common.utils + +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.slf4j.LoggerFactory +import org.springframework.web.bind.annotation.ExceptionHandler +import org.springframework.web.bind.annotation.RestControllerAdvice + +@RestControllerAdvice +class GlobalExceptionHandler( + private val maxWait: String = "3", +) { + companion object { + val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java) + } + + @ExceptionHandler(TooManyRequestsException::class) + fun handleTooManyRequests(ex: TooManyRequestsException): ResponseEntity { + return ResponseEntity + .status(HttpStatus.TOO_MANY_REQUESTS) + .header("Retry-After", maxWait) + .body(ex.message ?: "Повторите позже") + } +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/common/utils/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/common/utils/TooManyRequestsException.kt new file mode 100644 index 000000000..d3865680c --- /dev/null +++ b/src/main/kotlin/ru/quipy/common/utils/TooManyRequestsException.kt @@ -0,0 +1,3 @@ +package ru.quipy.common.utils + +class TooManyRequestsException(msg: String) : RuntimeException(msg) \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index 91f705c45..f7c286ab8 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -6,9 +6,9 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service -import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter +import ru.quipy.common.utils.TooManyRequestsException import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate import ru.quipy.payments.dto.Transaction @@ -41,9 +41,9 @@ class OrderPayer( accountProperties.parallelRequests, 0L, TimeUnit.MILLISECONDS, - PriorityBlockingQueue(accountProperties.parallelRequests, compareBy { it.deadline }) as BlockingQueue, + ArrayBlockingQueue(accountProperties.parallelRequests), NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler() + ThreadPoolExecutor.AbortPolicy() ) } @@ -58,36 +58,36 @@ class OrderPayer( val createdAt = System.currentTimeMillis() paymentProcessingPlannedCounter.increment() - return try { - val task = Runnable { - while (!rateLimit.tick()) { - Thread.sleep(Random().nextInt(0, 10).toLong()) - } - parallelLimiter.acquire() - paymentProcessingStartedCounter.increment() - try { - val createdEvent = paymentESService.create { - it.create(paymentId, orderId, amount) - } - logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) - paymentService.submitPaymentRequest( - paymentId, - amount, - createdAt, - deadline - ) - } finally { - parallelLimiter.release() - paymentProcessingCompletedCounter.increment() - } - } - val transaction = Transaction(orderId, amount, paymentId, deadline, task) - paymentExecutor.execute( transaction) - createdAt - } catch (e: Exception) { - parallelLimiter.release() - throw e - } + val task = Runnable { + while (!rateLimit.tick()) { + Thread.sleep(Random().nextInt(0, 10).toLong()) + } + parallelLimiter.acquire() + paymentProcessingStartedCounter.increment() + try { + val createdEvent = paymentESService.create { + it.create(paymentId, orderId, amount) + } + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) + paymentService.submitPaymentRequest( + paymentId, + amount, + createdAt, + deadline + ) + } finally { + parallelLimiter.release() + paymentProcessingCompletedCounter.increment() + } + } -} + val transaction = Transaction(orderId, amount, paymentId, deadline, task) + + try { + paymentExecutor.execute(transaction) + return createdAt + } catch (_: RejectedExecutionException) { + throw TooManyRequestsException("Сервер перегружен. Повторите позже.") + } + } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 0abc8486e..bb6025436 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -24,4 +24,5 @@ payment.token=${PAYMENT_TOKEN} # payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} # payment.accounts=${PAYMENT_ACCOUNTS:acc-3} payment.accounts=${PAYMENT_ACCOUNTS:acc-23} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-18} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file diff --git a/test-local-run.http b/test-local-run.http index cabd61099..743f0dabd 100644 --- a/test-local-run.http +++ b/test-local-run.http @@ -5,9 +5,9 @@ Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "ratePerSecond": 2, - "testCount": 100, - "processingTimeMillis": 60000 + "ratePerSecond": 16, + "testCount": 16000, + "processingTimeMillis": 30000 } ### Stop running test to save time and resources From d10d90bc4e4eaafa4d703b675f2f82252dbdebb9 Mon Sep 17 00:00:00 2001 From: vaycheslav Date: Thu, 23 Oct 2025 12:19:19 +0300 Subject: [PATCH 8/8] fix: fixed semaphore and rate limiter positions and did some refactoring --- .../utils => apigateway}/GlobalExceptionHandler.kt | 9 +++++---- .../ru/quipy/common/utils/TooManyRequestsException.kt | 3 --- .../ru/quipy/exceptions/TooManyRequestsException.kt | 3 +++ src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt | 6 +++--- 4 files changed, 11 insertions(+), 10 deletions(-) rename src/main/kotlin/ru/quipy/{common/utils => apigateway}/GlobalExceptionHandler.kt (78%) delete mode 100644 src/main/kotlin/ru/quipy/common/utils/TooManyRequestsException.kt create mode 100644 src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt diff --git a/src/main/kotlin/ru/quipy/common/utils/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt similarity index 78% rename from src/main/kotlin/ru/quipy/common/utils/GlobalExceptionHandler.kt rename to src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt index acdc494fe..e32911ade 100644 --- a/src/main/kotlin/ru/quipy/common/utils/GlobalExceptionHandler.kt +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -1,10 +1,11 @@ -package ru.quipy.common.utils +package ru.quipy.apigateway +import org.slf4j.LoggerFactory import org.springframework.http.HttpStatus import org.springframework.http.ResponseEntity -import org.slf4j.LoggerFactory import org.springframework.web.bind.annotation.ExceptionHandler import org.springframework.web.bind.annotation.RestControllerAdvice +import ru.quipy.exceptions.TooManyRequestsException @RestControllerAdvice class GlobalExceptionHandler( @@ -15,10 +16,10 @@ class GlobalExceptionHandler( } @ExceptionHandler(TooManyRequestsException::class) - fun handleTooManyRequests(ex: TooManyRequestsException): ResponseEntity { + fun handleTooManyRequests(): ResponseEntity { return ResponseEntity .status(HttpStatus.TOO_MANY_REQUESTS) .header("Retry-After", maxWait) - .body(ex.message ?: "Повторите позже") + .build() } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/common/utils/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/common/utils/TooManyRequestsException.kt deleted file mode 100644 index d3865680c..000000000 --- a/src/main/kotlin/ru/quipy/common/utils/TooManyRequestsException.kt +++ /dev/null @@ -1,3 +0,0 @@ -package ru.quipy.common.utils - -class TooManyRequestsException(msg: String) : RuntimeException(msg) \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt new file mode 100644 index 000000000..8d887d9f7 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt @@ -0,0 +1,3 @@ +package ru.quipy.exceptions + +class TooManyRequestsException() : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index f7c286ab8..2e8320429 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -8,8 +8,8 @@ import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter -import ru.quipy.common.utils.TooManyRequestsException import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate import ru.quipy.payments.dto.Transaction import java.time.Duration @@ -59,10 +59,10 @@ class OrderPayer( paymentProcessingPlannedCounter.increment() val task = Runnable { + parallelLimiter.acquire() while (!rateLimit.tick()) { Thread.sleep(Random().nextInt(0, 10).toLong()) } - parallelLimiter.acquire() paymentProcessingStartedCounter.increment() try { val createdEvent = paymentESService.create { @@ -87,7 +87,7 @@ class OrderPayer( paymentExecutor.execute(transaction) return createdAt } catch (_: RejectedExecutionException) { - throw TooManyRequestsException("Сервер перегружен. Повторите позже.") + throw TooManyRequestsException() } } } \ No newline at end of file