Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
Unreleased
- Retry throttling errors on XML/Query services (SES, SQS, SNS)

v2.7.0 (2026-03-04)
- Upgrade hackney to 3.x
- Correctly handle pod identity authorization when using Req adapter
Expand Down
28 changes: 14 additions & 14 deletions lib/ex_aws/operation/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ defimpl ExAws.Operation, for: ExAws.Operation.Query do
{"content-encoding", operation.content_encoding}
]

result =
ExAws.Request.request(:post, url, data, headers, config, operation.service)
|> ExAws.Request.default_aws_error()
parser = wrap_parser(operation.parser, operation.action, config)

parser = operation.parser
ExAws.Request.request(:post, url, data, headers, config, operation.service,
operation_parser: parser
)
|> ExAws.Request.default_aws_error()
|> parser.()
end

cond do
is_function(parser, 2) ->
parser.(result, operation.action)
def stream!(_, _), do: nil

is_function(parser, 3) ->
parser.(result, operation.action, config)
defp wrap_parser(parser, _action, _config) when is_function(parser, 1),
do: parser

true ->
result
end
end
defp wrap_parser(parser, action, _config) when is_function(parser, 2),
do: fn result -> parser.(result, action) end

def stream!(_, _), do: nil
defp wrap_parser(parser, action, config) when is_function(parser, 3),
do: fn result -> parser.(result, action, config) end
end
16 changes: 14 additions & 2 deletions lib/ex_aws/operation/rest_query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,30 @@ defimpl ExAws.Operation, for: ExAws.Operation.RestQuery do
headers = config[:headers] || []
url = ExAws.Request.Url.build(operation, config)

parser = wrap_parser(operation.parser, operation.action, config)

ExAws.Request.request(
operation.http_method,
url,
operation.body,
headers,
config,
operation.service
operation.service,
operation_parser: parser
)
|> ExAws.Request.default_aws_error()
|> operation.parser.(operation.action)
|> parser.()
end

defp wrap_parser(parser, _action, _config) when is_function(parser, 1),
do: parser

defp wrap_parser(parser, action, _config) when is_function(parser, 2),
do: fn result -> parser.(result, action) end

defp wrap_parser(parser, action, config) when is_function(parser, 3),
do: fn result -> parser.(result, action, config) end

