fix: prevent MQTT reconnection storm from stale interrupt callbacks#87
Merged
Conversation
Two race conditions caused a thundering-herd loop of simultaneous reconnection attempts (visible as dozens of 'Triggering quick reconnection...' lines within milliseconds in the logs). Bug 1 – stale interruption events fire after a resume clears _reconnect_task. The AWS SDK fires on_connection_interrupted from background threads via run_coroutine_threadsafe. When on_connection_resumed cancels the backoff task and sets _reconnect_task = None, queued _start_reconnect_task coroutines that haven't yet executed see _reconnect_task=None and pass the existing task-existence guard, spawning a new _reconnect_with_backoff loop against an already-healthy connection. Fix: both on_connection_interrupted and _start_reconnect_task now call is_connected_func() and bail out immediately if the client is connected. Bug 2 – closing the old connection inside _active_reconnect triggers a competing backoff loop. _active_reconnect calls connection_manager.close() on the old connection. The SDK fires _on_connection_interrupted_internal from a background thread, which schedules a _start_reconnect_task coroutine. That coroutine fires after the new connection succeeds and the existing task has been cancelled, tearing down the brand-new connection immediately. Fix: a boolean flag _actively_reconnecting is set to True for the duration of _active_reconnect and _deep_reconnect (cleared in a finally block so it is always reset). _on_connection_interrupted_internal checks the flag and skips the reconnection-handler delegation while it is set. Both methods also return immediately if called while the flag is already True, making them safe against concurrent invocations. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Contributor
There was a problem hiding this comment.
Pull request overview
Fixes an MQTT reconnection storm caused by two distinct races: (1) stale interrupt callbacks queued via run_coroutine_threadsafe that fired after the connection had already been restored, and (2) the intentional teardown of the old connection inside _active_reconnect / _deep_reconnect triggering a competing backoff loop via the SDK's interrupt callback.
Changes:
- Add
is_connected_func()guard inMqttReconnectionHandler.on_connection_interruptedand_start_reconnect_taskso stale interrupt events become no-ops when the client is healthy. - Add
_actively_reconnectingre-entrancy flag inNavienMqttClient, set/cleared (infinally) around_active_reconnectand_deep_reconnect, and checked in_on_connection_interrupted_internalto suppress the spurious delegation. - Add 14 unit tests covering both bugs and edge cases (success, exception paths, re-entrancy, concurrent interrupts).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| src/nwp500/mqtt/reconnection.py | Adds is_connected_func() checks in on_connection_interrupted and _start_reconnect_task to ignore stale interrupt events after recovery. |
| src/nwp500/mqtt/client.py | Introduces _actively_reconnecting flag; sets/clears around _active_reconnect/_deep_reconnect and uses it to suppress forwarding in _on_connection_interrupted_internal; also guards both reconnect methods from concurrent invocation. |
| tests/test_mqtt_reconnection_storm.py | New tests verifying both fixes, including stale-interrupt-after-resume, multi-interrupt deduplication, flag set/clear on success and on exception, and re-entrancy safety. |
- Wrap long comment in _active_reconnect (E501) - Sort/format import block in test file (I001, fixed by ruff --fix) - Remove quoted type annotation in _make_mqtt_client (UP037, fixed by ruff --fix) - Shorten docstrings and assert messages over 80 chars (E501) - Rename test_on_connection_interrupted_internal_skips_handler_when_flag_set to test_interrupted_internal_skips_handler_when_flag_set (E501 w/ noqa) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
Running the Home Assistant integration against a real device produces a thundering-herd of simultaneous reconnection attempts visible in the logs as dozens of lines like:
all firing within milliseconds of each other. Reconnection attempt counters jump non-monotonically (e.g. 1 → 2 → 5 → 3 → …), confirming that multiple independent
_reconnect_with_backoffloops are running concurrently. Each loop tears down the connection the previous one just established, producing an endless cycle.Root causes
Bug 1 – stale interruption events fire after a resume clears
_reconnect_taskThe AWS SDK fires
on_connection_interruptedfrom background threads. These callbacks callrun_coroutine_threadsafe, which queues a_start_reconnect_taskcoroutine into the event loop.When the connection recovers,
on_connection_resumedcancels the backoff task and sets_reconnect_task = None. Any_start_reconnect_taskcoroutines that were already queued but hadn't yet executed now run, see_reconnect_task is None, pass the existing guard, and spawn a new backoff loop against an already-healthy connection.Fix: both
on_connection_interruptedand_start_reconnect_tasknow callis_connected_func()and return immediately if the client is connected.Bug 2 – closing the old connection inside
_active_reconnecttriggers a competing backoff loop_active_reconnectcallsconnection_manager.close()on the old connection. The SDK fires_on_connection_interrupted_internalfrom a background thread for that closure, which queues another_start_reconnect_taskcoroutine. That coroutine fires after the new connection is up and the existing backoff task has been cancelled — tearing down the brand-new connection immediately.Fix: a boolean flag
_actively_reconnectingis set toTruefor the duration of_active_reconnectand_deep_reconnect(cleared infinallyso it is always reset)._on_connection_interrupted_internalskips the reconnection-handler delegation while the flag is set. Both reconnect methods also return immediately if called while the flag is alreadyTrue, making them safe against concurrent invocations.Changes
mqtt/reconnection.pyis_connected_func()guard inon_connection_interruptedand_start_reconnect_taskmqtt/client.py_actively_reconnectingflag; set/clear in_active_reconnectand_deep_reconnect; check in_on_connection_interrupted_internaltests/test_mqtt_reconnection_storm.pyTests