Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a3faf26
Add tiered send/receive recovery matching Go SDK parity
EldertGrootenboerMS Mar 18, 2026
5e28c33
fix(ci): build azure-core-amqp from source for tiered recovery PR
EldertGrootenboerMS Mar 18, 2026
f0c6e53
Merge branch 'main' into fix/servicebus-tiered-send-recovery
EldertGrootenboer Mar 18, 2026
0bd7282
Merge branch 'main' into fix/servicebus-tiered-send-recovery
EldertGrootenboer Mar 18, 2026
6116b7b
fix(review): address Copilot PR review feedback on tiered recovery
EldertGrootenboerMS Mar 18, 2026
f3a29a7
fix(review): address second Copilot review on tiered recovery
EldertGrootenboerMS Mar 18, 2026
79c52f9
fix(review): address third Copilot review on tiered recovery
EldertGrootenboerMS Mar 18, 2026
1fec9e5
fix(servicebus): make link/connection force-close non-blocking in rec…
EldertGrootenboerMS Mar 18, 2026
d4c64fd
fix(amqp): prevent NONE failure from consuming quick-retry flag
EldertGrootenboerMS Mar 18, 2026
52d4435
fix(amqp): classify disposed-link IllegalStateException as LINK recovery
EldertGrootenboerMS Mar 18, 2026
ce10aa2
fix(amqp): narrow disposed-ISE match to prevent tier misclassification
EldertGrootenboerMS Mar 18, 2026
5448c97
fix(amqp): clarify virtual-time reason in retry tests
EldertGrootenboerMS Mar 18, 2026
4bd93bb
fix(amqp): tie forceCloseConnection invalidation to specific connecti…
EldertGrootenboerMS Mar 18, 2026
2e80396
fix(amqp): rename test methods to camelCase for checkstyle compliance
EldertGrootenboerMS Mar 18, 2026
1860dd3
fix(amqp): use distinct log message for force-invalidation path
EldertGrootenboerMS Mar 18, 2026
c2d2086
Merge branch 'main' into fix/servicebus-tiered-send-recovery
EldertGrootenboer Mar 19, 2026
bf2b560
fix(amqp): guard backoff overflow and normalize RecoveryKind message …
EldertGrootenboerMS Mar 19, 2026
e76c2d8
fix(servicebus): scope send-link recovery to per-operation reference,…
EldertGrootenboerMS Mar 19, 2026
3eea3b2
fix(servicebus): include exception in connection-recovery warning log
EldertGrootenboerMS Mar 19, 2026
1f9bf92
fix(ci): move azure-core-amqp source comment to correct entry
EldertGrootenboerMS Mar 19, 2026
c0e2d8b
Merge branch 'main' into fix/servicebus-tiered-send-recovery
EldertGrootenboer Mar 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,15 @@ private void setAndClearChannel() {
close(oldChannel);
}

/**
* Force-closes the current cached channel so that the next subscriber receives a fresh one.
* This is used for connection-level recovery when the current connection is stale
* but the processor has not detected it (e.g., heartbeats echoed by intermediate infrastructure).
*/
public void forceCloseChannel() {
setAndClearChannel();
}