def stream!(%ExAws.Operation.RestQuery{stream_builder: nil}, _) do
raise ArgumentError, """
This operation does not support streaming!
Expand Down
86 changes: 74 additions & 12 deletions lib/ex_aws/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,50 @@ defmodule ExAws.Request do
@type error_t :: {:error, {:http_error, http_status, binary}}
@type response_t :: success_t | error_t

def request(http_method, url, data, headers, config, service) do
def request(http_method, url, data, headers, config, service, opts \\ []) do
body =
case data do
[] -> "{}"
d when is_binary(d) -> d
_ -> config[:json_codec].encode!(data)
end

request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1})
request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1}, opts)
end

def request_and_retry(_method, _url, _service, _config, _headers, _req_body, {:error, reason}),
do: {:error, reason}

def request_and_retry(method, url, service, config, headers, req_body, {:attempt, attempt}) do
def request_and_retry(
method,
url,
service,
config,
headers,
req_body,
attempt_or_error,
opts \\ []
)

def request_and_retry(
_method,
_url,
_service,
_config,
_headers,
_req_body,
{:error, reason},
_opts
),
do: {:error, reason}

def request_and_retry(
method,
url,
service,
config,
headers,
req_body,
{:attempt, attempt},
opts
) do
full_headers = ExAws.Auth.headers(method, url, service, config, headers, req_body)

with {:ok, full_headers} <- full_headers do
Expand All @@ -45,7 +74,7 @@ defmodule ExAws.Request do
{:error, {:http_error, status, "redirected"}}

{:ok, %{status_code: status} = resp} when status in 400..499 ->
case client_error(resp, config[:json_codec]) do
case client_error(resp, config[:json_codec], opts) do
{:retry, reason} ->
request_and_retry(
method,
Expand All @@ -54,7 +83,8 @@ defmodule ExAws.Request do
config,
headers,
req_body,
attempt_again?(attempt, reason, :client, config)
attempt_again?(attempt, reason, :client, config),
opts
)

{:error, reason} ->
Expand All @@ -72,7 +102,8 @@ defmodule ExAws.Request do
config,
headers,
req_body,
attempt_again?(attempt, reason, :server, config)
attempt_again?(attempt, reason, :server, config),
opts
)

{:error, reason_struct} ->
Expand All @@ -93,7 +124,8 @@ defmodule ExAws.Request do
config,
headers,
req_body,
attempt_again?(attempt, reason, :other, config)
attempt_again?(attempt, reason, :other, config),
opts
)
end
end
Expand Down Expand Up @@ -146,7 +178,9 @@ defmodule ExAws.Request do
defp extract_error({:error, error}), do: error
defp extract_error(error), do: error

def client_error(%{status_code: status, body: body} = error, json_codec) do
def client_error(resp, json_codec, opts \\ [])

def client_error(%{status_code: status, body: body} = error, json_codec, opts) do
case json_codec.decode(body) do
{:ok, %{"__type" => error_type, "message" => message} = err} ->
handle_error(error_type, message, status, err)
Expand All @@ -159,12 +193,40 @@ defmodule ExAws.Request do
_ ->
{:error, {:http_error, status, error}}
end
|> try_operation_parser_for_retries(opts[:operation_parser])
end

def client_error(%{status_code: status} = error, _) do
def client_error(%{status_code: status} = error, _, _) do
{:error, {:http_error, status, error}}
end

@retryable_error_codes [
"RequestThrottled",
"Throttling"
]

# Some operation parsers are able to detect retryable states even when the
# JSON codec based detection above does not work. Running the repsonse
# through the parser to check if we can detect a retryable response this way.
# The original response is returned in all other cases to avoid affecting the
# flow unless a retry opportunity is detected.
defp try_operation_parser_for_retries({:error, {:http_error, status, error}} = response, parser)
when is_function(parser, 1) do
case parser.(response) do
{:error, {:http_error, ^status, %{code: code}}} when code in @retryable_error_codes ->
{:retry, {:http_error, status, error}}

_ ->
response
end
rescue
_ -> response
catch
_, _ -> response
end

defp try_operation_parser_for_retries(response, _), do: response

def handle_aws_error({"ProvisionedThroughputExceededException" = type, message, _}) do
{:retry, {type, message}}
end
Expand Down
128 changes: 128 additions & 0 deletions test/ex_aws/operation/query_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
defmodule ExAws.Operation.QueryTest do
use ExUnit.Case, async: false
alias ExAws.JSON.JSX
import Mox

defmodule TestParser do
use ExAws.Operation.Query.Parser

def parse({:ok, _} = result, _action), do: result
end

setup do
{:ok,
config: %{
http_client: ExAws.Request.HttpMock,
json_codec: JSX,
access_key_id: "AKIAIOSFODNN7EXAMPLE",
secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
region: "us-east-1",
host: "email.us-east-1.amazonaws.com",
port: 443,
scheme: "https://",
normalize_path: true,
retries: [
max_attempts: 5,
base_backoff_in_ms: 1,
max_backoff_in_ms: 20
]
}}
end

test "Query.perform retries XML throttling errors using the operation parser", context do
throttled = """
<ErrorResponse>
<Error>
<Type>Sender</Type>
<Code>Throttling</Code>
<Message>Rate exceeded</Message>
</Error>
</ErrorResponse>
"""

success = """
<SendCustomVerificationEmailResponse>
<SendCustomVerificationEmailResult>
<MessageId>abc123</MessageId>
</SendCustomVerificationEmailResult>
</SendCustomVerificationEmailResponse>
"""

ExAws.Request.HttpMock
|> expect(:request, 2, fn _method, _url, _body, _headers, _opts ->
{:ok, %{status_code: 400, body: throttled}}
end)
|> expect(:request, fn _method, _url, _body, _headers, _opts ->
{:ok, %{status_code: 200, body: success, headers: []}}
end)

operation = %ExAws.Operation.Query{
path: "/",
params: %{"Action" => "SendCustomVerificationEmail"},
service: :ses,
action: :send_custom_verification_email,
parser: &TestParser.parse/2
}

assert {:ok, %{body: ^success}} = ExAws.Operation.perform(operation, context[:config])
end

test "Query.perform retries RequestThrottled errors using the operation parser", context do
throttled = """
<ErrorResponse>
<Error>
<Type>Sender</Type>
<Code>RequestThrottled</Code>
<Message>Request is throttled.</Message>
</Error>
</ErrorResponse>
"""

success = """
<SendMessageResponse>
<SendMessageResult>
<MessageId>msg123</MessageId>
</SendMessageResult>
</SendMessageResponse>
"""

ExAws.Request.HttpMock
|> expect(:request, 2, fn _method, _url, _body, _headers, _opts ->
{:ok, %{status_code: 400, body: throttled}}
end)
|> expect(:request, fn _method, _url, _body, _headers, _opts ->
{:ok, %{status_code: 200, body: success, headers: []}}
end)

operation = %ExAws.Operation.Query{
path: "/",
params: %{"Action" => "SendMessage"},
service: :sqs,
action: :send_message,
parser: &TestParser.parse/2
}

assert {:ok, %{body: ^success}} = ExAws.Operation.perform(operation, context[:config])
end

test "Query.perform surfaces non-retryable XML errors via the operation parser", context do
invalid =
"<ErrorResponse><Error><Type>Sender</Type><Code>InvalidParameterValue</Code><Message>bad</Message></Error></ErrorResponse>"

ExAws.Request.HttpMock
|> expect(:request, 1, fn _method, _url, _body, _headers, _opts ->
{:ok, %{status_code: 400, body: invalid}}
end)

operation = %ExAws.Operation.Query{
path: "/",
params: %{"Action" => "SendCustomVerificationEmail"},
service: :ses,
action: :send_custom_verification_email,
parser: &TestParser.parse/2
}

assert {:error, {:http_error, 400, %{code: "InvalidParameterValue"}}} =
ExAws.Operation.perform(operation, context[:config])
end
end
Loading