diff --git a/poetry.lock b/poetry.lock index 3e6be2d0..5e74c391 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,20 @@ # This file is automatically @generated by Poetry 2.3.2 and should not be changed by hand. +[[package]] +name = "aiomqtt" +version = "2.5.1" +description = "The idiomatic asyncio MQTT client" +optional = false +python-versions = "<4.0,>=3.8" +groups = ["main"] +files = [ + {file = "aiomqtt-2.5.1-py3-none-any.whl", hash = "sha256:fd58c3593160e4d475d90ce911cdfc4239cd64de96b0ba22edf6c86bd7afa278"}, + {file = "aiomqtt-2.5.1.tar.gz", hash = "sha256:25a0a47d157e8f158d2da1110ea4786c0615518751e94f7b04976c977a8ff20d"}, +] + +[package.dependencies] +paho-mqtt = ">=2.1.0,<3.0.0" + [[package]] name = "anyio" version = "4.12.1" @@ -234,21 +249,6 @@ files = [ graph = ["objgraph (>=1.7.2)"] profile = ["gprof2dot (>=2022.7.29)"] -[[package]] -name = "gmqtt" -version = "0.7.0" -description = "Client for MQTT protocol" -optional = false -python-versions = ">=3.5" -groups = ["main"] -files = [ - {file = "gmqtt-0.7.0-py3-none-any.whl", hash = "sha256:3e5571a20e9c115d83d600caa228b06f716087653e241035e29cec73277b52cc"}, - {file = "gmqtt-0.7.0.tar.gz", hash = "sha256:bedfec7bac26b6b4ce1f0c4c32cff3d663526a54c882d323d41560fc3b9b44a2"}, -] - -[package.extras] -test = ["atomicwrites (>=1.3.0)", "attrs (>=19.1.0)", "codecov (>=2.0.15)", "coverage (>=4.5.3)", "more-itertools (>=7.0.0)", "pluggy (>=0.11.0)", "py (>=1.8.0)", "pytest (>=5.4.0)", "pytest-asyncio (>=0.12.0)", "pytest-cov (>=2.7.1)", "six (>=1.12.0)", "uvloop (>=0.14.0)"] - [[package]] name = "h11" version = "0.16.0" @@ -578,6 +578,21 @@ files = [ {file = "packaging-26.0.tar.gz", hash = "sha256:00243ae351a257117b6a241061796684b084ed1c516a08c48a3f7e147a9d80b4"}, ] +[[package]] +name = "paho-mqtt" +version = "2.1.0" +description = "MQTT version 5.0/3.1.1 client class" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee"}, + {file = "paho_mqtt-2.1.0.tar.gz", hash = "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834"}, +] + +[package.extras] +proxy = ["pysocks"] + [[package]] name = "pathspec" version = "1.0.4" @@ -931,4 +946,4 @@ devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3) [metadata] lock-version = "2.1" python-versions = ">=3.12,<4.0" -content-hash = "895b2fe59d35a1326b819dc68c6a88aac237d81fd60c16076a13c4801e17dee2" +content-hash = "1e5abdfdc66a6b7dc2fb6a5ba61087f51c6b93a78f31b180b9006a089632815e" diff --git a/pyproject.toml b/pyproject.toml index 459f26d6..f8638a40 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,10 +17,10 @@ requires-python = '>=3.12,<4.0' dependencies = [ "saic-ismart-client-ng (>=0.9.3,<0.10.0)", 'httpx (>=0.28.1,<0.29.0)', - 'gmqtt (>=0.7.0,<0.8.0)', 'inflection (>=0.5.1,<0.6.0)', 'apscheduler (>=3.11.0,<4.0.0)', 'python-dotenv (>=1.1.1,<2.0.0)', + "aiomqtt (>=2.4.0,<3.0.0)", ] [project.urls] diff --git a/src/configuration/__init__.py b/src/configuration/__init__.py index 3816ae32..5cb23dba 100644 --- a/src/configuration/__init__.py +++ b/src/configuration/__init__.py @@ -1,14 +1,18 @@ from __future__ import annotations from enum import Enum -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal if TYPE_CHECKING: from integrations.openwb.charging_station import ChargingStation +Transport = Literal["tcp", "websockets"] +QoS = Literal[0, 1, 2] + + class TransportProtocol(Enum): - def __init__(self, transport_mechanism: str, with_tls: bool) -> None: + def __init__(self, transport_mechanism: Transport, with_tls: bool) -> None: self.transport_mechanism = transport_mechanism self.with_tls = with_tls diff --git a/src/configuration/parser.py b/src/configuration/parser.py index d97b3dfe..707ede1e 100644 --- a/src/configuration/parser.py +++ b/src/configuration/parser.py @@ -104,15 +104,18 @@ def __parse_mqtt_transport(args: Namespace, config: Configuration) -> None: args.tls_server_cert_check_hostname ) else: - msg = f"Invalid MQTT URI scheme: {parse_result.scheme}, use tcp or ws" + msg = f"Invalid MQTT URI scheme: {parse_result.scheme}, use tls, tcp or ws" raise SystemExit(msg) if parse_result.port: config.mqtt_port = parse_result.port - elif config.mqtt_transport_protocol == TransportProtocol.TCP: - config.mqtt_port = 1883 - else: + elif config.mqtt_transport_protocol == TransportProtocol.TLS: + config.mqtt_port = 8883 + elif config.mqtt_transport_protocol == TransportProtocol.WS: config.mqtt_port = 9001 + else: + # fallback to default mqtt port + config.mqtt_port = 1883 config.mqtt_host = str(parse_result.hostname) diff --git a/src/log_config.py b/src/log_config.py index 9c96caed..2a49e3a4 100644 --- a/src/log_config.py +++ b/src/log_config.py @@ -7,7 +7,7 @@ MODULES_DEFAULT_LOG_LEVEL = { "asyncio": "WARNING", - "gmqtt": "WARNING", + "aiomqtt": "WARNING", "httpcore": "WARNING", "httpx": "WARNING", "saic_ismart_client_ng": "WARNING", diff --git a/src/main.py b/src/main.py index dc47e092..e2d90d96 100644 --- a/src/main.py +++ b/src/main.py @@ -27,4 +27,5 @@ configuration = process_command_line() mqtt_gateway = MqttGateway(configuration) + asyncio.run(mqtt_gateway.run(), debug=debug_log_enabled()) diff --git a/src/publisher/core.py b/src/publisher/core.py index 80b93dbe..26d645f7 100644 --- a/src/publisher/core.py +++ b/src/publisher/core.py @@ -8,7 +8,7 @@ import mqtt_topics if TYPE_CHECKING: - from configuration import Configuration + from configuration import Configuration, QoS T = TypeVar("T") @@ -63,28 +63,61 @@ def is_connected(self) -> bool: @abstractmethod def publish_json( - self, key: str, data: dict[str, Any], no_prefix: bool = False + self, + key: str, + data: dict[str, Any], + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, ) -> None: raise NotImplementedError @abstractmethod - def publish_str(self, key: str, value: str, no_prefix: bool = False) -> None: + def publish_str( + self, + key: str, + value: str, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: raise NotImplementedError @abstractmethod - def publish_int(self, key: str, value: int, no_prefix: bool = False) -> None: + def publish_int( + self, + key: str, + value: int, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: raise NotImplementedError @abstractmethod - def publish_bool(self, key: str, value: bool, no_prefix: bool = False) -> None: + def publish_bool( + self, + key: str, + value: bool, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: raise NotImplementedError @abstractmethod - def publish_float(self, key: str, value: float, no_prefix: bool = False) -> None: + def publish_float( + self, + key: str, + value: float, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: raise NotImplementedError @abstractmethod - def clear_topic(self, key: str, no_prefix: bool = False) -> None: + def clear_topic(self, key: str, no_prefix: bool = False, qos: QoS = 0) -> None: raise NotImplementedError def get_mqtt_account_prefix(self) -> str: @@ -162,7 +195,9 @@ def __anonymize(self, data: T) -> T: return data def keepalive(self) -> None: - self.publish_str(mqtt_topics.INTERNAL_LWT, "online", False) + self.publish_str( + mqtt_topics.INTERNAL_LWT, "online", no_prefix=False, retain=True, qos=1 + ) @staticmethod def anonymize_str(value: str) -> str: diff --git a/src/publisher/log_publisher.py b/src/publisher/log_publisher.py index efb6b249..7f126b55 100644 --- a/src/publisher/log_publisher.py +++ b/src/publisher/log_publisher.py @@ -1,10 +1,13 @@ from __future__ import annotations import logging -from typing import Any, override +from typing import TYPE_CHECKING, Any, override from publisher.core import Publisher +if TYPE_CHECKING: + from configuration import QoS + LOG = logging.getLogger(__name__) LOG.setLevel(level="DEBUG") @@ -24,29 +27,62 @@ def enable_commands(self) -> None: @override def publish_json( - self, key: str, data: dict[str, Any], no_prefix: bool = False + self, + key: str, + data: dict[str, Any], + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, ) -> None: anonymized_json = self.dict_to_anonymized_json(data) self.internal_publish(key, anonymized_json) @override - def publish_str(self, key: str, value: str, no_prefix: bool = False) -> None: + def publish_str( + self, + key: str, + value: str, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: self.internal_publish(key, value) @override - def publish_int(self, key: str, value: int, no_prefix: bool = False) -> None: + def publish_int( + self, + key: str, + value: int, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: self.internal_publish(key, value) @override - def publish_bool(self, key: str, value: bool, no_prefix: bool = False) -> None: + def publish_bool( + self, + key: str, + value: bool, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: self.internal_publish(key, value) @override - def publish_float(self, key: str, value: float, no_prefix: bool = False) -> None: + def publish_float( + self, + key: str, + value: float, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: self.internal_publish(key, value) @override - def clear_topic(self, key: str, no_prefix: bool = False) -> None: + def clear_topic(self, key: str, no_prefix: bool = False, qos: QoS = 0) -> None: self.internal_publish(key, None) def internal_publish(self, key: str, value: Any) -> None: diff --git a/src/publisher/mqtt_publisher.py b/src/publisher/mqtt_publisher.py index 3b3a4df5..1c2cf14b 100644 --- a/src/publisher/mqtt_publisher.py +++ b/src/publisher/mqtt_publisher.py @@ -1,23 +1,43 @@ from __future__ import annotations +import asyncio import logging import ssl -from typing import TYPE_CHECKING, Any, Final, cast, override +from typing import TYPE_CHECKING, Any, override -import gmqtt +import aiomqtt +from aiomqtt.exceptions import MqttConnectError +from paho.mqtt.reasoncodes import ReasonCode import mqtt_topics from publisher.core import Publisher if TYPE_CHECKING: - from configuration import Configuration + from configuration import Configuration, QoS from integrations.openwb.charging_station import ChargingStation LOG = logging.getLogger(__name__) +# MQTT CONNACK "Server unavailable" return codes — the only transient +# connection refusal that warrants a retry. All other CONNACK refusal +# codes (bad credentials, protocol mismatch, etc.) are fatal. +_CONNACK_RC_SERVER_UNAVAILABLE_V311 = 3 # MQTT v3.1.1 +_CONNACK_RC_SERVER_UNAVAILABLE_V5 = 0x88 # MQTT v5 + + +def _is_transient_connect_error(rc: int | ReasonCode | None) -> bool: + if rc is None: + return False + if isinstance(rc, ReasonCode): + return rc.value == _CONNACK_RC_SERVER_UNAVAILABLE_V5 + return rc == _CONNACK_RC_SERVER_UNAVAILABLE_V311 + class MqttPublisher(Publisher): - def __init__(self, configuration: Configuration) -> None: + def __init__( + self, + configuration: Configuration, + ) -> None: super().__init__(configuration) self.publisher_id = configuration.mqtt_client_id self.host = self.configuration.mqtt_host @@ -27,113 +47,178 @@ def __init__(self, configuration: Configuration) -> None: self.last_charge_state_by_vin: dict[str, str] = {} self.vin_by_charger_connected_topic: dict[str, str] = {} self.first_connection = True + self.client: None | aiomqtt.Client = None + self.__running: asyncio.Task[None] | None = None + self.__connected = asyncio.Event() - mqtt_client = gmqtt.Client( - client_id=str(self.publisher_id), + async def __run_loop(self) -> None: + if not self.host: + msg = "MQTT host is not configured" + LOG.error(msg) + raise SystemExit(msg) + ssl_context: ssl.SSLContext | None = None + if self.transport_protocol.with_tls: + ssl_context = ssl.create_default_context() + if self.configuration.tls_server_cert_path: + LOG.debug( + f"Using custom CA file {self.configuration.tls_server_cert_path}" + ) + ssl_context.load_verify_locations( + cafile=self.configuration.tls_server_cert_path + ) + if not self.configuration.tls_server_cert_check_hostname: + LOG.warning( + f"Skipping hostname check for TLS connection to {self.host}" + ) + + client = aiomqtt.Client( + hostname=self.host, + port=self.port, + identifier=str(self.publisher_id), transport=self.transport_protocol.transport_mechanism, - will_message=gmqtt.Message( + username=self.configuration.mqtt_user or None, + password=self.configuration.mqtt_password or None, + clean_session=True, + tls_context=ssl_context, + tls_insecure=bool( + ssl_context and not self.configuration.tls_server_cert_check_hostname + ), + will=aiomqtt.Will( topic=self.get_topic(mqtt_topics.INTERNAL_LWT, False), payload="offline", retain=True, + qos=1, ), ) - mqtt_client.on_connect = self.__on_connect - mqtt_client.on_message = self.__on_message - self.client: Final[gmqtt.Client] = mqtt_client + client.pending_calls_threshold = 150 + reconnect_interval = 5 + while True: + try: + LOG.debug( + "Connecting to %s:%s as %s", + self.host, + self.port, + self.publisher_id, + ) + async with client as client_context: + self.client = client_context + await self.__on_connect() + self.__connected.set() + async for message in client_context.messages: + await self._on_message( + client_context, + str(message.topic), + message.payload, + message.qos, + message.properties, + ) + except MqttConnectError as e: + if not _is_transient_connect_error(e.rc): + LOG.error("Fatal MQTT connection error: %s", e) + msg = f"Unable to connect to MQTT broker: {e}" + raise SystemExit(msg) from e + LOG.warning( + "Connection to %s:%s refused: %s; Reconnecting in %d seconds ...", + self.host, + self.port, + e, + reconnect_interval, + ) + await asyncio.sleep(reconnect_interval) + except aiomqtt.MqttError as e: + LOG.warning( + "Connection to %s:%s lost: %s; Reconnecting in %d seconds ...", + self.host, + self.port, + e, + reconnect_interval, + ) + await asyncio.sleep(reconnect_interval) + except asyncio.CancelledError: + LOG.debug("MQTT publisher loop cancelled") + raise + finally: + self.__connected.clear() + LOG.info("MQTT client disconnected") @override async def connect(self) -> None: - if self.configuration.mqtt_user is not None: - if self.configuration.mqtt_password is not None: - self.client.set_auth_credentials( - username=self.configuration.mqtt_user, - password=self.configuration.mqtt_password, - ) - else: - self.client.set_auth_credentials(username=self.configuration.mqtt_user) - - if self.transport_protocol.with_tls: - ssl_context = ssl.create_default_context() - cert_uri = self.configuration.tls_server_cert_path - if cert_uri: - LOG.debug(f"Using custom CA file {cert_uri}") - ssl_context.load_verify_locations(cafile=cert_uri) - if not self.configuration.tls_server_cert_check_hostname: - LOG.warning( - f"Skipping hostname check for TLS connection to {self.host}" - ) - ssl_context.check_hostname = False - else: - ssl_context = None - await self.client.connect( - host=self.host, - port=self.port, - version=gmqtt.constants.MQTTv311, - ssl=ssl_context, + if self.__running and not self.__running.done(): + LOG.warning("MQTT client is already running") + return + self.__running = asyncio.create_task(self.__run_loop()) + # Wait for either a successful connection or the run loop to exit + # (e.g. due to missing host, fatal error, or unexpected exception) + connected_waiter = asyncio.create_task(self.__connected.wait()) + done, _pending = await asyncio.wait( + {self.__running, connected_waiter}, + return_when=asyncio.FIRST_COMPLETED, ) + if connected_waiter not in done: + connected_waiter.cancel() + if self.__running in done: + # The run loop exited before connecting — propagate the error + self.__running.result() - def __on_connect( - self, _client: Any, _flags: Any, rc: int, _properties: Any - ) -> None: - if rc == gmqtt.constants.CONNACK_ACCEPTED: - LOG.info("Connected to MQTT broker") - if not self.first_connection: - self.enable_commands() - if self.command_listener is not None: - self.command_listener.on_mqtt_reconnected() - self.first_connection = False - self.keepalive() - else: - if rc == gmqtt.constants.CONNACK_REFUSED_BAD_USERNAME_PASSWORD: - LOG.error( - f"MQTT connection error: bad username or password. Return code {rc}" - ) - elif rc == gmqtt.constants.CONNACK_REFUSED_PROTOCOL_VERSION: - LOG.error( - f"MQTT connection error: refused protocol version. Return code {rc}" - ) - else: - LOG.error(f"MQTT connection error.Return code {rc}") - msg = f"Unable to connect to MQTT broker. Return code: {rc}" - raise SystemExit(msg) + async def __on_connect(self) -> None: + LOG.info("Connected to MQTT broker") + if not self.first_connection: + await self.__enable_commands() + if self.command_listener is not None: + self.command_listener.on_mqtt_reconnected() + self.first_connection = False + self.keepalive() @override def enable_commands(self) -> None: - LOG.info("Subscribing to MQTT command topics") - mqtt_account_prefix = self.get_mqtt_account_prefix() - self.client.subscribe( - f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/+/+/{mqtt_topics.SET_SUFFIX}" - ) - self.client.subscribe( - f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/+/+/+/{mqtt_topics.SET_SUFFIX}" - ) - self.client.subscribe( - f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/{mqtt_topics.REFRESH_MODE}/{mqtt_topics.SET_SUFFIX}" - ) - self.client.subscribe( - f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/{mqtt_topics.REFRESH_PERIOD}/+/{mqtt_topics.SET_SUFFIX}" - ) - for charging_station in self.configuration.charging_stations_by_vin.values(): - LOG.debug( - f"Subscribing to MQTT topic {charging_station.charge_state_topic}" + task = asyncio.create_task(self.__enable_commands()) + task.add_done_callback(self.__handle_task_exception) + + async def __enable_commands(self) -> None: + if not self.client: + LOG.error("Failed to enable commands: MQTT client is not connected") + return + try: + LOG.info("Subscribing to MQTT command topics") + mqtt_account_prefix = self.get_mqtt_account_prefix() + await self.client.subscribe( + f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/+/+/{mqtt_topics.SET_SUFFIX}" + ) + await self.client.subscribe( + f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/+/+/+/{mqtt_topics.SET_SUFFIX}" ) - self.vin_by_charge_state_topic[charging_station.charge_state_topic] = ( - charging_station.vin + await self.client.subscribe( + f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/{mqtt_topics.REFRESH_MODE}/{mqtt_topics.SET_SUFFIX}" ) - self.client.subscribe(charging_station.charge_state_topic) - if charging_station.connected_topic: + await self.client.subscribe( + f"{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/{mqtt_topics.REFRESH_PERIOD}/+/{mqtt_topics.SET_SUFFIX}" + ) + for ( + charging_station + ) in self.configuration.charging_stations_by_vin.values(): LOG.debug( - f"Subscribing to MQTT topic {charging_station.connected_topic}" + f"Subscribing to MQTT topic {charging_station.charge_state_topic}" + ) + self.vin_by_charge_state_topic[charging_station.charge_state_topic] = ( + charging_station.vin ) - self.vin_by_charger_connected_topic[ - charging_station.connected_topic - ] = charging_station.vin - self.client.subscribe(charging_station.connected_topic) - if self.configuration.ha_discovery_enabled: - # enable dynamic discovery pushing in case ha reconnects - self.client.subscribe(self.configuration.ha_lwt_topic) - - async def __on_message( + await self.client.subscribe(charging_station.charge_state_topic) + if charging_station.connected_topic: + LOG.debug( + f"Subscribing to MQTT topic {charging_station.connected_topic}" + ) + self.vin_by_charger_connected_topic[ + charging_station.connected_topic + ] = charging_station.vin + await self.client.subscribe(charging_station.connected_topic) + if self.configuration.ha_discovery_enabled: + # enable dynamic discovery pushing in case ha reconnects + await self.client.subscribe(self.configuration.ha_lwt_topic) + except aiomqtt.MqttError as e: + LOG.error("Failed to subscribe to MQTT command topics: %s", e) + raise + + async def _on_message( self, _client: Any, topic: str, payload: Any, _qos: Any, _properties: Any ) -> None: try: @@ -180,39 +265,112 @@ async def __on_message_real(self, *, topic: str, payload: str) -> None: vin=vin, topic=topic, payload=payload ) - def __publish(self, topic: str, payload: Any) -> None: - self.client.publish(topic, payload, retain=True) + def __publish( + self, topic: str, payload: Any, retain: bool = True, qos: QoS = 0 + ) -> None: + LOG.debug("Publishing to MQTT topic %s with payload %s", topic, payload) + task = asyncio.create_task( + self.__async_publish(topic, payload, retain=retain, qos=qos) + ) + task.add_done_callback(self.__handle_task_exception) + + async def __async_publish( + self, topic: str, payload: Any, retain: bool, qos: QoS + ) -> None: + if not (self.client and self.is_connected()): + LOG.error("Failed to publish: MQTT client is not connected") + return + try: + await self.client.publish(topic, payload, retain=retain, qos=qos) + except aiomqtt.MqttError as e: + LOG.error( + f"Failed to publish to MQTT topic {topic} with payload {payload}: {e}" + ) + + @staticmethod + def __handle_task_exception(task: asyncio.Task[None]) -> None: + if task.cancelled(): + return + exc = task.exception() + if exc is not None: + LOG.error("Background MQTT task failed: %s", exc) @override def is_connected(self) -> bool: - return cast("bool", self.client.is_connected) + return self.__connected.is_set() @override def publish_json( - self, key: str, data: dict[str, Any], no_prefix: bool = False + self, + key: str, + data: dict[str, Any], + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, ) -> None: payload = self.dict_to_anonymized_json(data) - self.__publish(topic=self.get_topic(key, no_prefix), payload=payload) + self.__publish( + topic=self.get_topic(key, no_prefix), + payload=payload, + retain=retain, + qos=qos, + ) @override - def publish_str(self, key: str, value: str, no_prefix: bool = False) -> None: - self.__publish(topic=self.get_topic(key, no_prefix), payload=value) + def publish_str( + self, + key: str, + value: str, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: + self.__publish( + topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos + ) @override - def publish_int(self, key: str, value: int, no_prefix: bool = False) -> None: - self.__publish(topic=self.get_topic(key, no_prefix), payload=value) + def publish_int( + self, + key: str, + value: int, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: + self.__publish( + topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos + ) @override - def publish_bool(self, key: str, value: bool, no_prefix: bool = False) -> None: - self.__publish(topic=self.get_topic(key, no_prefix), payload=value) + def publish_bool( + self, + key: str, + value: bool, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: + self.__publish( + topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos + ) @override - def publish_float(self, key: str, value: float, no_prefix: bool = False) -> None: - self.__publish(topic=self.get_topic(key, no_prefix), payload=value) + def publish_float( + self, + key: str, + value: float, + no_prefix: bool = False, + retain: bool = True, + qos: QoS = 0, + ) -> None: + self.__publish( + topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos + ) @override - def clear_topic(self, key: str, no_prefix: bool = False) -> None: - self.__publish(topic=self.get_topic(key, no_prefix), payload=None) + def clear_topic(self, key: str, no_prefix: bool = False, qos: QoS = 0) -> None: + self.__publish(topic=self.get_topic(key, no_prefix), payload=None, qos=qos) def get_vin_from_topic(self, topic: str) -> str: global_topic_removed = topic[len(self.configuration.mqtt_topic) + 1 :] diff --git a/tests/test_mqtt_publisher.py b/tests/test_mqtt_publisher.py index f8ac5953..61022aa7 100644 --- a/tests/test_mqtt_publisher.py +++ b/tests/test_mqtt_publisher.py @@ -68,7 +68,7 @@ async def test_update_rear_window_heat_state(self) -> None: assert self.received_payload == REAR_WINDOW_HEAT_STATE async def send_message(self, topic: str, payload: Any) -> None: - await self.mqtt_client.client.on_message("client", topic, payload, 0, {}) + await self.mqtt_client._on_message("client", topic, payload, 0, {}) async def on_charging_detected(self, vin: str) -> None: pass