Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions framework/python/src/common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@
import json
import logging
import os
from common.mqtt_topics import MQTTTopic


class TestrunLogger(logging.Logger):
def ui_info(self, msg, *args, **kwargs):
from common import mqtt # pylint: disable=import-outside-toplevel
with mqtt.MQTT() as client:
client.send_message(MQTTTopic.INFO, {'message': msg})
self.info(msg, *args, **kwargs)

logging.setLoggerClass(TestrunLogger)

LOGGERS = {}
_LOG_FORMAT = '%(asctime)s %(name)-8s %(levelname)-7s %(message)s'
Expand Down
11 changes: 10 additions & 1 deletion framework/python/src/common/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ class MQTT:
def __init__(self) -> None:
self._host = WEBSOCKETS_HOST
self._client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2)
self._connect()

def __enter__(self):
self._connect()
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
if exc_traceback:
LOGGER.error(exc_traceback)
self.disconnect()

def _connect(self):
"""Establish connection to MQTT broker"""
Expand All @@ -54,7 +64,6 @@ def send_message(self, topic: str, message: t.Union[str, dict]) -> None:
topic (str): mqtt topic
message (t.Union[str, dict]): message
"""
self._connect()
if isinstance(message, dict):
message = json.dumps(message)
self._client.publish(topic, str(message))
21 changes: 21 additions & 0 deletions framework/python/src/common/mqtt_topics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Enums for mqtt topics"""
from enum import Enum

class MQTTTopic(str, Enum):
INFO = "info"
INTERNET_CONNECTION_TOPIC = "events/internet"
NETWORK_ADAPTERS_TOPIC = "events/adapter"
STATUS_TOPIC = "status"
9 changes: 3 additions & 6 deletions framework/python/src/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
from fastapi import FastAPI

from common import logger
from common.mqtt_topics import MQTTTopic

# Check adapters period seconds
# Check adapters period seconds
CHECK_NETWORK_ADAPTERS_PERIOD = 5
CHECK_INTERNET_PERIOD = 2
INTERNET_CONNECTION_TOPIC = 'events/internet'
NETWORK_ADAPTERS_TOPIC = 'events/adapter'

LOGGER = logger.get_logger('tasks')

