diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt new file mode 100644 index 000000000..e32911ade --- /dev/null +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -0,0 +1,25 @@ +package ru.quipy.apigateway + +import org.slf4j.LoggerFactory +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.springframework.web.bind.annotation.ExceptionHandler +import org.springframework.web.bind.annotation.RestControllerAdvice +import ru.quipy.exceptions.TooManyRequestsException + +@RestControllerAdvice +class GlobalExceptionHandler( + private val maxWait: String = "3", +) { + companion object { + val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java) + } + + @ExceptionHandler(TooManyRequestsException::class) + fun handleTooManyRequests(): ResponseEntity { + return ResponseEntity + .status(HttpStatus.TOO_MANY_REQUESTS) + .header("Retry-After", maxWait) + .build() + } +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt index 805399a17..bd37c79e2 100644 --- a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -13,8 +13,7 @@ class RpcControlConfig { @Bean fun getSlidingWindowRateLimiter(accountProperties: PaymentAccountProperties) = - SlidingWindowRateLimiter( - accountProperties.rateLimitPerSec.toLong(), + SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong(), Duration.ofSeconds(1) ) diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt new file mode 100644 index 000000000..8d887d9f7 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt @@ -0,0 +1,3 @@ +package ru.quipy.exceptions + +class TooManyRequestsException() : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt b/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt new file mode 100644 index 000000000..ce491998a --- /dev/null +++ b/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt @@ -0,0 +1,8 @@ +package ru.quipy.payments.dto + +import kotlinx.coroutines.Runnable +import java.util.UUID + +data class Transaction(val orderId: UUID, val amount: Int, val paymentId: UUID, val deadline: Long, val task : Runnable) : Runnable { + override fun run() = task.run() +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index c30a5d160..2e8320429 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -6,18 +6,15 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service -import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate +import ru.quipy.payments.dto.Transaction import java.time.Duration import java.util.* -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.Semaphore -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit -import kotlin.random.Random +import java.util.concurrent.* @Service class OrderPayer( @@ -27,10 +24,12 @@ class OrderPayer( @field:Qualifier("parallelLimiter") private val parallelLimiter: Semaphore, ) { - - private val paymentProcessingPlannedCounter: Counter = Metrics.counter("payment.processing.planned", "accountName", accountProperties.accountName) - private val paymentProcessingStartedCounter: Counter = Metrics.counter("payment.processing.started", "accountName", accountProperties.accountName) - private val paymentProcessingCompletedCounter: Counter = Metrics.counter("payment.processing.completed", "accountName", accountProperties.accountName) + private val paymentProcessingPlannedCounter: Counter = + Metrics.counter("payment.processing.planned", "accountName", accountProperties.accountName) + private val paymentProcessingStartedCounter: Counter = + Metrics.counter("payment.processing.started", "accountName", accountProperties.accountName) + private val paymentProcessingCompletedCounter: Counter = + Metrics.counter("payment.processing.completed", "accountName", accountProperties.accountName) companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) @@ -42,9 +41,9 @@ class OrderPayer( accountProperties.parallelRequests, 0L, TimeUnit.MILLISECONDS, - LinkedBlockingQueue(accountProperties.parallelRequests * 10), + ArrayBlockingQueue(accountProperties.parallelRequests), NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler() + ThreadPoolExecutor.AbortPolicy() ) } @@ -57,32 +56,38 @@ class OrderPayer( fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() - paymentProcessingPlannedCounter.increment() - parallelLimiter.acquire() - return try { + val task = Runnable { + parallelLimiter.acquire() while (!rateLimit.tick()) { - Thread.sleep(Random.nextLong(0, 100)) + Thread.sleep(Random().nextInt(0, 10).toLong()) } - - paymentExecutor.submit { - paymentProcessingStartedCounter.increment() - try { - val createdEvent = paymentESService.create { - it.create(paymentId, orderId, amount) - } - logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) - paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) - } finally { - parallelLimiter.release() - paymentProcessingCompletedCounter.increment() + paymentProcessingStartedCounter.increment() + try { + val createdEvent = paymentESService.create { + it.create(paymentId, orderId, amount) } + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) + paymentService.submitPaymentRequest( + paymentId, + amount, + createdAt, + deadline + ) + } finally { + parallelLimiter.release() + paymentProcessingCompletedCounter.increment() } - createdAt - } catch (e: Exception) { - parallelLimiter.release() - throw e + } + + val transaction = Transaction(orderId, amount, paymentId, deadline, task) + + try { + paymentExecutor.execute(transaction) + return createdAt + } catch (_: RejectedExecutionException) { + throw TooManyRequestsException() } } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 06f5f41a1..bb6025436 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -23,5 +23,6 @@ payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} # payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} # payment.accounts=${PAYMENT_ACCOUNTS:acc-3} -payment.accounts=${PAYMENT_ACCOUNTS:acc-5} +payment.accounts=${PAYMENT_ACCOUNTS:acc-23} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-18} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file diff --git a/test-local-run.http b/test-local-run.http index cabd61099..743f0dabd 100644 --- a/test-local-run.http +++ b/test-local-run.http @@ -5,9 +5,9 @@ Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "ratePerSecond": 2, - "testCount": 100, - "processingTimeMillis": 60000 + "ratePerSecond": 16, + "testCount": 16000, + "processingTimeMillis": 30000 } ### Stop running test to save time and resources