/**
* Checks the current state of the channel for this channel and returns true if the channel is null or if this
* processor is disposed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public final class ReactorConnectionCache<T extends ReactorConnection> implement
// any dependent type; instead, the dependent type must acquire Connection only through the cache route,
// i.e., by subscribing to 'createOrGetCachedConnection' via 'get()' getter.
private volatile T currentConnection;
// Holds the ID of the connection that forceCloseConnection() asked to force-invalidate.
// Only the connection whose getId() matches this value will be invalidated by cacheInvalidateIf;
// a freshly created connection with a different ID is never accidentally invalidated.
private final AtomicReference<String> forceInvalidateConnectionId = new AtomicReference<>(null);
private final State state = new State();

/**
Expand Down Expand Up @@ -113,12 +117,23 @@ public ReactorConnectionCache(Supplier<T> connectionSupplier, String fullyQualif
}
}).cacheInvalidateIf(c -> {
if (c.isDisposed()) {
// Connection disposed for any reason. Clean up the force-invalidate marker if it
// was targeting this connection so it is not accidentally consumed by a future
// connection that happens to have the same ID.
forceInvalidateConnectionId.compareAndSet(c.getId(), null);
withConnectionId(logger, c.getId()).log("The connection is closed, requesting a new connection.");
return true;
} else {
// Emit cached connection.
return false;
}
final String targetId = forceInvalidateConnectionId.get();
if (targetId != null
&& targetId.equals(c.getId())
&& forceInvalidateConnectionId.compareAndSet(targetId, null)) {
// forceCloseConnection() asked to invalidate exactly this connection.
withConnectionId(logger, c.getId()).log("Forcing connection close, requesting a new connection.");
return true;
}
// No forced invalidation targeted this connection — emit it from cache.
return false;
});
}

Expand Down Expand Up @@ -172,6 +187,37 @@ public boolean isCurrentConnectionClosed() {
return (currentConnection != null && currentConnection.isDisposed()) || terminated;
}

/**
* Closes the current cached connection (if any) so that the next {@link #get()} call creates
* a fresh connection. This is used for connection-level recovery when the current connection
* is in a stale state that the cache's normal error detection (via endpoint state signals)
* has not detected — for example, when intermediate infrastructure (load balancers, NAT gateways)
* is echoing AMQP heartbeats on behalf of a dead connection.
*
* <p>This is modeled after the Go SDK's {@code Namespace.Recover()} which explicitly closes
* the old connection and increments the connection revision.</p>
*
* <p>This method is safe to call concurrently. If the connection is already closed or being
* closed, this is a no-op.</p>
*/
public void forceCloseConnection() {
final T connection = currentConnection;
if (connection != null && !connection.isDisposed()) {
withConnectionId(logger, connection.getId())
.log("Force-closing connection for recovery. Next get() will create a fresh connection.");
// Set forceInvalidate before starting async close so that cacheInvalidateIf immediately
// invalidates this connection on the next get() call, without blocking the caller
// while the AMQP close handshake completes. ReactorConnection.dispose() calls
// closeAsync().block(), which is illegal on a non-blocking Reactor thread.
forceInvalidateConnectionId.set(connection.getId());
connection.closeAsync()
.subscribe(null,
error -> logger.atVerbose()
.addKeyValue(CONNECTION_ID_KEY, connection.getId())
.log("Error during async connection force-close.", error));
}
}

