diff --git a/.gitignore b/.gitignore index 259113f73..cf4961ef7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,35 +1,47 @@ -HELP.md -target/ -!.mvn/wrapper/maven-wrapper.jar -!**/src/main/**/target/ -!**/src/test/**/target/ - -### STS ### -.apt_generated -.classpath -.factorypath -.project -.settings -.springBeans -.sts4-cache - -### IntelliJ IDEA ### -.idea -*.iws -*.iml -*.ipr -./http-client.env.json - -### NetBeans ### -/nbproject/private/ - -/nbbuild/ -/dist/ -/nbdist/ -/.nb-gradle/ -build/ -!**/src/main/**/build/ -!**/src/test/**/build/ - -### VS Code ### -.vscode/ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### grafana ### +/grafana/ + +### prometheus ### + +/prometheus/ + +### confidential data ### + +http-client.env.json + + + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ + +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index cc6f2e042..3d9587cd0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ RUN mvn dependency:go-offline COPY src src RUN mvn package -FROM openjdk:17-jdk-slim +FROM eclipse-temurin:17-alpine-3.22 COPY --from=build /app/target/*.jar /high-load-course.jar diff --git a/README.md b/README.md index 56d36970f..bc19c4280 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ # Template for the HighLoad course + This project is based on [Tiny Event Sourcing library](https://github.com/andrsuh/tiny-event-sourcing) ### Run PostgreSql + This example uses Postgres as an implementation of the Event store. You can see it in `pom.xml`: ``` @@ -12,7 +14,8 @@ This example uses Postgres as an implementation of the Event store. You can see ``` -Thus, you have to run Postgres in order to test this example. Postgres service is included in `docker-compose` file that we have in the root of the project. +Thus, you have to run Postgres in order to test this example. Postgres service is included in `docker-compose` file +that we have in the root of the project. # More comprehensive information about the course, project, how to run tests is here: @@ -21,19 +24,25 @@ https://andrsuh.notion.site/2595d535059281d8a815c2cb3875c376?source=copy_link https://andrsuh.notion.site/2625d5350592801aaf88c7c95302d10c?source=copy_link ### Run the infrastructure + Set of the services you need to start developing and testing process is following: -- Bombardier - service that is in charge of emulation the store's clients activity (creates the incoming load). Also serves as a third-party payment system. + +- Bombardier - service that is in charge of emulation the store's clients activity (creates the incoming load). Also + serves as a third-party payment system. - Postgres DBMS - Prometheus + Grafana - metrics collection and visualization services You can run all beforementioned services by the following command: + ``` docker compose -f docker-compose.yml up ``` ### Run the application -To make the application run you can start the main class `OnlineShopApplication`. It is not being launched as a docker contained to simplify and speed up the devevopment process as it is easier for you to refactor the application and re-run it immediately in the IDE. +To make the application run you can start the main class `OnlineShopApplication`. It is not being launched as a docker +contained to simplify and speed up the devevopment process as it is easier for you to refactor the application and +re-run it immediately in the IDE. ### If you want to pull changes from the main repository into your fork diff --git a/grafana/provisioning/dashboards/ServicesStatistic.json b/grafana/provisioning/dashboards/ServicesStatistic.json index 684b97269..74e9740f0 100644 --- a/grafana/provisioning/dashboards/ServicesStatistic.json +++ b/grafana/provisioning/dashboards/ServicesStatistic.json @@ -3468,6 +3468,358 @@ ], "title": "Jetty Statistics", "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 7 + }, + "id": 101, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "История изменения количества задач в очереди", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Задачи в очереди", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 16, + "x": 8, + "y": 8 + }, + "id": 103, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "payment_processing_planned_total - payment_processing_started_total", + "legendFormat": "{{accountName}}", + "refId": "A" + } + ], + "title": "История задач в очереди", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Скорость входа в ожидание и выхода из ожидания (requests/sec)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Запросов/сек", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "reqps" + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Скорость увеличения очереди" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "green", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Скорость числа задач в обработке" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "blue", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 16 + }, + "id": 104, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_planned_total[1m])", + "legendFormat": "Попало в очередь: {{accountName}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_started_total[1m])", + "legendFormat": "Приняты в обработку (вышли из очереди): {{accountName}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_completed_total[1m])", + "legendFormat": "Обработаны: {{accountName}}", + "refId": "C" + } + ], + "title": "Throughput (в очереди vs в обработке vs обработаны)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Если значение > 0, задачи накапливаются быстрее, чем обрабатываются", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": true, + "axisColorMode": "text", + "axisLabel": "Δ запросов/сек", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "scheme", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "area" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 0.1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 105, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_planned_total[1m]) - rate(payment_processing_started_total[1m])", + "legendFormat": "{{accountName}}", + "refId": "A" + } + ], + "title": "Скорость накопления задач (увеличения очереди)", + "type": "timeseries" + } + ], + "title": "Payment Processing - Active Tasks", + "type": "row" } ], "preload": false, diff --git a/http-client.env.json b/http-client.env.json new file mode 100644 index 000000000..7373b4a7e --- /dev/null +++ b/http-client.env.json @@ -0,0 +1,6 @@ +{ + "vars": { + "serviceName": "cas-m3404-05", + "token": "7baFVCuQJk1b8qC1yN" + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5724b2568..e244cb6aa 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,5 @@ - 4.0.0 @@ -15,7 +15,7 @@ OnlineShop Application for resilience and highly-loaded applications course - + 2.2.0 1.9.0 4.12.0 @@ -25,7 +25,7 @@ 3.1.8 - + io.github.resilience4j resilience4j-ratelimiter @@ -67,15 +67,15 @@ ${jetty.version} - - - - + + + + - - - - + + + + com.fasterxml.jackson.module diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml index 936c7edd9..ce045546f 100644 --- a/prometheus/prometheus.yml +++ b/prometheus/prometheus.yml @@ -6,16 +6,16 @@ scrape_configs: - job_name: 'bombardier-docker-network-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['bombardier:1234'] + - targets: [ 'bombardier:1234' ] - job_name: 'bombardier-host-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:1234'] + - targets: [ 'host.docker.internal:1234' ] - job_name: 'online-store-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:8081'] + - targets: [ 'host.docker.internal:8081' ] - job_name: 'online-shop-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:18081'] \ No newline at end of file + - targets: [ 'host.docker.internal:18081' ] \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/OnlineShopApplication.kt b/src/main/kotlin/ru/quipy/OnlineShopApplication.kt index 9ac22e094..de0c6b31d 100644 --- a/src/main/kotlin/ru/quipy/OnlineShopApplication.kt +++ b/src/main/kotlin/ru/quipy/OnlineShopApplication.kt @@ -13,7 +13,7 @@ class OnlineShopApplication { val log: Logger = LoggerFactory.getLogger(OnlineShopApplication::class.java) companion object { - val appExecutor = Executors.newFixedThreadPool(64, NamedThreadFactory("main-app-executor")) + val appExecutor = Executors.newFixedThreadPool(110, NamedThreadFactory("main-app-executor")) } } diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 6f23fa18d..3da4dfbf3 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -2,23 +2,20 @@ package ru.quipy.apigateway import org.slf4j.Logger import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired import org.springframework.web.bind.annotation.* +import ru.quipy.common.utils.TokenBucketRateLimiter +import ru.quipy.exceptions.DeadlineExceededException +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer import java.util.* +import java.util.concurrent.TimeUnit @RestController -class APIController { +class APIController(private val orderRepository: OrderRepository, private val orderPayer: OrderPayer) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) - @Autowired - private lateinit var orderRepository: OrderRepository - - @Autowired - private lateinit var orderPayer: OrderPayer - @PostMapping("/users") fun createUser(@RequestBody req: CreateUserRequest): User { return User(UUID.randomUUID(), req.name) @@ -54,15 +51,30 @@ class APIController { PAID, } + private val tokenBucketRateLimiter: TokenBucketRateLimiter by lazy { + TokenBucketRateLimiter( + rate = 100, + bucketMaxCapacity = 2_000, + startBucket = 2_000, + window = 1000, + timeUnit = TimeUnit.MILLISECONDS, + ) + } + @PostMapping("/orders/{orderId}/payment") fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + + if (!tokenBucketRateLimiter.tick()) { + throw TooManyRequestsException(retryAfterMillisecond = 30) + } + + logger.info("Trying to pay order $orderId : $deadline") val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) it } ?: throw IllegalArgumentException("No such order $orderId") - val createdAt = orderPayer.processPayment(orderId, order.price, paymentId, deadline) return PaymentSubmissionDto(createdAt, paymentId) } diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt new file mode 100644 index 000000000..665e21773 --- /dev/null +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -0,0 +1,41 @@ +package ru.quipy.apigateway + +import org.slf4j.LoggerFactory +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.springframework.web.bind.annotation.ExceptionHandler +import org.springframework.web.bind.annotation.RestControllerAdvice +import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.DeadlineExceededException +import ru.quipy.exceptions.TooManyRequestsException +import ru.quipy.payments.api.PaymentAggregate +import ru.quipy.payments.logic.PaymentAggregateState +import ru.quipy.payments.logic.logProcessing +import ru.quipy.payments.logic.now +import java.util.UUID + +@RestControllerAdvice +class GlobalExceptionHandler( + private val paymentESService: EventSourcingService +) { + companion object { + val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java) + } + + @ExceptionHandler(TooManyRequestsException::class) + fun handleTooManyRequests(ex: TooManyRequestsException): ResponseEntity { + val wait = ex.retryAfterMillisecond + return ResponseEntity + .status(HttpStatus.TOO_MANY_REQUESTS) + .header("Retry-After", wait.toString()) + .build() + } + + @ExceptionHandler(DeadlineExceededException::class) + fun handleUnprocessableEntity(): ResponseEntity { + + return ResponseEntity + .status(HttpStatus.OK) + .build() + } +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt index 15920bd9c..768c71367 100644 --- a/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/FixedWindowRateLimiter.kt @@ -19,7 +19,7 @@ class FixedWindowRateLimiter( private val rate: Int, private val window: Long, private val timeUnit: TimeUnit = TimeUnit.MINUTES, -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(FixedWindowRateLimiter::class.java) private val counter = AtomicInteger(0) @@ -59,7 +59,7 @@ class SlowStartRateLimiter( private val targetRate: Int, private val timeUnit: TimeUnit = TimeUnit.MINUTES, private val slowStartOn: Boolean = true, -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(SlowStartRateLimiter::class.java) private val counter = AtomicInteger(0) @@ -108,7 +108,7 @@ class CountingRateLimiter( private val rate: Int, private val window: Long, private val timeUnit: TimeUnit = TimeUnit.SECONDS -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(CountingRateLimiter::class.java) } @@ -137,7 +137,11 @@ class CountingRateLimiter( ) } -fun makeRateLimiter(accountName: String, rate: Int, timeUnit: TimeUnit = TimeUnit.SECONDS): io.github.resilience4j.ratelimiter.RateLimiter { +fun makeRateLimiter( + accountName: String, + rate: Int, + timeUnit: TimeUnit = TimeUnit.SECONDS +): io.github.resilience4j.ratelimiter.RateLimiter { val config = RateLimiterConfig.custom() .limitRefreshPeriod(if (timeUnit == TimeUnit.SECONDS) Duration.ofSeconds(1) else Duration.ofMinutes(1)) .limitForPeriod(rate) diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index 6ff3092ab..04c6abc79 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -10,8 +10,6 @@ import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock class SlidingWindowRateLimiter( private val rate: Long, @@ -39,6 +37,23 @@ class SlidingWindowRateLimiter( } } + fun tickBlocking(timeout: Duration): Boolean { + val deadline = System.currentTimeMillis() + timeout.toMillis() + + while (System.currentTimeMillis() < deadline) { + if (tick()) { + return true + } + + val remainingTime = deadline - System.currentTimeMillis() + if (remainingTime > 0) { + Thread.sleep(minOf(10, remainingTime)) + } + } + + return false + } + data class Measure( val value: Long, val timestamp: Long @@ -64,6 +79,7 @@ class SlidingWindowRateLimiter( queue.take() } }.invokeOnCompletion { th -> if (th != null) logger.error("Rate limiter release job completed", th) } + companion object { private val logger: Logger = LoggerFactory.getLogger(SlidingWindowRateLimiter::class.java) } diff --git a/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt index a7c9547d7..2df8ec5a5 100644 --- a/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/TokenBucketRateLimiter.kt @@ -13,16 +13,17 @@ import java.util.concurrent.atomic.AtomicInteger class TokenBucketRateLimiter( private val rate: Int, private val bucketMaxCapacity: Int, + private val startBucket: Int, private val window: Long, private val timeUnit: TimeUnit = TimeUnit.MINUTES, -): RateLimiter { +) : RateLimiter { companion object { private val logger: Logger = LoggerFactory.getLogger(TokenBucketRateLimiter::class.java) } private val rateLimiterScope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher()) - private var bucket: AtomicInteger = AtomicInteger(0) + private var bucket: AtomicInteger = AtomicInteger(startBucket) private var start = System.currentTimeMillis() private var nextExpectedWakeUp = start + timeUnit.toMillis(window) diff --git a/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt b/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt index 9bcb80d07..be9861dc8 100644 --- a/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt +++ b/src/main/kotlin/ru/quipy/config/EventSourcingLibConfiguration.kt @@ -71,7 +71,8 @@ class EventSourcingLibConfiguration { val jettyServletWebServerFactory = JettyServletWebServerFactory() val c = JettyServerCustomizer { - (it.connectors[0].getConnectionFactory("h2c") as HTTP2CServerConnectionFactory).maxConcurrentStreams = 10_000_000 + (it.connectors[0].getConnectionFactory("h2c") as HTTP2CServerConnectionFactory).maxConcurrentStreams = + 10_000_000 } jettyServletWebServerFactory.serverCustomizers.add(c) diff --git a/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt new file mode 100644 index 000000000..bd37c79e2 --- /dev/null +++ b/src/main/kotlin/ru/quipy/config/RpcControlConfig.kt @@ -0,0 +1,25 @@ +package ru.quipy.config + +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import ru.quipy.common.utils.SlidingWindowRateLimiter +import ru.quipy.payments.logic.PaymentAccountProperties +import java.time.Duration +import java.util.concurrent.Semaphore + +@Configuration +class RpcControlConfig { + + @Bean + fun getSlidingWindowRateLimiter(accountProperties: PaymentAccountProperties) = + SlidingWindowRateLimiter(accountProperties.rateLimitPerSec.toLong(), + Duration.ofSeconds(1) + ) + + @Bean + @Qualifier("parallelLimiter") + fun parallelLimiter(accountProperties: PaymentAccountProperties): Semaphore { + return Semaphore(accountProperties.parallelRequests) + } +} diff --git a/src/main/kotlin/ru/quipy/exceptions/DeadlineExceededException.kt b/src/main/kotlin/ru/quipy/exceptions/DeadlineExceededException.kt new file mode 100644 index 000000000..8bd50330d --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/DeadlineExceededException.kt @@ -0,0 +1,5 @@ +package ru.quipy.exceptions + +import java.util.UUID + +class DeadlineExceededException() : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt new file mode 100644 index 000000000..6d1756c92 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt @@ -0,0 +1,3 @@ +package ru.quipy.exceptions + +class TooManyRequestsException(val retryAfterMillisecond: Long) : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt index 767f23b3e..d5d35985d 100644 --- a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt +++ b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt @@ -1,5 +1,6 @@ package ru.quipy.orders.subscribers +import io.micrometer.core.instrument.Metrics import jakarta.annotation.PostConstruct import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -19,6 +20,7 @@ class PaymentSubscriber { val logger: Logger = LoggerFactory.getLogger(PaymentSubscriber::class.java) + val paymentSucceededCounter = Metrics.counter("succeeded.payments", "account", "acc-5") @Autowired lateinit var subscriptionsManager: AggregateSubscriptionsManager @@ -42,6 +44,7 @@ class PaymentSubscriber { ).toSeconds() }, spent in queue: ${event.spentInQueueDuration.toSeconds()}" ) + paymentSucceededCounter.increment() } } } diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index eceb90cff..4d3eda6da 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -8,12 +8,16 @@ import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate -import ru.quipy.payments.logic.* +import ru.quipy.payments.logic.PaymentAccountProperties +import ru.quipy.payments.logic.PaymentAggregateState +import ru.quipy.payments.logic.PaymentExternalSystemAdapter +import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl import java.net.URI import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse import java.util.* +import java.util.concurrent.Semaphore @Configuration @@ -57,8 +61,15 @@ class PaymentAccountsConfig { it, paymentService, paymentProviderHostPort, - token + token, + Semaphore(it.parallelRequests) ) } } + + @Bean + fun getAccountProperties(accountAdapters: List): PaymentAccountProperties { + return accountAdapters.firstOrNull()?.getAccountProperties() + ?: throw IllegalStateException("No payment accounts configured") + } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt b/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt new file mode 100644 index 000000000..799949987 --- /dev/null +++ b/src/main/kotlin/ru/quipy/payments/dto/Transaction.kt @@ -0,0 +1,14 @@ +package ru.quipy.payments.dto + +import kotlinx.coroutines.Runnable +import java.util.UUID + +data class Transaction( + val orderId: UUID, + val amount: Int, + val paymentId: UUID, + val deadline: Long, + val task: Runnable +) : Runnable { + override fun run() = task.run() +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index a5909b85b..9c9701d0f 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,40 +1,52 @@ package ru.quipy.payments.logic +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.Metrics import org.slf4j.Logger import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service -import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.DeadlineExceededException +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate +import ru.quipy.payments.dto.Transaction +import java.time.Duration import java.util.* -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit +import java.util.concurrent.* @Service -class OrderPayer { +class OrderPayer( + private val paymentESService: EventSourcingService, + private val paymentService: PaymentService, + private val accountProperties: PaymentAccountProperties, + @field:Qualifier("parallelLimiter") + private val parallelLimiter: Semaphore, +) { + private val paymentProcessingPlannedCounter: Counter = + Metrics.counter("payment.processing.planned", "accountName", accountProperties.accountName) + private val paymentProcessingStartedCounter: Counter = + Metrics.counter("payment.processing.started", "accountName", accountProperties.accountName) + private val paymentProcessingCompletedCounter: Counter = + Metrics.counter("payment.processing.completed", "accountName", accountProperties.accountName) companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) } - @Autowired - private lateinit var paymentESService: EventSourcingService - - @Autowired - private lateinit var paymentService: PaymentService - - private val paymentExecutor = ThreadPoolExecutor( - 16, - 16, - 0L, - TimeUnit.MILLISECONDS, - LinkedBlockingQueue(8_000), - NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler() - ) + private val paymentExecutor: ThreadPoolExecutor by lazy { + ThreadPoolExecutor( + accountProperties.parallelRequests, + accountProperties.parallelRequests, + 0L, + TimeUnit.MILLISECONDS, + ArrayBlockingQueue(8_000), + NamedThreadFactory("payment-submission-executor"), + ThreadPoolExecutor.AbortPolicy() + ) + } fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() @@ -46,10 +58,12 @@ class OrderPayer { amount ) } - logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") + + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) } + return createdAt } } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt index 2389e3cb3..8f0d735fd 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt @@ -15,7 +15,12 @@ fun PaymentAggregateState.create(id: UUID, orderId: UUID, amount: Int): PaymentC ) } -fun PaymentAggregateState.logSubmission(success: Boolean, transactionId: UUID, startedAt: Long, spentInQueueDuration: Duration): PaymentSubmittedEvent { +fun PaymentAggregateState.logSubmission( + success: Boolean, + transactionId: UUID, + startedAt: Long, + spentInQueueDuration: Duration +): PaymentSubmittedEvent { return PaymentSubmittedEvent( this.getId(), success, this.orderId, transactionId, startedAt, spentInQueueDuration ) @@ -28,9 +33,18 @@ fun PaymentAggregateState.logProcessing( reason: String? = null ): PaymentProcessedEvent { val submittedAt = this.submissions[transactionId ?: UUID.randomUUID()]?.timeStarted ?: 0 - val spentInQueueDuration = this.submissions[transactionId ?: UUID.randomUUID()]?.spentInQueue ?: Duration.ofMillis(0) + val spentInQueueDuration = + this.submissions[transactionId ?: UUID.randomUUID()]?.spentInQueue ?: Duration.ofMillis(0) return PaymentProcessedEvent( - this.getId(), success, this.orderId, submittedAt, processedAt, this.amount!!, transactionId, reason, spentInQueueDuration + this.getId(), + success, + this.orderId, + submittedAt, + processedAt, + this.amount!!, + transactionId, + reason, + spentInQueueDuration ) } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt index ec0dd3709..ab8fcefd9 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateState.kt @@ -34,7 +34,8 @@ class PaymentAggregateState : AggregateState { @StateTransitionFunc fun paymentSubmittedApply(event: PaymentSubmittedEvent) { - submissions[event.transactionId] = PaymentSubmission(event.startedAt, event.transactionId, event.success, event.spentInQueueDuration) + submissions[event.transactionId] = + PaymentSubmission(event.startedAt, event.transactionId, event.success, event.spentInQueueDuration) updatedAt = createdAt } diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 5cb12106a..681c7f048 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -6,19 +6,23 @@ import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.RequestBody import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Qualifier +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate import java.net.SocketTimeoutException import java.time.Duration import java.util.* +import java.util.concurrent.Semaphore +import java.util.concurrent.TimeUnit - -// Advice: always treat time as a Duration class PaymentExternalSystemAdapterImpl( private val properties: PaymentAccountProperties, private val paymentESService: EventSourcingService, private val paymentProviderHostPort: String, private val token: String, + @field:Qualifier("parallelLimiter") + private val parallelLimiter: Semaphore, ) : PaymentExternalSystemAdapter { companion object { @@ -34,7 +38,16 @@ class PaymentExternalSystemAdapterImpl( private val rateLimitPerSec = properties.rateLimitPerSec private val parallelRequests = properties.parallelRequests - private val client = OkHttpClient.Builder().build() + private val slidingWindowRateLimiter = SlidingWindowRateLimiter( + rate = 120, + window = Duration.ofMillis(1_000) + ) + + private val httpClient = OkHttpClient.Builder().build() + + override fun getAccountProperties(): PaymentAccountProperties { + return properties + } override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { logger.warn("[$accountName] Submitting payment request for payment $paymentId") @@ -50,17 +63,40 @@ class PaymentExternalSystemAdapterImpl( logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") try { + val parallelLimiterTimeout = calculateRemainingTime(deadline, requestAverageProcessingTime.toMillis()) + if (parallelLimiterTimeout <= 0 || !parallelLimiter.tryAcquire(parallelLimiterTimeout, TimeUnit.MILLISECONDS)) { + + logger.warn("[$accountName] Parallel limiter timeout for payment $paymentId") + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId) + } + return + } + + val rateLimiterTimeout = calculateRemainingTime(deadline, requestAverageProcessingTime.toMillis()) + if (rateLimiterTimeout <= 0 || !slidingWindowRateLimiter.tickBlocking(Duration.ofMillis(rateLimiterTimeout))) { + + logger.warn("[$accountName] Rate limiter timeout for payment $paymentId") + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId) + } + return + } + val request = Request.Builder().run { url("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount") post(emptyBody) - }.build() + build() + } - client.newCall(request).execute().use { response -> + httpClient.newCall(request).execute().use { response -> val body = try { - mapper.readValue(response.body?.string(), ExternalSysResponse::class.java) + response.body?.string()?.let { + mapper.readValue(it, ExternalSysResponse::class.java) + } ?: ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, "Empty response body") } catch (e: Exception) { logger.error("[$accountName] [ERROR] Payment processed for txId: $transactionId, payment: $paymentId, result code: ${response.code}, reason: ${response.body?.string()}") - ExternalSysResponse(transactionId.toString(), paymentId.toString(),false, e.message) + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, e.message ?: "Unknown error") } logger.warn("[$accountName] Payment processed for txId: $transactionId, payment: $paymentId, succeeded: ${body.result}, message: ${body.message}") @@ -76,18 +112,20 @@ class PaymentExternalSystemAdapterImpl( is SocketTimeoutException -> { logger.error("[$accountName] Payment timeout for txId: $transactionId, payment: $paymentId", e) paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, reason = "Request timeout.") + it.logProcessing(false, now(), transactionId, reason = "HTTP request timeout") } } - else -> { logger.error("[$accountName] Payment failed for txId: $transactionId, payment: $paymentId", e) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, reason = e.message) + it.logProcessing(false, now(), transactionId, reason = e.message ?: "Unknown error") } } } + } finally { + if (parallelLimiter.availablePermits() < parallelRequests) { + parallelLimiter.release() + } } } @@ -97,6 +135,10 @@ class PaymentExternalSystemAdapterImpl( override fun name() = properties.accountName + private fun calculateRemainingTime(deadline: Long, requestAverageProcessingTime: Long): Long { + return deadline - now() - (requestAverageProcessingTime * 0.01).toLong() + } + } public fun now() = System.currentTimeMillis() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt index 255db77dd..2907c7706 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt @@ -17,6 +17,8 @@ interface PaymentService { */ interface PaymentExternalSystemAdapter { + fun getAccountProperties(): PaymentAccountProperties + fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) fun name(): String @@ -32,13 +34,21 @@ interface PaymentExternalSystemAdapter { data class PaymentAccountProperties( val serviceName: String, val accountName: String, - val parallelRequests: Int, - val rateLimitPerSec: Int, - val price: Int, + val parallelRequests: Int, // 30 + val rateLimitPerSec: Int, // 10 + val price: Int, // 30 val averageProcessingTime: Duration = Duration.ofSeconds(11), val enabled: Boolean, ) +/* +#- parallelRequests=5 - означает, что провайдер разрешает вам в любой момент времени иметь не более 5 одновременных запросов от вас к нему для этого аккаунта +#- rateLimitPerSec=5 - означает, что провайдер разрешает вам каждую секунду отправлять к нему не более 5 запросов по этому аккаунту +#- price=30 - означает, что провайдер оплаты будет взымать за каждый успешный или неуспешный вызов 30 денежных единиц с вашего магазина. +#- averageProcessingTime=PTO.05S - провайдер оплаты сообщает вам, что в среднем время обработки одного запроса по этому аккаунту будет составлять около 50ms. + + */ + /** * Describes response from external service. */ diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt index 1c24e5a72..7aad9b847 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt @@ -1,16 +1,8 @@ package ru.quipy.payments.logic import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import ru.quipy.common.utils.NamedThreadFactory -import ru.quipy.core.EventSourcingService -import ru.quipy.payments.api.PaymentAggregate -import java.time.Duration import java.util.* -import java.util.concurrent.Executors -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock @Service diff --git a/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt b/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt index 033b8a7b3..e4d8a4eaf 100644 --- a/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt +++ b/src/main/kotlin/ru/quipy/payments/subscribers/PaymentTransactionsSubscriber.kt @@ -1,5 +1,6 @@ package ru.quipy.payments.subscribers +import jakarta.annotation.PostConstruct import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired @@ -12,7 +13,6 @@ import ru.quipy.streams.annotation.RetryFailedStrategy import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList -import jakarta.annotation.PostConstruct @Service class PaymentTransactionsSubscriber { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 33d51a58b..43f6ae0ed 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,29 +2,27 @@ server.address=0.0.0.0 server.port=8081 server.http2.enabled=true spring.main.allow-bean-definition-overriding=true - # MongoDB properties spring.data.mongodb.host=localhost spring.data.mongodb.port=27017 spring.data.mongodb.database=online-shop - # Tiny event sourcing library properties event.sourcing.auto-scan-enabled=true event.sourcing.scan-package=ru.quipy event.sourcing.snapshots-enabled=false event.sourcing.sagas-enabled=false - # Postgres event store properties spring.datasource.hikari.jdbc-url=jdbc:postgresql://${POSTGRES_ADDRESS:localhost}:${POSTGRES_PORT:65432}/postgres spring.datasource.hikari.username=tiny_es spring.datasource.hikari.password=tiny_es spring.datasource.hikari.leak-detection-threshold=2000 - management.metrics.web.server.request.autotime.percentiles=0.95 management.metrics.export.prometheus.enabled=true management.endpoints.web.exposure.include=info,health,prometheus,metrics - payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} -payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-3} +payment.accounts=${PAYMENT_ACCOUNTS:acc-9} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-18} payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file diff --git a/test-local-run.http b/test-local-run.http index 7be0e4f73..3fd5cff2e 100644 --- a/test-local-run.http +++ b/test-local-run.http @@ -1,15 +1,36 @@ -### Run test locally +### Test 8 POST http://localhost:1234/test/run Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "ratePerSecond": 1, - "testCount": 100, - "processingTimeMillis": 80000 + "ratePerSecond": 100, + "testCount": 5000, + "processingTimeMillis": 20000 } ### Stop running test to save time and resources # @timeout 120 -POST http://localhost:1234/test/stop/{{serviceName}} \ No newline at end of file +POST http://localhost:1234/test/stop/{{serviceName}} + + + +# Аккаунт: "acc-9" +# +# Тест: +# { +# "ratePerSecond": 100, +# "testCount": 5000, +# "processingTimeMillis": 20000 +# } + +# PaymentAccountProperties( +# serviceName=cas-m3404-05, +# accountName=acc-9, +# parallelRequests=50, +# rateLimitPerSec=120, +# price=30, +# averageProcessingTime=PT0.5S, +# enabled=true +# ) diff --git a/test-on-prem-run.http b/test-on-prem-run.http index 584edc0b5..19f2bb9bb 100644 --- a/test-on-prem-run.http +++ b/test-on-prem-run.http @@ -6,11 +6,11 @@ Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "branch": "main", - "accounts": "acc-3", + "branch": "feature/kuro-lab-5-2", + "accounts": "acc-23", "ratePerSecond": 11, - "testCount": 1200, - "processingTimeMillis": 80000, + "testCount": 2200, + "processingTimeMillis": 13000, "onPremises": true }