From a0995ed828f55d3a3c823c694f00f4b83fd0ac06 Mon Sep 17 00:00:00 2001 From: bj00rn Date: Mon, 14 Jul 2025 20:27:46 +0200 Subject: [PATCH 01/29] replace gmqtt with aiomqtt --- poetry.lock | 47 +++--- pyproject.toml | 2 +- src/configuration/__init__.py | 7 +- src/configuration/parser.py | 2 +- src/log_config.py | 2 +- src/main.py | 1 + src/publisher/mqtt_publisher.py | 243 +++++++++++++++++++------------- tests/test_mqtt_publisher.py | 2 +- 8 files changed, 184 insertions(+), 122 deletions(-) diff --git a/poetry.lock b/poetry.lock index 04bd68a0..669dfa2f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,20 @@ # This file is automatically @generated by Poetry 2.4.1 and should not be changed by hand. +[[package]] +name = "aiomqtt" +version = "2.5.1" +description = "The idiomatic asyncio MQTT client" +optional = false +python-versions = "<4.0,>=3.8" +groups = ["main"] +files = [ + {file = "aiomqtt-2.5.1-py3-none-any.whl", hash = "sha256:fd58c3593160e4d475d90ce911cdfc4239cd64de96b0ba22edf6c86bd7afa278"}, + {file = "aiomqtt-2.5.1.tar.gz", hash = "sha256:25a0a47d157e8f158d2da1110ea4786c0615518751e94f7b04976c977a8ff20d"}, +] + +[package.dependencies] +paho-mqtt = ">=2.1.0,<3.0.0" + [[package]] name = "anyio" version = "4.12.1" @@ -270,21 +285,6 @@ files = [ {file = "filelock-3.29.0.tar.gz", hash = "sha256:69974355e960702e789734cb4871f884ea6fe50bd8404051a3530bc07809cf90"}, ] -[[package]] -name = "gmqtt" -version = "0.7.0" -description = "Client for MQTT protocol" -optional = false -python-versions = ">=3.5" -groups = ["main"] -files = [ - {file = "gmqtt-0.7.0-py3-none-any.whl", hash = "sha256:3e5571a20e9c115d83d600caa228b06f716087653e241035e29cec73277b52cc"}, - {file = "gmqtt-0.7.0.tar.gz", hash = "sha256:bedfec7bac26b6b4ce1f0c4c32cff3d663526a54c882d323d41560fc3b9b44a2"}, -] - -[package.extras] -test = ["atomicwrites (>=1.3.0)", "attrs (>=19.1.0)", "codecov (>=2.0.15)", "coverage (>=4.5.3)", "more-itertools (>=7.0.0)", "pluggy (>=0.11.0)", "py (>=1.8.0)", "pytest (>=5.4.0)", "pytest-asyncio (>=0.12.0)", "pytest-cov (>=2.7.1)", "six (>=1.12.0)", "uvloop (>=0.14.0)"] - [[package]] name = "h11" version = "0.16.0" @@ -641,6 +641,21 @@ files = [ {file = "packaging-26.0.tar.gz", hash = "sha256:00243ae351a257117b6a241061796684b084ed1c516a08c48a3f7e147a9d80b4"}, ] +[[package]] +name = "paho-mqtt" +version = "2.1.0" +description = "MQTT version 5.0/3.1.1 client class" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee"}, + {file = "paho_mqtt-2.1.0.tar.gz", hash = "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834"}, +] + +[package.extras] +proxy = ["pysocks"] + [[package]] name = "pathspec" version = "1.0.4" @@ -1134,4 +1149,4 @@ python-discovery = ">=1.2.2" [metadata] lock-version = "2.1" python-versions = ">=3.12,<4.0" -content-hash = "ba4f444f330a6cb58a06926a1c2701c544ffeaeda02107641361b7abaa61e4af" +content-hash = "2f857bf23052a67c88f8f78ebf3698a9327bfc3333cc22984bf8ed1bf127c4bf" diff --git a/pyproject.toml b/pyproject.toml index afda254d..8f35e6a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,10 +17,10 @@ requires-python = '>=3.12,<4.0' dependencies = [ "saic-ismart-client-ng (>=0.9.3,<0.10.0)", 'httpx (>=0.28.1,<0.29.0)', - 'gmqtt (>=0.7.0,<0.8.0)', 'inflection (>=0.5.1,<0.6.0)', 'apscheduler (>=3.11.0,<4.0.0)', 'python-dotenv (>=1.1.1,<2.0.0)', + "aiomqtt (>=2.4.0,<3.0.0)", ] [project.urls] diff --git a/src/configuration/__init__.py b/src/configuration/__init__.py index 7173f372..63ca6937 100644 --- a/src/configuration/__init__.py +++ b/src/configuration/__init__.py @@ -1,7 +1,7 @@ from __future__ import annotations from enum import Enum -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal if TYPE_CHECKING: from zoneinfo import ZoneInfo @@ -9,8 +9,11 @@ from integrations.openwb.charging_station import ChargingStation +Transport = Literal["tcp", "websockets"] + + class TransportProtocol(Enum): - def __init__(self, transport_mechanism: str, with_tls: bool) -> None: + def __init__(self, transport_mechanism: Transport, with_tls: bool) -> None: self.transport_mechanism = transport_mechanism self.with_tls = with_tls diff --git a/src/configuration/parser.py b/src/configuration/parser.py index 99456ab5..284d0225 100644 --- a/src/configuration/parser.py +++ b/src/configuration/parser.py @@ -105,7 +105,7 @@ def __parse_mqtt_transport(args: Namespace, config: Configuration) -> None: args.tls_server_cert_check_hostname ) else: - msg = f"Invalid MQTT URI scheme: {parse_result.scheme}, use tcp or ws" + msg = f"Invalid MQTT URI scheme: {parse_result.scheme}, use tls, tcp or ws" raise SystemExit(msg) if parse_result.port: diff --git a/src/log_config.py b/src/log_config.py index 9c96caed..2a49e3a4 100644 --- a/src/log_config.py +++ b/src/log_config.py @@ -7,7 +7,7 @@ MODULES_DEFAULT_LOG_LEVEL = { "asyncio": "WARNING", - "gmqtt": "WARNING", + "aiomqtt": "WARNING", "httpcore": "WARNING", "httpx": "WARNING", "saic_ismart_client_ng": "WARNING", diff --git a/src/main.py b/src/main.py index dc47e092..e2d90d96 100644 --- a/src/main.py +++ b/src/main.py @@ -27,4 +27,5 @@ configuration = process_command_line() mqtt_gateway = MqttGateway(configuration) + asyncio.run(mqtt_gateway.run(), debug=debug_log_enabled()) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index ed535d48..18e50cd8 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -1,11 +1,12 @@ from __future__ import annotations +import asyncio import logging import math import ssl -from typing import TYPE_CHECKING, Any, Final, cast, override +from typing import TYPE_CHECKING, Any, override -import gmqtt +import aiomqtt import mqtt_topics from publisher.core import Publisher @@ -19,7 +20,10 @@ class MqttPublisher(Publisher): - def __init__(self, configuration: Configuration) -> None: + def __init__( + self, + configuration: Configuration, + ) -> None: super().__init__(configuration) self.publisher_id = configuration.mqtt_client_id self.host = self.configuration.mqtt_host @@ -30,121 +34,140 @@ def __init__(self, configuration: Configuration) -> None: self.vin_by_charger_connected_topic: dict[str, str] = {} self.vin_by_imported_energy_topic: dict[str, str] = {} self.first_connection = True + self.client: None | aiomqtt.Client = None + self.__running: asyncio.Task[None] | None = None + self.__connected = asyncio.Event() - mqtt_client = gmqtt.Client( - client_id=str(self.publisher_id), - transport=self.transport_protocol.transport_mechanism, - will_message=gmqtt.Message( - topic=self.get_topic(mqtt_topics.INTERNAL_LWT, False), - payload="offline", - retain=True, - ), - ) - mqtt_client.on_connect = self.__on_connect - mqtt_client.on_message = self.__on_message - self.client: Final[gmqtt.Client] = mqtt_client - - @override - async def connect(self) -> None: - if self.configuration.mqtt_user is not None: - if self.configuration.mqtt_password is not None: - self.client.set_auth_credentials( - username=self.configuration.mqtt_user, - password=self.configuration.mqtt_password, - ) - else: - self.client.set_auth_credentials(username=self.configuration.mqtt_user) - + async def __run_loop(self) -> None: + if not self.host: + LOG.info("MQTT host is not configured") + return + ssl_context: ssl.SSLContext | None = None if self.transport_protocol.with_tls: ssl_context = ssl.create_default_context() - cert_uri = self.configuration.tls_server_cert_path - if cert_uri: - LOG.debug(f"Using custom CA file {cert_uri}") - ssl_context.load_verify_locations(cafile=cert_uri) + if self.configuration.tls_server_cert_path: + LOG.debug( + f"Using custom CA file {self.configuration.tls_server_cert_path}" + ) + ssl_context.load_verify_locations( + cafile=self.configuration.tls_server_cert_path + ) if not self.configuration.tls_server_cert_check_hostname: LOG.warning( f"Skipping hostname check for TLS connection to {self.host}" ) - ssl_context.check_hostname = False - else: - ssl_context = None - await self.client.connect( - host=self.host, + + client = aiomqtt.Client( + hostname=self.host, port=self.port, - version=gmqtt.constants.MQTTv311, - ssl=ssl_context, + identifier=str(self.publisher_id) + "a", + transport=self.transport_protocol.transport_mechanism, + username=self.configuration.mqtt_user or None, + password=self.configuration.mqtt_password or None, + clean_session=True, + tls_context=ssl_context, + tls_insecure=bool( + ssl_context and not self.configuration.tls_server_cert_check_hostname + ), + will=aiomqtt.Will( + topic=self.get_topic(mqtt_topics.INTERNAL_LWT, False), + payload="offline", + retain=True, + ), ) - - def __on_connect( - self, _client: Any, _flags: Any, rc: int, _properties: Any - ) -> None: - if rc == gmqtt.constants.CONNACK_ACCEPTED: - LOG.info("Connected to MQTT broker") - if not self.first_connection: - self.enable_commands() - if self.command_listener is not None: - self.command_listener.on_mqtt_reconnected() - self.first_connection = False - self.keepalive() - else: - if rc == gmqtt.constants.CONNACK_REFUSED_BAD_USERNAME_PASSWORD: - LOG.error( - f"MQTT connection error: bad username or password. Return code {rc}" - ) - elif rc == gmqtt.constants.CONNACK_REFUSED_PROTOCOL_VERSION: + client.pending_calls_threshold = 150 + self.client = client + reconnect_interval = 5 + while True: + try: + async with client: + self.__connected.set() + await self.__on_connect() + async for message in client.messages: + await self._on_message( + client, + str(message.topic), + message.payload, + message.qos, + message.properties, + ) + except aiomqtt.MqttError as e: LOG.error( - f"MQTT connection error: refused protocol version. Return code {rc}" + f"Connection to MQTT broker lost; Reconnecting in {reconnect_interval} seconds ...: {e}" ) - else: - LOG.error(f"MQTT connection error.Return code {rc}") - msg = f"Unable to connect to MQTT broker. Return code: {rc}" - raise SystemExit(msg) + await asyncio.sleep(reconnect_interval) + except asyncio.exceptions.CancelledError: + LOG.debug("MQTT publisher loop cancelled") + raise + finally: + self.__connected.clear() + self.client = None + LOG.info("MQTT client disconnected") + + @override + async def connect(self) -> None: + if self.__running and not self.__running.done(): + LOG.warning("MQTT client is already running") + return + self.__running = asyncio.create_task(self.__run_loop()) + await self.__connected.wait() + + async def __on_connect(self) -> None: + LOG.info("Connected to MQTT broker") + if not self.first_connection: + await self.__enable_commands() + self.first_connection = False @override def enable_commands(self) -> None: - LOG.info("Subscribing to MQTT command topics") - mqtt_account_prefix = self.get_mqtt_account_prefix() - self.client.subscribe( - f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/+/+/{mqtt_topics.SET_SUFFIX}" - ) - self.client.subscribe( - f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/+/+/+/{mqtt_topics.SET_SUFFIX}" - ) - self.client.subscribe( - f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/{mqtt_topics.REFRESH_MODE}/{mqtt_topics.SET_SUFFIX}" - ) - self.client.subscribe( - f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/{mqtt_topics.REFRESH_PERIOD}/+/{mqtt_topics.SET_SUFFIX}" - ) - for charging_station in self.configuration.charging_stations_by_vin.values(): - LOG.debug( - f"Subscribing to MQTT topic {charging_station.charge_state_topic}" + loop = asyncio.get_running_loop() + asyncio.run_coroutine_threadsafe(self.__enable_commands(), loop) + + async def __enable_commands(self) -> None: + if not self.__connected.is_set() or not self.client: + LOG.error("MQTT client is not connected") + return + try: + LOG.info("Subscribing to MQTT command topics") + mqtt_account_prefix = self.get_mqtt_account_prefix() + await self.client.subscribe( + f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/+/+/{mqtt_topics.SET_SUFFIX}" + ) + await self.client.subscribe( + f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/+/+/+/{mqtt_topics.SET_SUFFIX}" ) - self.vin_by_charge_state_topic[charging_station.charge_state_topic] = ( - charging_station.vin + await self.client.subscribe( + f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/{mqtt_topics.REFRESH_MODE}/{mqtt_topics.SET_SUFFIX}" ) - self.client.subscribe(charging_station.charge_state_topic) - if charging_station.connected_topic: + await self.client.subscribe( + f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/{mqtt_topics.REFRESH_PERIOD}/+/{mqtt_topics.SET_SUFFIX}" + ) + for ( + charging_station + ) in self.configuration.charging_stations_by_vin.values(): LOG.debug( - f"Subscribing to MQTT topic {charging_station.connected_topic}" + f"Subscribing to MQTT topic {charging_station.charge_state_topic}" ) - self.vin_by_charger_connected_topic[ - charging_station.connected_topic - ] = charging_station.vin - self.client.subscribe(charging_station.connected_topic) - if charging_station.imported_energy_topic: - LOG.debug( - f"Subscribing to MQTT topic {charging_station.imported_energy_topic}" + self.vin_by_charge_state_topic[charging_station.charge_state_topic] = ( + charging_station.vin ) - self.vin_by_imported_energy_topic[ - charging_station.imported_energy_topic - ] = charging_station.vin - self.client.subscribe(charging_station.imported_energy_topic) - if self.configuration.ha_discovery_enabled: - # enable dynamic discovery pushing in case ha reconnects - self.client.subscribe(self.configuration.ha_lwt_topic) + await self.client.subscribe(charging_station.charge_state_topic) + if charging_station.connected_topic: + LOG.debug( + f"Subscribing to MQTT topic {charging_station.connected_topic}" + ) + self.vin_by_charger_connected_topic[ + charging_station.connected_topic + ] = charging_station.vin + await self.client.subscribe(charging_station.connected_topic) + if self.configuration.ha_discovery_enabled: + # enable dynamic discovery pushing in case ha reconnects + await self.client.subscribe(self.configuration.ha_lwt_topic) + except aiomqtt.MqttError as e: + LOG.error("Failed to subscribe to MQTT command topics: {e}") + raise e - async def __on_message( + async def _on_message( self, _client: Any, topic: str, payload: Any, _qos: Any, _properties: Any ) -> None: try: @@ -230,11 +253,31 @@ async def __handle_imported_energy(self, topic: str, payload: str) -> None: def __publish( self, topic: str, payload: WirePayload | None, *, retain: bool = True ) -> None: - self.client.publish(topic, payload, retain=retain) + if not (self.client and self.is_connected()): + LOG.error("MQTT client is not connected") + return + loop = asyncio.get_running_loop() + asyncio.run_coroutine_threadsafe( + self.__async_publish(topic, payload, retain=retain), loop + ) + LOG.debug(f"Publishing to MQTT topic {topic} with payload {payload}") + + async def __async_publish( + self, topic: str, payload: Any, retain: bool + ) -> None: + if not (self.client and self.is_connected()): + LOG.error("MQTT client is not connected") + return + try: + await self.client.publish(topic, payload, retain) + except aiomqtt.MqttError as e: + LOG.error( + f"Failed to publish to MQTT topic {topic} with payload {payload}: {e}" + ) @override def is_connected(self) -> bool: - return cast("bool", self.client.is_connected) + return self.__connected.is_set() @override def publish_json( diff --git a/tests/test_mqtt_publisher.py b/tests/test_mqtt_publisher.py index 1fdeba34..429ca4e5 100644 --- a/tests/test_mqtt_publisher.py +++ b/tests/test_mqtt_publisher.py @@ -71,7 +71,7 @@ async def test_update_rear_window_heat_state(self) -> None: assert self.received_payload == REAR_WINDOW_HEAT_STATE async def send_message(self, topic: str, payload: Any) -> None: - await self.mqtt_client.client.on_message("client", topic, payload, 0, {}) + await self.mqtt_client._on_message("client", topic, payload, 0, {}) async def test_get_vin_from_sanitized_topic(self) -> None: """Topics arrive with the sanitized prefix, not the raw username.""" From 7ce9d6ccb5fd689a0e429faa461a56a8cb08c8dd Mon Sep 17 00:00:00 2001 From: bj00rn Date: Mon, 14 Jul 2025 22:41:54 +0200 Subject: [PATCH 02/29] Set default port for TLS to 8883, fallback to 1883 if no port and TCP transport --- src/configuration/parser.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/configuration/parser.py b/src/configuration/parser.py index 284d0225..304c295a 100644 --- a/src/configuration/parser.py +++ b/src/configuration/parser.py @@ -110,10 +110,13 @@ def __parse_mqtt_transport(args: Namespace, config: Configuration) -> None: if parse_result.port: config.mqtt_port = parse_result.port - elif config.mqtt_transport_protocol == TransportProtocol.TCP: - config.mqtt_port = 1883 - else: + elif config.mqtt_transport_protocol == TransportProtocol.TLS: + config.mqtt_port = 8883 + elif config.mqtt_transport_protocol == TransportProtocol.WS: config.mqtt_port = 9001 + else: + # fallback to default mqtt port + config.mqtt_port = 1883 config.mqtt_host = str(parse_result.hostname) From 131bb22c5a81720ed5c54cd353ad54db0a957a12 Mon Sep 17 00:00:00 2001 From: bj00rn Date: Tue, 7 Oct 2025 10:35:05 +0200 Subject: [PATCH 03/29] fix keepalive --- src/publisher/mqtt_publisher.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index 18e50cd8..f72b6092 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -117,6 +117,7 @@ async def __on_connect(self) -> None: if not self.first_connection: await self.__enable_commands() self.first_connection = False + self.keepalive() @override def enable_commands(self) -> None: From 619c4d684907e0f0a41804ef45935bdbef4d301d Mon Sep 17 00:00:00 2001 From: bj00rn Date: Tue, 18 Nov 2025 07:36:54 +0000 Subject: [PATCH 04/29] fix client not reconnecting --- src/publisher/mqtt_publisher.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index f72b6092..ffd8fa3e 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -76,24 +76,33 @@ async def __run_loop(self) -> None: ), ) client.pending_calls_threshold = 150 - self.client = client reconnect_interval = 5 while True: try: - async with client: + LOG.debug( + "Connecting to %s:%s as %s", + self.host, + self.port, + self.publisher_id, + ) + async with client as client_context: + self.client = client_context self.__connected.set() await self.__on_connect() - async for message in client.messages: + async for message in client_context.messages: await self._on_message( - client, + client_context, str(message.topic), message.payload, message.qos, message.properties, ) - except aiomqtt.MqttError as e: - LOG.error( - f"Connection to MQTT broker lost; Reconnecting in {reconnect_interval} seconds ...: {e}" + except aiomqtt.MqttError: + LOG.warning( + "Connection to %s:%s lost; Reconnecting in %d seconds ...", + self.host, + self.port, + reconnect_interval, ) await asyncio.sleep(reconnect_interval) except asyncio.exceptions.CancelledError: @@ -101,7 +110,6 @@ async def __run_loop(self) -> None: raise finally: self.__connected.clear() - self.client = None LOG.info("MQTT client disconnected") @override @@ -126,7 +134,7 @@ def enable_commands(self) -> None: async def __enable_commands(self) -> None: if not self.__connected.is_set() or not self.client: - LOG.error("MQTT client is not connected") + LOG.error("Failed to enable commands: MQTT client is not connected") return try: LOG.info("Subscribing to MQTT command topics") @@ -254,20 +262,17 @@ async def __handle_imported_energy(self, topic: str, payload: str) -> None: def __publish( self, topic: str, payload: WirePayload | None, *, retain: bool = True ) -> None: - if not (self.client and self.is_connected()): - LOG.error("MQTT client is not connected") - return + LOG.debug("Publishing to MQTT topic %s with payload %s", topic, payload) loop = asyncio.get_running_loop() asyncio.run_coroutine_threadsafe( self.__async_publish(topic, payload, retain=retain), loop ) - LOG.debug(f"Publishing to MQTT topic {topic} with payload {payload}") async def __async_publish( self, topic: str, payload: Any, retain: bool ) -> None: if not (self.client and self.is_connected()): - LOG.error("MQTT client is not connected") + LOG.error("Failed to publish: MQTT client is not connected") return try: await self.client.publish(topic, payload, retain) From b033b55df00e3f2f42b9af97ff8b7aa09e4c56c6 Mon Sep 17 00:00:00 2001 From: bj00rn Date: Tue, 25 Nov 2025 07:28:33 +0000 Subject: [PATCH 05/29] retain birth/keepalive message --- src/publisher/core.py | 38 ++++++++++++++++----- src/publisher/log_publisher.py | 42 +++++++++++++++++------- src/publisher/mqtt_publisher.py | 58 +++++++++++++++++++++++---------- 3 files changed, 102 insertions(+), 36 deletions(-) diff --git a/src/publisher/core.py b/src/publisher/core.py index 6e7e9265..4813db21 100644 --- a/src/publisher/core.py +++ b/src/publisher/core.py @@ -100,32 +100,52 @@ def publish_json( key: str, data: dict[str, Any], no_prefix: bool = False, - *, - retain: bool = True, + retain: bool = False, + qos: int = 0, ) -> None: raise NotImplementedError @abstractmethod def publish_str( - self, key: str, value: str, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: str, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: raise NotImplementedError @abstractmethod def publish_int( - self, key: str, value: int, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: int, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: raise NotImplementedError @abstractmethod def publish_bool( - self, key: str, value: bool, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: bool, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: raise NotImplementedError @abstractmethod def publish_float( - self, key: str, value: float, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: float, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: raise NotImplementedError @@ -173,7 +193,7 @@ def publish( raise TypeError(msg) @abstractmethod - def clear_topic(self, key: str, no_prefix: bool = False) -> None: + def clear_topic(self, key: str, no_prefix: bool = False, qos: int = 0) -> None: raise NotImplementedError def get_mqtt_account_prefix(self) -> str: @@ -249,7 +269,9 @@ def __anonymize(self, data: T) -> T: return data def keepalive(self) -> None: - self.publish_str(mqtt_topics.INTERNAL_LWT, "online", False) + self.publish_str( + mqtt_topics.INTERNAL_LWT, "online", no_prefix=False, retain=True, qos=1 + ) @staticmethod def anonymize_str(value: str) -> str: diff --git a/src/publisher/log_publisher.py b/src/publisher/log_publisher.py index 7969c61e..fc1160aa 100644 --- a/src/publisher/log_publisher.py +++ b/src/publisher/log_publisher.py @@ -28,38 +28,58 @@ def publish_json( key: str, data: dict[str, Any], no_prefix: bool = False, - *, - retain: bool = True, + retain: bool = False, + qos: int = 0, ) -> None: anonymized_json = self.dict_to_anonymized_json(data) self.internal_publish(key, anonymized_json, retain=retain) @override def publish_str( - self, key: str, value: str, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: str, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: - self.internal_publish(key, value, retain=retain) + self.internal_publish(key, value) @override def publish_int( - self, key: str, value: int, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: int, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: - self.internal_publish(key, value, retain=retain) + self.internal_publish(key, value) @override def publish_bool( - self, key: str, value: bool, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: bool, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: - self.internal_publish(key, value, retain=retain) + self.internal_publish(key, value) @override def publish_float( - self, key: str, value: float, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: float, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: - self.internal_publish(key, value, retain=retain) + self.internal_publish(key, value) @override - def clear_topic(self, key: str, no_prefix: bool = False) -> None: + def clear_topic(self, key: str, no_prefix: bool = False, qos: int = 0) -> None: self.internal_publish(key, None) def internal_publish( diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index ffd8fa3e..f38fa247 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -73,6 +73,7 @@ async def __run_loop(self) -> None: topic=self.get_topic(mqtt_topics.INTERNAL_LWT, False), payload="offline", retain=True, + qos=1, ), ) client.pending_calls_threshold = 150 @@ -260,22 +261,22 @@ async def __handle_imported_energy(self, topic: str, payload: str) -> None: ) def __publish( - self, topic: str, payload: WirePayload | None, *, retain: bool = True + self, topic: str, payload: WirePayload | None, retain: bool = False, qos: int = 0 ) -> None: LOG.debug("Publishing to MQTT topic %s with payload %s", topic, payload) loop = asyncio.get_running_loop() asyncio.run_coroutine_threadsafe( - self.__async_publish(topic, payload, retain=retain), loop + self.__async_publish(topic, payload, retain=retain, qos=qos), loop ) async def __async_publish( - self, topic: str, payload: Any, retain: bool + self, topic: str, payload: Any, retain: bool, qos: int ) -> None: if not (self.client and self.is_connected()): LOG.error("Failed to publish: MQTT client is not connected") return try: - await self.client.publish(topic, payload, retain) + await self.client.publish(topic, payload, retain=retain, qos=qos) except aiomqtt.MqttError as e: LOG.error( f"Failed to publish to MQTT topic {topic} with payload {payload}: {e}" @@ -291,49 +292,72 @@ def publish_json( key: str, data: dict[str, Any], no_prefix: bool = False, - *, - retain: bool = True, + retain: bool = False, + qos: int = 0, ) -> None: payload = self.dict_to_anonymized_json(data) self.__publish( - topic=self.get_topic(key, no_prefix), payload=payload, retain=retain + topic=self.get_topic(key, no_prefix), + payload=payload, + retain=retain, + qos=qos, ) @override def publish_str( - self, key: str, value: str, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: str, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: self.__publish( - topic=self.get_topic(key, no_prefix), payload=value, retain=retain + topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos ) @override def publish_int( - self, key: str, value: int, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: int, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: self.__publish( - topic=self.get_topic(key, no_prefix), payload=value, retain=retain + topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos ) @override def publish_bool( - self, key: str, value: bool, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: bool, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: self.__publish( - topic=self.get_topic(key, no_prefix), payload=value, retain=retain + topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos ) @override def publish_float( - self, key: str, value: float, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: float, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: self.__publish( - topic=self.get_topic(key, no_prefix), payload=value, retain=retain + topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos ) @override - def clear_topic(self, key: str, no_prefix: bool = False) -> None: - self.__publish(topic=self.get_topic(key, no_prefix), payload=None) + def clear_topic(self, key: str, no_prefix: bool = False, qos: int = 0) -> None: + self.__publish(topic=self.get_topic(key, no_prefix), payload=None, qos=qos) def get_vin_from_topic(self, topic: str) -> str: global_topic_removed = topic[len(self.configuration.mqtt_topic) + 1 :] From 5243c6c5770b214cbc11147e376448e3d96dd5fc Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 21:10:36 +0200 Subject: [PATCH 06/29] fix(tests): update publish mock signatures for aiomqtt migration Update test stubs and publish-behavior tests to match the new abstract method signatures (positional retain=False, qos=0) and rewrite the publisher tests to use AsyncMock against __async_publish instead of patching the synchronous gmqtt client.publish. --- tests/publisher/test_publish_dispatch.py | 34 +++-- tests/test_mqtt_publisher.py | 153 ++++++++++++++--------- 2 files changed, 119 insertions(+), 68 deletions(-) diff --git a/tests/publisher/test_publish_dispatch.py b/tests/publisher/test_publish_dispatch.py index 7902771a..5b6bfd8b 100644 --- a/tests/publisher/test_publish_dispatch.py +++ b/tests/publisher/test_publish_dispatch.py @@ -297,37 +297,57 @@ def publish_json( key: str, data: dict[str, Any], no_prefix: bool = False, - *, - retain: bool = True, + retain: bool = False, + qos: int = 0, ) -> None: pass @override def publish_str( - self, key: str, value: str, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: str, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: pass @override def publish_int( - self, key: str, value: int, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: int, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: pass @override def publish_bool( - self, key: str, value: bool, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: bool, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: pass @override def publish_float( - self, key: str, value: float, no_prefix: bool = False, *, retain: bool = True + self, + key: str, + value: float, + no_prefix: bool = False, + retain: bool = False, + qos: int = 0, ) -> None: pass @override - def clear_topic(self, key: str, no_prefix: bool = False) -> None: + def clear_topic(self, key: str, no_prefix: bool = False, qos: int = 0) -> None: pass diff --git a/tests/test_mqtt_publisher.py b/tests/test_mqtt_publisher.py index 429ca4e5..a066dd6c 100644 --- a/tests/test_mqtt_publisher.py +++ b/tests/test_mqtt_publisher.py @@ -1,8 +1,10 @@ from __future__ import annotations +import asyncio +import json from typing import Any, override import unittest -from unittest.mock import patch +from unittest.mock import AsyncMock, patch from configuration import Configuration, TransportProtocol from publisher.core import MqttCommandListener @@ -100,63 +102,92 @@ async def on_charger_connection_state_changed( ) -> None: pass - def test_publish_str_default_is_retained(self) -> None: - with patch.object(self.mqtt_client.client, "publish") as m_pub: - self.mqtt_client.publish_str("foo", "bar") - m_pub.assert_called_once_with("saic/foo", "bar", retain=True) - - def test_publish_str_forwards_retain_false(self) -> None: - with patch.object(self.mqtt_client.client, "publish") as m_pub: - self.mqtt_client.publish_str("foo", "bar", retain=False) - m_pub.assert_called_once_with("saic/foo", "bar", retain=False) - - def test_publish_int_default_is_retained(self) -> None: - with patch.object(self.mqtt_client.client, "publish") as m_pub: - self.mqtt_client.publish_int("foo", 42) - m_pub.assert_called_once_with("saic/foo", 42, retain=True) - - def test_publish_int_forwards_retain_false(self) -> None: - with patch.object(self.mqtt_client.client, "publish") as m_pub: - self.mqtt_client.publish_int("foo", 42, retain=False) - m_pub.assert_called_once_with("saic/foo", 42, retain=False) - - def test_publish_bool_default_is_retained(self) -> None: - with patch.object(self.mqtt_client.client, "publish") as m_pub: - self.mqtt_client.publish_bool("foo", True) - m_pub.assert_called_once_with("saic/foo", True, retain=True) - - def test_publish_bool_forwards_retain_false(self) -> None: - with patch.object(self.mqtt_client.client, "publish") as m_pub: - self.mqtt_client.publish_bool("foo", True, retain=False) - m_pub.assert_called_once_with("saic/foo", True, retain=False) - - def test_publish_float_default_is_retained(self) -> None: - with patch.object(self.mqtt_client.client, "publish") as m_pub: - self.mqtt_client.publish_float("foo", 1.5) - m_pub.assert_called_once_with("saic/foo", 1.5, retain=True) - - def test_publish_float_forwards_retain_false(self) -> None: - with patch.object(self.mqtt_client.client, "publish") as m_pub: - self.mqtt_client.publish_float("foo", 1.5, retain=False) - m_pub.assert_called_once_with("saic/foo", 1.5, retain=False) - - def test_publish_json_default_is_retained(self) -> None: - with patch.object(self.mqtt_client.client, "publish") as m_pub: - self.mqtt_client.publish_json("foo", {"a": 1}) - m_pub.assert_called_once() - args, kwargs = m_pub.call_args - assert args[0] == "saic/foo" - assert kwargs == {"retain": True} - - def test_publish_json_forwards_retain_false(self) -> None: - with patch.object(self.mqtt_client.client, "publish") as m_pub: - self.mqtt_client.publish_json("foo", {"a": 1}, retain=False) - m_pub.assert_called_once() - args, kwargs = m_pub.call_args - assert args[0] == "saic/foo" - assert kwargs == {"retain": False} - - def test_clear_topic_publishes_none_retained(self) -> None: - with patch.object(self.mqtt_client.client, "publish") as m_pub: - self.mqtt_client.clear_topic("foo") - m_pub.assert_called_once_with("saic/foo", None, retain=True) + async def _call_and_flush(self, fn: Any, *args: Any, **kwargs: Any) -> AsyncMock: + mock = AsyncMock() + with patch.object( + self.mqtt_client, "_MqttPublisher__async_publish", mock + ): + self.mqtt_client._MqttPublisher__connected.set() + fn(*args, **kwargs) + await asyncio.sleep(0) + return mock + + async def test_publish_str_default_is_not_retained(self) -> None: + m = await self._call_and_flush(self.mqtt_client.publish_str, "foo", "bar") + m.assert_called_once_with("saic/foo", "bar", retain=False, qos=0) + + async def test_publish_str_forwards_retain_true(self) -> None: + m = await self._call_and_flush( + self.mqtt_client.publish_str, "foo", "bar", retain=True + ) + m.assert_called_once_with("saic/foo", "bar", retain=True, qos=0) + + async def test_publish_str_forwards_retain_false(self) -> None: + m = await self._call_and_flush( + self.mqtt_client.publish_str, "foo", "bar", retain=False + ) + m.assert_called_once_with("saic/foo", "bar", retain=False, qos=0) + + async def test_publish_int_default_is_not_retained(self) -> None: + m = await self._call_and_flush(self.mqtt_client.publish_int, "foo", 42) + m.assert_called_once_with("saic/foo", 42, retain=False, qos=0) + + async def test_publish_int_forwards_retain_false(self) -> None: + m = await self._call_and_flush( + self.mqtt_client.publish_int, "foo", 42, retain=False + ) + m.assert_called_once_with("saic/foo", 42, retain=False, qos=0) + + async def test_publish_bool_default_is_not_retained(self) -> None: + m = await self._call_and_flush(self.mqtt_client.publish_bool, "foo", True) + m.assert_called_once_with("saic/foo", True, retain=False, qos=0) + + async def test_publish_bool_forwards_retain_false(self) -> None: + m = await self._call_and_flush( + self.mqtt_client.publish_bool, "foo", True, retain=False + ) + m.assert_called_once_with("saic/foo", True, retain=False, qos=0) + + async def test_publish_float_default_is_not_retained(self) -> None: + m = await self._call_and_flush(self.mqtt_client.publish_float, "foo", 1.5) + m.assert_called_once_with("saic/foo", 1.5, retain=False, qos=0) + + async def test_publish_float_forwards_retain_false(self) -> None: + m = await self._call_and_flush( + self.mqtt_client.publish_float, "foo", 1.5, retain=False + ) + m.assert_called_once_with("saic/foo", 1.5, retain=False, qos=0) + + async def test_publish_json_default_is_not_retained(self) -> None: + m = await self._call_and_flush( + self.mqtt_client.publish_json, "foo", {"a": 1} + ) + m.assert_called_once() + args, kwargs = m.call_args + assert args[0] == "saic/foo" + assert json.loads(args[1]) == {"a": 1} + assert kwargs == {"retain": False, "qos": 0} + + async def test_publish_json_forwards_retain_true(self) -> None: + m = await self._call_and_flush( + self.mqtt_client.publish_json, "foo", {"a": 1}, retain=True + ) + m.assert_called_once() + args, kwargs = m.call_args + assert args[0] == "saic/foo" + assert json.loads(args[1]) == {"a": 1} + assert kwargs == {"retain": True, "qos": 0} + + async def test_publish_json_forwards_retain_false(self) -> None: + m = await self._call_and_flush( + self.mqtt_client.publish_json, "foo", {"a": 1}, retain=False + ) + m.assert_called_once() + args, kwargs = m.call_args + assert args[0] == "saic/foo" + assert json.loads(args[1]) == {"a": 1} + assert kwargs == {"retain": False, "qos": 0} + + async def test_clear_topic_publishes_none_not_retained(self) -> None: + m = await self._call_and_flush(self.mqtt_client.clear_topic, "foo") + m.assert_called_once_with("saic/foo", None, retain=False, qos=0) From 85e609147a13884a50f4304c56e87b85f84ebdce Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 21:21:04 +0200 Subject: [PATCH 07/29] fix: restore *, retain=True defaults on all publish_* methods - Re-add keyword-only * separator before retain and qos params so callers cannot accidentally pass them positionally - Restore retain=True as the default; flipping to False would silently stop retaining messages for all existing callers - Fix __publish private helper which was also missing the True default - Update tests to match corrected defaults and use keyword args --- src/publisher/core.py | 15 ++++++---- src/publisher/log_publisher.py | 15 ++++++---- src/publisher/mqtt_publisher.py | 17 +++++++---- tests/publisher/test_publish_dispatch.py | 15 ++++++---- tests/test_mqtt_publisher.py | 36 +++++++----------------- 5 files changed, 51 insertions(+), 47 deletions(-) diff --git a/src/publisher/core.py b/src/publisher/core.py index 4813db21..2ee23650 100644 --- a/src/publisher/core.py +++ b/src/publisher/core.py @@ -100,7 +100,8 @@ def publish_json( key: str, data: dict[str, Any], no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: raise NotImplementedError @@ -111,7 +112,8 @@ def publish_str( key: str, value: str, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: raise NotImplementedError @@ -122,7 +124,8 @@ def publish_int( key: str, value: int, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: raise NotImplementedError @@ -133,7 +136,8 @@ def publish_bool( key: str, value: bool, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: raise NotImplementedError @@ -144,7 +148,8 @@ def publish_float( key: str, value: float, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: raise NotImplementedError diff --git a/src/publisher/log_publisher.py b/src/publisher/log_publisher.py index fc1160aa..ae315704 100644 --- a/src/publisher/log_publisher.py +++ b/src/publisher/log_publisher.py @@ -28,7 +28,8 @@ def publish_json( key: str, data: dict[str, Any], no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: anonymized_json = self.dict_to_anonymized_json(data) @@ -40,7 +41,8 @@ def publish_str( key: str, value: str, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: self.internal_publish(key, value) @@ -51,7 +53,8 @@ def publish_int( key: str, value: int, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: self.internal_publish(key, value) @@ -62,7 +65,8 @@ def publish_bool( key: str, value: bool, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: self.internal_publish(key, value) @@ -73,7 +77,8 @@ def publish_float( key: str, value: float, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: self.internal_publish(key, value) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index f38fa247..979cca98 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -261,7 +261,7 @@ async def __handle_imported_energy(self, topic: str, payload: str) -> None: ) def __publish( - self, topic: str, payload: WirePayload | None, retain: bool = False, qos: int = 0 + self, topic: str, payload: WirePayload | None, *, retain: bool = True, qos: int = 0 ) -> None: LOG.debug("Publishing to MQTT topic %s with payload %s", topic, payload) loop = asyncio.get_running_loop() @@ -292,7 +292,8 @@ def publish_json( key: str, data: dict[str, Any], no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: payload = self.dict_to_anonymized_json(data) @@ -309,7 +310,8 @@ def publish_str( key: str, value: str, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: self.__publish( @@ -322,7 +324,8 @@ def publish_int( key: str, value: int, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: self.__publish( @@ -335,7 +338,8 @@ def publish_bool( key: str, value: bool, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: self.__publish( @@ -348,7 +352,8 @@ def publish_float( key: str, value: float, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: self.__publish( diff --git a/tests/publisher/test_publish_dispatch.py b/tests/publisher/test_publish_dispatch.py index 5b6bfd8b..192a574d 100644 --- a/tests/publisher/test_publish_dispatch.py +++ b/tests/publisher/test_publish_dispatch.py @@ -297,7 +297,8 @@ def publish_json( key: str, data: dict[str, Any], no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: pass @@ -308,7 +309,8 @@ def publish_str( key: str, value: str, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: pass @@ -319,7 +321,8 @@ def publish_int( key: str, value: int, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: pass @@ -330,7 +333,8 @@ def publish_bool( key: str, value: bool, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: pass @@ -341,7 +345,8 @@ def publish_float( key: str, value: float, no_prefix: bool = False, - retain: bool = False, + *, + retain: bool = True, qos: int = 0, ) -> None: pass diff --git a/tests/test_mqtt_publisher.py b/tests/test_mqtt_publisher.py index a066dd6c..3d6ad7a7 100644 --- a/tests/test_mqtt_publisher.py +++ b/tests/test_mqtt_publisher.py @@ -107,19 +107,13 @@ async def _call_and_flush(self, fn: Any, *args: Any, **kwargs: Any) -> AsyncMock with patch.object( self.mqtt_client, "_MqttPublisher__async_publish", mock ): - self.mqtt_client._MqttPublisher__connected.set() + self.mqtt_client._MqttPublisher__connected.set() # type: ignore[attr-defined] fn(*args, **kwargs) await asyncio.sleep(0) return mock - async def test_publish_str_default_is_not_retained(self) -> None: + async def test_publish_str_default_is_retained(self) -> None: m = await self._call_and_flush(self.mqtt_client.publish_str, "foo", "bar") - m.assert_called_once_with("saic/foo", "bar", retain=False, qos=0) - - async def test_publish_str_forwards_retain_true(self) -> None: - m = await self._call_and_flush( - self.mqtt_client.publish_str, "foo", "bar", retain=True - ) m.assert_called_once_with("saic/foo", "bar", retain=True, qos=0) async def test_publish_str_forwards_retain_false(self) -> None: @@ -128,9 +122,9 @@ async def test_publish_str_forwards_retain_false(self) -> None: ) m.assert_called_once_with("saic/foo", "bar", retain=False, qos=0) - async def test_publish_int_default_is_not_retained(self) -> None: + async def test_publish_int_default_is_retained(self) -> None: m = await self._call_and_flush(self.mqtt_client.publish_int, "foo", 42) - m.assert_called_once_with("saic/foo", 42, retain=False, qos=0) + m.assert_called_once_with("saic/foo", 42, retain=True, qos=0) async def test_publish_int_forwards_retain_false(self) -> None: m = await self._call_and_flush( @@ -138,9 +132,9 @@ async def test_publish_int_forwards_retain_false(self) -> None: ) m.assert_called_once_with("saic/foo", 42, retain=False, qos=0) - async def test_publish_bool_default_is_not_retained(self) -> None: + async def test_publish_bool_default_is_retained(self) -> None: m = await self._call_and_flush(self.mqtt_client.publish_bool, "foo", True) - m.assert_called_once_with("saic/foo", True, retain=False, qos=0) + m.assert_called_once_with("saic/foo", True, retain=True, qos=0) async def test_publish_bool_forwards_retain_false(self) -> None: m = await self._call_and_flush( @@ -148,9 +142,9 @@ async def test_publish_bool_forwards_retain_false(self) -> None: ) m.assert_called_once_with("saic/foo", True, retain=False, qos=0) - async def test_publish_float_default_is_not_retained(self) -> None: + async def test_publish_float_default_is_retained(self) -> None: m = await self._call_and_flush(self.mqtt_client.publish_float, "foo", 1.5) - m.assert_called_once_with("saic/foo", 1.5, retain=False, qos=0) + m.assert_called_once_with("saic/foo", 1.5, retain=True, qos=0) async def test_publish_float_forwards_retain_false(self) -> None: m = await self._call_and_flush( @@ -158,7 +152,7 @@ async def test_publish_float_forwards_retain_false(self) -> None: ) m.assert_called_once_with("saic/foo", 1.5, retain=False, qos=0) - async def test_publish_json_default_is_not_retained(self) -> None: + async def test_publish_json_default_is_retained(self) -> None: m = await self._call_and_flush( self.mqtt_client.publish_json, "foo", {"a": 1} ) @@ -166,16 +160,6 @@ async def test_publish_json_default_is_not_retained(self) -> None: args, kwargs = m.call_args assert args[0] == "saic/foo" assert json.loads(args[1]) == {"a": 1} - assert kwargs == {"retain": False, "qos": 0} - - async def test_publish_json_forwards_retain_true(self) -> None: - m = await self._call_and_flush( - self.mqtt_client.publish_json, "foo", {"a": 1}, retain=True - ) - m.assert_called_once() - args, kwargs = m.call_args - assert args[0] == "saic/foo" - assert json.loads(args[1]) == {"a": 1} assert kwargs == {"retain": True, "qos": 0} async def test_publish_json_forwards_retain_false(self) -> None: @@ -190,4 +174,4 @@ async def test_publish_json_forwards_retain_false(self) -> None: async def test_clear_topic_publishes_none_not_retained(self) -> None: m = await self._call_and_flush(self.mqtt_client.clear_topic, "foo") - m.assert_called_once_with("saic/foo", None, retain=False, qos=0) + m.assert_called_once_with("saic/foo", None, retain=True, qos=0) From 2a87077e51f08ba53044eb4b0612c408e11b0054 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 21:22:01 +0200 Subject: [PATCH 08/29] style: apply ruff formatting --- src/publisher/mqtt_publisher.py | 7 ++++++- src/status_publisher/charge/chrg_mgmt_data.py | 18 +++++++++--------- tests/test_mqtt_publisher.py | 8 ++------ 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index 979cca98..6173860b 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -261,7 +261,12 @@ async def __handle_imported_energy(self, topic: str, payload: str) -> None: ) def __publish( - self, topic: str, payload: WirePayload | None, *, retain: bool = True, qos: int = 0 + self, + topic: str, + payload: WirePayload | None, + *, + retain: bool = True, + qos: int = 0, ) -> None: LOG.debug("Publishing to MQTT topic %s with payload %s", topic, payload) loop = asyncio.get_running_loop() diff --git a/src/status_publisher/charge/chrg_mgmt_data.py b/src/status_publisher/charge/chrg_mgmt_data.py index b0444fc9..5825590e 100644 --- a/src/status_publisher/charge/chrg_mgmt_data.py +++ b/src/status_publisher/charge/chrg_mgmt_data.py @@ -104,18 +104,18 @@ def publish(self, charge_mgmt_data: ChrgMgmtData) -> ChrgMgmtDataProcessingResul self._transform_and_publish( topic=mqtt_topics.BMS_CHARGE_STATUS, value=charge_mgmt_data.bms_charging_status, - transform=lambda x: f"UNKNOWN {charge_mgmt_data.bmsChrgSts}" - if x is None - else x.name, + transform=lambda x: ( + f"UNKNOWN {charge_mgmt_data.bmsChrgSts}" if x is None else x.name + ), ) self._transform_and_publish( topic=mqtt_topics.DRIVETRAIN_CHARGING_STOP_REASON, value=charge_mgmt_data.charging_stop_reason, validator=lambda x: x != ChargingStopReason.NO_REASON, - transform=lambda x: f"UNKNOWN {charge_mgmt_data.bmsChrgSpRsn}" - if x is None - else x.name, + transform=lambda x: ( + f"UNKNOWN {charge_mgmt_data.bmsChrgSpRsn}" if x is None else x.name + ), ) self._publish( @@ -153,9 +153,9 @@ def publish(self, charge_mgmt_data: ChrgMgmtData) -> ChrgMgmtDataProcessingResul topic=mqtt_topics.DRIVETRAIN_BATTERY_HEATING_STOP_REASON, value=charge_mgmt_data.heating_stop_reason, validator=lambda x: x != HeatingStopReason.NO_REASON, - transform=lambda x: f"UNKNOWN ({charge_mgmt_data.bmsPTCHeatResp})" - if x is None - else x.name, + transform=lambda x: ( + f"UNKNOWN ({charge_mgmt_data.bmsPTCHeatResp})" if x is None else x.name + ), ) self._transform_and_publish( diff --git a/tests/test_mqtt_publisher.py b/tests/test_mqtt_publisher.py index 3d6ad7a7..909f31e0 100644 --- a/tests/test_mqtt_publisher.py +++ b/tests/test_mqtt_publisher.py @@ -104,9 +104,7 @@ async def on_charger_connection_state_changed( async def _call_and_flush(self, fn: Any, *args: Any, **kwargs: Any) -> AsyncMock: mock = AsyncMock() - with patch.object( - self.mqtt_client, "_MqttPublisher__async_publish", mock - ): + with patch.object(self.mqtt_client, "_MqttPublisher__async_publish", mock): self.mqtt_client._MqttPublisher__connected.set() # type: ignore[attr-defined] fn(*args, **kwargs) await asyncio.sleep(0) @@ -153,9 +151,7 @@ async def test_publish_float_forwards_retain_false(self) -> None: m.assert_called_once_with("saic/foo", 1.5, retain=False, qos=0) async def test_publish_json_default_is_retained(self) -> None: - m = await self._call_and_flush( - self.mqtt_client.publish_json, "foo", {"a": 1} - ) + m = await self._call_and_flush(self.mqtt_client.publish_json, "foo", {"a": 1}) m.assert_called_once() args, kwargs = m.call_args assert args[0] == "saic/foo" From 9ee28e6fca63a4ac5f1ce2d208c577cf4578ff87 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 21:34:38 +0200 Subject: [PATCH 09/29] fix(log_publisher): forward retain flag to internal_publish publish_str/int/bool/float were silently dropping the retain parameter, causing the debug log to always show retain=True regardless of what the caller passed. --- src/publisher/log_publisher.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/publisher/log_publisher.py b/src/publisher/log_publisher.py index ae315704..f05866a0 100644 --- a/src/publisher/log_publisher.py +++ b/src/publisher/log_publisher.py @@ -45,7 +45,7 @@ def publish_str( retain: bool = True, qos: int = 0, ) -> None: - self.internal_publish(key, value) + self.internal_publish(key, value, retain=retain) @override def publish_int( @@ -57,7 +57,7 @@ def publish_int( retain: bool = True, qos: int = 0, ) -> None: - self.internal_publish(key, value) + self.internal_publish(key, value, retain=retain) @override def publish_bool( @@ -69,7 +69,7 @@ def publish_bool( retain: bool = True, qos: int = 0, ) -> None: - self.internal_publish(key, value) + self.internal_publish(key, value, retain=retain) @override def publish_float( @@ -81,7 +81,7 @@ def publish_float( retain: bool = True, qos: int = 0, ) -> None: - self.internal_publish(key, value) + self.internal_publish(key, value, retain=retain) @override def clear_topic(self, key: str, no_prefix: bool = False, qos: int = 0) -> None: From 9113cfd9e09f2f1dfd46c398feca1415bc42c3f9 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:11:48 +0200 Subject: [PATCH 10/29] fix(mqtt_publisher): read message.retain directly in _on_message _properties.get("retain", 0) called message.properties which is paho.mqtt.properties.Properties|None, not a dict. On MQTT v3.1.1 (common default) this is None, raising AttributeError on every incoming message and silently dropping all /set commands. Change _on_message to accept retained: bool directly, and pass message.retain (a bool field on aiomqtt.Message) at the call site. --- src/publisher/mqtt_publisher.py | 5 ++--- tests/test_mqtt_publisher.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index 6173860b..c729b706 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -96,7 +96,7 @@ async def __run_loop(self) -> None: str(message.topic), message.payload, message.qos, - message.properties, + message.retain, ) except aiomqtt.MqttError: LOG.warning( @@ -178,14 +178,13 @@ async def __enable_commands(self) -> None: raise e async def _on_message( - self, _client: Any, topic: str, payload: Any, _qos: Any, _properties: Any + self, _client: Any, topic: str, payload: Any, _qos: Any, retained: bool ) -> None: try: if isinstance(payload, bytes): payload = payload.decode("utf-8") else: payload = str(payload) - retained = bool(_properties.get("retain", 0)) await self.__on_message_real( topic=topic, payload=payload, retained=retained ) diff --git a/tests/test_mqtt_publisher.py b/tests/test_mqtt_publisher.py index 909f31e0..f610d4a2 100644 --- a/tests/test_mqtt_publisher.py +++ b/tests/test_mqtt_publisher.py @@ -73,7 +73,7 @@ async def test_update_rear_window_heat_state(self) -> None: assert self.received_payload == REAR_WINDOW_HEAT_STATE async def send_message(self, topic: str, payload: Any) -> None: - await self.mqtt_client._on_message("client", topic, payload, 0, {}) + await self.mqtt_client._on_message("client", topic, payload, 0, False) async def test_get_vin_from_sanitized_topic(self) -> None: """Topics arrive with the sanitized prefix, not the raw username.""" From 30f7bc227892d163713d820576f4fb6e8fd722a4 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:12:00 +0200 Subject: [PATCH 11/29] fix(mqtt_publisher): call on_mqtt_reconnected after broker reconnect The aiomqtt migration dropped the on_mqtt_reconnected() call that the old gmqtt __on_connect callback made. MqttGateway.on_mqtt_reconnected() resets HomeAssistantGatewayDiscovery and calls reset_ha_discovery() on every vehicle handler, so without this call HA entities would not be re-announced after a broker disconnect/reconnect cycle. --- src/publisher/mqtt_publisher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index c729b706..21cc3cde 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -125,6 +125,8 @@ async def __on_connect(self) -> None: LOG.info("Connected to MQTT broker") if not self.first_connection: await self.__enable_commands() + if self.command_listener is not None: + self.command_listener.on_mqtt_reconnected() self.first_connection = False self.keepalive() From 6310e7f8a232cf80cd954f20ba7db2aaea548042 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:12:11 +0200 Subject: [PATCH 12/29] fix(mqtt_publisher): restore imported_energy_topic subscription The aiomqtt migration dropped the imported_energy_topic subscription block from __enable_commands. vin_by_imported_energy_topic was initialised in __init__ but never populated, making __handle_imported_energy dead code and silently breaking OpenWB energy-import tracking. --- src/publisher/mqtt_publisher.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index 21cc3cde..597fb8e9 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -172,6 +172,14 @@ async def __enable_commands(self) -> None: charging_station.connected_topic ] = charging_station.vin await self.client.subscribe(charging_station.connected_topic) + if charging_station.imported_energy_topic: + LOG.debug( + f"Subscribing to MQTT topic {charging_station.imported_energy_topic}" + ) + self.vin_by_imported_energy_topic[ + charging_station.imported_energy_topic + ] = charging_station.vin + await self.client.subscribe(charging_station.imported_energy_topic) if self.configuration.ha_discovery_enabled: # enable dynamic discovery pushing in case ha reconnects await self.client.subscribe(self.configuration.ha_lwt_topic) From ec833b35ad335c1e203cf735db1d420851d58345 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:12:20 +0200 Subject: [PATCH 13/29] fix(mqtt_publisher): add missing f-prefix to subscribe error log LOG.error("...{e}") was a plain string, always logging the literal "{e}" instead of the exception value, making subscribe failures impossible to diagnose from logs. --- src/publisher/mqtt_publisher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index 597fb8e9..13d25eab 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -184,7 +184,7 @@ async def __enable_commands(self) -> None: # enable dynamic discovery pushing in case ha reconnects await self.client.subscribe(self.configuration.ha_lwt_topic) except aiomqtt.MqttError as e: - LOG.error("Failed to subscribe to MQTT command topics: {e}") + LOG.error(f"Failed to subscribe to MQTT command topics: {e}") raise e async def _on_message( From 3fd5a96a4e5c0a5708b1f86988776fd93c4efe0a Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:12:31 +0200 Subject: [PATCH 14/29] fix(mqtt_publisher): guard connect() against unconfigured host When self.host is empty __run_loop returned immediately without setting __connected, leaving connect() blocked forever on await self.__connected.wait(). Move the empty-host check into connect() so it returns before creating the task. --- src/publisher/mqtt_publisher.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index 13d25eab..79d34232 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -118,6 +118,9 @@ async def connect(self) -> None: if self.__running and not self.__running.done(): LOG.warning("MQTT client is already running") return + if not self.host: + LOG.info("MQTT host is not configured") + return self.__running = asyncio.create_task(self.__run_loop()) await self.__connected.wait() From 3f6266635482fc794b0095669619adc954370f16 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:12:40 +0200 Subject: [PATCH 15/29] fix(tests): correct misleading test name for clear_topic test_clear_topic_publishes_none_not_retained asserted retain=True, contradicting its own name. Clearing a retained MQTT topic requires publishing a null payload with retain=True; rename to match. --- tests/test_mqtt_publisher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_mqtt_publisher.py b/tests/test_mqtt_publisher.py index f610d4a2..d4bf19e9 100644 --- a/tests/test_mqtt_publisher.py +++ b/tests/test_mqtt_publisher.py @@ -168,6 +168,6 @@ async def test_publish_json_forwards_retain_false(self) -> None: assert json.loads(args[1]) == {"a": 1} assert kwargs == {"retain": False, "qos": 0} - async def test_clear_topic_publishes_none_not_retained(self) -> None: + async def test_clear_topic_publishes_none_retained(self) -> None: m = await self._call_and_flush(self.mqtt_client.clear_topic, "foo") m.assert_called_once_with("saic/foo", None, retain=True, qos=0) From 105627db79a3400e53e4f28c6e5176efa4267cbb Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:28:18 +0200 Subject: [PATCH 16/29] fix(mqtt_publisher): remove spurious "a" suffix from client identifier The suffix was a debug leftover from the initial aiomqtt migration. There is only one MqttPublisher instance so no client ID conflict exists. --- src/publisher/mqtt_publisher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index 79d34232..a77377c5 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -60,7 +60,7 @@ async def __run_loop(self) -> None: client = aiomqtt.Client( hostname=self.host, port=self.port, - identifier=str(self.publisher_id) + "a", + identifier=str(self.publisher_id), transport=self.transport_protocol.transport_mechanism, username=self.configuration.mqtt_user or None, password=self.configuration.mqtt_password or None, From 5cc59b0e912f3a1ae73052a644a03fc8efd0b693 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:31:50 +0200 Subject: [PATCH 17/29] fix(mqtt_publisher): exit immediately on permanent MQTT connect failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bad credentials, invalid client ID, and auth refusals (MQTT 3.1.1 rc 1, 2, 4, 5) are permanent — retrying every 5 seconds forever gives no useful signal. Raise SystemExit on these, matching the old gmqtt behaviour. rc 3 (server unavailable) is transient and still reconnects. --- src/publisher/mqtt_publisher.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index a77377c5..debd6e4b 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, override import aiomqtt +from aiomqtt.exceptions import MqttConnectError import mqtt_topics from publisher.core import Publisher @@ -98,6 +99,24 @@ async def __run_loop(self) -> None: message.qos, message.retain, ) + except MqttConnectError as e: + # rc values from MQTT 3.1.1 spec that are permanent — retrying won't help: + # 1 = incorrect protocol version + # 2 = invalid client identifier + # 4 = bad username or password + # 5 = not authorised + # rc 3 (server unavailable) is transient and falls through to reconnect. + _FATAL_CONNECT_RC = {1, 2, 4, 5} + if isinstance(e.rc, int) and e.rc in _FATAL_CONNECT_RC: + msg = f"MQTT connection permanently refused: {e}" + raise SystemExit(msg) from e + LOG.warning( + "Connection to %s:%s refused (transient); Reconnecting in %d seconds ...", + self.host, + self.port, + reconnect_interval, + ) + await asyncio.sleep(reconnect_interval) except aiomqtt.MqttError: LOG.warning( "Connection to %s:%s lost; Reconnecting in %d seconds ...", From dcddcca8ec36ced10f61a6f31850e5b3a6138b9e Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:32:32 +0200 Subject: [PATCH 18/29] refactor(mqtt_publisher): use paho CONNACK constants instead of magic numbers --- src/publisher/mqtt_publisher.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index debd6e4b..fdc1886d 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, override import aiomqtt +import paho.mqtt.client as paho_mqtt from aiomqtt.exceptions import MqttConnectError import mqtt_topics @@ -100,13 +101,14 @@ async def __run_loop(self) -> None: message.retain, ) except MqttConnectError as e: - # rc values from MQTT 3.1.1 spec that are permanent — retrying won't help: - # 1 = incorrect protocol version - # 2 = invalid client identifier - # 4 = bad username or password - # 5 = not authorised - # rc 3 (server unavailable) is transient and falls through to reconnect. - _FATAL_CONNECT_RC = {1, 2, 4, 5} + # Permanent rejections — retrying won't help. + # rc 3 (CONNACK_REFUSED_SERVER_UNAVAILABLE) is transient and falls through. + _FATAL_CONNECT_RC = { + paho_mqtt.CONNACK_REFUSED_PROTOCOL_VERSION, + paho_mqtt.CONNACK_REFUSED_IDENTIFIER_REJECTED, + paho_mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD, + paho_mqtt.CONNACK_REFUSED_NOT_AUTHORIZED, + } if isinstance(e.rc, int) and e.rc in _FATAL_CONNECT_RC: msg = f"MQTT connection permanently refused: {e}" raise SystemExit(msg) from e From a8bc3c693d026b46857a8f5a6c6bba6ab2f000a3 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:34:19 +0200 Subject: [PATCH 19/29] refactor(mqtt_publisher): replace paho constants with local MQTT 3.1.1 spec constants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoids depending on paho-mqtt (a transitive dep) for named CONNACK refusal codes. Values are stable — they are defined by the MQTT 3.1.1 specification (section 3.2.2.3) and will not change. --- src/publisher/mqtt_publisher.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index fdc1886d..ba87906c 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -7,7 +7,6 @@ from typing import TYPE_CHECKING, Any, override import aiomqtt -import paho.mqtt.client as paho_mqtt from aiomqtt.exceptions import MqttConnectError import mqtt_topics @@ -20,6 +19,18 @@ LOG = logging.getLogger(__name__) +# MQTT 3.1.1 spec section 3.2.2.3 — permanent connection refusal codes +_CONNACK_REFUSED_PROTOCOL_VERSION = 1 +_CONNACK_REFUSED_IDENTIFIER_REJECTED = 2 +_CONNACK_REFUSED_BAD_CREDENTIALS = 4 +_CONNACK_REFUSED_NOT_AUTHORIZED = 5 +_FATAL_CONNECT_RC = { + _CONNACK_REFUSED_PROTOCOL_VERSION, + _CONNACK_REFUSED_IDENTIFIER_REJECTED, + _CONNACK_REFUSED_BAD_CREDENTIALS, + _CONNACK_REFUSED_NOT_AUTHORIZED, +} + class MqttPublisher(Publisher): def __init__( @@ -102,13 +113,7 @@ async def __run_loop(self) -> None: ) except MqttConnectError as e: # Permanent rejections — retrying won't help. - # rc 3 (CONNACK_REFUSED_SERVER_UNAVAILABLE) is transient and falls through. - _FATAL_CONNECT_RC = { - paho_mqtt.CONNACK_REFUSED_PROTOCOL_VERSION, - paho_mqtt.CONNACK_REFUSED_IDENTIFIER_REJECTED, - paho_mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD, - paho_mqtt.CONNACK_REFUSED_NOT_AUTHORIZED, - } + # rc 3 (server unavailable) is transient and falls through to reconnect. if isinstance(e.rc, int) and e.rc in _FATAL_CONNECT_RC: msg = f"MQTT connection permanently refused: {e}" raise SystemExit(msg) from e From 7e55aeee794d4541c7c828ae8b3b122feb117f5e Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:35:08 +0200 Subject: [PATCH 20/29] fix(mqtt_publisher): exponential backoff on reconnect with 5 min cap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixed 5-second interval hammered a DNS lookup + TCP attempt every 5 s indefinitely against a down broker. Now doubles on each failure (5 s → 10 → 20 → … → 300 s) and resets to the minimum on a successful connection. --- src/publisher/mqtt_publisher.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index ba87906c..bea49dfe 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -19,6 +19,10 @@ LOG = logging.getLogger(__name__) +# Reconnect backoff: starts at 5 s, doubles on each failure, caps at 5 min +_RECONNECT_INTERVAL_MIN = 5 +_RECONNECT_INTERVAL_MAX = 300 + # MQTT 3.1.1 spec section 3.2.2.3 — permanent connection refusal codes _CONNACK_REFUSED_PROTOCOL_VERSION = 1 _CONNACK_REFUSED_IDENTIFIER_REJECTED = 2 @@ -90,7 +94,7 @@ async def __run_loop(self) -> None: ), ) client.pending_calls_threshold = 150 - reconnect_interval = 5 + reconnect_interval = _RECONNECT_INTERVAL_MIN while True: try: LOG.debug( @@ -103,6 +107,7 @@ async def __run_loop(self) -> None: self.client = client_context self.__connected.set() await self.__on_connect() + reconnect_interval = _RECONNECT_INTERVAL_MIN async for message in client_context.messages: await self._on_message( client_context, @@ -124,6 +129,7 @@ async def __run_loop(self) -> None: reconnect_interval, ) await asyncio.sleep(reconnect_interval) + reconnect_interval = min(reconnect_interval * 2, _RECONNECT_INTERVAL_MAX) except aiomqtt.MqttError: LOG.warning( "Connection to %s:%s lost; Reconnecting in %d seconds ...", @@ -132,6 +138,7 @@ async def __run_loop(self) -> None: reconnect_interval, ) await asyncio.sleep(reconnect_interval) + reconnect_interval = min(reconnect_interval * 2, _RECONNECT_INTERVAL_MAX) except asyncio.exceptions.CancelledError: LOG.debug("MQTT publisher loop cancelled") raise From 27677a6a2d0bcb0b0aa2b194678e38e2a3a2aa3c Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:36:47 +0200 Subject: [PATCH 21/29] fix(mqtt_publisher): use create_task instead of run_coroutine_threadsafe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit run_coroutine_threadsafe is the cross-thread API — calling it from the same event loop thread allocates an unnecessary concurrent.futures.Future and discards it, silently swallowing any unexpected exceptions. Replace with loop.create_task() which is the correct same-loop primitive and whose unhandled exceptions are surfaced by the asyncio runtime. --- src/publisher/mqtt_publisher.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index bea49dfe..ca615fe9 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -168,8 +168,7 @@ async def __on_connect(self) -> None: @override def enable_commands(self) -> None: - loop = asyncio.get_running_loop() - asyncio.run_coroutine_threadsafe(self.__enable_commands(), loop) + asyncio.get_running_loop().create_task(self.__enable_commands()) async def __enable_commands(self) -> None: if not self.__connected.is_set() or not self.client: @@ -314,9 +313,8 @@ def __publish( qos: int = 0, ) -> None: LOG.debug("Publishing to MQTT topic %s with payload %s", topic, payload) - loop = asyncio.get_running_loop() - asyncio.run_coroutine_threadsafe( - self.__async_publish(topic, payload, retain=retain, qos=qos), loop + asyncio.get_running_loop().create_task( + self.__async_publish(topic, payload, retain=retain, qos=qos) ) async def __async_publish( From 4fecfcdc1c0b2044526d4c6169d276c346f205b8 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 22:58:02 +0200 Subject: [PATCH 22/29] fix(mqtt_publisher): fix fatal CONNACK detection and connect() hang MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs in the permanent-refusal handling: 1. isinstance(e.rc, int) is always False — aiomqtt passes a ReasonCode object, not a plain int. ReasonCode.__eq__ supports int comparison, so the guard is redundant and was silently making the entire branch dead code. Remove the isinstance check. 2. raise SystemExit inside an asyncio Task is caught by the event loop and stored on the task, never propagated. connect() was left hanging on await __connected.wait() forever. Fix: store the error and set __connected so connect() unblocks, then re-raise there. --- src/publisher/mqtt_publisher.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index ca615fe9..7f4b71a3 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -54,6 +54,7 @@ def __init__( self.client: None | aiomqtt.Client = None self.__running: asyncio.Task[None] | None = None self.__connected = asyncio.Event() + self.__fatal_connect_error: SystemExit | None = None async def __run_loop(self) -> None: if not self.host: @@ -119,9 +120,12 @@ async def __run_loop(self) -> None: except MqttConnectError as e: # Permanent rejections — retrying won't help. # rc 3 (server unavailable) is transient and falls through to reconnect. - if isinstance(e.rc, int) and e.rc in _FATAL_CONNECT_RC: - msg = f"MQTT connection permanently refused: {e}" - raise SystemExit(msg) from e + # ReasonCode.__eq__ supports int comparison, so no isinstance guard needed. + if e.rc in _FATAL_CONNECT_RC: + LOG.error("MQTT connection permanently refused: %s", e) + self.__fatal_connect_error = SystemExit(str(e)) + self.__connected.set() + return LOG.warning( "Connection to %s:%s refused (transient); Reconnecting in %d seconds ...", self.host, @@ -156,6 +160,8 @@ async def connect(self) -> None: return self.__running = asyncio.create_task(self.__run_loop()) await self.__connected.wait() + if self.__fatal_connect_error is not None: + raise self.__fatal_connect_error async def __on_connect(self) -> None: LOG.info("Connected to MQTT broker") @@ -168,7 +174,12 @@ async def __on_connect(self) -> None: @override def enable_commands(self) -> None: - asyncio.get_running_loop().create_task(self.__enable_commands()) + task = asyncio.get_running_loop().create_task(self.__enable_commands()) + task.add_done_callback(self.__on_enable_commands_done) + + def __on_enable_commands_done(self, task: asyncio.Task[None]) -> None: + if not task.cancelled() and (exc := task.exception()): + LOG.error("Failed to enable MQTT command subscriptions: %s", exc) async def __enable_commands(self) -> None: if not self.__connected.is_set() or not self.client: From 17bd181efd936d02f6aab28920f933c2736cadde Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 23:01:01 +0200 Subject: [PATCH 23/29] fix(ha): emit retain as JSON boolean in discovery payloads str(retain).lower() produced the Python strings "true"/"false" which json.dumps serialized as JSON strings. Home Assistant's MQTT discovery schema expects a boolean; the string caused schema validation to reject the retain flag, so command topics for number/select entities were not retained on the broker and gateway settings were lost after an HA restart. --- src/integrations/home_assistant/base.py | 6 +++--- tests/integrations/home_assistant/test_discovery_retain.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/integrations/home_assistant/base.py b/src/integrations/home_assistant/base.py index 322fa60a..8f12dbe4 100644 --- a/src/integrations/home_assistant/base.py +++ b/src/integrations/home_assistant/base.py @@ -55,7 +55,7 @@ def _publish_select( "command_topic": self._get_command_topic(topic), "value_template": value_template, "command_template": command_template, - "retain": str(retain).lower(), + "retain": retain, "options": options, "enabled_by_default": enabled, } @@ -87,7 +87,7 @@ def _publish_text( "command_topic": self._get_command_topic(topic), "value_template": value_template, "command_template": command_template, - "retain": str(retain).lower(), + "retain": retain, "enabled_by_default": enabled, } if min_value is not None: @@ -153,7 +153,7 @@ def _publish_number( "state_topic": self._get_state_topic(topic), "command_topic": self._get_command_topic(topic), "value_template": value_template, - "retain": str(retain).lower(), + "retain": retain, "mode": mode, "min": min_value, "max": max_value, diff --git a/tests/integrations/home_assistant/test_discovery_retain.py b/tests/integrations/home_assistant/test_discovery_retain.py index 45fa7898..f56731fb 100644 --- a/tests/integrations/home_assistant/test_discovery_retain.py +++ b/tests/integrations/home_assistant/test_discovery_retain.py @@ -104,7 +104,7 @@ def test_required_entities_have_retain_true(self) -> None: assert payload is not None, ( f"No writable HA discovery payload found for topic {topic}" ) - assert payload.get("retain") == "true", ( + assert payload.get("retain") is True, ( f"Expected retain=true for {topic}, got {payload.get('retain')!r}" ) @@ -117,6 +117,6 @@ def test_non_retained_entities_keep_retain_false(self) -> None: payload = _payload_for_state_topic_suffix(payloads, topic) if payload is None: continue # entity not published for this vehicle config - assert payload.get("retain") in ("false", None), ( + assert payload.get("retain") in (False, None), ( f"Expected retain!=true for {topic}, got {payload.get('retain')!r}" ) From 11af44847856d2c5825cf0673232a6451a191fc4 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 23:01:16 +0200 Subject: [PATCH 24/29] style: apply ruff formatting --- src/publisher/mqtt_publisher.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index 7f4b71a3..b6de2477 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -133,7 +133,9 @@ async def __run_loop(self) -> None: reconnect_interval, ) await asyncio.sleep(reconnect_interval) - reconnect_interval = min(reconnect_interval * 2, _RECONNECT_INTERVAL_MAX) + reconnect_interval = min( + reconnect_interval * 2, _RECONNECT_INTERVAL_MAX + ) except aiomqtt.MqttError: LOG.warning( "Connection to %s:%s lost; Reconnecting in %d seconds ...", @@ -142,7 +144,9 @@ async def __run_loop(self) -> None: reconnect_interval, ) await asyncio.sleep(reconnect_interval) - reconnect_interval = min(reconnect_interval * 2, _RECONNECT_INTERVAL_MAX) + reconnect_interval = min( + reconnect_interval * 2, _RECONNECT_INTERVAL_MAX + ) except asyncio.exceptions.CancelledError: LOG.debug("MQTT publisher loop cancelled") raise From 77c4581b3575ed9e026616bc84dd9a1d20f8bd41 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 23:05:25 +0200 Subject: [PATCH 25/29] fix(extractors): suppress SoC kWh warning for non-EV vehicles extract_soc_kwh now returns None silently when charge_status is None, which is the expected state for non-EV vehicles on every poll cycle. The WARNING is preserved for EVs where charge_status is present but kWh derivation fails. --- src/extractors/__init__.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/extractors/__init__.py b/src/extractors/__init__.py index 2ebd343d..746a735a 100644 --- a/src/extractors/__init__.py +++ b/src/extractors/__init__.py @@ -43,17 +43,17 @@ def extract_soc_kwh( charge_status: ChrgMgmtDataRespProcessingResult | None, soc: float | None, ) -> float | None: - if ( - charge_status is not None - and (raw_soc_kwh := charge_status.soc_kwh) is not None - and (soc_kwh := __validate_and_convert_soc_kwh(raw_soc_kwh)) is not None - ): + if charge_status is None: + return None + + if (raw_soc_kwh := charge_status.soc_kwh) is not None and ( + soc_kwh := __validate_and_convert_soc_kwh(raw_soc_kwh) + ) is not None: LOG.debug("SoC kWh derived from realtimePower") return soc_kwh if ( soc is not None - and charge_status is not None and ( capacity := __validate_and_convert_soc_kwh( charge_status.real_total_battery_capacity From 5acfbd43f337033a3fe423a30a38aa5c142552f4 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 23:08:04 +0200 Subject: [PATCH 26/29] fix(command): publish command results with retain=False MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Command result topics ("Success" / "Failed: ...") were published with the default retain=True, so a stale result from a previous run would persist on the broker and re-appear after a gateway restart. Results are transient responses — they should not survive a reconnect. --- src/handlers/vehicle_command.py | 6 +++--- tests/handlers/test_vehicle_command.py | 23 +++++++++++++++-------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/handlers/vehicle_command.py b/src/handlers/vehicle_command.py index e7f994ca..e1840d1c 100644 --- a/src/handlers/vehicle_command.py +++ b/src/handlers/vehicle_command.py @@ -70,7 +70,7 @@ def __report_command_failure( else: LOG.error("Command %s failed: %s", command, detail) try: - self.publisher.publish_str(result_topic, f"Failed: {detail}") + self.publisher.publish_str(result_topic, f"Failed: {detail}", retain=False) except Exception: LOG.warning( "Failed to publish failure result for command %s", @@ -138,7 +138,7 @@ async def __execute_mqtt_command_handler( try: execution_result = await handler.handle(payload, retained=retained) - self.publisher.publish_str(result_topic, "Success") + self.publisher.publish_str(result_topic, "Success", retain=False) if execution_result.force_refresh: self.vehicle_state.set_refresh_mode( RefreshMode.FORCE, f"after command execution on topic {topic}" @@ -165,7 +165,7 @@ async def __execute_mqtt_command_handler( return try: execution_result = await handler.handle(payload, retained=retained) - self.publisher.publish_str(result_topic, "Success") + self.publisher.publish_str(result_topic, "Success", retain=False) if execution_result.force_refresh: self.vehicle_state.set_refresh_mode( RefreshMode.FORCE, diff --git a/tests/handlers/test_vehicle_command.py b/tests/handlers/test_vehicle_command.py index 4815b5ec..57cfaaf1 100644 --- a/tests/handlers/test_vehicle_command.py +++ b/tests/handlers/test_vehicle_command.py @@ -55,7 +55,7 @@ async def test_successful_command_publishes_success(self) -> None: await handler.handle_mqtt_command(topic=CHARGING_SET_TOPIC, payload="true") - pub.publish_str.assert_any_call(CHARGING_RESULT_TOPIC, "Success") + pub.publish_str.assert_any_call(CHARGING_RESULT_TOPIC, "Success", retain=False) pub.publish_json.assert_not_called() @@ -70,6 +70,7 @@ async def test_publishes_error_event(self) -> None: pub.publish_str.assert_any_call( result_topic, "Failed: No handler found for command topic nonexistent/topic/set", + retain=False, ) pub.publish_json.assert_called_once() event = pub.publish_json.call_args[0][1] @@ -99,6 +100,7 @@ async def test_publishes_error_event(self) -> None: CHARGING_RESULT_TOPIC, "Failed: Unsupported payload not_a_boolean for command " "DrivetrainChargingCommand", + retain=False, ) pub.publish_json.assert_called_once() event = pub.publish_json.call_args[0][1] @@ -119,6 +121,7 @@ async def test_publishes_error_event(self) -> None: pub.publish_str.assert_any_call( CHARGING_RESULT_TOPIC, "Failed: return code: 8, message: operation too frequent", + retain=False, ) pub.publish_json.assert_called_once() event = pub.publish_json.call_args[0][1] @@ -135,7 +138,7 @@ async def test_uses_safe_detail(self) -> None: await handler.handle_mqtt_command(topic=CHARGING_SET_TOPIC, payload="true") pub.publish_str.assert_any_call( - CHARGING_RESULT_TOPIC, "Failed: unexpected error" + CHARGING_RESULT_TOPIC, "Failed: unexpected error", retain=False ) event = pub.publish_json.call_args[0][1] assert event["detail"] == "unexpected error" @@ -157,7 +160,7 @@ async def test_relogin_success_retries_command(self) -> None: assert isinstance(relogin, AsyncMock) relogin.force_login.assert_awaited_once() assert saic_api.control_charging.await_count == 2 - pub.publish_str.assert_any_call(CHARGING_RESULT_TOPIC, "Success") + pub.publish_str.assert_any_call(CHARGING_RESULT_TOPIC, "Success", retain=False) pub.publish_json.assert_not_called() async def test_relogin_failure_publishes_error_event(self) -> None: @@ -170,7 +173,7 @@ async def test_relogin_failure_publishes_error_event(self) -> None: await handler.handle_mqtt_command(topic=CHARGING_SET_TOPIC, payload="true") pub.publish_str.assert_any_call( - CHARGING_RESULT_TOPIC, "Failed: relogin failed (login failed)" + CHARGING_RESULT_TOPIC, "Failed: relogin failed (login failed)", retain=False ) pub.publish_json.assert_called_once() event = pub.publish_json.call_args[0][1] @@ -186,7 +189,9 @@ async def test_retry_failure_publishes_error_event(self) -> None: await handler.handle_mqtt_command(topic=CHARGING_SET_TOPIC, payload="true") - pub.publish_str.assert_any_call(CHARGING_RESULT_TOPIC, "Failed: retry boom") + pub.publish_str.assert_any_call( + CHARGING_RESULT_TOPIC, "Failed: retry boom", retain=False + ) pub.publish_json.assert_called_once() event = pub.publish_json.call_args[0][1] assert event["detail"] == "retry boom" @@ -277,7 +282,7 @@ async def test_retained_force_refresh_mode_dropped(self) -> None: ) vehicle_state.set_refresh_mode.assert_not_called() - pub.publish_str.assert_any_call(REFRESH_MODE_RESULT_TOPIC, "Success") + pub.publish_str.assert_any_call(REFRESH_MODE_RESULT_TOPIC, "Success", retain=False) async def test_retained_charging_detection_refresh_mode_dropped(self) -> None: handler, pub = _build() @@ -290,7 +295,7 @@ async def test_retained_charging_detection_refresh_mode_dropped(self) -> None: ) vehicle_state.set_refresh_mode.assert_not_called() - pub.publish_str.assert_any_call(REFRESH_MODE_RESULT_TOPIC, "Success") + pub.publish_str.assert_any_call(REFRESH_MODE_RESULT_TOPIC, "Success", retain=False) async def test_retained_periodic_refresh_mode_applied(self) -> None: handler, _pub = _build() @@ -339,7 +344,9 @@ async def test_retained_battery_capacity_replays_to_vehicle_info(self) -> None: vehicle_state.update_battery_capacity.assert_called_once_with(50.0) pub.publish_float.assert_any_call(TOTAL_BATTERY_CAPACITY_STATE_TOPIC, 50.0) - pub.publish_str.assert_any_call(TOTAL_BATTERY_CAPACITY_RESULT_TOPIC, "Success") + pub.publish_str.assert_any_call( + TOTAL_BATTERY_CAPACITY_RESULT_TOPIC, "Success", retain=False + ) async def test_battery_capacity_zero_payload_publishes_model_default(self) -> None: """Payload `0` clears the override; the per-model default is republished.""" From 2070279e075629d7605e150f260705b262b93be5 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 23:09:48 +0200 Subject: [PATCH 27/29] fix(tls): only apply tls_insecure when a custom CA cert is configured tls_insecure was set for any TLS connection where hostname checking was disabled, not just the self-signed/custom-CA scenario it was intended for. With a public CA cert (the default case), hostname verification should never be bypassed. The warning log already required tls_server_cert_path; align the tls_insecure condition to match. --- src/publisher/mqtt_publisher.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index b6de2477..6513bd01 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -85,7 +85,9 @@ async def __run_loop(self) -> None: clean_session=True, tls_context=ssl_context, tls_insecure=bool( - ssl_context and not self.configuration.tls_server_cert_check_hostname + ssl_context + and self.configuration.tls_server_cert_path + and not self.configuration.tls_server_cert_check_hostname ), will=aiomqtt.Will( topic=self.get_topic(mqtt_topics.INTERNAL_LWT, False), From c16515e216eea3fccc432ed9950909a6f778b294 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 23:11:10 +0200 Subject: [PATCH 28/29] fix(tls): warn on hostname-check bypass regardless of custom CA cert The warning "Skipping hostname check" was only emitted when a custom CA cert path was also configured. Users connecting with self-signed certs (no CA file) or by IP address would disable hostname verification silently. Move the warning outside the tls_server_cert_path block so it fires for any TLS connection where tls_server_cert_check_hostname=False, and drop the spurious tls_server_cert_path guard on tls_insecure. --- src/publisher/mqtt_publisher.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index 6513bd01..d7cc9b61 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -70,10 +70,10 @@ async def __run_loop(self) -> None: ssl_context.load_verify_locations( cafile=self.configuration.tls_server_cert_path ) - if not self.configuration.tls_server_cert_check_hostname: - LOG.warning( - f"Skipping hostname check for TLS connection to {self.host}" - ) + if not self.configuration.tls_server_cert_check_hostname: + LOG.warning( + f"Skipping hostname check for TLS connection to {self.host}" + ) client = aiomqtt.Client( hostname=self.host, @@ -85,9 +85,7 @@ async def __run_loop(self) -> None: clean_session=True, tls_context=ssl_context, tls_insecure=bool( - ssl_context - and self.configuration.tls_server_cert_path - and not self.configuration.tls_server_cert_check_hostname + ssl_context and not self.configuration.tls_server_cert_check_hostname ), will=aiomqtt.Will( topic=self.get_topic(mqtt_topics.INTERNAL_LWT, False), From 953a24b41ee796dbb3ea2ad61c15aade70f62c2f Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Wed, 24 Jun 2026 23:16:01 +0200 Subject: [PATCH 29/29] chore: add 0.13.0 release notes to CHANGELOG --- CHANGELOG.md | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 57439d11..cff18fc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,41 @@ # Change Log +## 0.13.0 + +### Changed + +* Replace gmqtt with aiomqtt for MQTT connectivity. The new client uses a + pure-asyncio architecture — no more `run_coroutine_threadsafe` cross-thread + calls. Reconnect now uses exponential backoff (5 s → 10 s → … → 5 min) + instead of a fixed interval (#457). + +### Fixed + +* Exit immediately on permanent MQTT connection failures (bad credentials, + unknown protocol version, identifier rejected, not authorised) instead of + looping forever (#457). + +* Restore MQTT subscriptions and trigger HA re-discovery on broker reconnect + after a transient disconnect (#457). + +* Suppress spurious `WARNING: Could not extract a valid SoC kWh` log on every + poll for non-EV vehicles. The warning is preserved for EVs where the BMS + returns invalid data (#460). + +* Publish command result topics (`Success` / `Failed: …`) with `retain=False` + so a stale result from a previous session does not persist on the broker + across restarts (#461). + +* Emit `retain` as a JSON boolean (`true`/`false`) in Home Assistant MQTT + discovery payloads instead of the string `"true"`/`"false"` that the HA + schema requires (#459). + +* Warn when TLS hostname verification is disabled regardless of whether a + custom CA certificate is configured. Self-signed cert users (who typically + have no CA file) now see the warning too (#462). + +**Full Changelog**: https://github.com/SAIC-iSmart-API/saic-python-mqtt-gateway/compare/0.12.0...0.13.0 + ## 0.12.0 ### Added