/**
* Terminate so that consumers will no longer be able to request connection. If there is a current (cached)
* connection then it will be closed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;

import java.util.Locale;
import java.util.concurrent.TimeoutException;

/**
* Classifies errors into recovery tiers, determining what resources should be closed
* between retry attempts. This follows the tiered recovery pattern used by the Go, .NET,
* Python, and JS Azure SDKs.
*
* <ul>
* <li>{@link #NONE} — Retry on the same link (server-busy, timeouts).</li>
* <li>{@link #LINK} — Close the send/receive link; next retry creates a fresh link on the same connection.</li>
* <li>{@link #CONNECTION} — Close the entire connection; next retry creates a fresh connection and link.</li>
* <li>{@link #FATAL} — Do not retry (unauthorized, not-found, message too large).</li>
* </ul>
*/
public enum RecoveryKind {
/**
* No recovery needed — retry on the same link and connection.
* Applies to: server-busy, timeouts, resource-limit-exceeded.
*/
NONE,

/**
* Close the link (and its session) before retrying. The next retry creates a fresh link
* on the same connection.
* Applies to: link:detach-forced, link:stolen, transient AMQP errors on the link.
*/
LINK,

/**
* Close the entire connection before retrying. The next retry creates a fresh connection,
* session, and link.
* Applies to: connection:forced, connection:framing-error, proton:io, internal-error.
*/
CONNECTION,

/**
* Do not retry — the error is permanent.
* Applies to: unauthorized-access, not-found, message-size-exceeded.
*/
FATAL;

/**
* Classifies the given error into a {@link RecoveryKind} that determines what resources
* should be invalidated between retry attempts.
*
* @param error The error to classify.
* @return The recovery kind for the given error.
*/
public static RecoveryKind classify(Throwable error) {
if (error == null) {
return NONE;
}

// Timeouts — retry on same link, the link may still be healthy.
if (error instanceof TimeoutException) {
return NONE;
}

if (error instanceof AmqpException) {
final AmqpException amqpError = (AmqpException) error;
final AmqpErrorCondition condition = amqpError.getErrorCondition();

if (condition != null) {
switch (condition) {
// Connection-level errors — close the entire connection.
case CONNECTION_FORCED:
case CONNECTION_FRAMING_ERROR:
case CONNECTION_REDIRECT:
case PROTON_IO:
case INTERNAL_ERROR:
return CONNECTION;

// Link-level errors — close the link, keep the connection.
case LINK_DETACH_FORCED:
case LINK_STOLEN:
case LINK_REDIRECT:
case PARTITION_NOT_OWNED_ERROR:
case TRANSFER_LIMIT_EXCEEDED:
// operation-cancelled can signal "AMQP layer unexpectedly aborted or disconnected"
// (e.g. ReceiverUnsettledDeliveries remote Released outcome), requiring link recovery.
case OPERATION_CANCELLED:
return LINK;

// Fatal errors — do not retry.
case NOT_FOUND:
case UNAUTHORIZED_ACCESS:
case LINK_PAYLOAD_SIZE_EXCEEDED:
case NOT_ALLOWED:
case NOT_IMPLEMENTED:
case ENTITY_DISABLED_ERROR:
case ENTITY_ALREADY_EXISTS:
case PUBLISHER_REVOKED_ERROR:
case ARGUMENT_ERROR:
case ARGUMENT_OUT_OF_RANGE_ERROR:
case ILLEGAL_STATE:
case MESSAGE_LOCK_LOST:
case STORE_LOCK_LOST_ERROR:
return FATAL;

// Server-busy, timeouts, and resource-limit errors — retry on same link.
// RESOURCE_LIMIT_EXCEEDED is treated as transient here because ReactorSender
// groups it alongside SERVER_BUSY and TIMEOUT in its send-error retry logic.
case SERVER_BUSY_ERROR:
case TIMEOUT_ERROR:
case RESOURCE_LIMIT_EXCEEDED:
return NONE;

// Session/lock errors — link-level recovery.
// Session lock loss means the session link is invalid and
// a fresh link must be acquired for a new session.
case SESSION_LOCK_LOST:
case SESSION_CANNOT_BE_LOCKED:
case SESSION_NOT_FOUND:
case MESSAGE_NOT_FOUND:
return LINK;

default:
break;
}
}

// Transient AMQP errors without a specific condition — link recovery.
if (amqpError.isTransient()) {
return LINK;
}

// Non-transient AMQP errors without a recognized condition — fatal.
return FATAL;
}

// RequestResponseChannelClosedException — link-level (parent connection disposing).
if (error instanceof RequestResponseChannelClosedException) {
return LINK;
}

// IllegalStateException thrown by a disposed ReactorSender (e.g., "Cannot publish
// message when disposed." or "Cannot publish data batch when disposed."). This is
// a link-staleness signal: the link was closed (possibly by a concurrent recovery
// path) before the in-flight send could complete. LINK recovery creates a fresh
// link on the next retry.
// Match both "Cannot publish" and "disposed" to avoid misclassifying unrelated
// disposal signals (e.g., "Connection is disposed. Cannot get management instance.").
if (error instanceof IllegalStateException) {
final String msg = error.getMessage();
if (msg != null) {
final String normalizedMsg = msg.toLowerCase(Locale.ROOT);
if (normalizedMsg.contains("cannot publish") && normalizedMsg.contains("disposed")) {
return LINK;
}
}
}

// Unknown non-AMQP errors — treat as fatal (don't retry application or SDK bugs).
// The Go SDK defaults to CONNECTION for unknown errors, but those are AMQP-layer
// errors (io.EOF, net.Error). Java's non-AMQP exceptions (e.g., AzureException,
// RuntimeException) should fail fast rather than trigger connection recovery.
return FATAL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

import java.time.Duration;
import java.util.Locale;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
* Helper class to help with retry policies.
Expand Down Expand Up @@ -106,6 +109,44 @@ public static <T> Mono<T> withRetry(Mono<T> source, AmqpRetryOptions retryOption
return withRetry(source, retryOptions, timeoutMessage, false);
}

/**
* Applies the retry policy with tiered recovery between attempts. Before each retry,
* the error is classified via {@link RecoveryKind#classify(Throwable)} and the recovery
* callback is invoked so the caller can close the appropriate resources (link or connection).
*
* <p>This matches the tiered recovery pattern used by the Go, .NET, Python, and JS SDKs.</p>
*
* @param <T> Type of value in the {@link Mono}.
* @param source The publisher to apply the retry policy to.
* @param retryOptions A {@link AmqpRetryOptions}.
* @param errorMessage Text added to error logs.
* @param recoveryAction Called between retry attempts with the classified {@link RecoveryKind}.
* The caller should close the link (for {@link RecoveryKind#LINK}) or connection
* (for {@link RecoveryKind#CONNECTION}) so the next retry creates fresh resources.
*
* @return A publisher that returns the results of the {@link Mono} if any of the retry attempts
* are successful. Otherwise, propagates the last error.
*/
public static <T> Mono<T> withRetryAndRecovery(Mono<T> source, AmqpRetryOptions retryOptions, String errorMessage,
Consumer<RecoveryKind> recoveryAction) {
return withRetryAndRecovery(source, retryOptions, errorMessage, false, recoveryAction);
}

/**
* Like {@link #withRetryAndRecovery(Mono, AmqpRetryOptions, String, Consumer)} but with an option to allow
* long-running operations that should not be subject to the per-attempt timeout.
*
* @param allowsLongOperation If true, the source Mono will not be wrapped with a per-attempt timeout.
*/
public static <T> Mono<T> withRetryAndRecovery(Mono<T> source, AmqpRetryOptions retryOptions, String errorMessage,
boolean allowsLongOperation, Consumer<RecoveryKind> recoveryAction) {
if (!allowsLongOperation) {
source = source.timeout(retryOptions.getTryTimeout());
}
return source.retryWhen(createRetryWithRecovery(retryOptions, recoveryAction))
.doOnError(error -> LOGGER.error(errorMessage, error));
}

static Retry createRetry(AmqpRetryOptions options) {
final Duration delay = options.getDelay().plus(SERVER_BUSY_WAIT_TIME);
final RetryBackoffSpec retrySpec;
Expand All @@ -129,4 +170,78 @@ static Retry createRetry(AmqpRetryOptions options) {
.filter(error -> error instanceof TimeoutException
|| (error instanceof AmqpException && ((AmqpException) error).isTransient()));
}

/**
* Creates a Reactor {@link Retry} spec that performs tiered recovery between retry attempts.
* Before each retry, the error is classified and the recovery callback is invoked.
*
* <p>Includes a quick-retry optimization matching the Go SDK: on the first LINK or CONNECTION
* error, the retry fires immediately (no backoff) since the error may come from a previously
* stale link and recovery has just created a fresh one.</p>
*/
static Retry createRetryWithRecovery(AmqpRetryOptions options, Consumer<RecoveryKind> recoveryAction) {
final int maxRetries = options.getMaxRetries();
final Duration baseDelay = options.getDelay().plus(SERVER_BUSY_WAIT_TIME);
final Duration maxDelay = options.getMaxDelay();
final boolean isFixed = options.getMode() == com.azure.core.amqp.AmqpRetryMode.FIXED;
final AtomicBoolean didQuickRetry = new AtomicBoolean(false);

return Retry.from(retrySignals -> retrySignals.flatMap(signal -> {
final Throwable failure = signal.failure();
final long attempt = signal.totalRetriesInARow();
final RecoveryKind kind = RecoveryKind.classify(failure);

// FATAL errors — do not retry.
if (kind == RecoveryKind.FATAL) {
return Mono.<Long>error(failure);
}

// Check retry budget.
if (attempt >= maxRetries) {
return Mono.<Long>error(failure);
}

// Perform recovery before retry.
if (kind != RecoveryKind.NONE && recoveryAction != null) {
try {
recoveryAction.accept(kind);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This worries me because these should be handled by downstream subscribers rather than injecting this in the middle of the reactor operation pipeline. Thoughts, @anuchandy ?

Example: https://github.com/Azure/azure-sdk-for-java/pull/48460/changes#diff-42c5fc794930edf5858987b570b871727693454acb27130ac5a9748e55d2de95R1742

} catch (Exception e) {
LOGGER.atWarning().addKeyValue("recoveryKind", kind).log("Recovery action failed.", e);
}
}

// Quick retry: on the FIRST LINK/CONNECTION error, retry immediately (no backoff).
// Uses didQuickRetry flag to prevent repeated immediate retries under persistent
// errors — similar to the Go SDK's didQuickRetry pattern. Unlike Go's ResetAttempts(),
// the attempt counter is not reset here; subsequent retries continue with standard
// exponential backoff from the current attempt count.
// The kind check must come first: short-circuit evaluation prevents consuming the
// flag on NONE/FATAL failures where no quick-retry should be issued.
if ((kind == RecoveryKind.LINK || kind == RecoveryKind.CONNECTION) && !didQuickRetry.getAndSet(true)) {
LOGGER.atInfo().log("Quick retry after {} recovery (first occurrence).", kind);
return Mono.just(attempt);
}

// Standard backoff delay.
final Duration delay;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we have this delay logic in another location so we should consolidate it so users don't inadvertently end up with two different retry durations if they decide to use the normal retry rather than this method

if (isFixed) {
// Cap baseDelay to maxDelay so FIXED mode respects retryOptions.getMaxDelay().
delay = baseDelay.compareTo(maxDelay) > 0 ? maxDelay : baseDelay;
} else {
final long multiplier = 1L << Math.min(attempt, 30);
final long baseMillis = baseDelay.toMillis();
// Guard against overflow: if baseMillis * multiplier would exceed Long.MAX_VALUE,
// saturate to maxDelay (the clamp below would cap it there anyway).
final long millis
= baseMillis > Long.MAX_VALUE / multiplier ? maxDelay.toMillis() : baseMillis * multiplier;
delay = Duration.ofMillis(Math.min(millis, maxDelay.toMillis()));
}
final double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1) * JITTER_FACTOR;
// Clamp the final jittered delay to maxDelay so retryOptions are consistently respected.
final Duration jitteredDelay
= Duration.ofMillis(Math.min((long) (delay.toMillis() * jitter), maxDelay.toMillis()));

return Mono.delay(jitteredDelay).thenReturn(attempt);
}));
}
}
Loading