diff --git a/CHANGELOG.md b/CHANGELOG.md
index d8446755..8f5549a7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,6 @@
+Unreleased
+- Retry throttling errors on XML/Query services (SES, SQS, SNS)
+
v2.7.0 (2026-03-04)
- Upgrade hackney to 3.x
- Correctly handle pod identity authorization when using Req adapter
diff --git a/lib/ex_aws/operation/query.ex b/lib/ex_aws/operation/query.ex
index 804fd4b7..f5ebd3e1 100644
--- a/lib/ex_aws/operation/query.ex
+++ b/lib/ex_aws/operation/query.ex
@@ -38,23 +38,23 @@ defimpl ExAws.Operation, for: ExAws.Operation.Query do
{"content-encoding", operation.content_encoding}
]
- result =
- ExAws.Request.request(:post, url, data, headers, config, operation.service)
- |> ExAws.Request.default_aws_error()
+ parser = wrap_parser(operation.parser, operation.action, config)
- parser = operation.parser
+ ExAws.Request.request(:post, url, data, headers, config, operation.service,
+ operation_parser: parser
+ )
+ |> ExAws.Request.default_aws_error()
+ |> parser.()
+ end
- cond do
- is_function(parser, 2) ->
- parser.(result, operation.action)
+ def stream!(_, _), do: nil
- is_function(parser, 3) ->
- parser.(result, operation.action, config)
+ defp wrap_parser(parser, _action, _config) when is_function(parser, 1),
+ do: parser
- true ->
- result
- end
- end
+ defp wrap_parser(parser, action, _config) when is_function(parser, 2),
+ do: fn result -> parser.(result, action) end
- def stream!(_, _), do: nil
+ defp wrap_parser(parser, action, config) when is_function(parser, 3),
+ do: fn result -> parser.(result, action, config) end
end
diff --git a/lib/ex_aws/operation/rest_query.ex b/lib/ex_aws/operation/rest_query.ex
index eb2cd850..fe803b58 100644
--- a/lib/ex_aws/operation/rest_query.ex
+++ b/lib/ex_aws/operation/rest_query.ex
@@ -18,18 +18,30 @@ defimpl ExAws.Operation, for: ExAws.Operation.RestQuery do
headers = config[:headers] || []
url = ExAws.Request.Url.build(operation, config)
+ parser = wrap_parser(operation.parser, operation.action, config)
+
ExAws.Request.request(
operation.http_method,
url,
operation.body,
headers,
config,
- operation.service
+ operation.service,
+ operation_parser: parser
)
|> ExAws.Request.default_aws_error()
- |> operation.parser.(operation.action)
+ |> parser.()
end
+ defp wrap_parser(parser, _action, _config) when is_function(parser, 1),
+ do: parser
+
+ defp wrap_parser(parser, action, _config) when is_function(parser, 2),
+ do: fn result -> parser.(result, action) end
+
+ defp wrap_parser(parser, action, config) when is_function(parser, 3),
+ do: fn result -> parser.(result, action, config) end
+
def stream!(%ExAws.Operation.RestQuery{stream_builder: nil}, _) do
raise ArgumentError, """
This operation does not support streaming!
diff --git a/lib/ex_aws/request.ex b/lib/ex_aws/request.ex
index b783f2e2..41ff48de 100644
--- a/lib/ex_aws/request.ex
+++ b/lib/ex_aws/request.ex
@@ -9,7 +9,7 @@ defmodule ExAws.Request do
@type error_t :: {:error, {:http_error, http_status, binary}}
@type response_t :: success_t | error_t
- def request(http_method, url, data, headers, config, service) do
+ def request(http_method, url, data, headers, config, service, opts \\ []) do
body =
case data do
[] -> "{}"
@@ -17,13 +17,42 @@ defmodule ExAws.Request do
_ -> config[:json_codec].encode!(data)
end
- request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1})
+ request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1}, opts)
end
- def request_and_retry(_method, _url, _service, _config, _headers, _req_body, {:error, reason}),
- do: {:error, reason}
-
- def request_and_retry(method, url, service, config, headers, req_body, {:attempt, attempt}) do
+ def request_and_retry(
+ method,
+ url,
+ service,
+ config,
+ headers,
+ req_body,
+ attempt_or_error,
+ opts \\ []
+ )
+
+ def request_and_retry(
+ _method,
+ _url,
+ _service,
+ _config,
+ _headers,
+ _req_body,
+ {:error, reason},
+ _opts
+ ),
+ do: {:error, reason}
+
+ def request_and_retry(
+ method,
+ url,
+ service,
+ config,
+ headers,
+ req_body,
+ {:attempt, attempt},
+ opts
+ ) do
full_headers = ExAws.Auth.headers(method, url, service, config, headers, req_body)
with {:ok, full_headers} <- full_headers do
@@ -45,7 +74,7 @@ defmodule ExAws.Request do
{:error, {:http_error, status, "redirected"}}
{:ok, %{status_code: status} = resp} when status in 400..499 ->
- case client_error(resp, config[:json_codec]) do
+ case client_error(resp, config[:json_codec], opts) do
{:retry, reason} ->
request_and_retry(
method,
@@ -54,7 +83,8 @@ defmodule ExAws.Request do
config,
headers,
req_body,
- attempt_again?(attempt, reason, :client, config)
+ attempt_again?(attempt, reason, :client, config),
+ opts
)
{:error, reason} ->
@@ -72,7 +102,8 @@ defmodule ExAws.Request do
config,
headers,
req_body,
- attempt_again?(attempt, reason, :server, config)
+ attempt_again?(attempt, reason, :server, config),
+ opts
)
{:error, reason_struct} ->
@@ -93,7 +124,8 @@ defmodule ExAws.Request do
config,
headers,
req_body,
- attempt_again?(attempt, reason, :other, config)
+ attempt_again?(attempt, reason, :other, config),
+ opts
)
end
end
@@ -146,7 +178,9 @@ defmodule ExAws.Request do
defp extract_error({:error, error}), do: error
defp extract_error(error), do: error
- def client_error(%{status_code: status, body: body} = error, json_codec) do
+ def client_error(resp, json_codec, opts \\ [])
+
+ def client_error(%{status_code: status, body: body} = error, json_codec, opts) do
case json_codec.decode(body) do
{:ok, %{"__type" => error_type, "message" => message} = err} ->
handle_error(error_type, message, status, err)
@@ -159,12 +193,40 @@ defmodule ExAws.Request do
_ ->
{:error, {:http_error, status, error}}
end
+ |> try_operation_parser_for_retries(opts[:operation_parser])
end
- def client_error(%{status_code: status} = error, _) do
+ def client_error(%{status_code: status} = error, _, _) do
{:error, {:http_error, status, error}}
end
+ @retryable_error_codes [
+ "RequestThrottled",
+ "Throttling"
+ ]
+
+ # Some operation parsers are able to detect retryable states even when the
+ # JSON codec based detection above does not work. Running the repsonse
+ # through the parser to check if we can detect a retryable response this way.
+ # The original response is returned in all other cases to avoid affecting the
+ # flow unless a retry opportunity is detected.
+ defp try_operation_parser_for_retries({:error, {:http_error, status, error}} = response, parser)
+ when is_function(parser, 1) do
+ case parser.(response) do
+ {:error, {:http_error, ^status, %{code: code}}} when code in @retryable_error_codes ->
+ {:retry, {:http_error, status, error}}
+
+ _ ->
+ response
+ end
+ rescue
+ _ -> response
+ catch
+ _, _ -> response
+ end
+
+ defp try_operation_parser_for_retries(response, _), do: response
+
def handle_aws_error({"ProvisionedThroughputExceededException" = type, message, _}) do
{:retry, {type, message}}
end
diff --git a/test/ex_aws/operation/query_test.exs b/test/ex_aws/operation/query_test.exs
new file mode 100644
index 00000000..b4104456
--- /dev/null
+++ b/test/ex_aws/operation/query_test.exs
@@ -0,0 +1,128 @@
+defmodule ExAws.Operation.QueryTest do
+ use ExUnit.Case, async: false
+ alias ExAws.JSON.JSX
+ import Mox
+
+ defmodule TestParser do
+ use ExAws.Operation.Query.Parser
+
+ def parse({:ok, _} = result, _action), do: result
+ end
+
+ setup do
+ {:ok,
+ config: %{
+ http_client: ExAws.Request.HttpMock,
+ json_codec: JSX,
+ access_key_id: "AKIAIOSFODNN7EXAMPLE",
+ secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
+ region: "us-east-1",
+ host: "email.us-east-1.amazonaws.com",
+ port: 443,
+ scheme: "https://",
+ normalize_path: true,
+ retries: [
+ max_attempts: 5,
+ base_backoff_in_ms: 1,
+ max_backoff_in_ms: 20
+ ]
+ }}
+ end
+
+ test "Query.perform retries XML throttling errors using the operation parser", context do
+ throttled = """
+
+
+ Sender
+ Throttling
+ Rate exceeded
+
+
+ """
+
+ success = """
+
+
+ abc123
+
+
+ """
+
+ ExAws.Request.HttpMock
+ |> expect(:request, 2, fn _method, _url, _body, _headers, _opts ->
+ {:ok, %{status_code: 400, body: throttled}}
+ end)
+ |> expect(:request, fn _method, _url, _body, _headers, _opts ->
+ {:ok, %{status_code: 200, body: success, headers: []}}
+ end)
+
+ operation = %ExAws.Operation.Query{
+ path: "/",
+ params: %{"Action" => "SendCustomVerificationEmail"},
+ service: :ses,
+ action: :send_custom_verification_email,
+ parser: &TestParser.parse/2
+ }
+
+ assert {:ok, %{body: ^success}} = ExAws.Operation.perform(operation, context[:config])
+ end
+
+ test "Query.perform retries RequestThrottled errors using the operation parser", context do
+ throttled = """
+
+
+ Sender
+ RequestThrottled
+ Request is throttled.
+
+
+ """
+
+ success = """
+
+
+ msg123
+
+
+ """
+
+ ExAws.Request.HttpMock
+ |> expect(:request, 2, fn _method, _url, _body, _headers, _opts ->
+ {:ok, %{status_code: 400, body: throttled}}
+ end)
+ |> expect(:request, fn _method, _url, _body, _headers, _opts ->
+ {:ok, %{status_code: 200, body: success, headers: []}}
+ end)
+
+ operation = %ExAws.Operation.Query{
+ path: "/",
+ params: %{"Action" => "SendMessage"},
+ service: :sqs,
+ action: :send_message,
+ parser: &TestParser.parse/2
+ }
+
+ assert {:ok, %{body: ^success}} = ExAws.Operation.perform(operation, context[:config])
+ end
+
+ test "Query.perform surfaces non-retryable XML errors via the operation parser", context do
+ invalid =
+ "SenderInvalidParameterValuebad"
+
+ ExAws.Request.HttpMock
+ |> expect(:request, 1, fn _method, _url, _body, _headers, _opts ->
+ {:ok, %{status_code: 400, body: invalid}}
+ end)
+
+ operation = %ExAws.Operation.Query{
+ path: "/",
+ params: %{"Action" => "SendCustomVerificationEmail"},
+ service: :ses,
+ action: :send_custom_verification_email,
+ parser: &TestParser.parse/2
+ }
+
+ assert {:error, {:http_error, 400, %{code: "InvalidParameterValue"}}} =
+ ExAws.Operation.perform(operation, context[:config])
+ end
+end
diff --git a/test/ex_aws/request_test.exs b/test/ex_aws/request_test.exs
index d0168736..13779719 100644
--- a/test/ex_aws/request_test.exs
+++ b/test/ex_aws/request_test.exs
@@ -268,6 +268,97 @@ defmodule ExAws.RequestTest do
)
end
+ test "operation_parser triggers retry when it returns a retryable XML code", context do
+ TelemetryHelper.attach_telemetry([:ex_aws, :request])
+ success = mock_xml_throttling_response(2)
+
+ parser = fn
+ {:error, {:http_error, status, %{body: body}}} ->
+ code =
+ if String.contains?(body, "Throttling"),
+ do: "Throttling",
+ else: "Unknown"
+
+ {:error, {:http_error, status, %{code: code, message: "parsed"}}}
+
+ other ->
+ other
+ end
+
+ assert {:ok, %{body: ^success, status_code: 200}} =
+ ExAws.Request.request_and_retry(
+ :post,
+ "https://email.us-east-1.amazonaws.com/",
+ :ses,
+ context[:config],
+ context[:headers],
+ "",
+ {:attempt, 1},
+ operation_parser: parser
+ )
+
+ assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 1}}
+ assert_receive {[:ex_aws, :request, :stop], %{duration: _}, %{attempt: 1, result: :error}}
+ assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 2}}
+ assert_receive {[:ex_aws, :request, :stop], %{duration: _}, %{attempt: 2, result: :error}}
+ assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 3}}
+ assert_receive {[:ex_aws, :request, :stop], %{duration: _}, %{attempt: 3, result: :ok}}
+ end
+
+ test "operation_parser does not trigger retry for non-retryable XML codes", context do
+ TelemetryHelper.attach_telemetry([:ex_aws, :request])
+
+ xml =
+ "SenderInvalidParameterValuebad"
+
+ ExAws.Request.HttpMock
+ |> expect(:request, 1, fn _method, _url, _body, _headers, _opts ->
+ {:ok, %{status_code: 400, body: xml}}
+ end)
+
+ parser = fn {:error, {:http_error, status, _}} ->
+ {:error, {:http_error, status, %{code: "InvalidParameterValue", message: "bad"}}}
+ end
+
+ assert {:error, {:http_error, 400, %{body: ^xml, status_code: 400}}} =
+ ExAws.Request.request_and_retry(
+ :post,
+ "https://email.us-east-1.amazonaws.com/",
+ :ses,
+ context[:config],
+ context[:headers],
+ "",
+ {:attempt, 1},
+ operation_parser: parser
+ )
+
+ assert_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 1}}
+ refute_receive {[:ex_aws, :request, :start], %{system_time: _}, %{attempt: 2}}
+ end
+
+ test "operation_parser that crashes falls back to a non-retry error", context do
+ xml = ""
+
+ ExAws.Request.HttpMock
+ |> expect(:request, 1, fn _method, _url, _body, _headers, _opts ->
+ {:ok, %{status_code: 400, body: xml}}
+ end)
+
+ parser = fn _ -> raise "boom" end
+
+ assert {:error, {:http_error, 400, %{body: ^xml, status_code: 400}}} =
+ ExAws.Request.request_and_retry(
+ :post,
+ "https://email.us-east-1.amazonaws.com/",
+ :ses,
+ context[:config],
+ context[:headers],
+ "",
+ {:attempt, 1},
+ operation_parser: parser
+ )
+ end
+
test "TooManyRequestsException is retried", context do
TelemetryHelper.attach_telemetry([:ex_aws, :request])
success = mock_too_many_requests_exception(3)
@@ -318,7 +409,37 @@ defmodule ExAws.RequestTest do
success
end
- def mock_too_many_requests_exception(success_after_retries) do
+ defp mock_xml_throttling_response(success_after_retries) do
+ exception = """
+
+
+ Sender
+ Throttling
+ Rate exceeded
+
+
+ """
+
+ success = """
+
+
+ abc123
+
+
+ """
+
+ ExAws.Request.HttpMock
+ |> expect(:request, success_after_retries, fn _method, _url, _body, _headers, _opts ->
+ {:ok, %{status_code: 400, body: exception}}
+ end)
+ |> expect(:request, fn _method, _url, _body, _headers, _opts ->
+ {:ok, %{status_code: 200, body: success}}
+ end)
+
+ success
+ end
+
+ defp mock_too_many_requests_exception(success_after_retries) do
exception = "{\"__type\":\"TooManyRequestsException\",\"message\":\"Too many requests\"}"
success =