From 9de602698d02fb18ad46dca41bb11237fcc64243 Mon Sep 17 00:00:00 2001 From: Deiwin Sarjas Date: Tue, 5 May 2026 13:47:24 +0300 Subject: [PATCH] Retry throttling errors on XML/Query services via service parsers Threads the service-specific operation parser into the request logic so XML throttling errors (SES, SQS, SNS) can now trigger retries. Previously the JSON-only `client_error` fallthrough returned `{:error, ...}` immediately for XML bodies, skipping retries. An alternative would be to implement XML parsing within the request logic, but that would a) duplicate the logic that's already there and b) set a (potentially optional) requirement on an XML library directly in the main library here. The old cond block in `Query.perform` that silently passed through unrecognized parser arities is removed and arity 0 or 4+ now crashes loudly. This is not strictly necessary but seemed a more sensible approach. Addresses an issue similar to that raised in #300. --- CHANGELOG.md | 3 + lib/ex_aws/operation/query.ex | 28 +++--- lib/ex_aws/operation/rest_query.ex | 16 +++- lib/ex_aws/request.ex | 86 +++++++++++++++--- test/ex_aws/operation/query_test.exs | 128 +++++++++++++++++++++++++++ test/ex_aws/request_test.exs | 123 ++++++++++++++++++++++++- 6 files changed, 355 insertions(+), 29 deletions(-) create mode 100644 test/ex_aws/operation/query_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index d84467555..8f5549a7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +Unreleased +- Retry throttling errors on XML/Query services (SES, SQS, SNS) + v2.7.0 (2026-03-04) - Upgrade hackney to 3.x - Correctly handle pod identity authorization when using Req adapter diff --git a/lib/ex_aws/operation/query.ex b/lib/ex_aws/operation/query.ex index 804fd4b7b..f5ebd3e1b 100644 --- a/lib/ex_aws/operation/query.ex +++ b/lib/ex_aws/operation/query.ex @@ -38,23 +38,23 @@ defimpl ExAws.Operation, for: ExAws.Operation.Query do {"content-encoding", operation.content_encoding} ] - result = - ExAws.Request.request(:post, url, data, headers, config, operation.service) - |> ExAws.Request.default_aws_error() + parser = wrap_parser(operation.parser, operation.action, config) - parser = operation.parser + ExAws.Request.request(:post, url, data, headers, config, operation.service, + operation_parser: parser + ) + |> ExAws.Request.default_aws_error() + |> parser.() + end - cond do - is_function(parser, 2) -> - parser.(result, operation.action) + def stream!(_, _), do: nil - is_function(parser, 3) -> - parser.(result, operation.action, config) + defp wrap_parser(parser, _action, _config) when is_function(parser, 1), + do: parser - true -> - result - end - end + defp wrap_parser(parser, action, _config) when is_function(parser, 2), + do: fn result -> parser.(result, action) end - def stream!(_, _), do: nil + defp wrap_parser(parser, action, config) when is_function(parser, 3), + do: fn result -> parser.(result, action, config) end end diff --git a/lib/ex_aws/operation/rest_query.ex b/lib/ex_aws/operation/rest_query.ex index eb2cd850c..fe803b582 100644 --- a/lib/ex_aws/operation/rest_query.ex +++ b/lib/ex_aws/operation/rest_query.ex @@ -18,18 +18,30 @@ defimpl ExAws.Operation, for: ExAws.Operation.RestQuery do headers = config[:headers] || [] url = ExAws.Request.Url.build(operation, config) + parser = wrap_parser(operation.parser, operation.action, config) + ExAws.Request.request( operation.http_method, url, operation.body, headers, config, - operation.service + operation.service, + operation_parser: parser ) |> ExAws.Request.default_aws_error() - |> operation.parser.(operation.action) + |> parser.() end + defp wrap_parser(parser, _action, _config) when is_function(parser, 1), + do: parser + + defp wrap_parser(parser, action, _config) when is_function(parser, 2), + do: fn result -> parser.(result, action) end + + defp wrap_parser(parser, action, config) when is_function(parser, 3), + do: fn result -> parser.(result, action, config) end + def stream!(%ExAws.Operation.RestQuery{stream_builder: nil}, _) do raise ArgumentError, """ This operation does not support streaming! diff --git a/lib/ex_aws/request.ex b/lib/ex_aws/request.ex index b783f2e2c..41ff48ded 100644 --- a/lib/ex_aws/request.ex +++ b/lib/ex_aws/request.ex @@ -9,7 +9,7 @@ defmodule ExAws.Request do @type error_t :: {:error, {:http_error, http_status, binary}} @type response_t :: success_t | error_t - def request(http_method, url, data, headers, config, service) do + def request(http_method, url, data, headers, config, service, opts \\ []) do body = case data do [] -> "{}" @@ -17,13 +17,42 @@ defmodule ExAws.Request do _ -> config[:json_codec].encode!(data) end - request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1}) + request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1}, opts) end - def request_and_retry(_method, _url, _service, _config, _headers, _req_body, {:error, reason}), - do: {:error, reason} - - def request_and_retry(method, url, service, config, headers, req_body, {:attempt, attempt}) do + def request_and_retry( + method, + url, + service, + config, + headers, + req_body, + attempt_or_error, + opts \\ [] + ) + + def request_and_retry( + _method, + _url, + _service, + _config, + _headers, + _req_body, + {:error, reason}, + _opts + ), + do: {:error, reason} + + def request_and_retry( + method, + url, + service, + config, + headers, + req_body, + {:attempt, attempt}, + opts + ) do full_headers = ExAws.Auth.headers(method, url, service, config, headers, req_body) with {:ok, full_headers} <- full_headers do @@ -45,7 +74,7 @@ defmodule ExAws.Request do {:error, {:http_error, status, "redirected"}} {:ok, %{status_code: status} = resp} when status in 400..499 -> - case client_error(resp, config[:json_codec]) do + case client_error(resp, config[:json_codec], opts) do {:retry, reason} -> request_and_retry( method, @@ -54,7 +83,8 @@ defmodule ExAws.Request do config, headers, req_body, - attempt_again?(attempt, reason, :client, config) + attempt_again?(attempt, reason, :client, config), + opts ) {:error, reason} -> @@ -72,7 +102,8 @@ defmodule ExAws.Request do config, headers, req_body, - attempt_again?(attempt, reason, :server, config) + attempt_again?(attempt, reason, :server, config), + opts ) {:error, reason_struct} -> @@ -93,7 +124,8 @@ defmodule ExAws.Request do config, headers, req_body, - attempt_again?(attempt, reason, :other, config) + attempt_again?(attempt, reason, :other, config), + opts ) end end @@ -146,7 +178,9 @@ defmodule ExAws.Request do defp extract_error({:error, error}), do: error defp extract_error(error), do: error - def client_error(%{status_code: status, body: body} = error, json_codec) do + def client_error(resp, json_codec, opts \\ []) + + def client_error(%{status_code: status, body: body} = error, json_codec, opts) do case json_codec.decode(body) do {:ok, %{"__type" => error_type, "message" => message} = err} -> handle_error(error_type, message, status, err) @@ -159,12 +193,40 @@ defmodule ExAws.Request do _ -> {:error, {:http_error, status, error}} end + |> try_operation_parser_for_retries(opts[:operation_parser]) end - def client_error(%{status_code: status} = error, _) do + def client_error(%{status_code: status} = error, _, _) do {:error, {:http_error, status, error}} end + @retryable_error_codes [ + "RequestThrottled", + "Throttling" + ] + + # Some operation parsers are able to detect retryable states even when the + # JSON codec based detection above does not work. Running the repsonse + # through the parser to check if we can detect a retryable response this way. + # The original response is returned in all other cases to avoid affecting the + # flow unless a retry opportunity is detected. + defp try_operation_parser_for_retries({:error, {:http_error, status, error}} = response, parser) + when is_function(parser, 1) do + case parser.(response) do + {:error, {:http_error, ^status, %{code: code}}} when code in @retryable_error_codes -> + {:retry, {:http_error, status, error}} + + _ -> + response + end + rescue + _ -> response + catch + _, _ -> response + end + + defp try_operation_parser_for_retries(response, _), do: response + def handle_aws_error({"ProvisionedThroughputExceededException" = type, message, _}) do {:retry, {type, message}} end diff --git a/test/ex_aws/operation/query_test.exs b/test/ex_aws/operation/query_test.exs new file mode 100644 index 000000000..b41044564 --- /dev/null +++ b/test/ex_aws/operation/query_test.exs @@ -0,0 +1,128 @@ +defmodule ExAws.Operation.QueryTest do + use ExUnit.Case, async: false + alias ExAws.JSON.JSX + import Mox + + defmodule TestParser do + use ExAws.Operation.Query.Parser + + def parse({:ok, _} = result, _action), do: result + end + + setup do + {:ok, + config: %{ + http_client: ExAws.Request.HttpMock, + json_codec: JSX, + access_key_id: "AKIAIOSFODNN7EXAMPLE", + secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + region: "us-east-1", + host: "email.us-east-1.amazonaws.com", + port: 443, + scheme: "https://", + normalize_path: true, + retries: [ + max_attempts: 5, + base_backoff_in_ms: 1, + max_backoff_in_ms: 20 + ] + }} + end + + test "Query.perform retries XML throttling errors using the operation parser", context do + throttled = """ + + + Sender + Throttling + Rate exceeded + + + """ + + success = """ + + + abc123 + + + """ + + ExAws.Request.HttpMock + |> expect(:request, 2, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: throttled}} + end) + |> expect(:request, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 200, body: success, headers: []}} + end) + + operation = %ExAws.Operation.Query{ + path: "/", + params: %{"Action" => "SendCustomVerificationEmail"}, + service: :ses, + action: :send_custom_verification_email, + parser: &TestParser.parse/2 + } + + assert {:ok, %{body: ^success}} = ExAws.Operation.perform(operation, context[:config]) + end + + test "Query.perform retries RequestThrottled errors using the operation parser", context do + throttled = """ + + + Sender + RequestThrottled + Request is throttled. + + + """ + + success = """ + + + msg123 + + + """ + + ExAws.Request.HttpMock + |> expect(:request, 2, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: throttled}} + end) + |> expect(:request, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 200, body: success, headers: []}} + end) + + operation = %ExAws.Operation.Query{ + path: "/", + params: %{"Action" => "SendMessage"}, + service: :sqs, + action: :send_message, + parser: &TestParser.parse/2 + } + + assert {:ok, %{body: ^success}} = ExAws.Operation.perform(operation, context[:config]) + end + + test "Query.perform surfaces non-retryable XML errors via the operation parser", context do + invalid = + "SenderInvalidParameterValuebad" + + ExAws.Request.HttpMock + |> expect(:request, 1, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: invalid}} + end) + + operation = %ExAws.Operation.Query{ + path: "/", + params: %{"Action" => "SendCustomVerificationEmail"}, + service: :ses, + action: :send_custom_verification_email, + parser: &TestParser.parse/2 + } + + assert {:error, {:http_error, 400, %{code: "InvalidParameterValue"}}} = + ExAws.Operation.perform(operation, context[:config]) + end +end diff --git a/test/ex_aws/request_test.exs b/test/ex_aws/request_test.exs index d01687362..137797190 100644 --- a/test/ex_aws/request_test.exs +++ b/test/ex_aws/request_test.exs @@ -268,6 +268,97 @@ defmodule ExAws.RequestTest do ) end + test "operation_parser triggers retry when it returns a retryable XML code", context do + TelemetryHelper.attach_telemetry([:ex_aws, :request]) + success = mock_xml_throttling_response(2) + + parser = fn + {:error, {:http_error, status, %{body: body}}} -> + code = + if String.contains?(body, "Throttling"), + do: "Throttling", + else: "Unknown" + + {:error, {:http_error, status, %{code: code, message: "parsed"}}} + + other -> + other + end + + assert {:ok, %{body: ^success, status_code: 200}} = + ExAws.Request.request_and_retry( + :post, + "https://email.us-east-1.amazonaws.com/", + :ses, + context[:config], + context[:headers], + "", + {:attempt, 1}, + operation_parser: parser + ) + + assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 1}} + assert_receive {[:ex_aws, :request, :stop], %{duration: _}, %{attempt: 1, result: :error}} + assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 2}} + assert_receive {[:ex_aws, :request, :stop], %{duration: _}, %{attempt: 2, result: :error}} + assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 3}} + assert_receive {[:ex_aws, :request, :stop], %{duration: _}, %{attempt: 3, result: :ok}} + end + + test "operation_parser does not trigger retry for non-retryable XML codes", context do + TelemetryHelper.attach_telemetry([:ex_aws, :request]) + + xml = + "SenderInvalidParameterValuebad" + + ExAws.Request.HttpMock + |> expect(:request, 1, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: xml}} + end) + + parser = fn {:error, {:http_error, status, _}} -> + {:error, {:http_error, status, %{code: "InvalidParameterValue", message: "bad"}}} + end + + assert {:error, {:http_error, 400, %{body: ^xml, status_code: 400}}} = + ExAws.Request.request_and_retry( + :post, + "https://email.us-east-1.amazonaws.com/", + :ses, + context[:config], + context[:headers], + "", + {:attempt, 1}, + operation_parser: parser + ) + + assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 1}} + refute_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 2}} + end + + test "operation_parser that crashes falls back to a non-retry error", context do + xml = "" + + ExAws.Request.HttpMock + |> expect(:request, 1, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: xml}} + end) + + parser = fn _ -> raise "boom" end + + assert {:error, {:http_error, 400, %{body: ^xml, status_code: 400}}} = + ExAws.Request.request_and_retry( + :post, + "https://email.us-east-1.amazonaws.com/", + :ses, + context[:config], + context[:headers], + "", + {:attempt, 1}, + operation_parser: parser + ) + end + test "TooManyRequestsException is retried", context do TelemetryHelper.attach_telemetry([:ex_aws, :request]) success = mock_too_many_requests_exception(3) @@ -318,7 +409,37 @@ defmodule ExAws.RequestTest do success end - def mock_too_many_requests_exception(success_after_retries) do + defp mock_xml_throttling_response(success_after_retries) do + exception = """ + + + Sender + Throttling + Rate exceeded + + + """ + + success = """ + + + abc123 + + + """ + + ExAws.Request.HttpMock + |> expect(:request, success_after_retries, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: exception}} + end) + |> expect(:request, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 200, body: success}} + end) + + success + end + + defp mock_too_many_requests_exception(success_after_retries) do exception = "{\"__type\":\"TooManyRequestsException\",\"message\":\"Too many requests\"}" success =