-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Add tiered send/receive recovery to azure-core-amqp and azure-messaging-servicebus #48460
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a3faf26
5e28c33
f0c6e53
0bd7282
6116b7b
f3a29a7
79c52f9
1fec9e5
d4c64fd
52d4435
ce10aa2
5448c97
4bd93bb
2e80396
1860dd3
c2d2086
bf2b560
e76c2d8
3eea3b2
1f9bf92
c0e2d8b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| package com.azure.core.amqp.implementation; | ||
|
|
||
| import com.azure.core.amqp.exception.AmqpErrorCondition; | ||
| import com.azure.core.amqp.exception.AmqpException; | ||
|
|
||
| import java.util.Locale; | ||
| import java.util.concurrent.TimeoutException; | ||
|
|
||
| /** | ||
| * Classifies errors into recovery tiers, determining what resources should be closed | ||
| * between retry attempts. This follows the tiered recovery pattern used by the Go, .NET, | ||
| * Python, and JS Azure SDKs. | ||
| * | ||
| * <ul> | ||
| * <li>{@link #NONE} — Retry on the same link (server-busy, timeouts).</li> | ||
| * <li>{@link #LINK} — Close the send/receive link; next retry creates a fresh link on the same connection.</li> | ||
| * <li>{@link #CONNECTION} — Close the entire connection; next retry creates a fresh connection and link.</li> | ||
| * <li>{@link #FATAL} — Do not retry (unauthorized, not-found, message too large).</li> | ||
| * </ul> | ||
| */ | ||
| public enum RecoveryKind { | ||
| /** | ||
| * No recovery needed — retry on the same link and connection. | ||
| * Applies to: server-busy, timeouts, resource-limit-exceeded. | ||
| */ | ||
| NONE, | ||
|
|
||
| /** | ||
| * Close the link (and its session) before retrying. The next retry creates a fresh link | ||
| * on the same connection. | ||
| * Applies to: link:detach-forced, link:stolen, transient AMQP errors on the link. | ||
| */ | ||
| LINK, | ||
|
|
||
| /** | ||
| * Close the entire connection before retrying. The next retry creates a fresh connection, | ||
| * session, and link. | ||
| * Applies to: connection:forced, connection:framing-error, proton:io, internal-error. | ||
| */ | ||
| CONNECTION, | ||
|
|
||
| /** | ||
| * Do not retry — the error is permanent. | ||
| * Applies to: unauthorized-access, not-found, message-size-exceeded. | ||
| */ | ||
| FATAL; | ||
|
|
||
| /** | ||
| * Classifies the given error into a {@link RecoveryKind} that determines what resources | ||
| * should be invalidated between retry attempts. | ||
| * | ||
| * @param error The error to classify. | ||
| * @return The recovery kind for the given error. | ||
| */ | ||
| public static RecoveryKind classify(Throwable error) { | ||
| if (error == null) { | ||
| return NONE; | ||
| } | ||
|
|
||
| // Timeouts — retry on same link, the link may still be healthy. | ||
| if (error instanceof TimeoutException) { | ||
| return NONE; | ||
| } | ||
|
|
||
| if (error instanceof AmqpException) { | ||
| final AmqpException amqpError = (AmqpException) error; | ||
| final AmqpErrorCondition condition = amqpError.getErrorCondition(); | ||
|
|
||
| if (condition != null) { | ||
| switch (condition) { | ||
| // Connection-level errors — close the entire connection. | ||
| case CONNECTION_FORCED: | ||
| case CONNECTION_FRAMING_ERROR: | ||
| case CONNECTION_REDIRECT: | ||
| case PROTON_IO: | ||
| case INTERNAL_ERROR: | ||
| return CONNECTION; | ||
|
|
||
| // Link-level errors — close the link, keep the connection. | ||
| case LINK_DETACH_FORCED: | ||
| case LINK_STOLEN: | ||
| case LINK_REDIRECT: | ||
| case PARTITION_NOT_OWNED_ERROR: | ||
| case TRANSFER_LIMIT_EXCEEDED: | ||
| // operation-cancelled can signal "AMQP layer unexpectedly aborted or disconnected" | ||
| // (e.g. ReceiverUnsettledDeliveries remote Released outcome), requiring link recovery. | ||
| case OPERATION_CANCELLED: | ||
| return LINK; | ||
|
|
||
| // Fatal errors — do not retry. | ||
| case NOT_FOUND: | ||
| case UNAUTHORIZED_ACCESS: | ||
| case LINK_PAYLOAD_SIZE_EXCEEDED: | ||
| case NOT_ALLOWED: | ||
| case NOT_IMPLEMENTED: | ||
| case ENTITY_DISABLED_ERROR: | ||
| case ENTITY_ALREADY_EXISTS: | ||
| case PUBLISHER_REVOKED_ERROR: | ||
| case ARGUMENT_ERROR: | ||
| case ARGUMENT_OUT_OF_RANGE_ERROR: | ||
| case ILLEGAL_STATE: | ||
| case MESSAGE_LOCK_LOST: | ||
| case STORE_LOCK_LOST_ERROR: | ||
| return FATAL; | ||
|
|
||
| // Server-busy, timeouts, and resource-limit errors — retry on same link. | ||
| // RESOURCE_LIMIT_EXCEEDED is treated as transient here because ReactorSender | ||
| // groups it alongside SERVER_BUSY and TIMEOUT in its send-error retry logic. | ||
| case SERVER_BUSY_ERROR: | ||
| case TIMEOUT_ERROR: | ||
| case RESOURCE_LIMIT_EXCEEDED: | ||
| return NONE; | ||
|
|
||
| // Session/lock errors — link-level recovery. | ||
| // Session lock loss means the session link is invalid and | ||
| // a fresh link must be acquired for a new session. | ||
| case SESSION_LOCK_LOST: | ||
| case SESSION_CANNOT_BE_LOCKED: | ||
| case SESSION_NOT_FOUND: | ||
| case MESSAGE_NOT_FOUND: | ||
| return LINK; | ||
|
|
||
| default: | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| // Transient AMQP errors without a specific condition — link recovery. | ||
| if (amqpError.isTransient()) { | ||
| return LINK; | ||
| } | ||
|
|
||
| // Non-transient AMQP errors without a recognized condition — fatal. | ||
| return FATAL; | ||
| } | ||
|
|
||
| // RequestResponseChannelClosedException — link-level (parent connection disposing). | ||
| if (error instanceof RequestResponseChannelClosedException) { | ||
| return LINK; | ||
| } | ||
|
|
||
| // IllegalStateException thrown by a disposed ReactorSender (e.g., "Cannot publish | ||
| // message when disposed." or "Cannot publish data batch when disposed."). This is | ||
| // a link-staleness signal: the link was closed (possibly by a concurrent recovery | ||
| // path) before the in-flight send could complete. LINK recovery creates a fresh | ||
| // link on the next retry. | ||
| // Match both "Cannot publish" and "disposed" to avoid misclassifying unrelated | ||
| // disposal signals (e.g., "Connection is disposed. Cannot get management instance."). | ||
| if (error instanceof IllegalStateException) { | ||
| final String msg = error.getMessage(); | ||
| if (msg != null) { | ||
| final String normalizedMsg = msg.toLowerCase(Locale.ROOT); | ||
| if (normalizedMsg.contains("cannot publish") && normalizedMsg.contains("disposed")) { | ||
| return LINK; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Unknown non-AMQP errors — treat as fatal (don't retry application or SDK bugs). | ||
| // The Go SDK defaults to CONNECTION for unknown errors, but those are AMQP-layer | ||
| // errors (io.EOF, net.Error). Java's non-AMQP exceptions (e.g., AzureException, | ||
| // RuntimeException) should fail fast rather than trigger connection recovery. | ||
| return FATAL; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,10 @@ | |
|
|
||
| import java.time.Duration; | ||
| import java.util.Locale; | ||
| import java.util.concurrent.ThreadLocalRandom; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.function.Consumer; | ||
|
|
||
| /** | ||
| * Helper class to help with retry policies. | ||
|
|
@@ -106,6 +109,44 @@ public static <T> Mono<T> withRetry(Mono<T> source, AmqpRetryOptions retryOption | |
| return withRetry(source, retryOptions, timeoutMessage, false); | ||
| } | ||
|
|
||
| /** | ||
| * Applies the retry policy with tiered recovery between attempts. Before each retry, | ||
| * the error is classified via {@link RecoveryKind#classify(Throwable)} and the recovery | ||
| * callback is invoked so the caller can close the appropriate resources (link or connection). | ||
| * | ||
| * <p>This matches the tiered recovery pattern used by the Go, .NET, Python, and JS SDKs.</p> | ||
| * | ||
| * @param <T> Type of value in the {@link Mono}. | ||
| * @param source The publisher to apply the retry policy to. | ||
| * @param retryOptions A {@link AmqpRetryOptions}. | ||
| * @param errorMessage Text added to error logs. | ||
| * @param recoveryAction Called between retry attempts with the classified {@link RecoveryKind}. | ||
| * The caller should close the link (for {@link RecoveryKind#LINK}) or connection | ||
| * (for {@link RecoveryKind#CONNECTION}) so the next retry creates fresh resources. | ||
| * | ||
| * @return A publisher that returns the results of the {@link Mono} if any of the retry attempts | ||
| * are successful. Otherwise, propagates the last error. | ||
| */ | ||
| public static <T> Mono<T> withRetryAndRecovery(Mono<T> source, AmqpRetryOptions retryOptions, String errorMessage, | ||
| Consumer<RecoveryKind> recoveryAction) { | ||
| return withRetryAndRecovery(source, retryOptions, errorMessage, false, recoveryAction); | ||
| } | ||
|
|
||
| /** | ||
| * Like {@link #withRetryAndRecovery(Mono, AmqpRetryOptions, String, Consumer)} but with an option to allow | ||
| * long-running operations that should not be subject to the per-attempt timeout. | ||
| * | ||
| * @param allowsLongOperation If true, the source Mono will not be wrapped with a per-attempt timeout. | ||
| */ | ||
| public static <T> Mono<T> withRetryAndRecovery(Mono<T> source, AmqpRetryOptions retryOptions, String errorMessage, | ||
| boolean allowsLongOperation, Consumer<RecoveryKind> recoveryAction) { | ||
| if (!allowsLongOperation) { | ||
| source = source.timeout(retryOptions.getTryTimeout()); | ||
| } | ||
| return source.retryWhen(createRetryWithRecovery(retryOptions, recoveryAction)) | ||
| .doOnError(error -> LOGGER.error(errorMessage, error)); | ||
| } | ||
|
|
||
| static Retry createRetry(AmqpRetryOptions options) { | ||
| final Duration delay = options.getDelay().plus(SERVER_BUSY_WAIT_TIME); | ||
| final RetryBackoffSpec retrySpec; | ||
|
|
@@ -129,4 +170,78 @@ static Retry createRetry(AmqpRetryOptions options) { | |
| .filter(error -> error instanceof TimeoutException | ||
| || (error instanceof AmqpException && ((AmqpException) error).isTransient())); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a Reactor {@link Retry} spec that performs tiered recovery between retry attempts. | ||
| * Before each retry, the error is classified and the recovery callback is invoked. | ||
| * | ||
| * <p>Includes a quick-retry optimization matching the Go SDK: on the first LINK or CONNECTION | ||
| * error, the retry fires immediately (no backoff) since the error may come from a previously | ||
| * stale link and recovery has just created a fresh one.</p> | ||
| */ | ||
| static Retry createRetryWithRecovery(AmqpRetryOptions options, Consumer<RecoveryKind> recoveryAction) { | ||
| final int maxRetries = options.getMaxRetries(); | ||
| final Duration baseDelay = options.getDelay().plus(SERVER_BUSY_WAIT_TIME); | ||
| final Duration maxDelay = options.getMaxDelay(); | ||
| final boolean isFixed = options.getMode() == com.azure.core.amqp.AmqpRetryMode.FIXED; | ||
| final AtomicBoolean didQuickRetry = new AtomicBoolean(false); | ||
|
|
||
EldertGrootenboer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return Retry.from(retrySignals -> retrySignals.flatMap(signal -> { | ||
| final Throwable failure = signal.failure(); | ||
| final long attempt = signal.totalRetriesInARow(); | ||
| final RecoveryKind kind = RecoveryKind.classify(failure); | ||
|
|
||
| // FATAL errors — do not retry. | ||
| if (kind == RecoveryKind.FATAL) { | ||
| return Mono.<Long>error(failure); | ||
| } | ||
|
|
||
| // Check retry budget. | ||
| if (attempt >= maxRetries) { | ||
| return Mono.<Long>error(failure); | ||
| } | ||
|
|
||
| // Perform recovery before retry. | ||
| if (kind != RecoveryKind.NONE && recoveryAction != null) { | ||
| try { | ||
| recoveryAction.accept(kind); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This worries me because these should be handled by downstream subscribers rather than injecting this in the middle of the reactor operation pipeline. Thoughts, @anuchandy ? |
||
| } catch (Exception e) { | ||
| LOGGER.atWarning().addKeyValue("recoveryKind", kind).log("Recovery action failed.", e); | ||
| } | ||
| } | ||
|
|
||
| // Quick retry: on the FIRST LINK/CONNECTION error, retry immediately (no backoff). | ||
| // Uses didQuickRetry flag to prevent repeated immediate retries under persistent | ||
| // errors — similar to the Go SDK's didQuickRetry pattern. Unlike Go's ResetAttempts(), | ||
| // the attempt counter is not reset here; subsequent retries continue with standard | ||
| // exponential backoff from the current attempt count. | ||
| // The kind check must come first: short-circuit evaluation prevents consuming the | ||
| // flag on NONE/FATAL failures where no quick-retry should be issued. | ||
| if ((kind == RecoveryKind.LINK || kind == RecoveryKind.CONNECTION) && !didQuickRetry.getAndSet(true)) { | ||
| LOGGER.atInfo().log("Quick retry after {} recovery (first occurrence).", kind); | ||
| return Mono.just(attempt); | ||
| } | ||
|
|
||
| // Standard backoff delay. | ||
| final Duration delay; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like we have this delay logic in another location so we should consolidate it so users don't inadvertently end up with two different retry durations if they decide to use the normal retry rather than this method |
||
| if (isFixed) { | ||
| // Cap baseDelay to maxDelay so FIXED mode respects retryOptions.getMaxDelay(). | ||
| delay = baseDelay.compareTo(maxDelay) > 0 ? maxDelay : baseDelay; | ||
| } else { | ||
| final long multiplier = 1L << Math.min(attempt, 30); | ||
| final long baseMillis = baseDelay.toMillis(); | ||
| // Guard against overflow: if baseMillis * multiplier would exceed Long.MAX_VALUE, | ||
| // saturate to maxDelay (the clamp below would cap it there anyway). | ||
| final long millis | ||
| = baseMillis > Long.MAX_VALUE / multiplier ? maxDelay.toMillis() : baseMillis * multiplier; | ||
| delay = Duration.ofMillis(Math.min(millis, maxDelay.toMillis())); | ||
| } | ||
| final double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1) * JITTER_FACTOR; | ||
| // Clamp the final jittered delay to maxDelay so retryOptions are consistently respected. | ||
| final Duration jitteredDelay | ||
| = Duration.ofMillis(Math.min((long) (delay.toMillis() * jitter), maxDelay.toMillis())); | ||
|
|
||
| return Mono.delay(jitteredDelay).thenReturn(attempt); | ||
| })); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.