Expand All @@ -48,8 +47,7 @@ def __init__(
self.adapters_checker_job = self._scheduler.add_job(
func=self._testrun.get_net_orc().network_adapters_checker,
kwargs={
'mqtt_client': self._mqtt_client,
'topic': NETWORK_ADAPTERS_TOPIC
'topic': MQTTTopic.NETWORK_ADAPTERS_TOPIC
},
trigger='interval',
seconds=CHECK_NETWORK_ADAPTERS_PERIOD,
Expand All @@ -59,8 +57,7 @@ def __init__(
self.internet_shecker = self._scheduler.add_job(
func=self._testrun.get_net_orc().internet_conn_checker,
kwargs={
'mqtt_client': self._mqtt_client,
'topic': INTERNET_CONNECTION_TOPIC
'topic': MQTTTopic.INTERNET_CONNECTION_TOPIC
},
trigger='interval',
seconds=CHECK_INTERNET_PERIOD,
Expand Down
17 changes: 6 additions & 11 deletions framework/python/src/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
from fastapi.encoders import jsonable_encoder
from common import util, logger, mqtt
from common.mqtt_topics import MQTTTopic
from common.risk_profile import RiskProfile
from common.statuses import TestrunStatus, TestResult, TestrunResult
from net_orc.ip_control import IPControl
Expand All @@ -42,7 +43,6 @@
ALLOW_DISCONNECT_KEY='allow_disconnect'
CERTS_PATH = 'local/root_certs'
CONFIG_FILE_PATH = 'local/system.json'
STATUS_TOPIC = 'status'

MAKE_CONTROL_DIR = 'make/DEBIAN/control'

Expand All @@ -65,10 +65,11 @@ def wrapper(self, *args, **kwargs):
result = method(self, *args, **kwargs)

if self.get_status() != TestrunStatus.IDLE and not self.pause_message:
self.get_mqtt_client().send_message(
STATUS_TOPIC,
jsonable_encoder(self.to_json())
)
with mqtt.MQTT() as client:
client.send_message(
MQTTTopic.STATUS_TOPIC,
jsonable_encoder(self.to_json())
)
if self.get_status() in STATUSES_COMPLETE:
self.pause_message = True

Expand Down Expand Up @@ -168,9 +169,6 @@ def __init__(self, root_dir):
self._timezone = tz[0]
LOGGER.debug(f'System timezone is {self._timezone}')

# MQTT client
self._mqtt_client = mqtt.MQTT()

def start(self):
self.reset()
self._status = TestrunStatus.STARTING
Expand Down Expand Up @@ -1061,8 +1059,5 @@ def detect_network_adapters_change(self) -> dict:
self._ifaces = ifaces_new
return adapters

def get_mqtt_client(self):
return self._mqtt_client

def get_ifaces(self):
return self._ifaces
2 changes: 0 additions & 2 deletions framework/python/src/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def __init__(
self.adapters_checker_job = self._scheduler.add_job(
func=self._testrun.get_net_orc().network_adapters_checker,
kwargs={
'mqtt_client': self._mqtt_client,
'topic': NETWORK_ADAPTERS_TOPIC
},
trigger='interval',
Expand All @@ -59,7 +58,6 @@ def __init__(
self.internet_shecker = self._scheduler.add_job(
func=self._testrun.get_net_orc().internet_conn_checker,
kwargs={
'mqtt_client': self._mqtt_client,
'topic': INTERNET_CONNECTION_TOPIC
},
trigger='interval',
Expand Down
5 changes: 1 addition & 4 deletions framework/python/src/core/testrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,6 @@ async def stop(self):

self.get_session().set_status(TestrunStatus.CANCELLED)

# Disconnect before WS server stops to prevent error
self._mqtt_client.disconnect()

self._stop_network(kill=True)

def _register_exits(self):
Expand Down Expand Up @@ -468,7 +465,7 @@ def _device_discovered(self, mac_addr):
if device is not None:
if mac_addr != device.mac_addr:
LOGGER.info(f'Found device with mac addr: {mac_addr} but was ignored')
LOGGER.info(f'Expected device mac address is {device.mac_addr}')
LOGGER.ui_info(f'Expected device mac address is {device.mac_addr}')
# Ignore discovered device because it is not the target device
return
else:
Expand Down
12 changes: 7 additions & 5 deletions framework/python/src/net_orc/network_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,14 +697,15 @@ def restore_net(self):
def get_session(self):
return self._session

def network_adapters_checker(self, mqtt_client: mqtt.MQTT, topic: str):
def network_adapters_checker(self, topic: str):
"""Checks for changes in network adapters
and sends a message to the frontend
"""
try:
adapters = self._session.detect_network_adapters_change()
if adapters:
mqtt_client.send_message(topic, adapters)
with mqtt.MQTT() as client:
client.send_message(topic, adapters)
except Exception: # pylint: disable=W0703
LOGGER.error(traceback.format_exc())

Expand All @@ -713,7 +714,7 @@ def is_device_connected(self):
return self._ip_ctrl.check_interface_status(
self._session.get_device_interface())

def internet_conn_checker(self, mqtt_client: mqtt.MQTT, topic: str):
def internet_conn_checker(self, topic: str):
"""Checks internet connection and sends a status to frontend"""

# Default message
Expand All @@ -739,8 +740,9 @@ def internet_conn_checker(self, mqtt_client: mqtt.MQTT, topic: str):
if internet_connection:
message['connection'] = True

# Broadcast via MQTT client
mqtt_client.send_message(topic, message)
with mqtt.MQTT() as client:
# Broadcast via MQTT client
client.send_message(topic, message)


class NetworkConfig:
Expand Down
Loading