diff --git a/CHANGELOG.md b/CHANGELOG.md index d8446755..8f5549a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +Unreleased +- Retry throttling errors on XML/Query services (SES, SQS, SNS) + v2.7.0 (2026-03-04) - Upgrade hackney to 3.x - Correctly handle pod identity authorization when using Req adapter diff --git a/lib/ex_aws/operation/query.ex b/lib/ex_aws/operation/query.ex index 804fd4b7..f5ebd3e1 100644 --- a/lib/ex_aws/operation/query.ex +++ b/lib/ex_aws/operation/query.ex @@ -38,23 +38,23 @@ defimpl ExAws.Operation, for: ExAws.Operation.Query do {"content-encoding", operation.content_encoding} ] - result = - ExAws.Request.request(:post, url, data, headers, config, operation.service) - |> ExAws.Request.default_aws_error() + parser = wrap_parser(operation.parser, operation.action, config) - parser = operation.parser + ExAws.Request.request(:post, url, data, headers, config, operation.service, + operation_parser: parser + ) + |> ExAws.Request.default_aws_error() + |> parser.() + end - cond do - is_function(parser, 2) -> - parser.(result, operation.action) + def stream!(_, _), do: nil - is_function(parser, 3) -> - parser.(result, operation.action, config) + defp wrap_parser(parser, _action, _config) when is_function(parser, 1), + do: parser - true -> - result - end - end + defp wrap_parser(parser, action, _config) when is_function(parser, 2), + do: fn result -> parser.(result, action) end - def stream!(_, _), do: nil + defp wrap_parser(parser, action, config) when is_function(parser, 3), + do: fn result -> parser.(result, action, config) end end diff --git a/lib/ex_aws/operation/rest_query.ex b/lib/ex_aws/operation/rest_query.ex index eb2cd850..fe803b58 100644 --- a/lib/ex_aws/operation/rest_query.ex +++ b/lib/ex_aws/operation/rest_query.ex @@ -18,18 +18,30 @@ defimpl ExAws.Operation, for: ExAws.Operation.RestQuery do headers = config[:headers] || [] url = ExAws.Request.Url.build(operation, config) + parser = wrap_parser(operation.parser, operation.action, config) + ExAws.Request.request( operation.http_method, url, operation.body, headers, config, - operation.service + operation.service, + operation_parser: parser ) |> ExAws.Request.default_aws_error() - |> operation.parser.(operation.action) + |> parser.() end + defp wrap_parser(parser, _action, _config) when is_function(parser, 1), + do: parser + + defp wrap_parser(parser, action, _config) when is_function(parser, 2), + do: fn result -> parser.(result, action) end + + defp wrap_parser(parser, action, config) when is_function(parser, 3), + do: fn result -> parser.(result, action, config) end + def stream!(%ExAws.Operation.RestQuery{stream_builder: nil}, _) do raise ArgumentError, """ This operation does not support streaming! diff --git a/lib/ex_aws/request.ex b/lib/ex_aws/request.ex index b783f2e2..41ff48de 100644 --- a/lib/ex_aws/request.ex +++ b/lib/ex_aws/request.ex @@ -9,7 +9,7 @@ defmodule ExAws.Request do @type error_t :: {:error, {:http_error, http_status, binary}} @type response_t :: success_t | error_t - def request(http_method, url, data, headers, config, service) do + def request(http_method, url, data, headers, config, service, opts \\ []) do body = case data do [] -> "{}" @@ -17,13 +17,42 @@ defmodule ExAws.Request do _ -> config[:json_codec].encode!(data) end - request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1}) + request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1}, opts) end - def request_and_retry(_method, _url, _service, _config, _headers, _req_body, {:error, reason}), - do: {:error, reason} - - def request_and_retry(method, url, service, config, headers, req_body, {:attempt, attempt}) do + def request_and_retry( + method, + url, + service, + config, + headers, + req_body, + attempt_or_error, + opts \\ [] + ) + + def request_and_retry( + _method, + _url, + _service, + _config, + _headers, + _req_body, + {:error, reason}, + _opts + ), + do: {:error, reason} + + def request_and_retry( + method, + url, + service, + config, + headers, + req_body, + {:attempt, attempt}, + opts + ) do full_headers = ExAws.Auth.headers(method, url, service, config, headers, req_body) with {:ok, full_headers} <- full_headers do @@ -45,7 +74,7 @@ defmodule ExAws.Request do {:error, {:http_error, status, "redirected"}} {:ok, %{status_code: status} = resp} when status in 400..499 -> - case client_error(resp, config[:json_codec]) do + case client_error(resp, config[:json_codec], opts) do {:retry, reason} -> request_and_retry( method, @@ -54,7 +83,8 @@ defmodule ExAws.Request do config, headers, req_body, - attempt_again?(attempt, reason, :client, config) + attempt_again?(attempt, reason, :client, config), + opts ) {:error, reason} -> @@ -72,7 +102,8 @@ defmodule ExAws.Request do config, headers, req_body, - attempt_again?(attempt, reason, :server, config) + attempt_again?(attempt, reason, :server, config), + opts ) {:error, reason_struct} -> @@ -93,7 +124,8 @@ defmodule ExAws.Request do config, headers, req_body, - attempt_again?(attempt, reason, :other, config) + attempt_again?(attempt, reason, :other, config), + opts ) end end @@ -146,7 +178,9 @@ defmodule ExAws.Request do defp extract_error({:error, error}), do: error defp extract_error(error), do: error - def client_error(%{status_code: status, body: body} = error, json_codec) do + def client_error(resp, json_codec, opts \\ []) + + def client_error(%{status_code: status, body: body} = error, json_codec, opts) do case json_codec.decode(body) do {:ok, %{"__type" => error_type, "message" => message} = err} -> handle_error(error_type, message, status, err) @@ -159,12 +193,40 @@ defmodule ExAws.Request do _ -> {:error, {:http_error, status, error}} end + |> try_operation_parser_for_retries(opts[:operation_parser]) end - def client_error(%{status_code: status} = error, _) do + def client_error(%{status_code: status} = error, _, _) do {:error, {:http_error, status, error}} end + @retryable_error_codes [ + "RequestThrottled", + "Throttling" + ] + + # Some operation parsers are able to detect retryable states even when the + # JSON codec based detection above does not work. Running the repsonse + # through the parser to check if we can detect a retryable response this way. + # The original response is returned in all other cases to avoid affecting the + # flow unless a retry opportunity is detected. + defp try_operation_parser_for_retries({:error, {:http_error, status, error}} = response, parser) + when is_function(parser, 1) do + case parser.(response) do + {:error, {:http_error, ^status, %{code: code}}} when code in @retryable_error_codes -> + {:retry, {:http_error, status, error}} + + _ -> + response + end + rescue + _ -> response + catch + _, _ -> response + end + + defp try_operation_parser_for_retries(response, _), do: response + def handle_aws_error({"ProvisionedThroughputExceededException" = type, message, _}) do {:retry, {type, message}} end diff --git a/test/ex_aws/operation/query_test.exs b/test/ex_aws/operation/query_test.exs new file mode 100644 index 00000000..b4104456 --- /dev/null +++ b/test/ex_aws/operation/query_test.exs @@ -0,0 +1,128 @@ +defmodule ExAws.Operation.QueryTest do + use ExUnit.Case, async: false + alias ExAws.JSON.JSX + import Mox + + defmodule TestParser do + use ExAws.Operation.Query.Parser + + def parse({:ok, _} = result, _action), do: result + end + + setup do + {:ok, + config: %{ + http_client: ExAws.Request.HttpMock, + json_codec: JSX, + access_key_id: "AKIAIOSFODNN7EXAMPLE", + secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + region: "us-east-1", + host: "email.us-east-1.amazonaws.com", + port: 443, + scheme: "https://", + normalize_path: true, + retries: [ + max_attempts: 5, + base_backoff_in_ms: 1, + max_backoff_in_ms: 20 + ] + }} + end + + test "Query.perform retries XML throttling errors using the operation parser", context do + throttled = """ + + + Sender + Throttling + Rate exceeded + + + """ + + success = """ + + + abc123 + + + """ + + ExAws.Request.HttpMock + |> expect(:request, 2, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: throttled}} + end) + |> expect(:request, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 200, body: success, headers: []}} + end) + + operation = %ExAws.Operation.Query{ + path: "/", + params: %{"Action" => "SendCustomVerificationEmail"}, + service: :ses, + action: :send_custom_verification_email, + parser: &TestParser.parse/2 + } + + assert {:ok, %{body: ^success}} = ExAws.Operation.perform(operation, context[:config]) + end + + test "Query.perform retries RequestThrottled errors using the operation parser", context do + throttled = """ + + + Sender + RequestThrottled + Request is throttled. + + + """ + + success = """ + + + msg123 + + + """ + + ExAws.Request.HttpMock + |> expect(:request, 2, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: throttled}} + end) + |> expect(:request, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 200, body: success, headers: []}} + end) + + operation = %ExAws.Operation.Query{ + path: "/", + params: %{"Action" => "SendMessage"}, + service: :sqs, + action: :send_message, + parser: &TestParser.parse/2 + } + + assert {:ok, %{body: ^success}} = ExAws.Operation.perform(operation, context[:config]) + end + + test "Query.perform surfaces non-retryable XML errors via the operation parser", context do + invalid = + "SenderInvalidParameterValuebad" + + ExAws.Request.HttpMock + |> expect(:request, 1, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: invalid}} + end) + + operation = %ExAws.Operation.Query{ + path: "/", + params: %{"Action" => "SendCustomVerificationEmail"}, + service: :ses, + action: :send_custom_verification_email, + parser: &TestParser.parse/2 + } + + assert {:error, {:http_error, 400, %{code: "InvalidParameterValue"}}} = + ExAws.Operation.perform(operation, context[:config]) + end +end diff --git a/test/ex_aws/request_test.exs b/test/ex_aws/request_test.exs index d0168736..13779719 100644 --- a/test/ex_aws/request_test.exs +++ b/test/ex_aws/request_test.exs @@ -268,6 +268,97 @@ defmodule ExAws.RequestTest do ) end + test "operation_parser triggers retry when it returns a retryable XML code", context do + TelemetryHelper.attach_telemetry([:ex_aws, :request]) + success = mock_xml_throttling_response(2) + + parser = fn + {:error, {:http_error, status, %{body: body}}} -> + code = + if String.contains?(body, "Throttling"), + do: "Throttling", + else: "Unknown" + + {:error, {:http_error, status, %{code: code, message: "parsed"}}} + + other -> + other + end + + assert {:ok, %{body: ^success, status_code: 200}} = + ExAws.Request.request_and_retry( + :post, + "https://email.us-east-1.amazonaws.com/", + :ses, + context[:config], + context[:headers], + "", + {:attempt, 1}, + operation_parser: parser + ) + + assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 1}} + assert_receive {[:ex_aws, :request, :stop], %{duration: _}, %{attempt: 1, result: :error}} + assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 2}} + assert_receive {[:ex_aws, :request, :stop], %{duration: _}, %{attempt: 2, result: :error}} + assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 3}} + assert_receive {[:ex_aws, :request, :stop], %{duration: _}, %{attempt: 3, result: :ok}} + end + + test "operation_parser does not trigger retry for non-retryable XML codes", context do + TelemetryHelper.attach_telemetry([:ex_aws, :request]) + + xml = + "SenderInvalidParameterValuebad" + + ExAws.Request.HttpMock + |> expect(:request, 1, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: xml}} + end) + + parser = fn {:error, {:http_error, status, _}} -> + {:error, {:http_error, status, %{code: "InvalidParameterValue", message: "bad"}}} + end + + assert {:error, {:http_error, 400, %{body: ^xml, status_code: 400}}} = + ExAws.Request.request_and_retry( + :post, + "https://email.us-east-1.amazonaws.com/", + :ses, + context[:config], + context[:headers], + "", + {:attempt, 1}, + operation_parser: parser + ) + + assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 1}} + refute_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 2}} + end + + test "operation_parser that crashes falls back to a non-retry error", context do + xml = "" + + ExAws.Request.HttpMock + |> expect(:request, 1, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: xml}} + end) + + parser = fn _ -> raise "boom" end + + assert {:error, {:http_error, 400, %{body: ^xml, status_code: 400}}} = + ExAws.Request.request_and_retry( + :post, + "https://email.us-east-1.amazonaws.com/", + :ses, + context[:config], + context[:headers], + "", + {:attempt, 1}, + operation_parser: parser + ) + end + test "TooManyRequestsException is retried", context do TelemetryHelper.attach_telemetry([:ex_aws, :request]) success = mock_too_many_requests_exception(3) @@ -318,7 +409,37 @@ defmodule ExAws.RequestTest do success end - def mock_too_many_requests_exception(success_after_retries) do + defp mock_xml_throttling_response(success_after_retries) do + exception = """ + + + Sender + Throttling + Rate exceeded + + + """ + + success = """ + + + abc123 + + + """ + + ExAws.Request.HttpMock + |> expect(:request, success_after_retries, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 400, body: exception}} + end) + |> expect(:request, fn _method, _url, _body, _headers, _opts -> + {:ok, %{status_code: 200, body: success}} + end) + + success + end + + defp mock_too_many_requests_exception(success_after_retries) do exception = "{\"__type\":\"TooManyRequestsException\",\"message\":\"Too many requests\"}" success =