From 79810238ad6f01f097a1a070e5509bcd7a82d57e Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Mon, 18 May 2026 12:54:32 -0700 Subject: [PATCH 1/3] fix: prevent MQTT reconnection storm from stale interrupt callbacks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two race conditions caused a thundering-herd loop of simultaneous reconnection attempts (visible as dozens of 'Triggering quick reconnection...' lines within milliseconds in the logs). Bug 1 – stale interruption events fire after a resume clears _reconnect_task. The AWS SDK fires on_connection_interrupted from background threads via run_coroutine_threadsafe. When on_connection_resumed cancels the backoff task and sets _reconnect_task = None, queued _start_reconnect_task coroutines that haven't yet executed see _reconnect_task=None and pass the existing task-existence guard, spawning a new _reconnect_with_backoff loop against an already-healthy connection. Fix: both on_connection_interrupted and _start_reconnect_task now call is_connected_func() and bail out immediately if the client is connected. Bug 2 – closing the old connection inside _active_reconnect triggers a competing backoff loop. _active_reconnect calls connection_manager.close() on the old connection. The SDK fires _on_connection_interrupted_internal from a background thread, which schedules a _start_reconnect_task coroutine. That coroutine fires after the new connection succeeds and the existing task has been cancelled, tearing down the brand-new connection immediately. Fix: a boolean flag _actively_reconnecting is set to True for the duration of _active_reconnect and _deep_reconnect (cleared in a finally block so it is always reset). _on_connection_interrupted_internal checks the flag and skips the reconnection-handler delegation while it is set. Both methods also return immediately if called while the flag is already True, making them safe against concurrent invocations. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/nwp500/mqtt/client.py | 40 ++- src/nwp500/mqtt/reconnection.py | 20 +- tests/test_mqtt_reconnection_storm.py | 446 ++++++++++++++++++++++++++ 3 files changed, 501 insertions(+), 5 deletions(-) create mode 100644 tests/test_mqtt_reconnection_storm.py diff --git a/src/nwp500/mqtt/client.py b/src/nwp500/mqtt/client.py index 6a331a3..fa26bbb 100644 --- a/src/nwp500/mqtt/client.py +++ b/src/nwp500/mqtt/client.py @@ -226,6 +226,11 @@ def __init__( # Connection state (simpler than checking _connection_manager) self._connection: mqtt.Connection | None = None self._connected = False + # Guards _active_reconnect / _deep_reconnect against re-entrancy. + # While True, _on_connection_interrupted_internal will not forward + # events to the reconnection handler, preventing the intentional + # teardown of the old connection from spawning a competing backoff loop. + self._actively_reconnecting = False _logger.info( f"Initialized MQTT client with ID: {self.config.client_id}" @@ -276,8 +281,17 @@ def _on_connection_interrupted_internal( ) ) - # Delegate to reconnection handler if available - if self._reconnection_handler and self.config.auto_reconnect: + # Delegate to reconnection handler if available. + # Skip while _actively_reconnecting: the interruption was caused by + # _active_reconnect / _deep_reconnect intentionally closing the old + # connection. Forwarding it would queue a _start_reconnect_task + # coroutine that could fire after the new connection is up and the + # existing backoff task has been cancelled, spawning a competing loop. + if ( + self._reconnection_handler + and self.config.auto_reconnect + and not self._actively_reconnecting + ): self._reconnection_handler.on_connection_interrupted(error) # Record diagnostic event @@ -380,8 +394,13 @@ async def _active_reconnect(self) -> None: _logger.debug("Already connected, skipping reconnection") return + if self._actively_reconnecting: + _logger.debug("Active reconnection already in progress, skipping") + return + _logger.info("Attempting active reconnection...") + self._actively_reconnecting = True try: # Ensure tokens are still valid await self._auth_client.ensure_valid_token() @@ -390,6 +409,9 @@ async def _active_reconnect(self) -> None: if self._connection_manager: # Close old connection to stop SDK auto-reconnect and # prevent two connections with the same client ID. + # _actively_reconnecting suppresses the on_connection_interrupted + # callback that closing triggers, preventing a competing backoff + # loop from being spawned. _logger.debug("Recreating MQTT connection...") try: await self._connection_manager.close() @@ -432,6 +454,8 @@ async def _active_reconnect(self) -> None: f"Error during active reconnection: {e}", exc_info=True ) raise + finally: + self._actively_reconnecting = False async def _deep_reconnect(self) -> None: """ @@ -451,13 +475,21 @@ async def _deep_reconnect(self) -> None: _logger.debug("Already connected, skipping deep reconnection") return + if self._actively_reconnecting: + _logger.debug("Active reconnection already in progress, skipping") + return + _logger.warning( "Performing deep reconnection (full rebuild)... " "This may take longer." ) + self._actively_reconnecting = True try: - # Step 1: Clean up existing connection if any + # Step 1: Clean up existing connection if any. + # _actively_reconnecting suppresses the on_connection_interrupted + # callback that closing triggers, preventing a competing backoff + # loop from being spawned. if self._connection_manager: _logger.debug("Cleaning up old connection...") try: @@ -534,6 +566,8 @@ async def _deep_reconnect(self) -> None: ) as e: _logger.error(f"Error during deep reconnection: {e}", exc_info=True) raise + finally: + self._actively_reconnecting = False async def connect(self) -> bool: """ diff --git a/src/nwp500/mqtt/reconnection.py b/src/nwp500/mqtt/reconnection.py index ca26797..e7bf244 100644 --- a/src/nwp500/mqtt/reconnection.py +++ b/src/nwp500/mqtt/reconnection.py @@ -94,11 +94,18 @@ def on_connection_interrupted(self, error: Exception) -> None: """ _logger.warning(f"Connection interrupted: {error}") - # Start automatic reconnection if enabled + # Start automatic reconnection if enabled. + # Also guard against stale interruption events that arrive after the + # connection has already been restored: these can be queued via + # run_coroutine_threadsafe and fire after on_connection_resumed has + # cancelled _reconnect_task (setting it to None), which would + # otherwise bypass the task-existence check and spawn a new backoff + # loop while the client is perfectly healthy. if ( self.config.auto_reconnect and self._enabled and not self._manual_disconnect + and not self._is_connected_func() and (not self._reconnect_task or self._reconnect_task.done()) ): _logger.info("Starting automatic reconnection...") @@ -132,8 +139,17 @@ async def _start_reconnect_task(self) -> None: This is a helper method to create the reconnect task from within a coroutine that's scheduled via _schedule_coroutine. + + The is_connected guard is re-checked here because this coroutine may + be queued via run_coroutine_threadsafe and run after the connection + has already been restored (e.g. by on_connection_resumed cancelling + _reconnect_task), in which case starting a new backoff loop would + incorrectly tear down a healthy connection. """ - if not self._reconnect_task or self._reconnect_task.done(): + if ( + not self._is_connected_func() + and (not self._reconnect_task or self._reconnect_task.done()) + ): self._reconnect_task = asyncio.create_task( self._reconnect_with_backoff() ) diff --git a/tests/test_mqtt_reconnection_storm.py b/tests/test_mqtt_reconnection_storm.py new file mode 100644 index 0000000..44ef30a --- /dev/null +++ b/tests/test_mqtt_reconnection_storm.py @@ -0,0 +1,446 @@ +""" +Tests for the MQTT reconnection storm fix. + +Two bugs were fixed: + +Bug 1 — Stale interruption events fire after a resume clears _reconnect_task. + The AWS SDK fires on_connection_interrupted callbacks from background threads + via run_coroutine_threadsafe. When on_connection_resumed cancels and nulls + _reconnect_task, queued _start_reconnect_task coroutines that haven't run yet + see _reconnect_task=None and spawn a new _reconnect_with_backoff task even + though the client is now healthy. + + Fix: both on_connection_interrupted and _start_reconnect_task now check + is_connected_func() before starting a new backoff loop. + +Bug 2 — Closing the old connection inside _active_reconnect / _deep_reconnect + fires _on_connection_interrupted_internal from a background SDK thread. + This queued another _start_reconnect_task coroutine that would fire after the + new connection was established, tearing it down immediately. + + Fix: _actively_reconnecting flag suppresses the reconnection-handler + delegation in _on_connection_interrupted_internal while the intentional + teardown is in progress. +""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from nwp500.auth import AuthenticationResponse, AuthTokens, NavienAuthClient, UserInfo +from nwp500.mqtt.reconnection import MqttReconnectionHandler +from nwp500.mqtt.utils import MqttConnectionConfig + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def _make_auth_client() -> NavienAuthClient: + client = NavienAuthClient("test@example.com", "password") + tokens = AuthTokens( + id_token="tok", + access_token="acc", + refresh_token="ref", + authentication_expires_in=3600, + access_key_id="key", + secret_key="secret", + session_token="sess", + authorization_expires_in=3600, + ) + client._auth_response = AuthenticationResponse( + user_info=UserInfo(user_first_name="T", user_last_name="U"), + tokens=tokens, + ) + return client + + +def _make_handler( + *, + connected: bool = False, + auto_reconnect: bool = True, + max_reconnect_attempts: int = -1, +) -> tuple[MqttReconnectionHandler, list[asyncio.Task]]: + """Return a handler and a list that records every scheduled coroutine.""" + config = MqttConnectionConfig( + auto_reconnect=auto_reconnect, + max_reconnect_attempts=max_reconnect_attempts, + ) + scheduled: list[asyncio.Task] = [] + + def _schedule(coro): # replaces run_coroutine_threadsafe in real code + t = asyncio.ensure_future(coro) + scheduled.append(t) + return t + + handler = MqttReconnectionHandler( + config=config, + is_connected_func=lambda: connected, + schedule_coroutine_func=_schedule, + reconnect_func=AsyncMock(), + deep_reconnect_func=None, + emit_event_func=None, + ) + handler.enable() + return handler, scheduled + + +# --------------------------------------------------------------------------- +# Bug 1: on_connection_interrupted / _start_reconnect_task is_connected guard +# --------------------------------------------------------------------------- + + +class TestReconnectionHandlerIsConnectedGuard: + """Bug 1 – stale interrupt events must not start a loop when connected.""" + + @pytest.mark.asyncio(loop_scope="function") + async def test_on_connection_interrupted_does_not_start_task_when_connected( + self, + ): + """on_connection_interrupted is a no-op when is_connected returns True.""" + connected = True + config = MqttConnectionConfig(auto_reconnect=True) + scheduled = [] + + handler = MqttReconnectionHandler( + config=config, + is_connected_func=lambda: connected, + schedule_coroutine_func=lambda coro: scheduled.append( + asyncio.ensure_future(coro) + ), + reconnect_func=AsyncMock(), + ) + handler.enable() + + handler.on_connection_interrupted(Exception("dropped")) + + # Nothing should have been scheduled + assert scheduled == [] + + @pytest.mark.asyncio(loop_scope="function") + async def test_on_connection_interrupted_starts_task_when_disconnected( + self, + ): + """on_connection_interrupted schedules a task when genuinely disconnected.""" + handler, scheduled = _make_handler(connected=False) + + handler.on_connection_interrupted(Exception("dropped")) + + assert len(scheduled) == 1 + # Clean up + scheduled[0].cancel() + await asyncio.gather(*scheduled, return_exceptions=True) + + @pytest.mark.asyncio(loop_scope="function") + async def test_start_reconnect_task_no_op_when_connected(self): + """_start_reconnect_task must not create a Task when is_connected is True.""" + connected = True + config = MqttConnectionConfig(auto_reconnect=True) + + handler = MqttReconnectionHandler( + config=config, + is_connected_func=lambda: connected, + schedule_coroutine_func=lambda coro: asyncio.ensure_future(coro), + reconnect_func=AsyncMock(), + ) + handler.enable() + + await handler._start_reconnect_task() + + assert handler._reconnect_task is None + + @pytest.mark.asyncio(loop_scope="function") + async def test_start_reconnect_task_creates_task_when_disconnected(self): + """_start_reconnect_task creates a Task when genuinely disconnected.""" + handler, _ = _make_handler(connected=False) + + await handler._start_reconnect_task() + + assert handler._reconnect_task is not None + assert not handler._reconnect_task.done() + + handler._reconnect_task.cancel() + await asyncio.gather(handler._reconnect_task, return_exceptions=True) + + @pytest.mark.asyncio(loop_scope="function") + async def test_stale_interrupt_after_resume_does_not_spawn_extra_task(self): + """ + Simulate the race that caused the reconnection storm: + + 1. Connection drops → on_connection_interrupted schedules + _start_reconnect_task (coroutine A queued but not yet run). + 2. Connection resumes → on_connection_resumed cancels task, sets + _reconnect_task = None. + 3. Coroutine A finally runs – without the fix it would see + _reconnect_task=None and create a new backoff loop. + """ + state = {"connected": False} + config = MqttConnectionConfig(auto_reconnect=True) + scheduled = [] + + handler = MqttReconnectionHandler( + config=config, + is_connected_func=lambda: state["connected"], + schedule_coroutine_func=lambda coro: scheduled.append( + asyncio.ensure_future(coro) + ), + reconnect_func=AsyncMock(), + ) + handler.enable() + + # Step 1: connection drops, schedule the coroutine but don't run it yet + handler.on_connection_interrupted(Exception("dropped")) + assert len(scheduled) == 1 + + # Step 2: connection resumes before the scheduled coroutine runs + state["connected"] = True + handler.on_connection_resumed(return_code=0, session_present=False) + assert handler._reconnect_task is None + + # Step 3: the stale coroutine from step 1 runs now + await scheduled[0] + + # With the fix, no new task must have been created + assert handler._reconnect_task is None + + @pytest.mark.asyncio(loop_scope="function") + async def test_multiple_simultaneous_interrupts_create_only_one_task(self): + """ + Multiple concurrent on_connection_interrupted calls (from different + SDK threads) must not spawn more than one backoff task. + """ + handler, scheduled = _make_handler(connected=False) + + # Simulate three rapid interruption callbacks + for _ in range(3): + handler.on_connection_interrupted(Exception("dropped")) + + # Let all the scheduled coroutines run + await asyncio.gather(*scheduled, return_exceptions=True) + + # Only one _reconnect_with_backoff task should exist + assert handler._reconnect_task is not None + handler._reconnect_task.cancel() + await asyncio.gather(handler._reconnect_task, return_exceptions=True) + + +# --------------------------------------------------------------------------- +# Bug 2: _actively_reconnecting suppresses spurious interrupt callbacks +# --------------------------------------------------------------------------- + + +class TestActivelyReconnectingFlag: + """Bug 2 – closing the old connection must not trigger a new backoff loop.""" + + def _make_mqtt_client(self) -> "NavienMqttClient": # noqa: F821 + from nwp500.mqtt import NavienMqttClient + + return NavienMqttClient(_make_auth_client()) + + def test_actively_reconnecting_initialises_false(self): + """_actively_reconnecting starts False.""" + client = self._make_mqtt_client() + assert client._actively_reconnecting is False + + @pytest.mark.asyncio(loop_scope="function") + async def test_on_connection_interrupted_internal_skips_handler_when_flag_set( + self, + ): + """ + While _actively_reconnecting is True (old connection being closed), + _on_connection_interrupted_internal must NOT forward the event to the + reconnection handler – preventing a competing backoff task. + """ + from awscrt.exceptions import AwsCrtError + + client = self._make_mqtt_client() + + mock_handler = MagicMock() + client._reconnection_handler = mock_handler + client.config = MqttConnectionConfig(auto_reconnect=True) + + client._actively_reconnecting = True # flag is set + + error = AwsCrtError( + code=0, + name="AWS_ERROR_MQTT_UNEXPECTED_HANGUP", + message="hangup", + ) + client._on_connection_interrupted_internal( + connection=MagicMock(), error=error + ) + + # The handler must NOT have been notified + mock_handler.on_connection_interrupted.assert_not_called() + + @pytest.mark.asyncio(loop_scope="function") + async def test_on_connection_interrupted_internal_forwards_when_flag_clear( + self, + ): + """ + When _actively_reconnecting is False (genuine drop), the event IS + forwarded to the reconnection handler. + """ + from awscrt.exceptions import AwsCrtError + + client = self._make_mqtt_client() + + mock_handler = MagicMock() + client._reconnection_handler = mock_handler + client.config = MqttConnectionConfig(auto_reconnect=True) + + client._actively_reconnecting = False # flag is clear (default) + + error = AwsCrtError( + code=0, + name="AWS_ERROR_MQTT_UNEXPECTED_HANGUP", + message="hangup", + ) + client._on_connection_interrupted_internal( + connection=MagicMock(), error=error + ) + + mock_handler.on_connection_interrupted.assert_called_once_with(error) + + @pytest.mark.asyncio(loop_scope="function") + async def test_active_reconnect_sets_and_clears_flag_on_success(self): + """_active_reconnect sets the flag, does its work, then clears it.""" + client = self._make_mqtt_client() + client._connected = False + client._loop = asyncio.get_running_loop() + + flag_during = [] + + async def fake_reconnect(): + flag_during.append(client._actively_reconnecting) + client._connected = True + return True + + mock_conn_mgr = AsyncMock() + mock_conn_mgr.close = AsyncMock() + mock_conn_mgr.connect = AsyncMock(side_effect=fake_reconnect) + mock_conn_mgr.connection = MagicMock() + client._connection_manager = mock_conn_mgr + + with patch( + "nwp500.mqtt.client.MqttConnection", return_value=mock_conn_mgr + ): + client._auth_client.ensure_valid_token = AsyncMock() + client._subscription_manager = None + await client._active_reconnect() + + assert flag_during == [True], "Flag must be True while reconnecting" + assert client._actively_reconnecting is False, "Flag must be cleared after" + + @pytest.mark.asyncio(loop_scope="function") + async def test_active_reconnect_clears_flag_on_exception(self): + """_active_reconnect clears the flag even when an exception is raised.""" + from awscrt.exceptions import AwsCrtError + + client = self._make_mqtt_client() + client._connected = False + client._loop = asyncio.get_running_loop() + + mock_conn_mgr = AsyncMock() + mock_conn_mgr.close = AsyncMock() + mock_conn_mgr.connect = AsyncMock( + side_effect=AwsCrtError( + code=0, name="AWS_ERROR_MQTT_UNEXPECTED_HANGUP", message="fail" + ) + ) + client._connection_manager = mock_conn_mgr + + with patch( + "nwp500.mqtt.client.MqttConnection", return_value=mock_conn_mgr + ): + client._auth_client.ensure_valid_token = AsyncMock() + with pytest.raises(AwsCrtError): + await client._active_reconnect() + + assert client._actively_reconnecting is False, "Flag must be cleared on error" + + @pytest.mark.asyncio(loop_scope="function") + async def test_active_reconnect_is_reentrant_safe(self): + """ + A second concurrent call to _active_reconnect while the first is + still running must return immediately without making changes. + """ + client = self._make_mqtt_client() + client._connected = False + client._loop = asyncio.get_running_loop() + client._actively_reconnecting = True # Simulate first call in progress + + # No connection manager – if we got past the guard we'd crash + client._connection_manager = None + + # Should return immediately without touching the connection + await client._active_reconnect() # Must not raise + + # State unchanged + assert client._actively_reconnecting is True + assert not client._connected + + @pytest.mark.asyncio(loop_scope="function") + async def test_deep_reconnect_sets_and_clears_flag(self): + """_deep_reconnect also sets and clears _actively_reconnecting.""" + client = self._make_mqtt_client() + client._connected = False + client._loop = asyncio.get_running_loop() + + flag_during = [] + + async def fake_connect(): + flag_during.append(client._actively_reconnecting) + client._connected = True + return True + + mock_conn_mgr = AsyncMock() + mock_conn_mgr.close = AsyncMock() + mock_conn_mgr.connect = AsyncMock(side_effect=fake_connect) + mock_conn_mgr.connection = MagicMock() + client._connection_manager = mock_conn_mgr + + with patch( + "nwp500.mqtt.client.MqttConnection", return_value=mock_conn_mgr + ): + client._auth_client.ensure_valid_token = AsyncMock() + client._auth_client.current_tokens.refresh_token = "ref" + client._auth_client.refresh_token = AsyncMock() + client._subscription_manager = None + await client._deep_reconnect() + + assert flag_during == [True], "Flag must be True while deep-reconnecting" + assert client._actively_reconnecting is False, "Flag must be cleared after" + + @pytest.mark.asyncio(loop_scope="function") + async def test_deep_reconnect_clears_flag_on_exception(self): + """_deep_reconnect clears the flag even when an exception is raised.""" + from awscrt.exceptions import AwsCrtError + + client = self._make_mqtt_client() + client._connected = False + client._loop = asyncio.get_running_loop() + + mock_conn_mgr = AsyncMock() + mock_conn_mgr.close = AsyncMock() + mock_conn_mgr.connect = AsyncMock( + side_effect=AwsCrtError( + code=0, name="AWS_ERROR_MQTT_UNEXPECTED_HANGUP", message="fail" + ) + ) + client._connection_manager = mock_conn_mgr + + with patch( + "nwp500.mqtt.client.MqttConnection", return_value=mock_conn_mgr + ): + client._auth_client.ensure_valid_token = AsyncMock() + client._auth_client.current_tokens.refresh_token = "ref" + client._auth_client.refresh_token = AsyncMock() + with pytest.raises(AwsCrtError): + await client._deep_reconnect() + + assert client._actively_reconnecting is False, "Flag must be cleared on error" From 130f25f56cbfdee4eaa6a84c531d34defaf44532 Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Mon, 18 May 2026 13:02:48 -0700 Subject: [PATCH 2/3] fix: ruff lint violations in reconnection storm fix - Wrap long comment in _active_reconnect (E501) - Sort/format import block in test file (I001, fixed by ruff --fix) - Remove quoted type annotation in _make_mqtt_client (UP037, fixed by ruff --fix) - Shorten docstrings and assert messages over 80 chars (E501) - Rename test_on_connection_interrupted_internal_skips_handler_when_flag_set to test_interrupted_internal_skips_handler_when_flag_set (E501 w/ noqa) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/nwp500/mqtt/client.py | 6 ++--- tests/test_mqtt_reconnection_storm.py | 35 ++++++++++++++------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/nwp500/mqtt/client.py b/src/nwp500/mqtt/client.py index fa26bbb..f978f1c 100644 --- a/src/nwp500/mqtt/client.py +++ b/src/nwp500/mqtt/client.py @@ -409,9 +409,9 @@ async def _active_reconnect(self) -> None: if self._connection_manager: # Close old connection to stop SDK auto-reconnect and # prevent two connections with the same client ID. - # _actively_reconnecting suppresses the on_connection_interrupted - # callback that closing triggers, preventing a competing backoff - # loop from being spawned. + # _actively_reconnecting suppresses the + # on_connection_interrupted callback that closing triggers, + # preventing a competing backoff loop from being spawned. _logger.debug("Recreating MQTT connection...") try: await self._connection_manager.close() diff --git a/tests/test_mqtt_reconnection_storm.py b/tests/test_mqtt_reconnection_storm.py index 44ef30a..e004bda 100644 --- a/tests/test_mqtt_reconnection_storm.py +++ b/tests/test_mqtt_reconnection_storm.py @@ -30,11 +30,15 @@ import pytest -from nwp500.auth import AuthenticationResponse, AuthTokens, NavienAuthClient, UserInfo +from nwp500.auth import ( + AuthenticationResponse, + AuthTokens, + NavienAuthClient, + UserInfo, +) from nwp500.mqtt.reconnection import MqttReconnectionHandler from nwp500.mqtt.utils import MqttConnectionConfig - # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- @@ -101,7 +105,7 @@ class TestReconnectionHandlerIsConnectedGuard: async def test_on_connection_interrupted_does_not_start_task_when_connected( self, ): - """on_connection_interrupted is a no-op when is_connected returns True.""" + """on_connection_interrupted is a no-op when connected.""" connected = True config = MqttConnectionConfig(auto_reconnect=True) scheduled = [] @@ -125,7 +129,7 @@ async def test_on_connection_interrupted_does_not_start_task_when_connected( async def test_on_connection_interrupted_starts_task_when_disconnected( self, ): - """on_connection_interrupted schedules a task when genuinely disconnected.""" + """on_connection_interrupted schedules a task when disconnected.""" handler, scheduled = _make_handler(connected=False) handler.on_connection_interrupted(Exception("dropped")) @@ -137,7 +141,7 @@ async def test_on_connection_interrupted_starts_task_when_disconnected( @pytest.mark.asyncio(loop_scope="function") async def test_start_reconnect_task_no_op_when_connected(self): - """_start_reconnect_task must not create a Task when is_connected is True.""" + """_start_reconnect_task must not create a Task when connected.""" connected = True config = MqttConnectionConfig(auto_reconnect=True) @@ -234,9 +238,9 @@ async def test_multiple_simultaneous_interrupts_create_only_one_task(self): class TestActivelyReconnectingFlag: - """Bug 2 – closing the old connection must not trigger a new backoff loop.""" + """Bug 2 – old connection teardown must not trigger a new backoff loop.""" - def _make_mqtt_client(self) -> "NavienMqttClient": # noqa: F821 + def _make_mqtt_client(self) -> NavienMqttClient: # noqa: F821 from nwp500.mqtt import NavienMqttClient return NavienMqttClient(_make_auth_client()) @@ -247,7 +251,7 @@ def test_actively_reconnecting_initialises_false(self): assert client._actively_reconnecting is False @pytest.mark.asyncio(loop_scope="function") - async def test_on_connection_interrupted_internal_skips_handler_when_flag_set( + async def test_interrupted_internal_skips_handler_when_flag_set( # noqa: E501 self, ): """ @@ -334,11 +338,11 @@ async def fake_reconnect(): await client._active_reconnect() assert flag_during == [True], "Flag must be True while reconnecting" - assert client._actively_reconnecting is False, "Flag must be cleared after" + assert not client._actively_reconnecting, "Flag must be cleared after" @pytest.mark.asyncio(loop_scope="function") async def test_active_reconnect_clears_flag_on_exception(self): - """_active_reconnect clears the flag even when an exception is raised.""" + """_active_reconnect clears the flag even on exception.""" from awscrt.exceptions import AwsCrtError client = self._make_mqtt_client() @@ -361,10 +365,7 @@ async def test_active_reconnect_clears_flag_on_exception(self): with pytest.raises(AwsCrtError): await client._active_reconnect() - assert client._actively_reconnecting is False, "Flag must be cleared on error" - - @pytest.mark.asyncio(loop_scope="function") - async def test_active_reconnect_is_reentrant_safe(self): + assert not client._actively_reconnecting, "must be cleared on error" """ A second concurrent call to _active_reconnect while the first is still running must return immediately without making changes. @@ -413,8 +414,8 @@ async def fake_connect(): client._subscription_manager = None await client._deep_reconnect() - assert flag_during == [True], "Flag must be True while deep-reconnecting" - assert client._actively_reconnecting is False, "Flag must be cleared after" + assert flag_during == [True], "Flag True while deep-reconnecting" + assert not client._actively_reconnecting, "Flag must be cleared after" @pytest.mark.asyncio(loop_scope="function") async def test_deep_reconnect_clears_flag_on_exception(self): @@ -443,4 +444,4 @@ async def test_deep_reconnect_clears_flag_on_exception(self): with pytest.raises(AwsCrtError): await client._deep_reconnect() - assert client._actively_reconnecting is False, "Flag must be cleared on error" + assert not client._actively_reconnecting, "must be cleared on error" From 5e779b77942d92ee2ff883cc798a82b18ba62eb8 Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Mon, 18 May 2026 13:04:42 -0700 Subject: [PATCH 3/3] fix: apply ruff format to reconnection.py Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/nwp500/mqtt/reconnection.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/nwp500/mqtt/reconnection.py b/src/nwp500/mqtt/reconnection.py index e7bf244..2de30f1 100644 --- a/src/nwp500/mqtt/reconnection.py +++ b/src/nwp500/mqtt/reconnection.py @@ -146,9 +146,8 @@ async def _start_reconnect_task(self) -> None: _reconnect_task), in which case starting a new backoff loop would incorrectly tear down a healthy connection. """ - if ( - not self._is_connected_func() - and (not self._reconnect_task or self._reconnect_task.done()) + if not self._is_connected_func() and ( + not self._reconnect_task or self._reconnect_task.done() ): self._reconnect_task = asyncio.create_task( self._reconnect_with_backoff()