diff --git a/.gitignore b/.gitignore
index bae68ee..d580c5e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,3 +16,12 @@ example.*
home.*
img
.coverage
+cmqttd_config/*.cbz
+cmqttd_config/*.pem
+cmqttd_config/*.key¸
+cmqttd_config/*.xml
+cmqttd_config/auth
+certificates
+.vscode
+project.xml
+venv
diff --git a/Dockerfile b/Dockerfile
index 093ac44..86022f6 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -8,12 +8,13 @@
# $ docker build -t cmqttd .
# $ docker run --device /dev/ttyUSB0 -e "SERIAL_PORT=/dev/ttyUSB0" \
# -e "MQTT_SERVER=192.2.0.1" -e "TZ=Australia/Adelaide" -it cmqttd
-FROM alpine:3.11 as base
+FROM alpine:edge as base
+# python 3.10 required, at date this file is created only available in alpine:edge
# Install most Python deps here, because that way we don't need to include build tools in the
# final image.
-RUN apk add --no-cache python3 py3-cffi py3-paho-mqtt py3-six tzdata && \
- pip3 install 'pyserial==3.4' 'pyserial_asyncio==0.4'
+RUN apk add --no-cache python3 py-pip py3-cffi py3-paho-mqtt py3-six tzdata && \
+ pip3 install 'pyserial==3.5' 'pyserial_asyncio==0.6'
# Runs tests and builds a distribution tarball
FROM base as builder
@@ -27,8 +28,10 @@ RUN pip3 install 'parameterized' && \
# cmqttd runner image
FROM base as cmqttd
COPY COPYING COPYING.LESSER Dockerfile README.md entrypoint-cmqttd.sh /
+RUN sed -i 's/\r$//' entrypoint-cmqttd.sh
COPY --from=builder /cbus/dist/cbus-0.2.generic.tar.gz /
RUN tar zxf /cbus-0.2.generic.tar.gz && rm /cbus-0.2.generic.tar.gz
+COPY cmqttd_config/ /etc/cmqttd/
# Runs cmqttd itself
CMD /entrypoint-cmqttd.sh
diff --git a/cbus/common.py b/cbus/common.py
index ca56a3b..93b89e9 100644
--- a/cbus/common.py
+++ b/cbus/common.py
@@ -116,11 +116,16 @@ class Application(IntEnum):
LIGHTING_5d = 0x5d
LIGHTING_5e = 0x5e
LIGHTING_LAST = LIGHTING_5f = 0x5f
+ HVACACTUATOR_73 = 0x73
+ HVACACTUATOR_74 = 0x74
+ HVAC = 0xCA
CLOCK = 0xDF
+ TRIGGER = 0xCA
ENABLE = 0xCB
MASTER_APPLICATION = STATUS_REQUEST = 0xff
+
class CAL(IntEnum):
RESET = 0x08
RECALL = 0x1a
@@ -429,3 +434,4 @@ def check_ga(group_addr: int) -> None:
raise ValueError(
'Group Address out of range ({}..{}), got {}'.format(
MIN_GROUP_ADDR, MAX_GROUP_ADDR, group_addr))
+
diff --git a/cbus/daemon/cmqttd.py b/cbus/daemon/cmqttd.py
index c647207..215a1ba 100644
--- a/cbus/daemon/cmqttd.py
+++ b/cbus/daemon/cmqttd.py
@@ -15,13 +15,21 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see .
-from asyncio import get_event_loop, run
+from asyncio import get_event_loop, run, sleep
from argparse import ArgumentParser, FileType
import json
import logging
+from marshal import load
from typing import Any, BinaryIO, Dict, Optional, Text, TextIO
import paho.mqtt.client as mqtt
+import sys
+
+from cbus.protocol import application
+
+if sys.platform == 'win32':
+ from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
+ set_event_loop_policy(WindowsSelectorEventLoopPolicy())
try:
from serial_asyncio import create_serial_connection
@@ -33,6 +41,9 @@ async def create_serial_connection(*_, **__):
from cbus.paho_asyncio import AsyncioHelper
from cbus.protocol.pciprotocol import PCIProtocol
from cbus.toolkit.cbz import CBZ
+from cbus.toolkit.periodic import Periodic
+from cbus.protocol.application import LightingApplication
+from cbus.protocol.cal.report import LevelStatusReport,BinaryStatusReport
logger = logging.getLogger(__name__)
@@ -43,45 +54,63 @@ async def create_serial_connection(*_, **__):
_TOPIC_CONF_SUFFIX = '/config'
_TOPIC_STATE_SUFFIX = '/state'
_META_TOPIC = 'homeassistant/binary_sensor/cbus_cmqttd'
+_APPLICATION_GROUP_SEPARATOR = "_"
+def check_aa_lighting(app):
+ if not app in LightingApplication.supported_applications():
+ raise ValueError(
+ 'Application ${aa} is not a valid lighting application'.format(
+ app))
def ga_range():
return range(MIN_GROUP_ADDR, MAX_GROUP_ADDR + 1)
+def ga_string(group_addr: int, app_addr: int | Application, zeros=True) -> Text:
+ #note: the logic is necessary to ensure backward compatibility with previous version
+ if(app_addr==Application.LIGHTING):
+ return f'{group_addr:03d}' if zeros else f'{group_addr}'
+ else:
+ return f'{app_addr:03d}{_APPLICATION_GROUP_SEPARATOR}{group_addr:03d}'
+
+def default_light_name(group_addr,app_addr):
+ return f'C-Bus Light {ga_string(group_addr,app_addr,True)}'
-def get_topic_group_address(topic: Text) -> int:
+def get_topic_group_address(topic: Text) -> tuple[int,int | Application]:
"""Gets the group address for the given topic."""
if not topic.startswith(_LIGHT_TOPIC_PREFIX):
raise ValueError(
f'Invalid topic {topic}, must start with {_LIGHT_TOPIC_PREFIX}')
- ga = int(topic[len(_LIGHT_TOPIC_PREFIX):].split('/', maxsplit=1)[0])
+ a1,*a2 = topic[len(_LIGHT_TOPIC_PREFIX):].split('/', maxsplit=1)[0].split(_APPLICATION_GROUP_SEPARATOR)
+ aa,ga = (a1,a2[0]) if a2 else (Application.LIGHTING,a1)
+ aa,ga = (int(aa),int(ga))
check_ga(ga)
- return ga
-
+
+ return ga,aa
-def set_topic(group_addr: int) -> Text:
+def set_topic(group_addr: int, app_addr: int | Application) -> Text:
"""Gets the Set topic for a group address."""
- return _LIGHT_TOPIC_PREFIX + str(group_addr) + _TOPIC_SET_SUFFIX
+ return (_LIGHT_TOPIC_PREFIX + ga_string(group_addr,app_addr,False) + _TOPIC_SET_SUFFIX )
-def state_topic(group_addr: int) -> Text:
+def state_topic(group_addr: int, app_addr: int | Application) -> Text:
"""Gets the State topic for a group address."""
- return _LIGHT_TOPIC_PREFIX + str(group_addr) + _TOPIC_STATE_SUFFIX
+ return _LIGHT_TOPIC_PREFIX + ga_string(group_addr,app_addr,False) + _TOPIC_STATE_SUFFIX
-def conf_topic(group_addr: int) -> Text:
+def conf_topic(group_addr: int, app_addr: int | Application) -> Text:
"""Gets the Config topic for a group address."""
- return _LIGHT_TOPIC_PREFIX + str(group_addr) + _TOPIC_CONF_SUFFIX
+ return _LIGHT_TOPIC_PREFIX + ga_string(group_addr,app_addr,False) + _TOPIC_CONF_SUFFIX
-def bin_sensor_state_topic(group_addr: int) -> Text:
+def bin_sensor_state_topic(group_addr: int, app_addr: int | Application) -> Text:
"""Gets the Binary Sensor State topic for a group address."""
- return _BINSENSOR_TOPIC_PREFIX + str(group_addr) + _TOPIC_STATE_SUFFIX
+ return _BINSENSOR_TOPIC_PREFIX + ga_string(group_addr,app_addr,False) + _TOPIC_STATE_SUFFIX
-def bin_sensor_conf_topic(group_addr: int) -> Text:
+def bin_sensor_conf_topic(group_addr: int, app_addr: int | Application) -> Text:
"""Gets the Binary Sensor Config topic for a group address."""
- return _BINSENSOR_TOPIC_PREFIX + str(group_addr) + _TOPIC_CONF_SUFFIX
+ return _BINSENSOR_TOPIC_PREFIX + ga_string(group_addr,app_addr,False) + _TOPIC_CONF_SUFFIX
+
class CBusHandler(PCIProtocol):
@@ -90,27 +119,41 @@ class CBusHandler(PCIProtocol):
"""
mqtt_api = None
- def __init__(self, labels: Optional[Dict[int, Text]], *args, **kwargs):
+ def __init__(self, labels: Optional[Dict[int, Dict]], *args, **kwargs):
super().__init__(*args, **kwargs)
self.labels = (
- labels if labels is not None else {}) # type: Dict[int, Text]
+ labels if labels is not None else {56:{}}) # type: Dict[int, Text]
- def on_lighting_group_ramp(self, source_addr, group_addr, duration, level):
+ def on_lighting_group_ramp(self, source_addr, group_addr, app_addr,duration, level):
if not self.mqtt_api:
return
self.mqtt_api.lighting_group_ramp(
- source_addr, group_addr, duration, level)
+ source_addr, group_addr, app_addr,duration, level)
- def on_lighting_group_on(self, source_addr, group_addr):
+ def on_lighting_group_on(self, source_addr, group_addr,app_addr):
if not self.mqtt_api:
return
- self.mqtt_api.lighting_group_on(source_addr, group_addr)
+ self.mqtt_api.lighting_group_on(source_addr, group_addr,app_addr)
- def on_lighting_group_off(self, source_addr, group_addr):
+ def on_lighting_group_off(self, source_addr, group_addr,app_addr):
if not self.mqtt_api:
return
- self.mqtt_api.lighting_group_off(source_addr, group_addr)
+ self.mqtt_api.lighting_group_off(source_addr, group_addr,app_addr)
+
+ def on_level_report(self, app_addr, start, report: LevelStatusReport):
+ groups = self.mqtt_api.groupDB.setdefault(app_addr,{})
+ for val in report:
+ if groups[start]:
+ if val==None:
+ pass
+ elif val==0:
+ self.on_lighting_group_off(0,start,app_addr)
+ elif val == 255:
+ self.on_lighting_group_on(0,start,app_addr)
+ else:
+ self.on_lighting_group_ramp(0,start,app_addr,0,val)
+ start+=1
# TODO: on_lighting_group_terminate_ramp
@@ -123,8 +166,12 @@ class MqttClient(mqtt.Client):
def on_connect(self, client, userdata: CBusHandler, flags, rc):
logger.info('Connected to MQTT broker')
userdata.mqtt_api = self
- self.subscribe([(set_topic(ga), 2) for ga in ga_range()])
+ self.groupDB = {}
self.publish_all_lights(userdata.labels)
+ for app_addr in self.groupDB.keys():
+ for block in range(0,256,32):
+ pass
+ Periodic.throttler.enqueue(lambda b= block,a= app_addr:userdata.request_status(b,a))
def on_message(self, client, userdata: CBusHandler, msg: mqtt.MQTTMessage):
"""Handle a message from an MQTT subscription."""
@@ -133,7 +180,7 @@ def on_message(self, client, userdata: CBusHandler, msg: mqtt.MQTTMessage):
return
try:
- ga = get_topic_group_address(msg.topic)
+ group_addr, app_addr = get_topic_group_address(msg.topic)
except ValueError:
# Invalid group address
logging.error(f'Invalid group address in topic {msg.topic}')
@@ -159,23 +206,69 @@ def on_message(self, client, userdata: CBusHandler, msg: mqtt.MQTTMessage):
if light_on:
if brightness == 255 and transition_time == 0:
# lighting on
- userdata.lighting_group_on(ga)
- self.lighting_group_on(None, ga)
+ userdata.lighting_group_on(group_addr,app_addr)
+ self.lighting_group_on(None, group_addr,app_addr)
else:
# ramp
- userdata.lighting_group_ramp(ga, transition_time, brightness)
- self.lighting_group_ramp(None, ga, transition_time, brightness)
+ userdata.lighting_group_ramp(group_addr, app_addr, transition_time, brightness)
+ self.lighting_group_ramp(None, group_addr, app_addr, transition_time, brightness)
else:
# lighting off
- userdata.lighting_group_off(ga)
- self.lighting_group_off(None, ga)
+ userdata.lighting_group_off(group_addr,app_addr)
+ self.lighting_group_off(None, group_addr,app_addr)
def publish(self, topic: Text, payload: Dict[Text, Any]):
"""Publishes a payload as JSON."""
payload = json.dumps(payload)
return super().publish(topic, payload, 1, True)
- def publish_all_lights(self, labels: Dict[int, Text]):
+ def publish_light(self, group_addr : int, app_addr :int | Application, app_labels:dict = None):
+ default_name = default_light_name(group_addr,app_addr)
+ UID = f'cbus_light_{ga_string(group_addr,app_addr,False)}'
+ name = default_name
+ if app_labels:
+ _,labels = app_labels.get(app_addr,(None,{}))
+ if labels:
+ name = labels.get(group_addr,name)
+ UID = f'cbus_light_{ga_string(group_addr,app_addr,False)}'
+ self.subscribe(set_topic(group_addr,app_addr), 2 )
+ self.publish(conf_topic(group_addr,app_addr), {
+ 'name': name,
+ 'unique_id': UID,
+ 'cmd_t': set_topic(group_addr,app_addr),
+ 'stat_t': state_topic(group_addr,app_addr),
+ 'schema': 'json',
+ 'brightness': True,
+ 'device': {
+ 'identifiers': [UID],
+ 'connections': [['cbus_group_address', str(group_addr)],['cbus_application_address', str(app_addr)]],
+ 'sw_version': 'cmqttd https://github.com/micolous/cbus',
+ 'name': default_name,
+ 'manufacturer': 'Clipsal',
+ 'model': 'C-Bus Lighting Application',
+ 'via_device': 'cmqttd',
+ },
+ })
+ Sensor_UID = f'cbus_bin_sensor_{ga_string(group_addr,app_addr,False)}'
+ self.publish(bin_sensor_conf_topic(group_addr,app_addr), {
+ 'name': f'{name} (as binary sensor)',
+ 'unique_id': Sensor_UID,
+ 'stat_t': bin_sensor_state_topic(group_addr,app_addr),
+ 'device': {
+ 'identifiers': [Sensor_UID],
+ 'connections': [['cbus_group_address', str(group_addr)],['cbus_application_address', str(app_addr)]],
+ 'sw_version': 'cmqttd https://github.com/micolous/cbus',
+ 'name': default_name,
+ 'manufacturer': 'Clipsal',
+ 'model': 'C-Bus Lighting Application',
+ 'via_device': 'cmqttd',
+ },
+ })
+ self.groupDB.setdefault(app_addr,{})[group_addr] = True
+
+
+
+ def publish_all_lights(self, app_labels):
"""Publishes a configuration topic for all lights."""
# Meta-device which holds all the C-Bus group addresses
self.publish(_META_TOPIC + _TOPIC_CONF_SUFFIX, {
@@ -192,76 +285,53 @@ def publish_all_lights(self, labels: Dict[int, Text]):
},
})
- for ga in ga_range():
- name = labels.get(ga, f'C-Bus Light {ga:03d}')
- self.publish(conf_topic(ga), {
- 'name': name,
- 'unique_id': f'cbus_light_{ga}',
- 'cmd_t': set_topic(ga),
- 'stat_t': state_topic(ga),
- 'schema': 'json',
- 'brightness': True,
- 'device': {
- 'identifiers': [f'cbus_light_{ga}'],
- 'connections': [['cbus_group_address', str(ga)]],
- 'sw_version': 'cmqttd https://github.com/micolous/cbus',
- 'name': f'C-Bus Light {ga:03d}',
- 'manufacturer': 'Clipsal',
- 'model': 'C-Bus Lighting Application',
- 'via_device': 'cmqttd',
- },
- })
-
- self.publish(bin_sensor_conf_topic(ga), {
- 'name': f'{name} (as binary sensor)',
- 'unique_id': f'cbus_bin_sensor_{ga}',
- 'stat_t': bin_sensor_state_topic(ga),
- 'device': {
- 'identifiers': [f'cbus_bin_sensor_{ga}'],
- 'connections': [['cbus_group_address', str(ga)]],
- 'sw_version': 'cmqttd https://github.com/micolous/cbus',
- 'name': f'C-Bus Light {ga:03d}',
- 'manufacturer': 'Clipsal',
- 'model': 'C-Bus Lighting Application',
- 'via_device': 'cmqttd',
- },
- })
-
- def publish_binary_sensor(self, group_addr: int, state: bool):
+ for app_addr,(_,labels) in app_labels.items():
+ for group_addr in labels.keys():
+ self.publish_light(group_addr,app_addr,app_labels)
+
+ def check_published(self,group_addr: int, app_addr: int | Application):
+ if not self.groupDB.setdefault(app_addr,{}).get(group_addr,False):
+ self.publish_light(group_addr,app_addr)
+
+
+ def publish_binary_sensor(self, group_addr: int, app_addr: int | Application, state: Optional[bool]):
payload = 'ON' if state else 'OFF'
return super().publish(
- bin_sensor_state_topic(group_addr), payload, 1, True)
+ bin_sensor_state_topic(group_addr,app_addr), payload, 1, True)
- def lighting_group_on(self, source_addr: Optional[int], group_addr: int):
+ def lighting_group_on(self, source_addr: Optional[int], group_addr: int, app_addr: int | Application):
"""Relays a lighting-on event from CBus to MQTT."""
- self.publish(state_topic(group_addr), {
+ self.check_published(group_addr,app_addr)
+ self.publish(state_topic(group_addr,app_addr), {
'state': 'ON',
'brightness': 255,
'transition': 0,
'cbus_source_addr': source_addr,
})
- self.publish_binary_sensor(group_addr, True)
+ self.publish_binary_sensor(group_addr,app_addr, True)
- def lighting_group_off(self, source_addr: Optional[int], group_addr: int):
+ def lighting_group_off(self, source_addr: Optional[int], group_addr: int, app_addr: int | Application):
"""Relays a lighting-off event from CBus to MQTT."""
- self.publish(state_topic(group_addr), {
+ self.check_published(group_addr,app_addr)
+ self.publish(state_topic(group_addr,app_addr), {
'state': 'OFF',
'brightness': 0,
'transition': 0,
'cbus_source_addr': source_addr,
})
- self.publish_binary_sensor(group_addr, False)
+ self.publish_binary_sensor(group_addr,app_addr, False)
- def lighting_group_ramp(self, source_addr: Optional[int], group_addr: int,
+ def lighting_group_ramp(self, source_addr: Optional[int], group_addr: int, app_addr:int|Application,
duration: int, level: int):
"""Relays a lighting-ramp event from CBus to MQTT."""
- self.publish(state_topic(group_addr), {
+ self.check_published(group_addr,app_addr)
+ self.publish(state_topic(group_addr,app_addr), {
'state': 'ON',
'brightness': level,
'transition': duration,
'cbus_source_addr': source_addr,
})
- self.publish_binary_sensor(group_addr, level > 0)
+ self.publish_binary_sensor(group_addr, app_addr, level > 0 if isinstance(level,int) else None)
def read_auth(client: mqtt.Client, auth_file: TextIO):
@@ -271,43 +341,68 @@ def read_auth(client: mqtt.Client, auth_file: TextIO):
client.username_pw_set(username, password)
-def read_cbz_labels(cbz_file: BinaryIO) -> Dict[int, Text]:
+def read_cbz_labels(cbz_file: BinaryIO, network_name = None) -> Dict[int, Text]:
"""Reads group address names from a given Toolkit CBZ file."""
- labels = {} # type: Dict[int, Text]
+ class obj():
+ pass
+
+ labels = {56:{}} # type: Dict[int,Dict[int, Text]]
+
cbz = CBZ(cbz_file)
- # TODO: support multiple networks/applications
- # Look for 1 direct network
networks = [n for n in cbz.installation.project.network
if n.interface.interface_type != 'bridge']
- if len(networks) != 1:
- logger.warning('Expected exactly 1 non-bridge network in project file, '
- 'got %d instead! Labels will be unavailable.',
- len(networks))
- return labels
-
- # Look for
- applications = [a for a in networks[0].applications
- if a.address == Application.LIGHTING]
- if len(applications) != 1:
- logger.warning('Could not find lighting application %x in project '
- 'file. Labels will be unavailable.',
- Application.LIGHTING)
- return labels
-
- for group in applications[0].groups:
- name = group.tag_name.strip()
-
- # Ignore default names
- if not name or name in ('', f'Group {group.address}'):
- continue
-
- labels[group.address] = name
+
+ if network_name:
+ networks = [n for n in networks if n.tag_name == network_name]
+ if len(networks) != 1:
+ if network_name:
+ logger.warning('Could not find a non-bridge network with name "%s" in project file.',network_name)
+ else:
+ logger.warning('Expected one non-bridge network in project file, found %d instead',len(networks))
+ app = obj()
+ app.address = 56
+ app.tag_name = "Lighting"
+ app.groups = []
+ net = obj()
+ net.applications = [app]
+ networks = [net]
+
+ applications = [a for a in networks[0].applications if a.address in LightingApplication.supported_applications()]
+
+ if len(applications) == 0:
+ logger.warning('Could not find any lighting application in project file.')
+ app = obj()
+ app.address = 56
+ app.tag_name = "Lighting"
+ app.groups = []
+ applications = [app]
+
+ for a in applications:
+ l = {}
+ if a.groups:
+ for group in a.groups:
+ name = group.tag_name.strip()
+ if not name or name in ('', f'Group {group.address}'):
+ name = default_light_name(group.address,a.address)
+ l[group.address] = name
+ else:
+ logger.warning('No label available in project file for application %d.',a.address)
+ logger.warning('Will use default labels.')
+ for ga in ga_range():
+ l[ga] = default_light_name(ga,a.address)
+ labels[a.address]=(a.tag_name,l)
return labels
+# Wait time between commands emitted by throtller
+_PERIOD = 0.97
+
async def _main():
+
+ #throttler is queue used used to stagger commmands
+ Periodic.throttler = Periodic(_PERIOD)
parser = ArgumentParser()
group = parser.add_argument_group('Logging options')
@@ -416,8 +511,17 @@ async def _main():
'generated names like "C-Bus Light 001" will be used instead.'
)
+ group.add_argument(
+ '-N', '--cbus-network',
+ nargs='*',
+ help='Name of the C-Bus network to be used in case a project file is provided'
+ 'and the project contains multiple networks.'
+ )
+
option = parser.parse_args()
+ option.cbus_network = " ".join(option.cbus_network)
+
if bool(option.broker_client_cert) != bool(option.broker_client_key):
return parser.error(
'To use client certificates, both -k and -K must be specified.')
@@ -428,7 +532,7 @@ async def _main():
loop = get_event_loop()
connection_lost_future = loop.create_future()
- labels = (read_cbz_labels(option.project_file)
+ labels = (read_cbz_labels(option.project_file,option.cbus_network)
if option.project_file else None)
def factory():
diff --git a/cbus/protocol/application/clock.py b/cbus/protocol/application/clock.py
index 70809b1..b0f207b 100644
--- a/cbus/protocol/application/clock.py
+++ b/cbus/protocol/application/clock.py
@@ -50,7 +50,7 @@ def application(self) -> Application:
return Application.CLOCK
@staticmethod
- def decode_sals(data: bytes) -> Sequence[ClockSAL]:
+ def decode_sals(data: bytes,_=None) -> Sequence[ClockSAL]:
"""
Decodes a clock broadcast application packet and returns it's SAL(s).
@@ -251,7 +251,7 @@ def supported_applications() -> Set[Application]:
return {Application.CLOCK}
@staticmethod
- def decode_sals(data: bytes) -> Sequence[SAL]:
+ def decode_sals(data: bytes,_=None) -> Sequence[SAL]:
"""
Decodes a clock and timekeeping application packet and returns its
SAL(s).
diff --git a/cbus/protocol/application/enable.py b/cbus/protocol/application/enable.py
index cdafffb..9d6f545 100644
--- a/cbus/protocol/application/enable.py
+++ b/cbus/protocol/application/enable.py
@@ -43,7 +43,7 @@ def application(self) -> Application:
return Application.ENABLE
@staticmethod
- def decode_sals(data: bytes) -> List[EnableSAL]:
+ def decode_sals(data: bytes,_=None) -> List[EnableSAL]:
"""
Decodes a enable control application packet and returns it's SAL(s).
@@ -150,7 +150,7 @@ def supported_applications() -> Set[Application]:
return {Application.ENABLE}
@classmethod
- def decode_sals(cls, data: bytes) -> List[EnableSAL]:
+ def decode_sals(cls, data: bytes, _ = None) -> List[EnableSAL]:
"""
Decodes a enable broadcast application packet and returns its SAL(s).
"""
diff --git a/cbus/protocol/application/lighting.py b/cbus/protocol/application/lighting.py
index 2dcdfb6..80fbd46 100644
--- a/cbus/protocol/application/lighting.py
+++ b/cbus/protocol/application/lighting.py
@@ -49,23 +49,26 @@ class LightingSAL(SAL, abc.ABC):
Base type for lighting application SALs.
"""
- def __init__(self, group_address: int):
+ def __init__(self, group_address: int, application_address: Union[Application,int]):
"""
This should not be called directly by your code!
Use one of the subclasses of cbus.protocol.lighting.LightingSAL
instead.
"""
+ #TODO: modify to avoid redundancy
+ if not application_address in _SUPPORTED_APPLICATIONS:
+ raise ValueError('Expected light Application address, got {}'.format(application_address))
check_ga(group_address)
+ self.application_address = application_address
self.group_address = group_address
@property
def application(self) -> Union[int, Application]:
- # TODO: Support other application IDs
- return Application.LIGHTING
+ return self.application_address
@staticmethod
- def decode_sals(data: bytes) -> List[LightingSAL]:
+ def decode_sals(data: bytes, application: int | Application ) -> List[LightingSAL]:
"""
Decodes a lighting application packet and returns it's SAL(s).
@@ -98,7 +101,7 @@ def decode_sals(data: bytes) -> List[LightingSAL]:
break
sal, data = _SAL_HANDLERS[command_code].decode(
- data, command_code, group_address)
+ data, command_code, group_address,application)
if sal:
output.append(sal)
@@ -128,7 +131,7 @@ class LightingRampSAL(LightingSAL):
"""
- def __init__(self, group_address: int, duration: int, level: int):
+ def __init__(self, group_address: int,application_address: Union[int,Application] , duration: int, level: int ):
"""
Creates a new SAL Lighting Ramp message.
@@ -142,14 +145,14 @@ def __init__(self, group_address: int, duration: int, level: int):
indicating full brightness.
:type level: int
"""
- super().__init__(group_address=group_address)
+ super().__init__(group_address,application_address )
self.duration = duration
self.level = level
@staticmethod
def decode(data: bytes, command_code: int,
- group_address: int) -> Tuple[Optional[LightingSAL], bytes]:
+ group_address: int, application_address: int | Application) -> Tuple[Optional[LightingSAL], bytes]:
"""
Do not call this method directly -- use LightingSAL.decode
"""
@@ -165,7 +168,7 @@ def decode(data: bytes, command_code: int,
data = data[1:]
return LightingRampSAL(
- group_address=group_address, duration=duration, level=level), data
+ group_address=group_address, application_address=application_address,duration=duration, level=level), data
def encode(self) -> bytes:
if self.level < 0 or self.level > 255:
@@ -192,11 +195,11 @@ class LightingOnSAL(LightingSAL):
@staticmethod
def decode(data: bytes, command_code: int,
- group_address: int) -> Tuple[LightingOnSAL, bytes]:
+ group_address: int, application_address: int | Application) -> Tuple[LightingOnSAL, bytes]:
"""
Do not call this method directly -- use LightingSAL.decode
"""
- return LightingOnSAL(group_address), data
+ return LightingOnSAL(group_address,application_address), data
def encode(self):
return super().encode() + bytes([LightCommand.ON, self.group_address])
@@ -214,11 +217,11 @@ class LightingOffSAL(LightingSAL):
@staticmethod
def decode(data: bytes, command_code: int,
- group_address: int) -> Tuple[LightingOffSAL, bytes]:
+ group_address: int, application_address: int | Application) -> Tuple[LightingOffSAL, bytes]:
"""
Do not call this method directly -- use LightingSAL.decode
"""
- return LightingOffSAL(group_address), data
+ return LightingOffSAL(group_address,application_address), data
def encode(self):
return super().encode() + bytes([LightCommand.OFF, self.group_address])
@@ -237,11 +240,11 @@ class LightingTerminateRampSAL(LightingSAL):
@staticmethod
def decode(data: bytes, command_code: int,
- group_address: int) -> Tuple[LightingTerminateRampSAL, bytes]:
+ group_address: int, application_address: int | Application) -> Tuple[LightingTerminateRampSAL, bytes]:
"""
Do not call this method directly -- use LightingSAL.decode
"""
- return LightingTerminateRampSAL(group_address), data
+ return LightingTerminateRampSAL(group_address,application_address), data
def encode(self):
return super().encode() + bytes([
@@ -276,8 +279,8 @@ class LightingApplication(BaseApplication):
"""
@staticmethod
- def decode_sals(data: bytes) -> List[SAL]:
- return LightingSAL.decode_sals(data)
+ def decode_sals(data: bytes, application: int | Application ) -> List[SAL]:
+ return LightingSAL.decode_sals(data,application)
@staticmethod
def supported_applications() -> FrozenSet[int]:
diff --git a/cbus/protocol/application/sal.py b/cbus/protocol/application/sal.py
index 152e28d..719a54d 100644
--- a/cbus/protocol/application/sal.py
+++ b/cbus/protocol/application/sal.py
@@ -57,7 +57,7 @@ def supported_applications() -> FrozenSet[Union[int, Application]]:
@staticmethod
@abc.abstractmethod
- def decode_sals(data: bytes) -> Sequence[SAL]:
+ def decode_sals(data: bytes,application: int| Application=None) -> Sequence[SAL]:
"""
Decodes a SAL message
diff --git a/cbus/protocol/application/status_request.py b/cbus/protocol/application/status_request.py
index 153a356..80c1f51 100644
--- a/cbus/protocol/application/status_request.py
+++ b/cbus/protocol/application/status_request.py
@@ -36,7 +36,7 @@ def supported_applications() -> FrozenSet[int]:
return _SUPPORTED_APPLICATIONS
@staticmethod
- def decode_sals(data: bytes) -> Sequence[SAL]:
+ def decode_sals(data: bytes,_=None) -> Sequence[SAL]:
return StatusRequestSAL.decode_sals(data)
@@ -47,7 +47,7 @@ class StatusRequestSAL(SAL):
child_application: int
@classmethod
- def decode_sals(cls, data: bytes) -> Sequence[StatusRequestSAL]:
+ def decode_sals(cls, data: bytes,_=None) -> Sequence[StatusRequestSAL]:
output = []
while data:
diff --git a/cbus/protocol/application/temperature.py b/cbus/protocol/application/temperature.py
index 176b1fe..753835a 100644
--- a/cbus/protocol/application/temperature.py
+++ b/cbus/protocol/application/temperature.py
@@ -53,7 +53,7 @@ def application(self) -> Union[int, Application]:
return Application.TEMPERATURE
@staticmethod
- def decode_sals(data: bytes) -> Sequence[TemperatureSAL]:
+ def decode_sals(data: bytes, _ = None) -> Sequence[TemperatureSAL]:
"""
Decodes a temperature broadcast application packet and returns its
SAL(s).
@@ -169,7 +169,7 @@ def supported_applications() -> Set[Application]:
return {Application.TEMPERATURE}
@staticmethod
- def decode_sals(data: bytes) -> Sequence[TemperatureSAL]:
+ def decode_sals(data: bytes, _ = None) -> Sequence[TemperatureSAL]:
"""
Decodes a temperature broadcast application packet and returns it's
SAL(s).
diff --git a/cbus/protocol/buffered_protocol.py b/cbus/protocol/buffered_protocol.py
index e8f03df..b651159 100644
--- a/cbus/protocol/buffered_protocol.py
+++ b/cbus/protocol/buffered_protocol.py
@@ -67,23 +67,27 @@ def data_received(self, data: bytes) -> None:
:param data: new data to add to the buffer
:return: None
"""
- data_size = len(data)
- if data_size > self._size_limit:
- raise ValueError('Received data exceeds size limit '
+ try:
+ data_size = len(data)
+ if data_size > self._size_limit:
+ raise ValueError('Received data exceeds size limit '
'({} bytes)'.format(self._size_limit))
- # Add the data to the buffer
- with self._buf_lock:
- if len(self._buf) + data_size > self._size_limit:
- self._buf = bytearray()
- raise ValueError(
- 'Received data would make the buffer exceed the maximum '
- 'limit, buffer dropped!')
+ # Add the data to the buffer
+ with self._buf_lock:
+ if len(self._buf) + data_size > self._size_limit:
+ self._buf = bytearray()
+ raise ValueError(
+ 'Received data would make the buffer exceed the maximum '
+ 'limit, buffer dropped!')
- self._buf.extend(data)
+ self._buf.extend(data)
- # Handle any messages that may be in the buffer.
- self._process_buffer()
+ # Handle any messages that may be in the buffer.
+ self._process_buffer()
+ except:
+ # Clear buffer.
+ self._buf = bytearray()
def _process_buffer(self):
with self._buf_lock:
diff --git a/cbus/protocol/pciprotocol.py b/cbus/protocol/pciprotocol.py
index 08df01d..1d6ce22 100644
--- a/cbus/protocol/pciprotocol.py
+++ b/cbus/protocol/pciprotocol.py
@@ -27,6 +27,8 @@
from six import int2byte
+from cbus.protocol.cal.report import BinaryStatusReport, LevelStatusReport
+
try:
from serial_asyncio import create_serial_connection
except ImportError:
@@ -52,6 +54,7 @@ async def create_serial_connection(*_, **__):
from cbus.protocol.pm_packet import PointToMultipointPacket
from cbus.protocol.pp_packet import PointToPointPacket
from cbus.protocol.reset_packet import ResetPacket
+from cbus.protocol.cal.extended import ExtendedCAL
logger = logging.getLogger(__name__)
@@ -114,17 +117,17 @@ def handle_cbus_packet(self, p: BasePacket) -> None:
# lighting application
if isinstance(s, LightingRampSAL):
self.on_lighting_group_ramp(p.source_address,
- s.group_address,
+ s.group_address,s.application_address,
s.duration, s.level)
elif isinstance(s, LightingOnSAL):
self.on_lighting_group_on(p.source_address,
- s.group_address)
+ s.group_address,s.application_address)
elif isinstance(s, LightingOffSAL):
self.on_lighting_group_off(p.source_address,
- s.group_address)
+ s.group_address,s.application_address)
elif isinstance(s, LightingTerminateRampSAL):
self.on_lighting_group_terminate_ramp(
- p.source_address, s.group_address)
+ p.source_address, s.group_address,s.application_address)
else:
logger.debug(f'hcp: unhandled lighting SAL type: {s!r}')
elif isinstance(s, ClockSAL):
@@ -134,6 +137,18 @@ def handle_cbus_packet(self, p: BasePacket) -> None:
self.on_clock_update(p.source_address, s.val)
else:
logger.debug(f'hcp: unhandled SAL type: {s!r}')
+ elif isinstance(p,PointToPointPacket):
+ for s in p:
+ if isinstance(s,ExtendedCAL):
+ if isinstance(s.report,BinaryStatusReport):
+ pass
+ elif isinstance(s.report,LevelStatusReport):
+ self.on_level_report(s.child_application, s.block_start, s.report)
+ else:
+ pass
+ else:
+ pass
+
else:
logger.debug(f'hcp: unhandled other packet: {p!r}')
@@ -326,7 +341,7 @@ def _get_confirmation_code(self):
return int2byte(o)
def _send(self,
- cmd: Union[BasePacket],
+ cmd: BasePacket,
confirmation: bool = True,
basic_mode: bool = False):
"""
@@ -384,10 +399,17 @@ def pci_reset(self):
self._send(ResetPacket())
# serial user interface guide sect 10.2
- # Set application address 1 to 38 (lighting)
- # self._send('A3210038', encode=False, checksum=False)
+ # Set application address 1 to ALL applications
+ # self._send('A32100FF', encode=False, checksum=False)
self._send(DeviceManagementPacket(
- checksum=False, parameter=0x21, value=0x38),
+ checksum=False, parameter=0x21, value=0xFF),
+ basic_mode=True)
+
+ # serial user interface guide sect 10.2
+ # Set application address 2 to USED applications
+ # self._send('A32200FF', encode=False, checksum=False)
+ self._send(DeviceManagementPacket(
+ checksum=False, parameter=0x22, value=0xFF),
basic_mode=True)
# Interface options #3
@@ -409,7 +431,7 @@ def pci_reset(self):
# 6: IDMON
# self._send('A3300059', encode=False, checksum=False)
self._send(DeviceManagementPacket(
- checksum=False, parameter=0x30, value=0x59),
+ checksum=False, parameter=0x30, value=0x79),
basic_mode=True)
def identify(self, unit_address, attribute):
@@ -430,7 +452,7 @@ def identify(self, unit_address, attribute):
unit_address=unit_address, cals=[IdentifyCAL(attribute)])
return self._send(p)
- def lighting_group_on(self, group_addr: Union[int, Iterable[int]]):
+ def lighting_group_on(self, group_addr: Union[int, Iterable[int]],application_addr: Union[int,Application] ):
"""
Turns on the lights for the given group_id.
@@ -453,10 +475,17 @@ def lighting_group_on(self, group_addr: Union[int, Iterable[int]]):
f'group_addr iterable length is > 9 ({group_addr_count})')
p = PointToMultipointPacket(
- sals=[LightingOnSAL(ga) for ga in group_addr])
+ sals=[LightingOnSAL(ga,application_addr) for ga in group_addr])
return self._send(p)
- def lighting_group_off(self, group_addr: Union[int, Iterable[int]]):
+ def request_status(self,group_addr: Union[int, Iterable[int]],application_addr: Union[int,Application] ):
+ p = PointToMultipointPacket(sals=[
+ StatusRequestSAL(level_request=True, group_address=group_addr,child_application=application_addr)
+ ])
+ return self._send(p)
+
+
+ def lighting_group_off(self, group_addr: Union[int, Iterable[int]],application_addr: Union[int,Application] ):
"""
Turns off the lights for the given group_id.
@@ -480,11 +509,11 @@ def lighting_group_off(self, group_addr: Union[int, Iterable[int]]):
f'group_addr iterable length is > 9 ({group_addr_count})')
p = PointToMultipointPacket(
- sals=[LightingOffSAL(ga) for ga in group_addr])
+ sals=[LightingOffSAL(ga,application_addr) for ga in group_addr])
return self._send(p)
def lighting_group_ramp(
- self, group_addr: int, duration: int, level: int = 255):
+ self, group_addr: int, application_addr: Union[int,Application], duration: int, level: int = 255 ):
"""
Ramps (fades) a group address to a specified lighting level.
@@ -506,11 +535,11 @@ def lighting_group_ramp(
"""
p = PointToMultipointPacket(
- sals=LightingRampSAL(group_addr, duration, level))
+ sals=LightingRampSAL(group_addr, application_addr, duration, level))
return self._send(p)
def lighting_group_terminate_ramp(
- self, group_addr: Union[int, Iterable[int]]):
+ self, group_addr: Union[int, Iterable[int]], application_addr: Union[int,Application]):
"""
Stops ramping a group address at the current point.
@@ -533,7 +562,7 @@ def lighting_group_terminate_ramp(
f'group_addr iterable length is > 9 ({group_addr_count})')
p = PointToMultipointPacket(
- sals=[LightingTerminateRampSAL(ga) for ga in group_addr])
+ sals=[LightingTerminateRampSAL(ga,application_addr) for ga in group_addr])
return self._send(p)
def clock_datetime(self, when: Optional[datetime] = None):
diff --git a/cbus/protocol/pm_packet.py b/cbus/protocol/pm_packet.py
index 8aafd6a..747f9fa 100644
--- a/cbus/protocol/pm_packet.py
+++ b/cbus/protocol/pm_packet.py
@@ -107,7 +107,7 @@ def decode_packet(
# find an application handler
handler = get_application(application)
data = data[2:]
- sals = handler.decode_sals(data)
+ sals = handler.decode_sals(data,application)
return cls(
checksum=checksum, priority_class=priority_class,
diff --git a/cbus/toolkit/cbz.py b/cbus/toolkit/cbz.py
index 187e883..376e7d6 100644
--- a/cbus/toolkit/cbz.py
+++ b/cbus/toolkit/cbz.py
@@ -219,7 +219,7 @@ def __init__(self, fh: BinaryIO):
zip_h = ZipFile(fh, 'r')
except BadZipFile:
# Try to load as XML instead.
- xml_fh = fh
+ xml_fh = fh.name
if zip_h:
files = zip_h.namelist()
diff --git a/cbus/toolkit/periodic.py b/cbus/toolkit/periodic.py
new file mode 100644
index 0000000..fe620d5
--- /dev/null
+++ b/cbus/toolkit/periodic.py
@@ -0,0 +1,29 @@
+import asyncio
+import queue
+
+class Periodic:
+ """
+ class that manages a queue of functions to be called while
+ leaving a interval between two successsive calls
+ """
+ queue = []
+
+ def __init__(self,period = 1):
+ self.queue = queue.Queue()
+ self.period = period
+ loop = asyncio.get_event_loop()
+ self.task = loop.create_task(self._work())
+
+ async def _work(self):
+ while True:
+ try:
+ action = self.queue.get(block=False)
+ action()
+ except:
+ pass
+ finally:
+ await asyncio.sleep(self.period)
+
+ def enqueue(self,task):
+ #talks is a lambda or the name of a function with no argument
+ self.queue.put(task)
\ No newline at end of file
diff --git a/cmqttd_config/README.md b/cmqttd_config/README.md
new file mode 100644
index 0000000..2acfdf4
--- /dev/null
+++ b/cmqttd_config/README.md
@@ -0,0 +1,16 @@
+# This folder should contain optional cmqttd configuration files.
+
+## project.cbz (file)
+This file provides cbus group labels. Create a backup of the cbus project with the cbus setup tool, copy the back up file in this folder and rename it: project.cbz
+
+## auth (file)
+Username and password to use to connect to an MQTT broker, separated by a newline character.
+If this file is not present, then cmqttd will try to use the MQTT broker without authentication.
+
+## certificates (directory)
+A directory of CA certificates to trust when connecting with TLS.
+If this directory is not present, the default (Python) CA store will be used instead.
+
+## client.pem client.key (files)
+Client certificate (pem) and private key (key) to use to connect to the MQTT broker.
+
diff --git a/entrypoint-cmqttd.sh b/entrypoint-cmqttd.sh
index e1d719b..33ecff9 100755
--- a/entrypoint-cmqttd.sh
+++ b/entrypoint-cmqttd.sh
@@ -82,6 +82,13 @@ else
echo "${CMQTTD_PROJECT_FILE} not found; using generated labels."
fi
+echo ">${CMQTTD_CBUS_NETWORK}<"
+
+if [ -n "${CMQTTD_CBUS_NETWORK}" ]; then
+ echo "Loading C-Bus network ${CMQTTD_CBUS_NETWORK}"
+ CMQTTD_ARGS="${CMQTTD_ARGS} --cbus-network ${CMQTTD_CBUS_NETWORK}"
+fi
+
echo ""
# Announce what we think local time is on start-up. This will be sent to the C-Bus network.
diff --git a/requirements-cmqttd.txt b/requirements-cmqttd.txt
index a8f9910..e67a24a 100644
--- a/requirements-cmqttd.txt
+++ b/requirements-cmqttd.txt
@@ -1,4 +1,4 @@
pyserial==3.4
-pyserial_asyncio (==0.4)
+pyserial_asyncio (==0.6)
six
-paho-mqtt==1.5.0
+paho-mqtt==1.6.1
diff --git a/requirements.txt b/requirements.txt
index 2394302..9beddbb 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,5 @@
-pyserial==3.4
-pyserial_asyncio==0.4
+pyserial==3.5
+pyserial_asyncio==0.6
six
# official protocol documentation downloading
@@ -9,7 +9,7 @@ requests
pydot
# required for cmqttd
-paho-mqtt==1.5.0
+paho-mqtt==1.6.1
# required for cbz
lxml
diff --git a/setup.cfg b/setup.cfg
index fcb3456..d468769 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -3,5 +3,5 @@ based_on_style = google
[pytype]
inputs = cbus
-version = 3.7
+version = 3.10
exclude = cbus/toolkit/cbz.py
diff --git a/setup.py b/setup.py
index bb6a456..ba69795 100755
--- a/setup.py
+++ b/setup.py
@@ -3,12 +3,12 @@
from setuptools import setup, find_packages
deps = [
- 'pyserial (==3.4)',
- 'pyserial_asyncio (==0.4)',
+ 'pyserial (==3.5)',
+ 'pyserial_asyncio (==0.6)',
'lxml (>=2.3.2)',
'six',
'pydot',
- 'paho_mqtt (==1.5.0)'
+ 'paho_mqtt (==1.6.1)'
]
tests_require = [
diff --git a/tests/test_cmqttd.py b/tests/test_cmqttd.py
index d24fe7e..3e295e0 100644
--- a/tests/test_cmqttd.py
+++ b/tests/test_cmqttd.py
@@ -23,7 +23,7 @@
from typing import Optional, Text, cast
import unittest
-from cbus.common import check_ga
+from cbus.common import Application, check_ga
from cbus.daemon import cmqttd
@@ -59,22 +59,22 @@ def test_valid_topic_group_address(self, ga):
bin_topic_len = len(bin_topic)
# base topic path -> ga
- self.assertEqual(ga, cmqttd.get_topic_group_address(light_topic))
+ self.assertEqual((ga,Application.LIGHTING), cmqttd.get_topic_group_address(light_topic))
# Generating a set topic
- set_topic = cmqttd.set_topic(ga)
+ set_topic = cmqttd.set_topic(ga,Application.LIGHTING)
self.assertEqual(light_topic, set_topic[:light_topic_len])
- self.assertEqual(ga, cmqttd.get_topic_group_address(set_topic))
+ self.assertEqual((ga,Application.LIGHTING), cmqttd.get_topic_group_address(set_topic))
# Generating a state topic
- state_topic = cmqttd.state_topic(ga)
+ state_topic = cmqttd.state_topic(ga,Application.LIGHTING)
self.assertEqual(light_topic, state_topic[:light_topic_len])
- self.assertEqual(ga, cmqttd.get_topic_group_address(state_topic))
+ self.assertEqual((ga,Application.LIGHTING), cmqttd.get_topic_group_address(state_topic))
# Generating a conf topic
- conf_topic = cmqttd.conf_topic(ga)
+ conf_topic = cmqttd.conf_topic(ga,Application.LIGHTING)
self.assertEqual(light_topic, conf_topic[:light_topic_len])
- self.assertEqual(ga, cmqttd.get_topic_group_address(conf_topic))
+ self.assertEqual((ga,Application.LIGHTING), cmqttd.get_topic_group_address(conf_topic))
# Ensure all the topics are unique
self.assertNotEqual(set_topic, state_topic)
@@ -83,10 +83,10 @@ def test_valid_topic_group_address(self, ga):
# Binary sensors are read only, so get_topic_group_address doesn't
# support them.
- bin_state_topic = cmqttd.bin_sensor_state_topic(ga)
+ bin_state_topic = cmqttd.bin_sensor_state_topic(ga,Application.LIGHTING)
self.assertTrue(bin_topic, bin_state_topic[:bin_topic_len])
- bin_conf_topic = cmqttd.bin_sensor_conf_topic(ga)
+ bin_conf_topic = cmqttd.bin_sensor_conf_topic(ga,Application.LIGHTING)
self.assertTrue(bin_topic, bin_conf_topic[:bin_topic_len])
# Uniqueness check
diff --git a/tests/test_lighting.py b/tests/test_lighting.py
index f4804e8..e3f62ec 100644
--- a/tests/test_lighting.py
+++ b/tests/test_lighting.py
@@ -16,8 +16,10 @@
# along with this library. If not, see .
from __future__ import absolute_import
+from email.mime import application
import unittest
+from cbus.common import Application
from cbus.protocol.pm_packet import PointToMultipointPacket
from cbus.protocol.application.lighting import (
@@ -127,7 +129,7 @@ class InternalLightingTest(CBusTestCase):
def test_lighting_encode_decode(self):
"""test of encode then decode"""
- orig = PointToMultipointPacket(sals=LightingOnSAL(27))
+ orig = PointToMultipointPacket(sals=LightingOnSAL(27,Application.LIGHTING))
orig.source_address = 5
data = orig.encode_packet() + b'\r\n'
@@ -142,7 +144,7 @@ def test_lighting_encode_decode(self):
def test_lighting_encode_decode_client(self):
"""test of encode then decode, with packets from a client"""
- orig = PointToMultipointPacket(sals=LightingOnSAL(27))
+ orig = PointToMultipointPacket(sals=LightingOnSAL(27,Application.LIGHTING))
data = b'\\' + orig.encode_packet() + b'\r'
@@ -155,23 +157,23 @@ def test_lighting_encode_decode_client(self):
def test_invalid_ga(self):
"""test argument validation"""
with self.assertRaises(ValueError):
- PointToMultipointPacket(sals=LightingOnSAL(999))
+ PointToMultipointPacket(sals=LightingOnSAL(999,Application.LIGHTING))
with self.assertRaises(ValueError):
- PointToMultipointPacket(sals=LightingOffSAL(-1))
+ PointToMultipointPacket(sals=LightingOffSAL(-1,Application.LIGHTING))
def test_slow_ramp(self):
"""test very slow ramps"""
p1 = PointToMultipointPacket(
- sals=LightingRampSAL(1, 18*60, 255)).encode_packet()
+ sals=LightingRampSAL(1,Application.LIGHTING, 18*60, 255)).encode_packet()
p2 = PointToMultipointPacket(
- sals=LightingRampSAL(1, 17*60, 255)).encode_packet()
+ sals=LightingRampSAL(1,Application.LIGHTING, 17*60, 255)).encode_packet()
self.assertEqual(p1, p2)
def test_brightness_bounds(self):
with self.assertRaisesRegex(ValueError, r'Ramp level .+ bounds'):
- LightingRampSAL(1, 10, -1).encode()
+ LightingRampSAL(1,Application.LIGHTING, 10, -1).encode()
with self.assertRaisesRegex(ValueError, r'Ramp level .+ bounds'):
- LightingRampSAL(1, 10, 256).encode()
+ LightingRampSAL(1,Application.LIGHTING, 10, 256).encode()
class LightingRegressionTest(CBusTestCase):
diff --git a/tests/test_pm_packet.py b/tests/test_pm_packet.py
index 1c7b5a0..4372776 100644
--- a/tests/test_pm_packet.py
+++ b/tests/test_pm_packet.py
@@ -64,14 +64,14 @@ def test_invalid_multiple_application_sal(self):
with self.assertRaisesRegex(
ValueError, r'SAL .+ of application ff, .+ has application 38'):
PointToMultipointPacket(sals=[
- LightingOffSAL(1),
+ LightingOffSAL(1,Application.LIGHTING),
StatusRequestSAL(level_request=True, group_address=1,
child_application=Application.LIGHTING),
])
def test_remove_sals(self):
# create a packet
- p = PointToMultipointPacket(sals=LightingOffSAL(1))
+ p = PointToMultipointPacket(sals=LightingOffSAL(1,Application.LIGHTING))
self.assertEqual(1, len(p))
p.clear_sal()
@@ -84,7 +84,7 @@ def test_remove_sals(self):
# Adding another lighting SAL should fail
with self.assertRaisesRegex(ValueError, r'has application ff$'):
- p.append_sal(LightingOffSAL(1))
+ p.append_sal(LightingOffSAL(1,Application.LIGHTING))
self.assertEqual(1, len(p))
def test_invalid_sal(self):