diff --git a/.gitignore b/.gitignore index 5f404b3..1b7a31d 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ __pycache__ /ospro/__pycache__ /ospro/archive /pcb-board/archive -/simulation/archive \ No newline at end of file +/simulation/archive +testing.py \ No newline at end of file diff --git a/dashboard.py b/dashboard.py index c3e9e9c..43efb13 100644 --- a/dashboard.py +++ b/dashboard.py @@ -1,15 +1,5 @@ -""" -Information ---------------------------------------------------------------------- -Name : dashboard.py -Location : ~/ - -Description ---------------------------------------------------------------------- -Runs the GUI application. -""" - -# Import modules +""" Dashboard UI """ + import os import sys import re @@ -25,9 +15,11 @@ from scipy.stats import pearsonr from PIL import Image +from ospro import exceptions, errors +from ospro.platform import factory +from ospro.sensors import temp as ts +from ospro.sensors import pressure as ps import ospro.utils.utils as utils -import ospro.sensors.temp as temp -import ospro.sensors.pressure as pressure # Initialize global variables idling = True @@ -44,41 +36,25 @@ ) ) -# Configure environment -if not config['session']['dev']: - - # Import GPIO module - import RPi.GPIO as GPIO - - # Define board mode - if not GPIO.getmode(): - GPIO.setmode(GPIO.BCM) - elif GPIO.getmode() == 10: - print('ERROR: Invalid GPIO mode (BOARD).') - sys.exit() - else: - pass - - # Suppress GPIO warnings - # GPIO.setwarnings(False) - - # Setup GPIO pins - GPIO.setup( - config['extraction']['pin'], - GPIO.OUT +# Load the platform interface +try: + GPIO = factory.load_interface( + dev=config['session']['dev'] ) +except exceptions.InvalidPlatformError: + sys.exit(errors.PLATFORM_ERRNO) -# Initialize temperatore sensor -tSensor = temp.Sensor( - outputPin=config['tPID']['pin'] +# Setup the interface +GPIO.setup( + config['extraction']['pin'], + GPIO.OUT ) -tSensor.initialize(config) + +# Initialize temperatore sensor +tSensor = ts.Sensor(dev=config['session']['dev']) # Initialize pressure sensor -pSensor = pressure.Sensor( - outputPin=config['pPID']['pin'] -) -pSensor.initialize(config) +pSensor = ps.Sensor(dev=config['session']['dev']) # Assign extraction ID if os.path.isdir(config['session']['diagnosticsLoc']): @@ -377,10 +353,16 @@ def save_extraction( # Update application components tkCounter.set(float(round(counter / 10, 1))) if config['settings']['scale'] == 'F': - tkTempValue.set(temp.convert_to_f(tSensor.read_temp(config))) + tkTempValue.set( + ts.convert_to_f( + tSensor.read(set_point=config['tPID']['setPoint']) + ) + ) else: - tkTempValue.set(tSensor.read_temp(config)) - tkPresValue.set(pSensor.read_pressure(config)) + tkTempValue.set( + tSensor.read(set_point=config['tPID']['setPoint']) + ) + tkPresValue.set(pSensor.read(set_point=config['pPID']['setPoint'])) labelCounter.configure(text_color=theme['CTkLabel']['text_color']) buttonStart.configure(state='normal') @@ -951,10 +933,16 @@ def idle(): # Read sensor values if config['settings']['scale'] == 'F': - tkTempValue.set(temp.convert_to_f(tSensor.read_temp(config))) + tkTempValue.set( + ts.convert_to_f( + tSensor.read(set_point=config['tPID']['setPoint']) + ) + ) else: - tkTempValue.set(tSensor.read_temp(config)) - tkPresValue.set(pSensor.read_pressure(config)) + tkTempValue.set( + tSensor.read(set_point=config['tPID']['setPoint']) + ) + tkPresValue.set(pSensor.read(set_point=config['pPID']['setPoint'])) # Continue idling root.after(100, idle) @@ -997,11 +985,11 @@ def start( extracting = True flashing = False - if not config['session']['dev']: - GPIO.output( - config['extraction']['pin'], - GPIO.HIGH - ) + # Setup the interface + GPIO.output( + config['extraction']['pin'], + GPIO.HIGH + ) # Update application omponents labelCounter.configure(text_color=theme['CTkLabel']['text_color']) @@ -1097,10 +1085,16 @@ def count( # Append sensor values if config['settings']['scale'] == 'F': - temperatureLst.append(temp.convert_to_f(tSensor.read_temp(config))) + temperatureLst.append( + ts.convert_to_f( + tSensor.read(set_point=config['tPID']['setPoint']) + ) + ) else: - temperatureLst.append(tSensor.read_temp(config)) - pressureLst.append(pSensor.read_pressure(config)) + temperatureLst.append( + tSensor.read(set_point=config['tPID']['setPoint']) + ) + pressureLst.append(pSensor.read(set_point=config['pPID']['setPoint'])) # Update label text tkCounter.set(float(round(counter / 10, 1))) @@ -1210,11 +1204,11 @@ def stop( extracting = False flashing = True - if not config['session']['dev']: - GPIO.output( - config['extraction']['pin'], - GPIO.LOW - ) + # Setup the interface + GPIO.output( + config['extraction']['pin'], + GPIO.LOW + ) # Update application omponents buttonStart.configure(state='normal') @@ -1321,10 +1315,16 @@ def reset( # Update application components tkCounter.set(float(round(counter / 10, 1))) if config['settings']['scale'] == 'F': - tkTempValue.set(temp.convert_to_f(tSensor.read_temp(config))) + tkTempValue.set( + ts.convert_to_f( + tSensor.read(set_point=config['tPID']['setPoint']) + ) + ) else: - tkTempValue.set(tSensor.read_temp(config)) - tkPresValue.set(pSensor.read_pressure(config)) + tkTempValue.set( + tSensor.read(set_point=config['tPID']['setPoint']) + ) + tkPresValue.set(pSensor.read(set_point=config['pPID']['setPoint'])) labelCounter.configure(text_color=theme['CTkLabel']['text_color']) buttonStart.configure(state='normal') @@ -1582,7 +1582,7 @@ def delete_callback( tkProfile.set( textwrap.shorten( str(config['settings']['profile']), - width=30, + width=25, break_long_words=True ) ) @@ -1629,7 +1629,7 @@ def scale_callback( ) # Update tkinter variables - tkSetPoint.set(temp.convert_to_f(config['tPID']['setPoint'])) + tkSetPoint.set(ts.convert_to_f(config['tPID']['setPoint'])) else: @@ -1693,7 +1693,7 @@ def setpoint_callback( # Update settings if config['settings']['scale'] == 'F': - config['tPID']['setPoint'] = int(temp.convert_to_c(value)) + config['tPID']['setPoint'] = int(ts.convert_to_c(value)) else: config['tPID']['setPoint'] = int(float(value)) @@ -1737,7 +1737,7 @@ def profile_callback( tkProfile.set( textwrap.shorten( str(value).strip(), - width=30, + width=25, break_long_words=True ) ) @@ -4178,7 +4178,7 @@ def create_profile( # Assign temperature range based on scale value if config['settings']['scale'] == 'F': setPointLst = list(np.arange(200, 220, 1)) - tkSetPoint.set(temp.convert_to_f(config['tPID']['setPoint'])) + tkSetPoint.set(ts.convert_to_f(config['tPID']['setPoint'])) else: setPointLst = [ round(item, 1) for item in list(np.arange(93.5, 105, 0.5)) @@ -4223,7 +4223,7 @@ def create_profile( tkProfile.set( textwrap.shorten( str(config['settings']['profile']), - width=30, + width=25, break_long_words=True ) ) @@ -4245,12 +4245,18 @@ def create_profile( # Set initial sensor values tkTempValue = tk.IntVar(root) if config['settings']['scale'] == 'F': - tkTempValue.set(temp.convert_to_f(tSensor.read_temp(config))) + tkTempValue.set( + ts.convert_to_f( + tSensor.read(set_point=config['tPID']['setPoint']) + ) + ) else: - tkTempValue.set(tSensor.read_temp(config)) + tkTempValue.set( + tSensor.read(set_point=config['tPID']['setPoint']) + ) tkPresValue = tk.DoubleVar(root) - tkPresValue.set(pSensor.read_pressure(config)) + tkPresValue.set(pSensor.read(set_point=config['pPID']['setPoint'])) # Declare main frame components labelLogo = customtkinter.CTkLabel( @@ -4494,8 +4500,7 @@ def create_profile( root.quit() # Cleanup - if not config['session']['dev']: - GPIO.cleanup() + GPIO.cleanup() # Exit sys.exit() @@ -4504,5 +4509,4 @@ def create_profile( root.quit() # Cleanup - if not config['session']['dev']: - GPIO.cleanup() + GPIO.cleanup() diff --git a/main.py b/main.py index b9324db..1d284b3 100644 --- a/main.py +++ b/main.py @@ -1,202 +1,203 @@ -""" -Information ---------------------------------------------------------------------- -Name : main.py -Location : ~/ - -Description ---------------------------------------------------------------------- -Contains initialization functions and runs the Ospro application. -""" - -# Import modules -import subprocess -import time +""" Ospro """ + +from typing import Union import os import sys -import ospro.utils.utils as utils +import time +import subprocess +from ospro import logging, errors +from pytensils import config # Initialize global variables -running = True -dirLoc = os.path.abspath( +RUNNING: bool = True +DIR_NAME: Union[str, os.PathLike] = os.path.abspath( os.path.dirname(__file__) ) -execLoc = os.path.abspath( - sys.executable -) - -# Initialize session dictionary -session = { - 'running': running, +EXECUTABLE: Union[str, os.PathLike] = os.path.abspath(sys.executable) +SESSION: dict = { + 'running': RUNNING, 'assetsLoc': os.path.join( - dirLoc, 'assets' + DIR_NAME, 'assets' ), 'configLoc': os.path.join( - dirLoc, 'config' + DIR_NAME, 'config' ), 'diagnosticsLoc': os.path.join( - dirLoc, 'diagnostics' + DIR_NAME, 'diagnostics' ), - 'modulesLoc': dirLoc, - 'controllersLoc': dirLoc, + 'modulesLoc': DIR_NAME, + 'controllersLoc': DIR_NAME, 'sensorsLoc': os.path.join( - dirLoc, 'ospro', 'sensors' + DIR_NAME, 'ospro', 'sensors' ), 'utilsLoc': os.path.join( - dirLoc, 'ospro', 'utils' + DIR_NAME, 'ospro', 'utils' ) } -# Define functions +# Define app-orchestration functions def initialize( - session -): - """ - Variables - --------------------------------------------------------------------- - session = Dictionary object containing the - parameters essential to the session. - - Description - --------------------------------------------------------------------- - Checks for the dashboard, temp_pid and pressure_pid modules. Reads, - validates and updates ~/config.json and returns the config dictionary - object. + session_object: dict, + Logging: logging.Logger +) -> config.Handler: + """ Checks for the dashboard, temp_pid and pressure_pid modules. Reads, + validates and updates ~/config.json and returns a + `pytensils.config.Handler` object. + + Parameters + ---------- + session_object: `dict` + Dictionary object containing the parameters essential to the session. + Logging: `logging.Logger` + Logging object for debug console logging. """ # Check dashboard if os.path.isfile( os.path.join( - session['modulesLoc'], 'dashboard.py' + session_object['modulesLoc'], 'dashboard.py' ) ): - session['dashboard'] = True + session_object['dashboard'] = True # Check temperature PID if os.path.isfile( os.path.join( - session['controllersLoc'], 'temp_pid.py' + session_object['controllersLoc'], 'temp_pid.py' ) - ) and (session['dashboard']): - session['tempPID'] = True + ) and (session_object['dashboard']): + session_object['tempPID'] = True # Check pressure PID if os.path.isfile( os.path.join( - session['controllersLoc'], 'pressure_pid.py' + session_object['controllersLoc'], 'pressure_pid.py' ) - ) and (session['dashboard']): - session['pressurePID'] = True + ) and (session_object['dashboard']): + session_object['pressurePID'] = True # Read config - config = utils.read_config( - configLoc=os.path.join( - session['configLoc'], 'config.json' - ) - ) - config['session'] = {**config['session'], **session} + Config = config.Handler(path=os.path.join(session_object['configLoc'])) + Config.data['session'] = {**Config.data['session'], **session_object} # Validate config - utils.validate_config( - config, - utils.read_config( - configLoc=os.path.join( - session['configLoc'], 'dtypes.json' - ) - ) - ) + if Config.validate( + dtypes=config.Handler( + path=os.path.join(session_object['configLoc']), + file_name='dtypes.json' + ).to_dict() + ): - # Update config with session parameters - utils.write_config( - configLoc=os.path.join( - session['configLoc'], 'config.json' - ), - config=config - ) + # Logging + Logging.debug('NOTE: Config validation completed successfully.') + + # Update config with session parameters + Config.write() - return config + return Config def poll( - app, - execLoc, - appLoc + app: subprocess.Popen, + executable_path: Union[str, os.PathLike], + app_path: Union[str, os.PathLike] ): - """ - Variables - --------------------------------------------------------------------- - app = Subprocess object for an - application. - - Description - --------------------------------------------------------------------- - Checks the status of the application and re-starts it when not + """ Checks the status of the application and re-starts it when not running. + + Parameters + ---------- + app: `subprocess.Popen` + Class instance of the application. + executable_path: `Union[str, os.PathLike]` + The local file-path of the Python executable. + app_path: `Union[str, os.PathLike]` + The local file-path of the application entrypoint. """ if app.poll() is None: pass + + elif app.returncode in errors.FATAL: + app = False + + # Logging + Logging.error( + ' '.join([ + '%sERROR: Invalid platform.' % (logging.COLORS['red']), + 'The {raspberry-pi} platform is only available', + 'on ARM-architecture.%s' % (logging.RESET) + ]) + ) + elif app.returncode > 0: - app = subprocess.Popen( - [execLoc, appLoc] + + # Restart + app = subprocess.Popen([executable_path, app_path]) + + # Logging + Logging.debug( + 'NOTE: {%s} restarted successfully.' % (os.path.basename(app_path)) ) + else: app = False return app -# Main if __name__ == '__main__': + # Setup logging + Logging = logging.get_ospro_logger() + # Initialize the application - config = initialize(session) + Config = initialize(session_object=SESSION, Logging=Logging) - # Initialize dashboard - if config['session']['dashboard']: + # Initialize the dashboard-UI + if Config.data['session']['dashboard']: dashboard_app = subprocess.Popen( [ - execLoc, + EXECUTABLE, os.path.join( - config['session']['modulesLoc'], + Config.data['session']['modulesLoc'], 'dashboard.py' ) ] ) - # Initialize temperature controller - if config['session']['tempPID']: + # Initialize the temperature controller + if Config.data['session']['tempPID']: temp_pid_app = subprocess.Popen( [ - execLoc, + EXECUTABLE, os.path.join( - config['session']['controllersLoc'], + Config.data['session']['controllersLoc'], 'temp_pid.py' ) ] ) - # Initialize pressure controller - if config['session']['pressurePID']: + # Initialize the pressure controller + if Config.data['session']['pressurePID']: pass # pressure_pid_app = subprocess.Popen( # [ - # execLoc, + # EXECUTABLE, # os.path.join( - # config['session']['controllersLoc'], + # Config.data['session']['controllersLoc'], # 'pressure_pid.py' # ) # ] # ) - while config['session']['running']: + while Config.data['session']['running']: # Read config - config = utils.read_config( - configLoc=os.path.join( - config['session']['configLoc'], - 'config.json' + Config = config.Handler( + path=os.path.join( + Config.data['session']['configLoc'] ) ) @@ -204,18 +205,18 @@ def poll( if dashboard_app: dashboard_app = poll( app=dashboard_app, - execLoc=execLoc, - appLoc=os.path.join( - config['session']['modulesLoc'], + executable_path=EXECUTABLE, + app_path=os.path.join( + Config.data['session']['modulesLoc'], 'dashboard.py' ) ) temp_pid_app = poll( app=temp_pid_app, - execLoc=execLoc, - appLoc=os.path.join( - config['session']['controllersLoc'], + executable_path=EXECUTABLE, + app_path=os.path.join( + Config.data['session']['controllersLoc'], 'temp_pid.py' ) ) @@ -223,23 +224,13 @@ def poll( else: # Terminate applications - config['session']['running'] = False - utils.write_config( - configLoc=os.path.join( - session['configLoc'], 'config.json' - ), - config=config - ) + Config.data['session']['running'] = False + Config.write() except KeyboardInterrupt: # Terminate applications - config['session']['running'] = False - utils.write_config( - configLoc=os.path.join( - session['configLoc'], 'config.json' - ), - config=config - ) + Config.data['session']['running'] = False + Config.write() time.sleep(5) diff --git a/ospro/__init__.py b/ospro/__init__.py index e69de29..31ed233 100644 --- a/ospro/__init__.py +++ b/ospro/__init__.py @@ -0,0 +1,9 @@ +""" Ospro """ + +from ospro import exceptions +from ospro import logging +from ospro import algorithms +from ospro import platform +from ospro import sensors + +__all__ = ['exceptions', 'logging', 'algorithms', 'platform', 'sensors'] diff --git a/ospro/controllers/__init__.py b/ospro/controllers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ospro/controllers/_controller.py b/ospro/controllers/_controller.py new file mode 100644 index 0000000..b13f0d4 --- /dev/null +++ b/ospro/controllers/_controller.py @@ -0,0 +1,77 @@ +""" Generic controller """ + +from ospro.platform import factory + + +class GenericController(): + """ A `class` that represents a generic controller. + + Parameters + ---------- + dev: `bool` + `True` or `False`, whether dev-mode is enabled. + output_pin: `int` + Pin number that identifies the pulse width modulation pin. + """ + + def __init__( + self, + dev: bool, + output_pin: int + ): + """ Creates an instance of the controller. + + Parameters + ---------- + dev: `bool` + `True` or `False`, whether dev-mode is enabled. + output_pin: `int` + The general-purpose input/output pin that sets the + pulse-width power output for the controller. + """ + + # Assign class variables + self.output_pin: int = output_pin + + # Load the platform interface + GPIO = factory.load_interface(dev=dev) + + # Setup the platform interface for the controller + GPIO.setup( + self.output_pin, + GPIO.OUT + ) + + # Create a pulse-width modulation controller + self.controller = GPIO.PWM( + self.output_pin, + 600 + ) + + # Start the controller + self.controller.start(0) + + def start( + self + ): + """ Starts the duty cycle of the controller. """ + self.controller.start(0) + + def stop( + self + ): + """ Stops the duty cycle of the controller. """ + self.controller.stop() + + def update_duty_cycle( + self, + output: float + ): + """ Changes the duty cycle of the controller. + + Parameters + ---------- + output: `float` + The pulse-width modulation output duty cycle. + """ + self.controller.ChangeDutyCycle(output) diff --git a/ospro/controllers/pressure.py b/ospro/controllers/pressure.py new file mode 100644 index 0000000..9ef58e5 --- /dev/null +++ b/ospro/controllers/pressure.py @@ -0,0 +1,16 @@ +""" Pressure controller-API """ + +from ospro.controllers import _controller + + +class Controller(_controller.GenericController): + """ A `class` that represents a pressure controller. + + Parameters + ---------- + dev: `bool` + `True` or `False`, whether dev-mode is enabled. + output_pin: `int` + Pin number that identifies the pulse width modulation pin. + """ + pass diff --git a/ospro/controllers/temp.py b/ospro/controllers/temp.py new file mode 100644 index 0000000..d14279d --- /dev/null +++ b/ospro/controllers/temp.py @@ -0,0 +1,16 @@ +""" Temperature controller-API """ + +from ospro.controllers import _controller + + +class Controller(_controller.GenericController): + """ A `class` that represents a temperature controller. + + Parameters + ---------- + dev: `bool` + `True` or `False`, whether dev-mode is enabled. + output_pin: `int` + Pin number that identifies the pulse width modulation pin. + """ + pass diff --git a/ospro/errors.py b/ospro/errors.py new file mode 100644 index 0000000..aaf64b2 --- /dev/null +++ b/ospro/errors.py @@ -0,0 +1,9 @@ +""" Fatal errors """ + +import errno + +# Error number constants +PLATFORM_ERRNO: int = errno.ENXIO # No such device or address +INTERFACE_ERRNO: int = errno.EIO # I / O error + +FATAL: list = [PLATFORM_ERRNO] diff --git a/ospro/exceptions.py b/ospro/exceptions.py new file mode 100644 index 0000000..fee3c41 --- /dev/null +++ b/ospro/exceptions.py @@ -0,0 +1,9 @@ +""" Exceptions """ + + +class InvalidPlatformError(Exception): + pass + + +class InvalidModeError(Exception): + pass diff --git a/ospro/logging.py b/ospro/logging.py new file mode 100644 index 0000000..d38b120 --- /dev/null +++ b/ospro/logging.py @@ -0,0 +1,80 @@ +""" Logging """ + +import logging + +# ANSI escape sequences for colors +COLORS = { + 'black': '\033[30m', + 'red': '\033[31m', + 'green': '\033[32m', + 'yellow': '\033[33m', + 'blue': '\033[34m', + 'magenta': '\033[35m', + 'cyan': '\033[36m', + 'white': '\033[37m', +} + +# Default color to reset formatting +RESET = '\033[0m' + + +# Alias logging.Logger for type hinting +class Logger(logging.Logger): + def __init__(self, args, kwargs): + super().__init__(*args, **kwargs) + + +# Create application-loggers +def get_ospro_logger() -> logging.Logger: + """ Get the Ospro-application console logger. """ + return logger(name='ospro').get() + + +def get_temperature_pid_logger() -> logging.Logger: + """ Get the temperature-PID controller console logger. """ + return logger(name='tPID ', text_color=COLORS['blue']).get() + + +def get_pressure_pid_logger() -> logging.Logger: + """ Get the pressure-PID controller console logger. """ + raise NotImplementedError + + +class logger(): + """ A `class` that represents a generic logging handler. """ + + def __init__( + self, + name: str = __name__, + level: int = logging.DEBUG, + text_color: str = RESET + ): + """ Creates an instance of the generic logger. + + Parameters + ---------- + name: `str` + The name of the logger. + level: `int` + The severity of the log messages. + text_color: `str` + The text-color of the console log messages. + """ + self.logger = logging.getLogger(name=name) + self.logger.setLevel(level=level) + + # Create and format a console handler + if not self.logger.handlers: + debugger = logging.StreamHandler() + debugger.setLevel(level=level) + debugger.setFormatter( + fmt=logging.Formatter( + f'{text_color}[{name}] %(message)s{RESET}' + ) + ) + self.logger.addHandler(hdlr=debugger) + + def get(self): + """ Returns the logger instance. + """ + return self.logger diff --git a/ospro/platform/__init__.py b/ospro/platform/__init__.py new file mode 100644 index 0000000..dc804d7 --- /dev/null +++ b/ospro/platform/__init__.py @@ -0,0 +1,5 @@ +""" Platform orchestration """ + +from ospro.platform import factory + +__all__ = ['factory'] diff --git a/ospro/platform/_interface.py b/ospro/platform/_interface.py new file mode 100644 index 0000000..fcc0efd --- /dev/null +++ b/ospro/platform/_interface.py @@ -0,0 +1,131 @@ +""" Abstract general-purpose input / output interface """ + +from typing import Union +from abc import ABC, abstractmethod +from dataclasses import dataclass + + +class AbstractInterface(ABC): + """ Abstract general-purpose input / output interface. """ + + # Interface constants + OUT: int = 0 + IN: int = 1 + HIGH: bool = True + LOW: bool = False + + RISING: int = 1 + FALLING: int = 2 + BOTH: int = 3 + + PUD_OFF: int = 0 + PUD_DOWN: int = 1 + PUD_UP: int = 2 + + @abstractmethod + def setup( + self, + channel: int, + mode: int, + pull_up_down: int = PUD_OFF + ): + """ Sets the input or output mode for a specified pin. Mode should be + either OUT or IN. + + Parameters + ---------- + channel: `int` + A general-purpose input/output pin. + mode: `int` + Either OUT or IN. + pull_up_down: `int` + Either PUD_OFF, PUD_DOWN or PUD_UP. + """ + raise NotImplementedError + + @abstractmethod + def output(self, channel: int, value: Union[int, bool]): + """ Sets the specified pin the provided high/low value. Value should + be either HIGH/LOW or a boolean (true = high). + + Parameters + ---------- + channel: `int` + A general-purpose input/output pin. + value: `Union[int, bool]` + Either HIGH or LOW or True (HIGH) or False (LOW). + """ + raise NotImplementedError + + @abstractmethod + def input(self, channel: int): + """ Reads the specified pin and return HIGH/true if the pin is pulled + high, or LOW/false if pulled low. + + Parameters + ---------- + channel: `int` + A general-purpose input/output pin that is enabled + during espresso extraction. + """ + raise NotImplementedError + + @abstractmethod + def cleanup(self, channel: Union[int, None] = None): + """ Cleans up GPIO event detection for specific pin, or all pins if + `None` is specified. + + Parameters + ---------- + channel: `Union[int, None]` + A general-purpose input/output pin. + """ + raise NotImplementedError + + class AbstractPWM(ABC): + """ Abstract pulse-width modulation controller. """ + + # Controller constants + OFF: int = 0 + ON: int = 1 + + @abstractmethod + def __init__(self, channel: int, frequency: float): + """ Creates an instance of the pulse-width modulation controller. + + Parameters + ---------- + channel: `int` + A general-purpose input/output pin. + frequency: `int` + The frequency of the pulse-width modulation. + """ + raise NotImplementedError + + @abstractmethod + def start(self): + """ Starts the controller. """ + raise NotImplementedError + + @abstractmethod + def stop(self): + """ Stops the controller. """ + raise NotImplementedError + + +@dataclass +class Pin(): + """ Represents a general-pupose input / output pin. """ + + mode: Union[int, None] = None + pull_up_down: Union[int, None] = None + value: Union[int, bool, None] = None + + +@dataclass +class Controller(): + """ Represents a general-pupose input / output pin controller. """ + + dutycycle: Union[int, bool] = False + pwm: Union[bool] = False + frequency: Union[float, None] = None diff --git a/ospro/platform/factory.py b/ospro/platform/factory.py new file mode 100644 index 0000000..6d35e3d --- /dev/null +++ b/ospro/platform/factory.py @@ -0,0 +1,73 @@ +""" Platform factory """ + +import platform +from ospro import exceptions + +TYPES = { + 'raspberry-pi': 'raspberry-pi', + 'other': 'other' +} + + +def load_interface( + dev: bool +): + """ Loads the platform interface. + + Parameters + ---------- + dev: `bool` + `True` or `False`, whether dev-mode is enabled. + """ + + # Parse platform + machine_platform = parse_platform(dev=dev) + + # Raise an exception if an invalid platform is assigned + if machine_platform.strip().lower() not in list(TYPES.values()): + raise exceptions.InvalidPlatformError( + 'Invalid platform. The available platforms are [%s]' % ( + ', '.join(list(TYPES.values())) + ) + ) + + # Raise an exception if the raspberry-pi platform is requested when + # running on non-ARM architecture + if ( + machine_platform.strip().lower() == TYPES['raspberry-pi'] + and ( + platform.system() != 'Linux' + or 'arm' not in platform.machine() + ) + ): + raise exceptions.InvalidPlatformError( + ' '.join([ + 'Invalid platform.', + 'The {raspberry-pi} platform is only available', + 'on ARM-architecture.' + ]) + ) + + # Configure the raspberry-pi platform + if machine_platform.strip().lower() == TYPES['raspberry-pi']: + from ospro.platform import raspberry_pi + return raspberry_pi.Interface() + + # Skip configuration for all other platforms + if machine_platform.strip().lower() == TYPES['other']: + from ospro.platform import other + return other.Interface() + + +def parse_platform(dev: bool): + """ Parses the platform from the execution mode. + + Parameters + ---------- + dev: `bool` + `True` or `False`, whether dev-mode is enabled. + """ + if dev: + return TYPES['other'] + else: + return TYPES['raspberry-pi'] diff --git a/ospro/platform/other.py b/ospro/platform/other.py new file mode 100644 index 0000000..ee2bc7d --- /dev/null +++ b/ospro/platform/other.py @@ -0,0 +1,133 @@ +""" Other (mocked) general-purpose input / output interface """ + +from typing import Union, Dict +from ospro.platform._interface import AbstractInterface +from ospro.platform._interface import Pin +from ospro.platform._interface import Controller + + +class Interface(AbstractInterface): + """ Other (mocked) general-purpose input / output interface """ + + def __init__(self): + self.channels: Dict[str, Pin] = {} + + def setup( + self, + channel: int, + mode: int, + pull_up_down: int = AbstractInterface.PUD_OFF + ): + """ Sets the input or output mode for a specified pin. Mode should be + either OUT or IN. + + Parameters + ---------- + channel: `int` + A general-purpose input/output pin. + mode: `int` + Either OUT or IN. + pull_up_down: `int` + Either PUD_OFF, PUD_DOWN or PUD_UP. + """ + self.channels[channel] = Pin( + mode=mode, + pull_up_down=pull_up_down + ) + + def output(self, channel: int, value: Union[int, bool]): + """ Sets the specified pin the provided high/low value. Value should + be either HIGH/LOW or a boolean (true = high). + + Parameters + ---------- + channel: `int` + A general-purpose input/output pin. + value: `Union[int, bool]` + Either HIGH or LOW or True (HIGH) or False (LOW). + """ + if channel in self.channels: + _pin = Pin( + mode=self.channels[channel].mode, + pull_up_down=self.channels[channel].pull_up_down, + value=value + ) + else: + _pin = Pin( + value=value + ) + self.channels[channel] = _pin + + def input(self, pin) -> Union[int, bool]: + """ Reads the specified pin and returns HIGH/true if the pin is pulled + high, or LOW/false if pulled low. + + Parameters + ---------- + channel: `int` + A general-purpose input/output pin that is enabled + during espresso extraction. + """ + return self.channels.get(pin, None) + + def cleanup(self, channel: Union[int, None] = None): + """ Cleans up GPIO event detection for specific pin, or all pins if + `None` is specified. + + Parameters + ---------- + channel: `Union[int, None]` + A general-purpose input/output pin. + """ + if channel: + del self.channels[channel] + else: + self.channels.clear() + + class PWM(AbstractInterface.AbstractPWM): + """ Represents pulse-width modulation. """ + + def __init__( + self, + channel: int, + frequency: float + ): + """ Creates an instance of the pulse-width modulation controller. + + Parameters + ---------- + channel: `int` + A general-purpose input/output pin. + frequency: `int` + The frequency of the pulse-width modulation. + """ + self.channel: int = channel + self.controller = Controller( + dutycycle=self.OFF, + pwm=True, + frequency=frequency + ) + + def start(self, dutycycle: float): + """ Starts the controller. + + Parameters + ---------- + dutycycle: `float` + The pulse width modulation output duty cycle. + """ + self.controller.dutycycle = dutycycle + + def ChangeDutyCycle(self, dutycycle: float): + """ Changes the duty cycle of the controller. + + Parameters + ---------- + dutycycle: `float` + The pulse width modulation output duty cycle. + """ + self.controller.dutycycle = dutycycle + + def stop(self): + """ Stops the controller. """ + self.controller.dutycycle = 0 diff --git a/ospro/platform/raspberry_pi.py b/ospro/platform/raspberry_pi.py new file mode 100644 index 0000000..02ea24d --- /dev/null +++ b/ospro/platform/raspberry_pi.py @@ -0,0 +1,32 @@ +""" Raspberry-Pi general-purpose input / output interface """ + +from ospro import exceptions + +# Import raspberry-pi general-purpose input / output interface. +try: + import RPi.GPIO as GPIO + + # Define the board mode + if not GPIO.getmode(): + GPIO.setmode(GPIO.BCM) + elif GPIO.getmode() == 10: + raise exceptions.InvalidModeError( + 'Invalid interface mode {BOARD}.' + ) + else: + pass + +except ImportError: + raise exceptions.InvalidPlatformError( + ' '.join([ + 'Invalid platform.', + 'The {raspberry-pi} platform is only available', + 'on ARM-architecture.' + ]) + ) + + +def Interface(): + """ Represents the Raspberry-Pi general-purpose input / output interface. + """ + return GPIO diff --git a/ospro/sensors/pressure.py b/ospro/sensors/pressure.py index 55c4a0a..6cae550 100644 --- a/ospro/sensors/pressure.py +++ b/ospro/sensors/pressure.py @@ -1,225 +1,102 @@ -""" -Information ---------------------------------------------------------------------- -Name : pressure.py -Location : ~/ospro/sensors - -Description ---------------------------------------------------------------------- -Contains the pressure sensor classes and functions. +""" Pressure sensor-API + +Sensor +------ +Pressure transducer 1/8 IN NPT thread stainless steel (500 PSI) +- Measures 0 to 500 psi output in 1 psi increments + - ±0.5% psi in accuracy +- 5v power supply and logic level compliant +- 0.5v - 4.5v linear voltage output, + - 0 psi outputs 0.5v + - 500 psi outputs 4.5v """ -# Import modules import random -import sys - - -# Define temperature sensor class -class Controller(): - - def __init__( - self, - outputPin - ): - """ - Variables - --------------------------------------------------------------------- - outputPin = Pin number that identifies the - pulse width modulation pin. - - Description - --------------------------------------------------------------------- - Creates an instance of the Controller class. - """ - - # Assign class variables - self.outputPin = outputPin - - def initialize( - self, - config - ): - """ - Variables - --------------------------------------------------------------------- - config = Dictionary object containing - the application settings - - Description - --------------------------------------------------------------------- - Initializes the pressure controller hardware. - """ - - # Import sensor modules - if not config['session']['dev']: - - import RPi.GPIO as GPIO - - # Define board mode - if not GPIO.getmode(): - GPIO.setmode(GPIO.BCM) - elif GPIO.getmode() == 10: - print('ERROR: Invalid GPIO mode (BOARD).') - sys.exit() - else: - pass - - # Suppress GPIO warnings - # GPIO.setwarnings(False) - - # Setup GPIO pins - self.controller = GPIO.PWM( - self.outputPin, - 600 - ) - - # Start the controller - self.controller.start(0) - - else: - self.controller = False +from ospro.platform import factory - def start( - self - ): - """ - Variables - --------------------------------------------------------------------- - - Description - --------------------------------------------------------------------- - Starts the duty cycle of the pressure controller class. - """ - - # Start the controller - self.controller.start(0) - - def stop( - self - ): - """ - Variables - --------------------------------------------------------------------- - - Description - --------------------------------------------------------------------- - Stops the duty cycle of the pressure controller class. - """ - - # Stop the controller - self.controller.stop() - - def update_duty_cycle( - self, - output - ): - """ - Variables - --------------------------------------------------------------------- - output = Pulse width modulation output duty - cycle. - - Description - --------------------------------------------------------------------- - Changes the duty cycle of the pressure controller class. - """ - - # Update duty cycle - self.controller.ChangeDutyCycle(output) +# Assign the minimum pressure sensor accuracy +# for the pressure sensor hardware +ACCURACY: float = 0.344738 +MIN: int = 0 +MAX: int = 500 +RANDOM_SEED: int = 9 class Sensor(): + """ A `class` that represents a pressure sensor. + + Parameters + ---------- + dev: `bool` + `True` or `False`, whether dev-mode is enabled. + """ def __init__( self, - outputPin + dev: bool ): - """ - Variables - --------------------------------------------------------------------- - outputPin = Pin number that identifies the - pulse width modulation pin. - - Description - --------------------------------------------------------------------- - Creates an instance of the Sensor class. + """ Creates an instance of the pressure sensor. + + Parameters + ---------- + dev: `bool` + `True` or `False`, whether dev-mode is enabled. """ # Assign class variables - self.outputPin = outputPin + self.dev: bool = dev - def initialize( - self, - config - ): - """ - Variables - --------------------------------------------------------------------- - config = Dictionary object containing - the application settings - - Description - --------------------------------------------------------------------- - Initializes the pressure sensor hardware. - """ + # Load the raspberry-pi platform interface + _ = factory.load_interface(dev=dev) # Import sensor modules - if not config['session']['dev']: + if not dev: - import RPi.GPIO as GPIO import Adafruit_ADS1x15 as adafruit - # Define board mode - if not GPIO.getmode(): - GPIO.setmode(GPIO.BCM) - elif GPIO.getmode() == 10: - print('ERROR: Invalid GPIO mode (BOARD).') - sys.exit() - else: - pass - - # Suppress GPIO warnings - # GPIO.setwarnings(False) - - # Setup GPIO pins - GPIO.setup( - self.outputPin, - GPIO.OUT - ) - # Initialize sensors self.sensor = adafruit.ADS1115( address=0x48, busnum=1 ) + else: - self.sensor = False + self.sensor = None - def read_pressure( + def read( self, - config - ): - """ - Variables - --------------------------------------------------------------------- - - Description - --------------------------------------------------------------------- - Returns the system pressure. - """ - - if config['session']['dev']: - pressure = float(random.randint(80, 90) / 10) + set_point: int = RANDOM_SEED + ) -> float: + """ Returns the pressure in bars. + + Parameters + ---------- + set_point: `int` + The set-point in bars of the system. + Used for generating random pressure values when {dev} = `True`. + """ + + if self.dev: + pressure = float( + random.randint( + set_point - 10 * 10, + set_point + 10 * 10 + ) / 10 + ) else: try: pressure = round( - (3.0 / 1750) * - (self.sensor.read_adc( - 0, - gain=2 / 3 - )) - (34.0 / 7.0), 1 + (3.0 / 1750) + * ( + self.sensor.read_adc( + 0, + gain=2 / 3 + ) + ) + - (34.0 / 7.0), + 1 ) - except RuntimeError: - pass + except RuntimeError as e: + raise RuntimeError(e) - return pressure + return abs(float(pressure)) diff --git a/ospro/sensors/temp.py b/ospro/sensors/temp.py index 6e6cf03..1a89845 100644 --- a/ospro/sensors/temp.py +++ b/ospro/sensors/temp.py @@ -1,278 +1,116 @@ +""" Temperature sensor-API + +Sensor +------ +Adafruit MAX31855 +- Compatible with K-type thermocouples +- Measures -200°C to +1350°C output in 0.25 degree increments + - K-type thermocouples typically range from ±2°C to ±6°C in accuracy +- 3.3 to 5v power supply and logic level compliant """ -Information ---------------------------------------------------------------------- -Name : temp.py -Location : ~/ospro/sensors -Description ---------------------------------------------------------------------- -Contains the temperature sensor classes and functions. -""" - -# Import modules import random -import sys - - -# Define temperature sensor class -class Controller(): - - def __init__( - self, - outputPin - ): - """ - Variables - --------------------------------------------------------------------- - outputPin = Pin number that identifies the - pulse width modulation pin. - - Description - --------------------------------------------------------------------- - Creates an instance of the Controller class. - """ - - # Assign class variables - self.outputPin = outputPin - - def initialize( - self, - config - ): - """ - Variables - --------------------------------------------------------------------- - config = Dictionary object containing - the application settings - - Description - --------------------------------------------------------------------- - Initializes the temperature controller hardware. - """ - - # Import sensor modules - if not config['session']['dev']: - - import RPi.GPIO as GPIO - - # Define board mode - if not GPIO.getmode(): - GPIO.setmode(GPIO.BCM) - elif GPIO.getmode() == 10: - print('ERROR: Invalid GPIO mode (BOARD).') - sys.exit() - else: - pass - - # Suppress GPIO warnings - # GPIO.setwarnings(False) - - # Setup GPIO pins - self.controller = GPIO.PWM( - self.outputPin, - 600 - ) - - # Start the controller - self.controller.start(0) - - else: - self.controller = False - - def start( - self - ): - """ - Variables - --------------------------------------------------------------------- - - Description - --------------------------------------------------------------------- - Starts the duty cycle of the temperature controller class. - """ - - # Start the controller - self.controller.start(0) - - def stop( - self - ): - """ - Variables - --------------------------------------------------------------------- - - Description - --------------------------------------------------------------------- - Stops the duty cycle of the temperature controller class. - """ - - # Stop the controller - self.controller.stop() - - def update_duty_cycle( - self, - output - ): - """ - Variables - --------------------------------------------------------------------- - output = Pulse width modulation output duty - cycle. +from ospro.platform import factory - Description - --------------------------------------------------------------------- - Changes the duty cycle of the temperature controller class. - """ - - # Update duty cycle - self.controller.ChangeDutyCycle(output) +# Assign the minimum temperature sensor accuracy +# for the temperature sensor hardware +ACCURACY: int = 3 +MIN: int = 0 +MAX: int = 600 +RANDOM_SEED: int = 93 class Sensor(): + """ A `class` that represents a temperature sensor. + + Parameters + ---------- + dev: `bool` + `True` or `False`, whether dev-mode is enabled. + """ def __init__( self, - outputPin + dev: bool ): - """ - Variables - --------------------------------------------------------------------- - outputPin = Pin number that identifies the - pulse width modulation pin. + """ Creates an instance of the temperature sensor. - Description - --------------------------------------------------------------------- - Creates an instance of the Sensor class. + Parameters + ---------- + dev: `bool` + `True` or `False`, whether dev-mode is enabled. """ # Assign class variables - self.outputPin = outputPin - self.previousTemperature = None - - def initialize( - self, - config - ): - """ - Variables - --------------------------------------------------------------------- - config = Dictionary object containing - the application settings + self.dev: bool = dev - Description - --------------------------------------------------------------------- - Initializes the temperature sensor hardware. - """ + # Load the raspberry-pi platform interface + _ = factory.load_interface(dev=dev) # Import sensor modules - if not config['session']['dev']: + if not dev: import board import digitalio - import RPi.GPIO as GPIO import adafruit_max31855 - # Define board mode - if not GPIO.getmode(): - GPIO.setmode(GPIO.BCM) - elif GPIO.getmode() == 10: - print('ERROR: Invalid GPIO mode (BOARD).') - sys.exit() - else: - pass - - # Suppress GPIO warnings - # GPIO.setwarnings(False) - - # Setup GPIO pins - GPIO.setup( - self.outputPin, - GPIO.OUT - ) - # Initialize sensors self.sensor = adafruit_max31855.MAX31855( board.SPI(), digitalio.DigitalInOut(board.D5) ) + else: - self.sensor = False + self.sensor = None - def read_temp( + def read( self, - config - ): - """ - Variables - --------------------------------------------------------------------- - config = Dictionary object containing - the application settings + set_point: int = RANDOM_SEED + ) -> int: + """ Returns the temperature in degrees Celsius. - Description - --------------------------------------------------------------------- - Returns the water temperature in degrees Celsius. + Parameters + ---------- + set_point: `int` + The set-point in degrees Celcius of the system. + Used for generating random temperatures when {dev} = `True`. """ - if config['session']['dev']: + if self.dev: temperature = random.randint( - int(config['tPID']['setPoint'] * 0.95), - int(config['tPID']['setPoint'] * 1.02) + int(set_point * 0.95), + int(set_point * 1.02) ) - else: try: - temperature = round(self.sensor.temperature, 2) - - except RuntimeError: - if self.previousTemperature is not None: - temperature = self.previousTemperature - else: - print('ERROR: Unable to read temperature sensor.') - sys.exit() - - # Evaluate error - if self.previousTemperature is not None: - if ( - ( - abs(temperature - self.previousTemperature) >= - config['tPID']['error'] - ) - ): - temperature = self.previousTemperature - - # Update previous temperature - self.previousTemperature = temperature + temperature = round(self.sensor.temperature, 0) + except RuntimeError as e: + raise RuntimeError(e) return int(temperature) def convert_to_c( - temperature -): - """ - Variables - --------------------------------------------------------------------- - temperature = Temperature value in Fahrenheit - - Description - --------------------------------------------------------------------- - Converts {temperature} from Fahrenheit to Celsius. + temperature: float +) -> int: + """ Converts {temperature} from Fahrenheit to Celsius. + + Parameters + ---------- + temperature: `float` + Temperature value in Fahrenheit. """ - return int((float(temperature) - 32) * 5 / 9) def convert_to_f( - temperature -): - """ - Variables - --------------------------------------------------------------------- - temperature = Temperature value in Celsius. - - Description - --------------------------------------------------------------------- - Converts {temperature} from Celsius to Fahrenheit. + temperature: float +) -> int: + """ Converts {temperature} from Celsius to Fahrenheit. + + Parameters + ---------- + temperature: `float` + Temperature value in Celsius. """ - return int((float(temperature) * 9 / 5) + 32) diff --git a/ospro/utils/utils.py b/ospro/utils/utils.py index 1b0f35b..e314212 100644 --- a/ospro/utils/utils.py +++ b/ospro/utils/utils.py @@ -1,19 +1,7 @@ -""" -Information ---------------------------------------------------------------------- -Name : utils.py -Location : ~/ospro/utils/ -Author : Tom Eleff -Published : 2023-07-11 -Revised on : 2023-09-03 - -Description ---------------------------------------------------------------------- -Contains the utility functions necessary for managing configuration, +""" Contains the utility functions necessary for managing configuration, validation and user-logging. """ -# Import Modules import json import os import time diff --git a/requirements.txt b/requirements.txt index 9799f67..ac3176d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ numpy==1.21.6 matplotlib==3.5.2 scipy==1.7.3 customtkinter==5.2.1 -pillow==9.5.0 \ No newline at end of file +pillow==9.5.0 +pytensils==1.2.0 \ No newline at end of file diff --git a/requirements_RPi.txt b/requirements_RPi.txt index 7df4349..0e13577 100644 --- a/requirements_RPi.txt +++ b/requirements_RPi.txt @@ -4,6 +4,7 @@ matplotlib==3.5.2 scipy==1.7.3 customtkinter==5.2.1 pillow==9.5.0 +pytensils==1.2.0 RPi.GPIO==0.7.1 adafruit-circuitpython-ads1x15==2.2.25 adafruit-circuitpython-max31855==3.2.21 diff --git a/simulate_pressure_profiles.py b/simulate_pressure_profiles.py index 53b16ca..ded5377 100644 --- a/simulate_pressure_profiles.py +++ b/simulate_pressure_profiles.py @@ -1,18 +1,7 @@ +""" Ad-hoc analysis that generates simulated pressure profiles from espresso +extraction data. """ -Information ---------------------------------------------------------------------- -Name : simulate_pressure_profiles.py -Location : ~/ -Author : Tom Eleff -Published : 2023-09-03 -Revised on : ~ -Description ---------------------------------------------------------------------- -Simulates espresso pressure profiles. -""" - -# Import modules import os import time import datetime as dt diff --git a/temp_pid.py b/temp_pid.py index d09ad06..eb31287 100644 --- a/temp_pid.py +++ b/temp_pid.py @@ -1,273 +1,266 @@ -""" -Information ---------------------------------------------------------------------- -Name : temp_pid.py -Location : ~/ -Author : Tom Eleff -Published : 2023-06-25 -Revised on : 2023-07-07 - -Description ---------------------------------------------------------------------- -Runs the temperature PID controller. -""" - -# Import modules +""" Temperature PID-controller """ + import os import sys import time import copy -import ospro.utils.utils as utils -import ospro.sensors.temp as temp - -# Initialize global variables -config = utils.read_config( - configLoc=os.path.join( - os.path.dirname(__file__), - 'config', - 'config.json' - ) -) +from pytensils import config +from ospro import logging, exceptions, errors +from ospro.platform import factory +from ospro.sensors import temp as ts +from ospro.controllers import temp as tc -# Configure environment -if not config['session']['dev']: +if __name__ == '__main__': - # Import GPIO module - import RPi.GPIO as GPIO + # Setup logging + Logging = logging.get_temperature_pid_logger() - # Define board mode - if not GPIO.getmode(): - GPIO.setmode(GPIO.BCM) - elif GPIO.getmode() == 10: - print('ERROR: Invalid GPIO mode (BOARD).') - sys.exit() - else: - pass + # Load configuration + Config = config.Handler( + path=os.path.join( + os.path.dirname(__file__), + 'config' + ) + ) - # Suppress GPIO warnings - # GPIO.setwarnings(False) + # Validate configuration + if Config.validate( + dtypes=config.Handler( + path=os.path.join( + os.path.dirname(__file__), + 'config' + ), + file_name='dtypes.json' + ).to_dict() + ): + + # Logging + Logging.debug('NOTE: Config validation completed successfully.') + + # Load the platform interface + try: + GPIO = factory.load_interface( + dev=Config.data['session']['dev'] + ) + except exceptions.InvalidPlatformError: + sys.exit(errors.PLATFORM_ERRNO) - # Setup GPIO pins + # Setup the interface GPIO.setup( - config['extraction']['pin'], + Config.data['extraction']['pin'], GPIO.IN, pull_up_down=GPIO.PUD_DOWN ) -# Main -if __name__ == '__main__': - # Initialize temperatore sensor - tSensor = temp.Sensor( - outputPin=config['tPID']['pin'] - ) - tSensor.initialize(config) + tSensor = ts.Sensor(dev=Config.data['session']['dev']) # Read temperature - previousTemperature = tSensor.read_temp(config) + previous_temperature = tSensor.read( + set_point=Config.data['tPID']['setPoint'] + ) # Set initial parameters - previousTime = time.time() + previous_time = time.time() integral = 0 - previousError = 0 + previous_error = 0 - # Set intitial pulse width modulation output - if not config['session']['dev']: - tController = temp.Controller( - outputPin=config['tPID']['pin'] - ) - tController.initialize(config) - tController.start() + # Set intitial pulse-width modulation output + tController = tc.Controller( + dev=Config.data['session']['dev'], + output_pin=Config.data['tPID']['pin'] + ) # Startup delay time.sleep(0.001) - # Run - while config['session']['running']: + # Catch keyboard interrupt + try: - # Read config - config = utils.read_config( - configLoc=os.path.join( - config['session']['configLoc'], - 'config.json' - ) - ) + # Run + while Config.data['session']['running']: - try: + # Read config + Config.data = Config.read() # Read temperature - temperature = tSensor.read_temp(config) + try: + temperature = tSensor.read( + set_point=Config.data['tPID']['setPoint'] + ) + except RuntimeError: + sys.exit(errors.INTERFACE_ERRNO) - # Avoid unstable temperatures + # Pass if temperature is unstable if ( ( - abs(temperature - previousTemperature) >= - config['tPID']['error'] + abs(temperature - previous_temperature) + >= Config.data['tPID']['error'] ) ): - # Reset parameters - currentTime = time.time() - print( - "{:<{len0}} {:<{len1}} {:<{len2}} {:<{len3}}".format( - 'Status: Reset', - 'Temp: (%.2f, %.2f)' % ( + # Pass + # previous_error = 0 + # integral = 0 + # output = 0 + current_time = time.time() + Logging.debug( + "{:<{len0}} {:<{len1}} {:<{len2}} {:<{len3}}".format( + 'Status: Pass', + 'Temp: (%3.0f, %3.0f)' % ( temperature, - config['tPID']['setPoint'] + Config.data['tPID']['setPoint'] ), 'PWM: N/A', 'PID: [-.--, -.--, -.--]', - len0=17, - len1=26, - len2=14, - len3=26 + len0=13, + len1=16, + len2=10, + len3=22 ) ) - # Calculate pulse width modulation - else: + # Set full output if temperature is below the dead-zone + elif temperature < int( + ( + Config.data['tPID']['setPoint'] + - Config.data['tPID']['deadZoneRange'] + ) + ): - # Set max output if temperature is below the dead-zone - if temperature < int( - ( - config['tPID']['setPoint'] - - config['tPID']['deadZoneRange'] - ) - ): - - # Reset parameters - previousError = 0 - integral = 0 - output = 100 - currentTime = time.time() - - # Output parameters - print( - "{:<{len0}} {:<{len1}} {:<{len2}} {:<{len3}}".format( - 'Status: Under', - 'Temp: (%.2f, %.2f)' % ( - temperature, - config['tPID']['setPoint'] - ), - 'PWM: %s %%' % (output), - 'PID: [%.2f, %.2f, %.2f]' % ( - 0, - 0, - 0 - ), - len0=17, - len1=26, - len2=14, - len3=26 - ) + # Reset parameters + previous_error = 0 + integral = 0 + output = 100 + current_time = time.time() + + # Logging + Logging.debug( + "{:<{len0}} {:<{len1}} {:<{len2}} {:<{len3}}".format( + 'Status: Under', + 'Temp: (%3.0f, %3.0f)' % ( + temperature, + Config.data['tPID']['setPoint'] + ), + 'PWM: %3.0f %%' % (output), + 'PID: [%.2f, %.2f, %.2f]' % ( + 0, + 0, + 0 + ), + len0=13, + len1=16, + len2=10, + len3=22 ) + ) - # Set zero output if temperature is above max temperature - elif temperature >= 115: - - # Reset parameters - previousError = 0 - integral = 0 - output = 0 - currentTime = time.time() - - # Output parameters - print( - "{:<{len0}} {:<{len1}} {:<{len2}} {:<{len3}}".format( - 'Status: Over', - 'Temp: (%.2f, %.2f)' % ( - temperature, - config['tPID']['setPoint'] - ), - 'PWM: %s %%' % (output), - 'PID: [%.2f, %.2f, %.2f]' % ( - 0, - 0, - 0 - ), - len0=17, - len1=26, - len2=14, - len3=26 - ) - ) + # Set zero output if temperature is above the set-point + elif temperature >= int( + Config.data['tPID']['setPoint'] + + ts.ACCURACY + ): - # Calculate output - else: - - # Calculate error - error = round(config['tPID']['setPoint'] - temperature, 2) - - # Calculate proportional output - pOut = config['tPID']['p'] * error - - # Calculate integral output - currentTime = time.time() - deltaTime = currentTime - previousTime - integral += (error * deltaTime) - iOut = (config['tPID']['i'] * integral) - - # Calculate derivative output - deltaError = error - previousError - derivative = (deltaError/deltaTime) - dOut = (config['tPID']['d'] * derivative) - - output = max(min(int(pOut + iOut + dOut), 100), 0) - - # Output parameters - print( - "{:<{len0}} {:<{len1}} {:<{len2}} {:<{len3}}".format( - 'Status: Valid', - 'Temp: (%.2f, %.2f)' % ( - temperature, - config['tPID']['setPoint'] - ), - 'PWM: %s %%' % (output), - 'PID: [%.2f, %.2f, %.2f]' % ( - abs(max(pOut, 0)), - abs(max(iOut, 0)), - abs(max(dOut, 0)) - ), - len0=17, - len1=26, - len2=14, - len3=26 - ) + # Reset parameters + previous_error = 0 + integral = 0 + output = 0 + current_time = time.time() + + # Logging + Logging.debug( + "{:<{len0}} {:<{len1}} {:<{len2}} {:<{len3}}".format( + 'Status: Over', + 'Temp: (%3.0f, %3.0f)' % ( + temperature, + Config.data['tPID']['setPoint'] + ), + 'PWM: %3.0f %%' % (output), + 'PID: [%.2f, %.2f, %.2f]' % ( + 0, + 0, + 0 + ), + len0=13, + len1=16, + len2=10, + len3=22 ) + ) - # Set pulse width modulation output - if not config['session']['dev']: + # Calculate output + else: + + # Calculate error + error = round( + Config.data['tPID']['setPoint'] - temperature, + 2 + ) - # Set default during extraction - if GPIO.input( - config['extraction']['pin'] - ): - output = 10 + # Calculate the proportional output + p_out = Config.data['tPID']['p'] * error - # Update duty cycle - tController.update_duty_cycle(output) + # Calculate the integral output + current_time = time.time() + delta_time = current_time - previous_time + integral += (error * delta_time) + i_out = (Config.data['tPID']['i'] * integral) + + # Calculate the derivative output + delta_error = error - previous_error + derivative = (delta_error / delta_time) + d_out = (Config.data['tPID']['d'] * derivative) + + # Calculate the total output + output = max(min(int(p_out + i_out + d_out), 100), 0) + + # Logging + Logging.debug( + "{:<{len0}} {:<{len1}} {:<{len2}} {:<{len3}}".format( + 'Status: Valid', + 'Temp: (%3.0f, %3.0f)' % ( + temperature, + Config.data['tPID']['setPoint'] + ), + 'PWM: %3.0f %%' % (output), + 'PID: [%.2f, %.2f, %.2f]' % ( + abs(max(p_out, 0)), + abs(max(i_out, 0)), + abs(max(d_out, 0)) + ), + len0=13, + len1=16, + len2=10, + len3=22 + ) + ) + + # Set default during extraction + if GPIO.input(Config.data['extraction']['pin']): + output = 10 + + # Update the duty-cycle + tController.update_duty_cycle(output) # Recalculate time delta for delay - deltaTime = (time.time() - previousTime) + delta_time = (time.time() - previous_time) # Update parameters - previousTemperature = copy.deepcopy(temperature) - previousTime = copy.deepcopy(currentTime) + previous_temperature = copy.deepcopy(temperature) + previous_time = copy.deepcopy(current_time) # Delay - time.sleep(config['tPID']['sampleRate']) - - except KeyboardInterrupt: - - # Terminate pulse width modulation & cleanup - if not config['session']['dev']: - tController.stop() - GPIO.cleanup() + time.sleep(Config.data['tPID']['sampleRate']) - # Exit - sys.exit() + except KeyboardInterrupt: - # Terminate pulse width modulation & cleanup - if not config['session']['dev']: + # Terminate pulse-width modulation & cleanup tController.stop() GPIO.cleanup() + + # Exit + sys.exit() + + # Terminate pulse-width modulation & cleanup + tController.stop() + GPIO.cleanup()