Skip to content

Add tiered send/receive recovery to azure-core-amqp and azure-messaging-servicebus#48460

Open
EldertGrootenboer wants to merge 21 commits intoAzure:mainfrom
EldertGrootenboer:fix/servicebus-tiered-send-recovery
Open

Add tiered send/receive recovery to azure-core-amqp and azure-messaging-servicebus#48460
EldertGrootenboer wants to merge 21 commits intoAzure:mainfrom
EldertGrootenboer:fix/servicebus-tiered-send-recovery

Conversation

@EldertGrootenboer
Copy link
Contributor

What this PR does

Adds tiered send/receive recovery to azure-core-amqp and azure-messaging-servicebus, matching the recovery pattern used by the Go, .NET, Python, and JS SDKs. When an AMQP operation fails, the error is classified into a recovery tier — NONE, LINK, CONNECTION, or FATAL — and the appropriate resources are closed before retrying.

This resolves a long-standing issue where a single stale AMQP connection or link could cause a Service Bus sender or receiver to become permanently stuck, requiring a JVM restart.

Changes

azure-core-amqp (shared — benefits Event Hubs too)

  • RecoveryKind (new): Error classification enum with classify(Throwable) that maps every AmqpErrorCondition to NONE/LINK/CONNECTION/FATAL
  • RetryUtil.withRetryAndRecovery() (new): Retry wrapper that invokes a recovery callback between attempts. Includes quick-retry optimization for the first LINK/CONNECTION error (matching Go SDK's didQuickRetry)
  • ReactorConnectionCache.forceCloseConnection() (new): Closes the cached connection so the next get() creates a fresh one — for stale connections where AMQP heartbeats are echoed by intermediate infrastructure
  • AmqpChannelProcessor.forceCloseChannel() (new): Same for the v1 connection cache

azure-messaging-servicebus

  • Sender: All send paths (sendBatchInternal, sendFluxInternal, scheduleMessageInternal, getSendLinkWithRetry) now use withRetryAndRecovery with active link disposal + forceCloseConnection on CONNECTION errors
  • Receiver: Receive link creation uses withRetryAndRecovery with session removal on LINK errors + forceCloseConnection on CONNECTION errors
  • Session acquirer: Tiered error classification with connection recovery
  • Session manager: Tiered error classification with link retry + connection recovery
  • ConnectionCacheWrapper.forceCloseConnection(): Delegates to v1/v2 cache

Tests

  • RecoveryKindTest: 26 tests covering all error condition classifications
  • ServiceBusSenderAsyncClientTest: Updated failedSendMessageReportsMetrics to use FATAL error (matching broadened retry filter)
  • All existing tests pass: 638 (core-amqp) + 941 (servicebus) = 1579 tests, 0 failures

Cross-SDK parity

Feature Go .NET Python JS Java (this PR)
Error classification GetRecoveryKind() HasLinkCommunicationError shutdown_handler Event-driven RecoveryKind.classify()
Link recovery Close link + session FaultTolerantAmqpObject Close handler closeLink() link.dispose() + removeSession()
Connection recovery Namespace.Recover() ActiveConnection.GetOrCreateAsync Close handler + reopen refreshConnection() forceCloseConnection()
Quick retry didQuickRetry + ResetAttempts N/A N/A N/A AtomicBoolean didQuickRetry
Recovery timing Before retry attempt Passive (lazy detect) Between retries Event-driven Before retry (doBeforeRetry)

Related issues

Directly fixes or would have prevented:

Add RecoveryKind error classification and recovery-aware retry to azure-core-amqp.
Apply tiered recovery to all Service Bus sender, receiver, and session paths.
On LINK errors: dispose stale link/session, retry with fresh resources.
On CONNECTION errors: force-close the cached connection, retry with fresh connection.
Includes quick-retry optimization and didQuickRetry deduplication.

Fixes Azure#44688
Copilot AI review requested due to automatic review settings March 18, 2026 03:10
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Add azure-core-amqp to AdditionalModules in ci.yml and trigger paths so CI
builds it from source alongside servicebus. Update pom.xml to reference
2.12.0-beta.1 with current tag (cross-module PR -- to be revisited after
azure-core-amqp is released to Maven Central).

Note: uses current tag temporarily; reviewer to confirm release sequencing.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 6 comments.


You can also share your feedback on Copilot code review. Take the survey.

Address 6 comments from Copilot PR review on Azure#48460:

- sendFluxInternal: wrap only batchList (link acquisition) with
  withRetryAndRecovery, not the full sendOperation. Wrapping the
  outer operation caused the user-provided Flux to be re-subscribed
  on each retry and nested retries with sendBatchInternal.

- scheduleMessageInternal: change getSendLink to getSendLinkWithRetry
  so schedule operations get the same tiered recovery as other send paths.

- ReactorConnectionCache.forceCloseConnection: use connection.dispose()
  instead of closeAsync() so isDisposed() returns true synchronously.
  This ensures cacheInvalidateIf invalidates the cached reference
  immediately on the next get() call.

- RetryUtil: replace Math.random() with ThreadLocalRandom.current()
  to eliminate shared RNG contention and improve testability.

- performRecovery comment: remove contradictory 'happens organically'
  comment that conflicted with the explicit forceCloseConnection() call.

- ServiceBusReceiverAsyncClient: add error handler to the subscribe()
  call inside the LINK recovery callback so failures are logged and
  do not silently leak.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 7 comments.


You can also share your feedback on Copilot code review. Take the survey.

- T1: ServiceBusSessionAcquirer - return delay after forceCloseConnection for
  RecoveryKind.CONNECTION so session acquisition retries after connection recovery
- T2: ServiceBusSessionManager - merge LINK and CONNECTION into a single delay
  branch so CONNECTION errors retry instead of falling through to publishError
- T3: ServiceBusSenderAsyncClient - narrow withRetryAndRecovery to wrap only link
  acquisition (getSendLink); messages.collect() moved outside retry boundary to
  avoid re-subscribing user Flux on retry
- T4: RecoveryKind - reclassify OPERATION_CANCELLED from NONE to LINK because
  core-amqp raises it when AMQP layer unexpectedly aborts or disconnects, which
  requires link recovery (e.g. ReceiverUnsettledDeliveries remote Released outcome)
- T5: RecoveryKind - reclassify RESOURCE_LIMIT_EXCEEDED from FATAL to NONE to
  match ReactorSender.isGeneralSendError() which treats it as retriable alongside
  SERVER_BUSY and TIMEOUT
- T6: RetryUtilTest - add four tests for createRetryWithRecovery covering FATAL
  no-retry, LINK recovery callback, CONNECTION recovery callback, and retry
  budget exhaustion; use virtual time for backoff-delay scenarios
- T7: RecoveryKindTest - rename operationCancelled test to expect LINK result,
  add new test asserting RESOURCE_LIMIT_EXCEEDED classifies as NONE
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.


You can also share your feedback on Copilot code review. Take the survey.

- T8: RetryUtil - clamp final jittered delay to maxDelay so retryOptions are
  consistently respected. Previously jitter was applied after the pre-jitter cap
  which could produce a delay exceeding retryOptions.getMaxDelay(). Also cap
  baseDelay to maxDelay in FIXED mode (FIXED previously used baseDelay without
  checking against maxDelay, unlike the EXPONENTIAL path).
- T9: ServiceBusReceiverAsyncClient - fix misleading log message in the LINK/
  CONNECTION recovery callback. The error handler on connectionProcessor.subscribe
  fires only when obtaining the connection fails (not when removeSession fails,
  since it returns a boolean). Renamed to "Error obtaining connection during {}
  recovery." Also log the boolean result of removeSession at VERBOSE level to
  confirm whether a stale session was actually evicted.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.


You can also share your feedback on Copilot code review. Take the survey.

…overy

T10: In ServiceBusSenderAsyncClient.performRecovery(), replace link.dispose()
with link.closeAsync().subscribe(...). ReactorSender.dispose() calls
closeAsync().block(tryTimeout), which blocks the Reactor thread when invoked
from a recovery callback on a non-blocking scheduler.

T11: In ReactorConnectionCache.forceCloseConnection(), replace connection.dispose()
with a non-blocking equivalent: set a new forceInvalidate AtomicBoolean flag before
starting connection.closeAsync().subscribe(...). The cacheInvalidateIf predicate
now checks forceInvalidate in addition to isDisposed(), ensuring the cache is
invalidated synchronously (by the flag) while the close handshake completes
asynchronously. ReactorConnection.dispose() has the same blocking pattern.

T12: Update comment in RetryUtil.createRetryWithRecovery() to remove the
misleading claim that the quick-retry path matches Go's ResetAttempts(). The
Java implementation uses a didQuickRetry flag only (no attempt counter reset);
subsequent retries continue standard exponential backoff from the running count.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.


You can also share your feedback on Copilot code review. Take the survey.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 9 comments.


You can also share your feedback on Copilot code review. Take the survey.

… guard session disposal, improve recovery log context
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.


You can also share your feedback on Copilot code review. Take the survey.

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated no new comments.

// Perform recovery before retry.
if (kind != RecoveryKind.NONE && recoveryAction != null) {
try {
recoveryAction.accept(kind);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This worries me because these should be handled by downstream subscribers rather than injecting this in the middle of the reactor operation pipeline. Thoughts, @anuchandy ?

Example: https://github.com/Azure/azure-sdk-for-java/pull/48460/changes#diff-42c5fc794930edf5858987b570b871727693454acb27130ac5a9748e55d2de95R1742

}

// Standard backoff delay.
final Duration delay;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we have this delay logic in another location so we should consolidate it so users don't inadvertently end up with two different retry durations if they decide to use the normal retry rather than this method

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants