fix: MQTT connection flapping + comprehensive bug fixes#86
Merged
Conversation
When _active_reconnect() creates a new MqttConnection, the old connection was never disconnected. The old SDK connection's built-in auto-reconnect would eventually succeed, creating two active connections with the same client ID. AWS IoT only allows one connection per client ID, so the broker kicks one off, triggering on_connection_interrupted and starting another reconnection — causing an infinite connect/disconnect loop. Changes: - Add MqttConnection.close() method that unconditionally tears down the underlying SDK connection regardless of _connected state (unlike disconnect() which skips when _connected is False after interruption) - _active_reconnect(): close old connection before creating replacement - _deep_reconnect(): use close() unconditionally instead of checking is_connected before calling disconnect() - Add tests for close() method behavior Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Thread safety:
- ensure_device_info_cached: use loop.call_soon_threadsafe for Future
resolution from AWS SDK callback thread (prevents race/crash)
MQTT reconnection:
- Clamp deep_reconnect_threshold to minimum of 1 in config validation
to prevent ZeroDivisionError in reconnection backoff logic
Diagnostics:
- Increment total_reconnect_attempts counter on each connection drop
(was always 0 despite reconnections occurring)
- Replace float('inf') default for shortest_session_seconds with None
in to_dict() to prevent JSON serialization errors
Events:
- Use asyncio.get_running_loop().create_future() instead of bare
asyncio.Future() in wait_for() for proper loop binding
Encoding:
- Make build_reservation_entry temperature validation unit-aware:
defaults are now 35-65°C in metric mode, 95-150°F in US mode
(was hardcoded to Fahrenheit, breaking Celsius users)
- Log warning on malformed reservation hex data with trailing bytes
instead of silently dropping partial entries
Periodic requests:
- Log error and break on unknown PeriodicRequestType instead of
silently doing nothing
Cache:
- Purge expired entries from device_info_cache during get_all_cached()
instead of only filtering them from results (prevents memory leak)
All 486 tests pass (23 new tests added).
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Remove unused imports from test_bug_fixes.py (ruff F401) - Convert test_mqtt_reconnection.py from asyncio.get_event_loop() to @pytest.mark.asyncio (asyncio.get_event_loop() removed in 3.14) - Fix import sorting in test_mqtt_reconnection.py (ruff I001) - Shorten docstring line to fit 80-char limit (ruff E501) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR addresses MQTT connection flapping in the MQTT client by ensuring old AWS IoT SDK connections are torn down before creating replacement connections, and bundles several additional audit-driven bug fixes across diagnostics, events, encoding, periodic requests, and caching.
Changes:
- Add an unconditional
MqttConnection.close()teardown and use it during active/deep reconnect flows to prevent competing connections with the same client ID. - Fix several correctness issues found during audit (thread-safe future resolution from SDK thread, JSON-safe diagnostics serialization, reconnection config validation, event-loop-bound futures).
- Add new tests covering reconnection cleanup and the additional bug fixes.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_mqtt_reconnection.py | New tests for MqttConnection.close() and reconnection cleanup behavior. |
| tests/test_bug_fixes.py | New tests covering config clamping, diagnostics metrics serialization, event wait_for(), reservation encoding/decoding, and cache purge behavior. |
| src/nwp500/mqtt/utils.py | Clamp deep_reconnect_threshold to a minimum of 1 during config initialization. |
| src/nwp500/mqtt/periodic.py | Log and stop the periodic loop on unknown PeriodicRequestType. |
| src/nwp500/mqtt/diagnostics.py | Make metrics JSON-friendly (inf → None) and increment reconnect attempts on drops. |
| src/nwp500/mqtt/connection.py | Introduce close() that disconnects regardless of _connected state and clears internal state to prevent SDK auto-reconnect interference. |
| src/nwp500/mqtt/client.py | Close old connection manager before rebuilding during reconnection; make device-info caching future resolution thread-safe. |
| src/nwp500/events.py | Bind wait_for() futures to the running event loop via get_running_loop().create_future(). |
| src/nwp500/encoding.py | Add warning on trailing bytes in reservation hex decode; make reservation temp validation unit-system aware. |
| src/nwp500/device_info_cache.py | Purge expired entries during get_all_cached() to prevent cache growth. |
- client.py on_feature: move future.done() check inside call_soon_threadsafe callback to eliminate race between done-check and set_result across the SDK thread / event loop thread boundary - tests/test_mqtt_reconnection.py: replace asyncio.Future with concurrent.futures.Future to match what the AWS IoT SDK actually returns, exercising the wrap_future() conversion path in close() - encoding.py build_reservation_entry: read get_unit_system() once into a local variable instead of calling it twice Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Comprehensive bug fix PR addressing the MQTT connection flapping issue plus 9 additional bugs found during a full code audit.
MQTT Connection Flapping (Original Fix)
Problem
MQTT connections flap endlessly: connect → disconnect after 1-7s → reconnect → repeat. All entities become unavailable in Home Assistant.
Root Cause
When
_active_reconnect()creates a newMqttConnection, the old connection is never closed. The old AWS IoT SDK connection's built-in auto-reconnect eventually succeeds, creating two active connections with the same client ID. AWS IoT kicks one off →on_connection_interruptedfires → another reconnection → infinite loop.Fix
MqttConnection.close()method that unconditionally tears down the SDK connection regardless of_connectedstate_active_reconnect(): close old connection before creating replacement_deep_reconnect(): useclose()unconditionally instead of checkingis_connectedAdditional Bug Fixes (Code Audit)
Thread Safety
ensure_device_info_cached: Useloop.call_soon_threadsafe()for Future resolution from AWS SDK callback thread (was callingfuture.set_result()directly from non-asyncio thread — race condition / potential crash)MQTT Reconnection
deep_reconnect_thresholdto minimum of 1 to preventZeroDivisionErrorin reconnection backoff modulo operationDiagnostics
total_reconnect_attemptson each connection drop (was always 0)float('inf')default forshortest_session_secondswithNoneinto_dict()(was breaking JSON serialization)Events
asyncio.get_running_loop().create_future()instead of bareasyncio.Future()inwait_for()Encoding
build_reservation_entry()now uses correct defaults per unit system (35-65°C metric, 95-150°F US). Was hardcoded to Fahrenheit values, causing valid Celsius inputs to fail validation.decode_reservation_hex()logs a warning on trailing bytes instead of silently discarding themPeriodic Requests
PeriodicRequestTypeinstead of silently doing nothingCache
device_info_cacheduringget_all_cached()instead of only filtering resultsTesting
test_mqtt_reconnection.py,test_bug_fixes.py