diff --git a/.gitignore b/.gitignore index bae68ee..d580c5e 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,12 @@ example.* home.* img .coverage +cmqttd_config/*.cbz +cmqttd_config/*.pem +cmqttd_config/*.key¸ +cmqttd_config/*.xml +cmqttd_config/auth +certificates +.vscode +project.xml +venv diff --git a/Dockerfile b/Dockerfile index 093ac44..86022f6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,12 +8,13 @@ # $ docker build -t cmqttd . # $ docker run --device /dev/ttyUSB0 -e "SERIAL_PORT=/dev/ttyUSB0" \ # -e "MQTT_SERVER=192.2.0.1" -e "TZ=Australia/Adelaide" -it cmqttd -FROM alpine:3.11 as base +FROM alpine:edge as base +# python 3.10 required, at date this file is created only available in alpine:edge # Install most Python deps here, because that way we don't need to include build tools in the # final image. -RUN apk add --no-cache python3 py3-cffi py3-paho-mqtt py3-six tzdata && \ - pip3 install 'pyserial==3.4' 'pyserial_asyncio==0.4' +RUN apk add --no-cache python3 py-pip py3-cffi py3-paho-mqtt py3-six tzdata && \ + pip3 install 'pyserial==3.5' 'pyserial_asyncio==0.6' # Runs tests and builds a distribution tarball FROM base as builder @@ -27,8 +28,10 @@ RUN pip3 install 'parameterized' && \ # cmqttd runner image FROM base as cmqttd COPY COPYING COPYING.LESSER Dockerfile README.md entrypoint-cmqttd.sh / +RUN sed -i 's/\r$//' entrypoint-cmqttd.sh COPY --from=builder /cbus/dist/cbus-0.2.generic.tar.gz / RUN tar zxf /cbus-0.2.generic.tar.gz && rm /cbus-0.2.generic.tar.gz +COPY cmqttd_config/ /etc/cmqttd/ # Runs cmqttd itself CMD /entrypoint-cmqttd.sh diff --git a/cbus/common.py b/cbus/common.py index ca56a3b..93b89e9 100644 --- a/cbus/common.py +++ b/cbus/common.py @@ -116,11 +116,16 @@ class Application(IntEnum): LIGHTING_5d = 0x5d LIGHTING_5e = 0x5e LIGHTING_LAST = LIGHTING_5f = 0x5f + HVACACTUATOR_73 = 0x73 + HVACACTUATOR_74 = 0x74 + HVAC = 0xCA CLOCK = 0xDF + TRIGGER = 0xCA ENABLE = 0xCB MASTER_APPLICATION = STATUS_REQUEST = 0xff + class CAL(IntEnum): RESET = 0x08 RECALL = 0x1a @@ -429,3 +434,4 @@ def check_ga(group_addr: int) -> None: raise ValueError( 'Group Address out of range ({}..{}), got {}'.format( MIN_GROUP_ADDR, MAX_GROUP_ADDR, group_addr)) + diff --git a/cbus/daemon/cmqttd.py b/cbus/daemon/cmqttd.py index c647207..215a1ba 100644 --- a/cbus/daemon/cmqttd.py +++ b/cbus/daemon/cmqttd.py @@ -15,13 +15,21 @@ # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see . -from asyncio import get_event_loop, run +from asyncio import get_event_loop, run, sleep from argparse import ArgumentParser, FileType import json import logging +from marshal import load from typing import Any, BinaryIO, Dict, Optional, Text, TextIO import paho.mqtt.client as mqtt +import sys + +from cbus.protocol import application + +if sys.platform == 'win32': + from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy + set_event_loop_policy(WindowsSelectorEventLoopPolicy()) try: from serial_asyncio import create_serial_connection @@ -33,6 +41,9 @@ async def create_serial_connection(*_, **__): from cbus.paho_asyncio import AsyncioHelper from cbus.protocol.pciprotocol import PCIProtocol from cbus.toolkit.cbz import CBZ +from cbus.toolkit.periodic import Periodic +from cbus.protocol.application import LightingApplication +from cbus.protocol.cal.report import LevelStatusReport,BinaryStatusReport logger = logging.getLogger(__name__) @@ -43,45 +54,63 @@ async def create_serial_connection(*_, **__): _TOPIC_CONF_SUFFIX = '/config' _TOPIC_STATE_SUFFIX = '/state' _META_TOPIC = 'homeassistant/binary_sensor/cbus_cmqttd' +_APPLICATION_GROUP_SEPARATOR = "_" +def check_aa_lighting(app): + if not app in LightingApplication.supported_applications(): + raise ValueError( + 'Application ${aa} is not a valid lighting application'.format( + app)) def ga_range(): return range(MIN_GROUP_ADDR, MAX_GROUP_ADDR + 1) +def ga_string(group_addr: int, app_addr: int | Application, zeros=True) -> Text: + #note: the logic is necessary to ensure backward compatibility with previous version + if(app_addr==Application.LIGHTING): + return f'{group_addr:03d}' if zeros else f'{group_addr}' + else: + return f'{app_addr:03d}{_APPLICATION_GROUP_SEPARATOR}{group_addr:03d}' + +def default_light_name(group_addr,app_addr): + return f'C-Bus Light {ga_string(group_addr,app_addr,True)}' -def get_topic_group_address(topic: Text) -> int: +def get_topic_group_address(topic: Text) -> tuple[int,int | Application]: """Gets the group address for the given topic.""" if not topic.startswith(_LIGHT_TOPIC_PREFIX): raise ValueError( f'Invalid topic {topic}, must start with {_LIGHT_TOPIC_PREFIX}') - ga = int(topic[len(_LIGHT_TOPIC_PREFIX):].split('/', maxsplit=1)[0]) + a1,*a2 = topic[len(_LIGHT_TOPIC_PREFIX):].split('/', maxsplit=1)[0].split(_APPLICATION_GROUP_SEPARATOR) + aa,ga = (a1,a2[0]) if a2 else (Application.LIGHTING,a1) + aa,ga = (int(aa),int(ga)) check_ga(ga) - return ga - + + return ga,aa -def set_topic(group_addr: int) -> Text: +def set_topic(group_addr: int, app_addr: int | Application) -> Text: """Gets the Set topic for a group address.""" - return _LIGHT_TOPIC_PREFIX + str(group_addr) + _TOPIC_SET_SUFFIX + return (_LIGHT_TOPIC_PREFIX + ga_string(group_addr,app_addr,False) + _TOPIC_SET_SUFFIX ) -def state_topic(group_addr: int) -> Text: +def state_topic(group_addr: int, app_addr: int | Application) -> Text: """Gets the State topic for a group address.""" - return _LIGHT_TOPIC_PREFIX + str(group_addr) + _TOPIC_STATE_SUFFIX + return _LIGHT_TOPIC_PREFIX + ga_string(group_addr,app_addr,False) + _TOPIC_STATE_SUFFIX -def conf_topic(group_addr: int) -> Text: +def conf_topic(group_addr: int, app_addr: int | Application) -> Text: """Gets the Config topic for a group address.""" - return _LIGHT_TOPIC_PREFIX + str(group_addr) + _TOPIC_CONF_SUFFIX + return _LIGHT_TOPIC_PREFIX + ga_string(group_addr,app_addr,False) + _TOPIC_CONF_SUFFIX -def bin_sensor_state_topic(group_addr: int) -> Text: +def bin_sensor_state_topic(group_addr: int, app_addr: int | Application) -> Text: """Gets the Binary Sensor State topic for a group address.""" - return _BINSENSOR_TOPIC_PREFIX + str(group_addr) + _TOPIC_STATE_SUFFIX + return _BINSENSOR_TOPIC_PREFIX + ga_string(group_addr,app_addr,False) + _TOPIC_STATE_SUFFIX -def bin_sensor_conf_topic(group_addr: int) -> Text: +def bin_sensor_conf_topic(group_addr: int, app_addr: int | Application) -> Text: """Gets the Binary Sensor Config topic for a group address.""" - return _BINSENSOR_TOPIC_PREFIX + str(group_addr) + _TOPIC_CONF_SUFFIX + return _BINSENSOR_TOPIC_PREFIX + ga_string(group_addr,app_addr,False) + _TOPIC_CONF_SUFFIX + class CBusHandler(PCIProtocol): @@ -90,27 +119,41 @@ class CBusHandler(PCIProtocol): """ mqtt_api = None - def __init__(self, labels: Optional[Dict[int, Text]], *args, **kwargs): + def __init__(self, labels: Optional[Dict[int, Dict]], *args, **kwargs): super().__init__(*args, **kwargs) self.labels = ( - labels if labels is not None else {}) # type: Dict[int, Text] + labels if labels is not None else {56:{}}) # type: Dict[int, Text] - def on_lighting_group_ramp(self, source_addr, group_addr, duration, level): + def on_lighting_group_ramp(self, source_addr, group_addr, app_addr,duration, level): if not self.mqtt_api: return self.mqtt_api.lighting_group_ramp( - source_addr, group_addr, duration, level) + source_addr, group_addr, app_addr,duration, level) - def on_lighting_group_on(self, source_addr, group_addr): + def on_lighting_group_on(self, source_addr, group_addr,app_addr): if not self.mqtt_api: return - self.mqtt_api.lighting_group_on(source_addr, group_addr) + self.mqtt_api.lighting_group_on(source_addr, group_addr,app_addr) - def on_lighting_group_off(self, source_addr, group_addr): + def on_lighting_group_off(self, source_addr, group_addr,app_addr): if not self.mqtt_api: return - self.mqtt_api.lighting_group_off(source_addr, group_addr) + self.mqtt_api.lighting_group_off(source_addr, group_addr,app_addr) + + def on_level_report(self, app_addr, start, report: LevelStatusReport): + groups = self.mqtt_api.groupDB.setdefault(app_addr,{}) + for val in report: + if groups[start]: + if val==None: + pass + elif val==0: + self.on_lighting_group_off(0,start,app_addr) + elif val == 255: + self.on_lighting_group_on(0,start,app_addr) + else: + self.on_lighting_group_ramp(0,start,app_addr,0,val) + start+=1 # TODO: on_lighting_group_terminate_ramp @@ -123,8 +166,12 @@ class MqttClient(mqtt.Client): def on_connect(self, client, userdata: CBusHandler, flags, rc): logger.info('Connected to MQTT broker') userdata.mqtt_api = self - self.subscribe([(set_topic(ga), 2) for ga in ga_range()]) + self.groupDB = {} self.publish_all_lights(userdata.labels) + for app_addr in self.groupDB.keys(): + for block in range(0,256,32): + pass + Periodic.throttler.enqueue(lambda b= block,a= app_addr:userdata.request_status(b,a)) def on_message(self, client, userdata: CBusHandler, msg: mqtt.MQTTMessage): """Handle a message from an MQTT subscription.""" @@ -133,7 +180,7 @@ def on_message(self, client, userdata: CBusHandler, msg: mqtt.MQTTMessage): return try: - ga = get_topic_group_address(msg.topic) + group_addr, app_addr = get_topic_group_address(msg.topic) except ValueError: # Invalid group address logging.error(f'Invalid group address in topic {msg.topic}') @@ -159,23 +206,69 @@ def on_message(self, client, userdata: CBusHandler, msg: mqtt.MQTTMessage): if light_on: if brightness == 255 and transition_time == 0: # lighting on - userdata.lighting_group_on(ga) - self.lighting_group_on(None, ga) + userdata.lighting_group_on(group_addr,app_addr) + self.lighting_group_on(None, group_addr,app_addr) else: # ramp - userdata.lighting_group_ramp(ga, transition_time, brightness) - self.lighting_group_ramp(None, ga, transition_time, brightness) + userdata.lighting_group_ramp(group_addr, app_addr, transition_time, brightness) + self.lighting_group_ramp(None, group_addr, app_addr, transition_time, brightness) else: # lighting off - userdata.lighting_group_off(ga) - self.lighting_group_off(None, ga) + userdata.lighting_group_off(group_addr,app_addr) + self.lighting_group_off(None, group_addr,app_addr) def publish(self, topic: Text, payload: Dict[Text, Any]): """Publishes a payload as JSON.""" payload = json.dumps(payload) return super().publish(topic, payload, 1, True) - def publish_all_lights(self, labels: Dict[int, Text]): + def publish_light(self, group_addr : int, app_addr :int | Application, app_labels:dict = None): + default_name = default_light_name(group_addr,app_addr) + UID = f'cbus_light_{ga_string(group_addr,app_addr,False)}' + name = default_name + if app_labels: + _,labels = app_labels.get(app_addr,(None,{})) + if labels: + name = labels.get(group_addr,name) + UID = f'cbus_light_{ga_string(group_addr,app_addr,False)}' + self.subscribe(set_topic(group_addr,app_addr), 2 ) + self.publish(conf_topic(group_addr,app_addr), { + 'name': name, + 'unique_id': UID, + 'cmd_t': set_topic(group_addr,app_addr), + 'stat_t': state_topic(group_addr,app_addr), + 'schema': 'json', + 'brightness': True, + 'device': { + 'identifiers': [UID], + 'connections': [['cbus_group_address', str(group_addr)],['cbus_application_address', str(app_addr)]], + 'sw_version': 'cmqttd https://github.com/micolous/cbus', + 'name': default_name, + 'manufacturer': 'Clipsal', + 'model': 'C-Bus Lighting Application', + 'via_device': 'cmqttd', + }, + }) + Sensor_UID = f'cbus_bin_sensor_{ga_string(group_addr,app_addr,False)}' + self.publish(bin_sensor_conf_topic(group_addr,app_addr), { + 'name': f'{name} (as binary sensor)', + 'unique_id': Sensor_UID, + 'stat_t': bin_sensor_state_topic(group_addr,app_addr), + 'device': { + 'identifiers': [Sensor_UID], + 'connections': [['cbus_group_address', str(group_addr)],['cbus_application_address', str(app_addr)]], + 'sw_version': 'cmqttd https://github.com/micolous/cbus', + 'name': default_name, + 'manufacturer': 'Clipsal', + 'model': 'C-Bus Lighting Application', + 'via_device': 'cmqttd', + }, + }) + self.groupDB.setdefault(app_addr,{})[group_addr] = True + + + + def publish_all_lights(self, app_labels): """Publishes a configuration topic for all lights.""" # Meta-device which holds all the C-Bus group addresses self.publish(_META_TOPIC + _TOPIC_CONF_SUFFIX, { @@ -192,76 +285,53 @@ def publish_all_lights(self, labels: Dict[int, Text]): }, }) - for ga in ga_range(): - name = labels.get(ga, f'C-Bus Light {ga:03d}') - self.publish(conf_topic(ga), { - 'name': name, - 'unique_id': f'cbus_light_{ga}', - 'cmd_t': set_topic(ga), - 'stat_t': state_topic(ga), - 'schema': 'json', - 'brightness': True, - 'device': { - 'identifiers': [f'cbus_light_{ga}'], - 'connections': [['cbus_group_address', str(ga)]], - 'sw_version': 'cmqttd https://github.com/micolous/cbus', - 'name': f'C-Bus Light {ga:03d}', - 'manufacturer': 'Clipsal', - 'model': 'C-Bus Lighting Application', - 'via_device': 'cmqttd', - }, - }) - - self.publish(bin_sensor_conf_topic(ga), { - 'name': f'{name} (as binary sensor)', - 'unique_id': f'cbus_bin_sensor_{ga}', - 'stat_t': bin_sensor_state_topic(ga), - 'device': { - 'identifiers': [f'cbus_bin_sensor_{ga}'], - 'connections': [['cbus_group_address', str(ga)]], - 'sw_version': 'cmqttd https://github.com/micolous/cbus', - 'name': f'C-Bus Light {ga:03d}', - 'manufacturer': 'Clipsal', - 'model': 'C-Bus Lighting Application', - 'via_device': 'cmqttd', - }, - }) - - def publish_binary_sensor(self, group_addr: int, state: bool): + for app_addr,(_,labels) in app_labels.items(): + for group_addr in labels.keys(): + self.publish_light(group_addr,app_addr,app_labels) + + def check_published(self,group_addr: int, app_addr: int | Application): + if not self.groupDB.setdefault(app_addr,{}).get(group_addr,False): + self.publish_light(group_addr,app_addr) + + + def publish_binary_sensor(self, group_addr: int, app_addr: int | Application, state: Optional[bool]): payload = 'ON' if state else 'OFF' return super().publish( - bin_sensor_state_topic(group_addr), payload, 1, True) + bin_sensor_state_topic(group_addr,app_addr), payload, 1, True) - def lighting_group_on(self, source_addr: Optional[int], group_addr: int): + def lighting_group_on(self, source_addr: Optional[int], group_addr: int, app_addr: int | Application): """Relays a lighting-on event from CBus to MQTT.""" - self.publish(state_topic(group_addr), { + self.check_published(group_addr,app_addr) + self.publish(state_topic(group_addr,app_addr), { 'state': 'ON', 'brightness': 255, 'transition': 0, 'cbus_source_addr': source_addr, }) - self.publish_binary_sensor(group_addr, True) + self.publish_binary_sensor(group_addr,app_addr, True) - def lighting_group_off(self, source_addr: Optional[int], group_addr: int): + def lighting_group_off(self, source_addr: Optional[int], group_addr: int, app_addr: int | Application): """Relays a lighting-off event from CBus to MQTT.""" - self.publish(state_topic(group_addr), { + self.check_published(group_addr,app_addr) + self.publish(state_topic(group_addr,app_addr), { 'state': 'OFF', 'brightness': 0, 'transition': 0, 'cbus_source_addr': source_addr, }) - self.publish_binary_sensor(group_addr, False) + self.publish_binary_sensor(group_addr,app_addr, False) - def lighting_group_ramp(self, source_addr: Optional[int], group_addr: int, + def lighting_group_ramp(self, source_addr: Optional[int], group_addr: int, app_addr:int|Application, duration: int, level: int): """Relays a lighting-ramp event from CBus to MQTT.""" - self.publish(state_topic(group_addr), { + self.check_published(group_addr,app_addr) + self.publish(state_topic(group_addr,app_addr), { 'state': 'ON', 'brightness': level, 'transition': duration, 'cbus_source_addr': source_addr, }) - self.publish_binary_sensor(group_addr, level > 0) + self.publish_binary_sensor(group_addr, app_addr, level > 0 if isinstance(level,int) else None) def read_auth(client: mqtt.Client, auth_file: TextIO): @@ -271,43 +341,68 @@ def read_auth(client: mqtt.Client, auth_file: TextIO): client.username_pw_set(username, password) -def read_cbz_labels(cbz_file: BinaryIO) -> Dict[int, Text]: +def read_cbz_labels(cbz_file: BinaryIO, network_name = None) -> Dict[int, Text]: """Reads group address names from a given Toolkit CBZ file.""" - labels = {} # type: Dict[int, Text] + class obj(): + pass + + labels = {56:{}} # type: Dict[int,Dict[int, Text]] + cbz = CBZ(cbz_file) - # TODO: support multiple networks/applications - # Look for 1 direct network networks = [n for n in cbz.installation.project.network if n.interface.interface_type != 'bridge'] - if len(networks) != 1: - logger.warning('Expected exactly 1 non-bridge network in project file, ' - 'got %d instead! Labels will be unavailable.', - len(networks)) - return labels - - # Look for - applications = [a for a in networks[0].applications - if a.address == Application.LIGHTING] - if len(applications) != 1: - logger.warning('Could not find lighting application %x in project ' - 'file. Labels will be unavailable.', - Application.LIGHTING) - return labels - - for group in applications[0].groups: - name = group.tag_name.strip() - - # Ignore default names - if not name or name in ('', f'Group {group.address}'): - continue - - labels[group.address] = name + + if network_name: + networks = [n for n in networks if n.tag_name == network_name] + if len(networks) != 1: + if network_name: + logger.warning('Could not find a non-bridge network with name "%s" in project file.',network_name) + else: + logger.warning('Expected one non-bridge network in project file, found %d instead',len(networks)) + app = obj() + app.address = 56 + app.tag_name = "Lighting" + app.groups = [] + net = obj() + net.applications = [app] + networks = [net] + + applications = [a for a in networks[0].applications if a.address in LightingApplication.supported_applications()] + + if len(applications) == 0: + logger.warning('Could not find any lighting application in project file.') + app = obj() + app.address = 56 + app.tag_name = "Lighting" + app.groups = [] + applications = [app] + + for a in applications: + l = {} + if a.groups: + for group in a.groups: + name = group.tag_name.strip() + if not name or name in ('', f'Group {group.address}'): + name = default_light_name(group.address,a.address) + l[group.address] = name + else: + logger.warning('No label available in project file for application %d.',a.address) + logger.warning('Will use default labels.') + for ga in ga_range(): + l[ga] = default_light_name(ga,a.address) + labels[a.address]=(a.tag_name,l) return labels +# Wait time between commands emitted by throtller +_PERIOD = 0.97 + async def _main(): + + #throttler is queue used used to stagger commmands + Periodic.throttler = Periodic(_PERIOD) parser = ArgumentParser() group = parser.add_argument_group('Logging options') @@ -416,8 +511,17 @@ async def _main(): 'generated names like "C-Bus Light 001" will be used instead.' ) + group.add_argument( + '-N', '--cbus-network', + nargs='*', + help='Name of the C-Bus network to be used in case a project file is provided' + 'and the project contains multiple networks.' + ) + option = parser.parse_args() + option.cbus_network = " ".join(option.cbus_network) + if bool(option.broker_client_cert) != bool(option.broker_client_key): return parser.error( 'To use client certificates, both -k and -K must be specified.') @@ -428,7 +532,7 @@ async def _main(): loop = get_event_loop() connection_lost_future = loop.create_future() - labels = (read_cbz_labels(option.project_file) + labels = (read_cbz_labels(option.project_file,option.cbus_network) if option.project_file else None) def factory(): diff --git a/cbus/protocol/application/clock.py b/cbus/protocol/application/clock.py index 70809b1..b0f207b 100644 --- a/cbus/protocol/application/clock.py +++ b/cbus/protocol/application/clock.py @@ -50,7 +50,7 @@ def application(self) -> Application: return Application.CLOCK @staticmethod - def decode_sals(data: bytes) -> Sequence[ClockSAL]: + def decode_sals(data: bytes,_=None) -> Sequence[ClockSAL]: """ Decodes a clock broadcast application packet and returns it's SAL(s). @@ -251,7 +251,7 @@ def supported_applications() -> Set[Application]: return {Application.CLOCK} @staticmethod - def decode_sals(data: bytes) -> Sequence[SAL]: + def decode_sals(data: bytes,_=None) -> Sequence[SAL]: """ Decodes a clock and timekeeping application packet and returns its SAL(s). diff --git a/cbus/protocol/application/enable.py b/cbus/protocol/application/enable.py index cdafffb..9d6f545 100644 --- a/cbus/protocol/application/enable.py +++ b/cbus/protocol/application/enable.py @@ -43,7 +43,7 @@ def application(self) -> Application: return Application.ENABLE @staticmethod - def decode_sals(data: bytes) -> List[EnableSAL]: + def decode_sals(data: bytes,_=None) -> List[EnableSAL]: """ Decodes a enable control application packet and returns it's SAL(s). @@ -150,7 +150,7 @@ def supported_applications() -> Set[Application]: return {Application.ENABLE} @classmethod - def decode_sals(cls, data: bytes) -> List[EnableSAL]: + def decode_sals(cls, data: bytes, _ = None) -> List[EnableSAL]: """ Decodes a enable broadcast application packet and returns its SAL(s). """ diff --git a/cbus/protocol/application/lighting.py b/cbus/protocol/application/lighting.py index 2dcdfb6..80fbd46 100644 --- a/cbus/protocol/application/lighting.py +++ b/cbus/protocol/application/lighting.py @@ -49,23 +49,26 @@ class LightingSAL(SAL, abc.ABC): Base type for lighting application SALs. """ - def __init__(self, group_address: int): + def __init__(self, group_address: int, application_address: Union[Application,int]): """ This should not be called directly by your code! Use one of the subclasses of cbus.protocol.lighting.LightingSAL instead. """ + #TODO: modify to avoid redundancy + if not application_address in _SUPPORTED_APPLICATIONS: + raise ValueError('Expected light Application address, got {}'.format(application_address)) check_ga(group_address) + self.application_address = application_address self.group_address = group_address @property def application(self) -> Union[int, Application]: - # TODO: Support other application IDs - return Application.LIGHTING + return self.application_address @staticmethod - def decode_sals(data: bytes) -> List[LightingSAL]: + def decode_sals(data: bytes, application: int | Application ) -> List[LightingSAL]: """ Decodes a lighting application packet and returns it's SAL(s). @@ -98,7 +101,7 @@ def decode_sals(data: bytes) -> List[LightingSAL]: break sal, data = _SAL_HANDLERS[command_code].decode( - data, command_code, group_address) + data, command_code, group_address,application) if sal: output.append(sal) @@ -128,7 +131,7 @@ class LightingRampSAL(LightingSAL): """ - def __init__(self, group_address: int, duration: int, level: int): + def __init__(self, group_address: int,application_address: Union[int,Application] , duration: int, level: int ): """ Creates a new SAL Lighting Ramp message. @@ -142,14 +145,14 @@ def __init__(self, group_address: int, duration: int, level: int): indicating full brightness. :type level: int """ - super().__init__(group_address=group_address) + super().__init__(group_address,application_address ) self.duration = duration self.level = level @staticmethod def decode(data: bytes, command_code: int, - group_address: int) -> Tuple[Optional[LightingSAL], bytes]: + group_address: int, application_address: int | Application) -> Tuple[Optional[LightingSAL], bytes]: """ Do not call this method directly -- use LightingSAL.decode """ @@ -165,7 +168,7 @@ def decode(data: bytes, command_code: int, data = data[1:] return LightingRampSAL( - group_address=group_address, duration=duration, level=level), data + group_address=group_address, application_address=application_address,duration=duration, level=level), data def encode(self) -> bytes: if self.level < 0 or self.level > 255: @@ -192,11 +195,11 @@ class LightingOnSAL(LightingSAL): @staticmethod def decode(data: bytes, command_code: int, - group_address: int) -> Tuple[LightingOnSAL, bytes]: + group_address: int, application_address: int | Application) -> Tuple[LightingOnSAL, bytes]: """ Do not call this method directly -- use LightingSAL.decode """ - return LightingOnSAL(group_address), data + return LightingOnSAL(group_address,application_address), data def encode(self): return super().encode() + bytes([LightCommand.ON, self.group_address]) @@ -214,11 +217,11 @@ class LightingOffSAL(LightingSAL): @staticmethod def decode(data: bytes, command_code: int, - group_address: int) -> Tuple[LightingOffSAL, bytes]: + group_address: int, application_address: int | Application) -> Tuple[LightingOffSAL, bytes]: """ Do not call this method directly -- use LightingSAL.decode """ - return LightingOffSAL(group_address), data + return LightingOffSAL(group_address,application_address), data def encode(self): return super().encode() + bytes([LightCommand.OFF, self.group_address]) @@ -237,11 +240,11 @@ class LightingTerminateRampSAL(LightingSAL): @staticmethod def decode(data: bytes, command_code: int, - group_address: int) -> Tuple[LightingTerminateRampSAL, bytes]: + group_address: int, application_address: int | Application) -> Tuple[LightingTerminateRampSAL, bytes]: """ Do not call this method directly -- use LightingSAL.decode """ - return LightingTerminateRampSAL(group_address), data + return LightingTerminateRampSAL(group_address,application_address), data def encode(self): return super().encode() + bytes([ @@ -276,8 +279,8 @@ class LightingApplication(BaseApplication): """ @staticmethod - def decode_sals(data: bytes) -> List[SAL]: - return LightingSAL.decode_sals(data) + def decode_sals(data: bytes, application: int | Application ) -> List[SAL]: + return LightingSAL.decode_sals(data,application) @staticmethod def supported_applications() -> FrozenSet[int]: diff --git a/cbus/protocol/application/sal.py b/cbus/protocol/application/sal.py index 152e28d..719a54d 100644 --- a/cbus/protocol/application/sal.py +++ b/cbus/protocol/application/sal.py @@ -57,7 +57,7 @@ def supported_applications() -> FrozenSet[Union[int, Application]]: @staticmethod @abc.abstractmethod - def decode_sals(data: bytes) -> Sequence[SAL]: + def decode_sals(data: bytes,application: int| Application=None) -> Sequence[SAL]: """ Decodes a SAL message diff --git a/cbus/protocol/application/status_request.py b/cbus/protocol/application/status_request.py index 153a356..80c1f51 100644 --- a/cbus/protocol/application/status_request.py +++ b/cbus/protocol/application/status_request.py @@ -36,7 +36,7 @@ def supported_applications() -> FrozenSet[int]: return _SUPPORTED_APPLICATIONS @staticmethod - def decode_sals(data: bytes) -> Sequence[SAL]: + def decode_sals(data: bytes,_=None) -> Sequence[SAL]: return StatusRequestSAL.decode_sals(data) @@ -47,7 +47,7 @@ class StatusRequestSAL(SAL): child_application: int @classmethod - def decode_sals(cls, data: bytes) -> Sequence[StatusRequestSAL]: + def decode_sals(cls, data: bytes,_=None) -> Sequence[StatusRequestSAL]: output = [] while data: diff --git a/cbus/protocol/application/temperature.py b/cbus/protocol/application/temperature.py index 176b1fe..753835a 100644 --- a/cbus/protocol/application/temperature.py +++ b/cbus/protocol/application/temperature.py @@ -53,7 +53,7 @@ def application(self) -> Union[int, Application]: return Application.TEMPERATURE @staticmethod - def decode_sals(data: bytes) -> Sequence[TemperatureSAL]: + def decode_sals(data: bytes, _ = None) -> Sequence[TemperatureSAL]: """ Decodes a temperature broadcast application packet and returns its SAL(s). @@ -169,7 +169,7 @@ def supported_applications() -> Set[Application]: return {Application.TEMPERATURE} @staticmethod - def decode_sals(data: bytes) -> Sequence[TemperatureSAL]: + def decode_sals(data: bytes, _ = None) -> Sequence[TemperatureSAL]: """ Decodes a temperature broadcast application packet and returns it's SAL(s). diff --git a/cbus/protocol/buffered_protocol.py b/cbus/protocol/buffered_protocol.py index e8f03df..b651159 100644 --- a/cbus/protocol/buffered_protocol.py +++ b/cbus/protocol/buffered_protocol.py @@ -67,23 +67,27 @@ def data_received(self, data: bytes) -> None: :param data: new data to add to the buffer :return: None """ - data_size = len(data) - if data_size > self._size_limit: - raise ValueError('Received data exceeds size limit ' + try: + data_size = len(data) + if data_size > self._size_limit: + raise ValueError('Received data exceeds size limit ' '({} bytes)'.format(self._size_limit)) - # Add the data to the buffer - with self._buf_lock: - if len(self._buf) + data_size > self._size_limit: - self._buf = bytearray() - raise ValueError( - 'Received data would make the buffer exceed the maximum ' - 'limit, buffer dropped!') + # Add the data to the buffer + with self._buf_lock: + if len(self._buf) + data_size > self._size_limit: + self._buf = bytearray() + raise ValueError( + 'Received data would make the buffer exceed the maximum ' + 'limit, buffer dropped!') - self._buf.extend(data) + self._buf.extend(data) - # Handle any messages that may be in the buffer. - self._process_buffer() + # Handle any messages that may be in the buffer. + self._process_buffer() + except: + # Clear buffer. + self._buf = bytearray() def _process_buffer(self): with self._buf_lock: diff --git a/cbus/protocol/pciprotocol.py b/cbus/protocol/pciprotocol.py index 08df01d..1d6ce22 100644 --- a/cbus/protocol/pciprotocol.py +++ b/cbus/protocol/pciprotocol.py @@ -27,6 +27,8 @@ from six import int2byte +from cbus.protocol.cal.report import BinaryStatusReport, LevelStatusReport + try: from serial_asyncio import create_serial_connection except ImportError: @@ -52,6 +54,7 @@ async def create_serial_connection(*_, **__): from cbus.protocol.pm_packet import PointToMultipointPacket from cbus.protocol.pp_packet import PointToPointPacket from cbus.protocol.reset_packet import ResetPacket +from cbus.protocol.cal.extended import ExtendedCAL logger = logging.getLogger(__name__) @@ -114,17 +117,17 @@ def handle_cbus_packet(self, p: BasePacket) -> None: # lighting application if isinstance(s, LightingRampSAL): self.on_lighting_group_ramp(p.source_address, - s.group_address, + s.group_address,s.application_address, s.duration, s.level) elif isinstance(s, LightingOnSAL): self.on_lighting_group_on(p.source_address, - s.group_address) + s.group_address,s.application_address) elif isinstance(s, LightingOffSAL): self.on_lighting_group_off(p.source_address, - s.group_address) + s.group_address,s.application_address) elif isinstance(s, LightingTerminateRampSAL): self.on_lighting_group_terminate_ramp( - p.source_address, s.group_address) + p.source_address, s.group_address,s.application_address) else: logger.debug(f'hcp: unhandled lighting SAL type: {s!r}') elif isinstance(s, ClockSAL): @@ -134,6 +137,18 @@ def handle_cbus_packet(self, p: BasePacket) -> None: self.on_clock_update(p.source_address, s.val) else: logger.debug(f'hcp: unhandled SAL type: {s!r}') + elif isinstance(p,PointToPointPacket): + for s in p: + if isinstance(s,ExtendedCAL): + if isinstance(s.report,BinaryStatusReport): + pass + elif isinstance(s.report,LevelStatusReport): + self.on_level_report(s.child_application, s.block_start, s.report) + else: + pass + else: + pass + else: logger.debug(f'hcp: unhandled other packet: {p!r}') @@ -326,7 +341,7 @@ def _get_confirmation_code(self): return int2byte(o) def _send(self, - cmd: Union[BasePacket], + cmd: BasePacket, confirmation: bool = True, basic_mode: bool = False): """ @@ -384,10 +399,17 @@ def pci_reset(self): self._send(ResetPacket()) # serial user interface guide sect 10.2 - # Set application address 1 to 38 (lighting) - # self._send('A3210038', encode=False, checksum=False) + # Set application address 1 to ALL applications + # self._send('A32100FF', encode=False, checksum=False) self._send(DeviceManagementPacket( - checksum=False, parameter=0x21, value=0x38), + checksum=False, parameter=0x21, value=0xFF), + basic_mode=True) + + # serial user interface guide sect 10.2 + # Set application address 2 to USED applications + # self._send('A32200FF', encode=False, checksum=False) + self._send(DeviceManagementPacket( + checksum=False, parameter=0x22, value=0xFF), basic_mode=True) # Interface options #3 @@ -409,7 +431,7 @@ def pci_reset(self): # 6: IDMON # self._send('A3300059', encode=False, checksum=False) self._send(DeviceManagementPacket( - checksum=False, parameter=0x30, value=0x59), + checksum=False, parameter=0x30, value=0x79), basic_mode=True) def identify(self, unit_address, attribute): @@ -430,7 +452,7 @@ def identify(self, unit_address, attribute): unit_address=unit_address, cals=[IdentifyCAL(attribute)]) return self._send(p) - def lighting_group_on(self, group_addr: Union[int, Iterable[int]]): + def lighting_group_on(self, group_addr: Union[int, Iterable[int]],application_addr: Union[int,Application] ): """ Turns on the lights for the given group_id. @@ -453,10 +475,17 @@ def lighting_group_on(self, group_addr: Union[int, Iterable[int]]): f'group_addr iterable length is > 9 ({group_addr_count})') p = PointToMultipointPacket( - sals=[LightingOnSAL(ga) for ga in group_addr]) + sals=[LightingOnSAL(ga,application_addr) for ga in group_addr]) return self._send(p) - def lighting_group_off(self, group_addr: Union[int, Iterable[int]]): + def request_status(self,group_addr: Union[int, Iterable[int]],application_addr: Union[int,Application] ): + p = PointToMultipointPacket(sals=[ + StatusRequestSAL(level_request=True, group_address=group_addr,child_application=application_addr) + ]) + return self._send(p) + + + def lighting_group_off(self, group_addr: Union[int, Iterable[int]],application_addr: Union[int,Application] ): """ Turns off the lights for the given group_id. @@ -480,11 +509,11 @@ def lighting_group_off(self, group_addr: Union[int, Iterable[int]]): f'group_addr iterable length is > 9 ({group_addr_count})') p = PointToMultipointPacket( - sals=[LightingOffSAL(ga) for ga in group_addr]) + sals=[LightingOffSAL(ga,application_addr) for ga in group_addr]) return self._send(p) def lighting_group_ramp( - self, group_addr: int, duration: int, level: int = 255): + self, group_addr: int, application_addr: Union[int,Application], duration: int, level: int = 255 ): """ Ramps (fades) a group address to a specified lighting level. @@ -506,11 +535,11 @@ def lighting_group_ramp( """ p = PointToMultipointPacket( - sals=LightingRampSAL(group_addr, duration, level)) + sals=LightingRampSAL(group_addr, application_addr, duration, level)) return self._send(p) def lighting_group_terminate_ramp( - self, group_addr: Union[int, Iterable[int]]): + self, group_addr: Union[int, Iterable[int]], application_addr: Union[int,Application]): """ Stops ramping a group address at the current point. @@ -533,7 +562,7 @@ def lighting_group_terminate_ramp( f'group_addr iterable length is > 9 ({group_addr_count})') p = PointToMultipointPacket( - sals=[LightingTerminateRampSAL(ga) for ga in group_addr]) + sals=[LightingTerminateRampSAL(ga,application_addr) for ga in group_addr]) return self._send(p) def clock_datetime(self, when: Optional[datetime] = None): diff --git a/cbus/protocol/pm_packet.py b/cbus/protocol/pm_packet.py index 8aafd6a..747f9fa 100644 --- a/cbus/protocol/pm_packet.py +++ b/cbus/protocol/pm_packet.py @@ -107,7 +107,7 @@ def decode_packet( # find an application handler handler = get_application(application) data = data[2:] - sals = handler.decode_sals(data) + sals = handler.decode_sals(data,application) return cls( checksum=checksum, priority_class=priority_class, diff --git a/cbus/toolkit/cbz.py b/cbus/toolkit/cbz.py index 187e883..376e7d6 100644 --- a/cbus/toolkit/cbz.py +++ b/cbus/toolkit/cbz.py @@ -219,7 +219,7 @@ def __init__(self, fh: BinaryIO): zip_h = ZipFile(fh, 'r') except BadZipFile: # Try to load as XML instead. - xml_fh = fh + xml_fh = fh.name if zip_h: files = zip_h.namelist() diff --git a/cbus/toolkit/periodic.py b/cbus/toolkit/periodic.py new file mode 100644 index 0000000..fe620d5 --- /dev/null +++ b/cbus/toolkit/periodic.py @@ -0,0 +1,29 @@ +import asyncio +import queue + +class Periodic: + """ + class that manages a queue of functions to be called while + leaving a interval between two successsive calls + """ + queue = [] + + def __init__(self,period = 1): + self.queue = queue.Queue() + self.period = period + loop = asyncio.get_event_loop() + self.task = loop.create_task(self._work()) + + async def _work(self): + while True: + try: + action = self.queue.get(block=False) + action() + except: + pass + finally: + await asyncio.sleep(self.period) + + def enqueue(self,task): + #talks is a lambda or the name of a function with no argument + self.queue.put(task) \ No newline at end of file diff --git a/cmqttd_config/README.md b/cmqttd_config/README.md new file mode 100644 index 0000000..2acfdf4 --- /dev/null +++ b/cmqttd_config/README.md @@ -0,0 +1,16 @@ +# This folder should contain optional cmqttd configuration files. + +## project.cbz (file) +This file provides cbus group labels. Create a backup of the cbus project with the cbus setup tool, copy the back up file in this folder and rename it: project.cbz + +## auth (file) +Username and password to use to connect to an MQTT broker, separated by a newline character. +If this file is not present, then cmqttd will try to use the MQTT broker without authentication. + +## certificates (directory) +A directory of CA certificates to trust when connecting with TLS. +If this directory is not present, the default (Python) CA store will be used instead. + +## client.pem client.key (files) +Client certificate (pem) and private key (key) to use to connect to the MQTT broker. + diff --git a/entrypoint-cmqttd.sh b/entrypoint-cmqttd.sh index e1d719b..33ecff9 100755 --- a/entrypoint-cmqttd.sh +++ b/entrypoint-cmqttd.sh @@ -82,6 +82,13 @@ else echo "${CMQTTD_PROJECT_FILE} not found; using generated labels." fi +echo ">${CMQTTD_CBUS_NETWORK}<" + +if [ -n "${CMQTTD_CBUS_NETWORK}" ]; then + echo "Loading C-Bus network ${CMQTTD_CBUS_NETWORK}" + CMQTTD_ARGS="${CMQTTD_ARGS} --cbus-network ${CMQTTD_CBUS_NETWORK}" +fi + echo "" # Announce what we think local time is on start-up. This will be sent to the C-Bus network. diff --git a/requirements-cmqttd.txt b/requirements-cmqttd.txt index a8f9910..e67a24a 100644 --- a/requirements-cmqttd.txt +++ b/requirements-cmqttd.txt @@ -1,4 +1,4 @@ pyserial==3.4 -pyserial_asyncio (==0.4) +pyserial_asyncio (==0.6) six -paho-mqtt==1.5.0 +paho-mqtt==1.6.1 diff --git a/requirements.txt b/requirements.txt index 2394302..9beddbb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ -pyserial==3.4 -pyserial_asyncio==0.4 +pyserial==3.5 +pyserial_asyncio==0.6 six # official protocol documentation downloading @@ -9,7 +9,7 @@ requests pydot # required for cmqttd -paho-mqtt==1.5.0 +paho-mqtt==1.6.1 # required for cbz lxml diff --git a/setup.cfg b/setup.cfg index fcb3456..d468769 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,5 +3,5 @@ based_on_style = google [pytype] inputs = cbus -version = 3.7 +version = 3.10 exclude = cbus/toolkit/cbz.py diff --git a/setup.py b/setup.py index bb6a456..ba69795 100755 --- a/setup.py +++ b/setup.py @@ -3,12 +3,12 @@ from setuptools import setup, find_packages deps = [ - 'pyserial (==3.4)', - 'pyserial_asyncio (==0.4)', + 'pyserial (==3.5)', + 'pyserial_asyncio (==0.6)', 'lxml (>=2.3.2)', 'six', 'pydot', - 'paho_mqtt (==1.5.0)' + 'paho_mqtt (==1.6.1)' ] tests_require = [ diff --git a/tests/test_cmqttd.py b/tests/test_cmqttd.py index d24fe7e..3e295e0 100644 --- a/tests/test_cmqttd.py +++ b/tests/test_cmqttd.py @@ -23,7 +23,7 @@ from typing import Optional, Text, cast import unittest -from cbus.common import check_ga +from cbus.common import Application, check_ga from cbus.daemon import cmqttd @@ -59,22 +59,22 @@ def test_valid_topic_group_address(self, ga): bin_topic_len = len(bin_topic) # base topic path -> ga - self.assertEqual(ga, cmqttd.get_topic_group_address(light_topic)) + self.assertEqual((ga,Application.LIGHTING), cmqttd.get_topic_group_address(light_topic)) # Generating a set topic - set_topic = cmqttd.set_topic(ga) + set_topic = cmqttd.set_topic(ga,Application.LIGHTING) self.assertEqual(light_topic, set_topic[:light_topic_len]) - self.assertEqual(ga, cmqttd.get_topic_group_address(set_topic)) + self.assertEqual((ga,Application.LIGHTING), cmqttd.get_topic_group_address(set_topic)) # Generating a state topic - state_topic = cmqttd.state_topic(ga) + state_topic = cmqttd.state_topic(ga,Application.LIGHTING) self.assertEqual(light_topic, state_topic[:light_topic_len]) - self.assertEqual(ga, cmqttd.get_topic_group_address(state_topic)) + self.assertEqual((ga,Application.LIGHTING), cmqttd.get_topic_group_address(state_topic)) # Generating a conf topic - conf_topic = cmqttd.conf_topic(ga) + conf_topic = cmqttd.conf_topic(ga,Application.LIGHTING) self.assertEqual(light_topic, conf_topic[:light_topic_len]) - self.assertEqual(ga, cmqttd.get_topic_group_address(conf_topic)) + self.assertEqual((ga,Application.LIGHTING), cmqttd.get_topic_group_address(conf_topic)) # Ensure all the topics are unique self.assertNotEqual(set_topic, state_topic) @@ -83,10 +83,10 @@ def test_valid_topic_group_address(self, ga): # Binary sensors are read only, so get_topic_group_address doesn't # support them. - bin_state_topic = cmqttd.bin_sensor_state_topic(ga) + bin_state_topic = cmqttd.bin_sensor_state_topic(ga,Application.LIGHTING) self.assertTrue(bin_topic, bin_state_topic[:bin_topic_len]) - bin_conf_topic = cmqttd.bin_sensor_conf_topic(ga) + bin_conf_topic = cmqttd.bin_sensor_conf_topic(ga,Application.LIGHTING) self.assertTrue(bin_topic, bin_conf_topic[:bin_topic_len]) # Uniqueness check diff --git a/tests/test_lighting.py b/tests/test_lighting.py index f4804e8..e3f62ec 100644 --- a/tests/test_lighting.py +++ b/tests/test_lighting.py @@ -16,8 +16,10 @@ # along with this library. If not, see . from __future__ import absolute_import +from email.mime import application import unittest +from cbus.common import Application from cbus.protocol.pm_packet import PointToMultipointPacket from cbus.protocol.application.lighting import ( @@ -127,7 +129,7 @@ class InternalLightingTest(CBusTestCase): def test_lighting_encode_decode(self): """test of encode then decode""" - orig = PointToMultipointPacket(sals=LightingOnSAL(27)) + orig = PointToMultipointPacket(sals=LightingOnSAL(27,Application.LIGHTING)) orig.source_address = 5 data = orig.encode_packet() + b'\r\n' @@ -142,7 +144,7 @@ def test_lighting_encode_decode(self): def test_lighting_encode_decode_client(self): """test of encode then decode, with packets from a client""" - orig = PointToMultipointPacket(sals=LightingOnSAL(27)) + orig = PointToMultipointPacket(sals=LightingOnSAL(27,Application.LIGHTING)) data = b'\\' + orig.encode_packet() + b'\r' @@ -155,23 +157,23 @@ def test_lighting_encode_decode_client(self): def test_invalid_ga(self): """test argument validation""" with self.assertRaises(ValueError): - PointToMultipointPacket(sals=LightingOnSAL(999)) + PointToMultipointPacket(sals=LightingOnSAL(999,Application.LIGHTING)) with self.assertRaises(ValueError): - PointToMultipointPacket(sals=LightingOffSAL(-1)) + PointToMultipointPacket(sals=LightingOffSAL(-1,Application.LIGHTING)) def test_slow_ramp(self): """test very slow ramps""" p1 = PointToMultipointPacket( - sals=LightingRampSAL(1, 18*60, 255)).encode_packet() + sals=LightingRampSAL(1,Application.LIGHTING, 18*60, 255)).encode_packet() p2 = PointToMultipointPacket( - sals=LightingRampSAL(1, 17*60, 255)).encode_packet() + sals=LightingRampSAL(1,Application.LIGHTING, 17*60, 255)).encode_packet() self.assertEqual(p1, p2) def test_brightness_bounds(self): with self.assertRaisesRegex(ValueError, r'Ramp level .+ bounds'): - LightingRampSAL(1, 10, -1).encode() + LightingRampSAL(1,Application.LIGHTING, 10, -1).encode() with self.assertRaisesRegex(ValueError, r'Ramp level .+ bounds'): - LightingRampSAL(1, 10, 256).encode() + LightingRampSAL(1,Application.LIGHTING, 10, 256).encode() class LightingRegressionTest(CBusTestCase): diff --git a/tests/test_pm_packet.py b/tests/test_pm_packet.py index 1c7b5a0..4372776 100644 --- a/tests/test_pm_packet.py +++ b/tests/test_pm_packet.py @@ -64,14 +64,14 @@ def test_invalid_multiple_application_sal(self): with self.assertRaisesRegex( ValueError, r'SAL .+ of application ff, .+ has application 38'): PointToMultipointPacket(sals=[ - LightingOffSAL(1), + LightingOffSAL(1,Application.LIGHTING), StatusRequestSAL(level_request=True, group_address=1, child_application=Application.LIGHTING), ]) def test_remove_sals(self): # create a packet - p = PointToMultipointPacket(sals=LightingOffSAL(1)) + p = PointToMultipointPacket(sals=LightingOffSAL(1,Application.LIGHTING)) self.assertEqual(1, len(p)) p.clear_sal() @@ -84,7 +84,7 @@ def test_remove_sals(self): # Adding another lighting SAL should fail with self.assertRaisesRegex(ValueError, r'has application ff$'): - p.append_sal(LightingOffSAL(1)) + p.append_sal(LightingOffSAL(1,Application.LIGHTING)) self.assertEqual(1, len(p)) def test_invalid_sal(self):