diff --git a/sensor/src/ezo/driver/error.rs b/sensor/src/ezo/driver/error.rs index adb8648..7c73fcf 100644 --- a/sensor/src/ezo/driver/error.rs +++ b/sensor/src/ezo/driver/error.rs @@ -1,6 +1,5 @@ use std::fmt; -#[allow(unused)] #[derive(Debug)] pub enum DriverError { Connection(String), diff --git a/sensor/src/ezo/driver/i2c.rs b/sensor/src/ezo/driver/i2c.rs index 3ba845c..edfe0a5 100644 --- a/sensor/src/ezo/driver/i2c.rs +++ b/sensor/src/ezo/driver/i2c.rs @@ -1,6 +1,5 @@ use crate::i2c_bus::I2cConnection; -#[allow(unused)] pub struct I2cDriver { pub connection: I2cConnection, } diff --git a/sensor/src/ezo/driver/mod.rs b/sensor/src/ezo/driver/mod.rs index 4dc80c5..603a36a 100644 --- a/sensor/src/ezo/driver/mod.rs +++ b/sensor/src/ezo/driver/mod.rs @@ -28,7 +28,6 @@ pub struct DeviceInfo { pub firmware_version: f64, } -#[allow(unused)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Status { PoweredOn, @@ -52,6 +51,5 @@ pub trait CommandTransport { pub trait Driver: CommandTransport { fn connection_info(&self) -> SensorConnection; fn device_info(&mut self) -> Result; - #[allow(unused)] fn status(&mut self) -> Result; } diff --git a/sensor/src/ezo/rtd.rs b/sensor/src/ezo/rtd.rs index 6f37c9a..6ebe59a 100644 --- a/sensor/src/ezo/rtd.rs +++ b/sensor/src/ezo/rtd.rs @@ -1,5 +1,4 @@ use chrono::Utc; -use std::fmt; use std::sync::Mutex; use crate::ezo::driver::{uart::UartDriver, Driver}; @@ -22,6 +21,7 @@ impl EzoSensor for Rtd { &self.driver } + // TODO: set the TemperatureOutput solution here fn data_range(&self) -> (f32, f32) { (-126.0, 1254.0) } diff --git a/sensor/src/lib.rs b/sensor/src/lib.rs index c5cef94..08965d8 100644 --- a/sensor/src/lib.rs +++ b/sensor/src/lib.rs @@ -4,3 +4,4 @@ pub mod ezo; pub mod i2c_bus; pub mod sensor; pub mod serial_port; +pub mod services; diff --git a/sensor/src/main.rs b/sensor/src/main.rs index 0d22451..7ee8377 100644 --- a/sensor/src/main.rs +++ b/sensor/src/main.rs @@ -1,299 +1,7 @@ -mod error; -mod ezo; -mod i2c_bus; -mod sensor; -mod serial_port; - -use ezo::driver::uart::UartDriver; -use ezo::driver::{DeviceType, Driver}; -use ezo::rtd::Rtd; -use sensor::Sensor; -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; -use tokio::sync::mpsc::Sender; -use tokio::sync::{mpsc, oneshot}; -use tokio::task::JoinHandle; -use tokio::time::{interval, Duration as TokioDuration}; - -use crate::sensor::SensorConnection; -use crate::serial_port::SerialPortMetadata; - -/// A sensor list compatible with both UART and I2C protocols. -pub type SensorList = HashMap>; - -#[allow(unused)] -enum ServiceCommand { - /// Add sensors in the registry (no replacement) - AddSensors { - sensors: Vec<(String, Arc)>, - }, - /// Remove sensors from the registry - RemoveSensors { uuids: Vec }, - /// Get a specific sensor by serial number - FindSensor { - serial_number: String, - respond_to: oneshot::Sender>>, - }, - /// Get all sensors (snapshot) - AllSensors { - respond_to: oneshot::Sender>, - }, -} - -pub struct CommandChannel { - tx: mpsc::Sender, - rx: mpsc::Receiver, -} - -/// Supervisor service that maintains the list of sensors -pub struct UartService { - sensors: SensorList, - sensor_tasks: HashMap>, - cmd_channel: CommandChannel, -} - -impl Default for UartService { - fn default() -> Self { - Self::new() - } -} - -impl UartService { - pub fn new() -> Self { - let (tx, rx) = mpsc::channel(100); - - Self { - sensors: HashMap::new(), - sensor_tasks: HashMap::new(), - cmd_channel: CommandChannel { tx, rx }, - } - } - - /// Main supervisor loop - maintains sensor registry - pub async fn run(mut self) { - let cmd_tx = self.cmd_channel.tx.clone(); - - // Spawn tasks to detect (un)plugged sensors and update the registry - self.detect_plugged_sensors(cmd_tx.clone()); - self.detect_unplugged_sensors(cmd_tx.clone()); - - println!("UartService started - maintaining sensor registry"); - - // Main event loop - just manages the HashMap - loop { - tokio::select! { - Some(cmd) = self.cmd_channel.rx.recv() => { - self.handle_cmd(cmd); - } - - _ = tokio::signal::ctrl_c() => { - println!("Shutting down sensor registry..."); - self.abort_all_sensor_tasks(); - break; - } - } - } - } - - /// Handle commands to maintain sensor list - fn handle_cmd(&mut self, cmd: ServiceCommand) { - match cmd { - ServiceCommand::AddSensors { sensors } => { - println!("Registry: Adding up to {} sensors", sensors.len()); - for (uuid, sensor) in sensors { - if self.sensors.contains_key(&uuid) { - println!("Registry: Sensor {uuid} already exists, skipping add"); - continue; - } - - let task = Arc::clone(&sensor).run(); - self.sensor_tasks.insert(uuid.clone(), task); - self.sensors.insert(uuid, sensor); - } - println!("Registry: Total sensors = {}", self.sensors.len()); - } - - ServiceCommand::RemoveSensors { uuids } => { - println!("Registry: Removing {} sensors", uuids.len()); - for uuid in &uuids { - if let Some(task) = self.sensor_tasks.remove(uuid) { - task.abort(); - } - self.sensors.remove(uuid); - } - println!("Registry: Total sensors = {}", self.sensors.len()); - } - - ServiceCommand::FindSensor { - serial_number, - respond_to, - } => { - let sensor = self.sensors.get(&serial_number).cloned(); - let _ = respond_to.send(sensor); - } - - ServiceCommand::AllSensors { respond_to } => { - println!( - "Registry: Providing snapshot of all sensors ({} total)", - self.sensors.len() - ); - let _ = respond_to.send(Arc::new(self.sensors.clone())); - } - } - } - - fn abort_all_sensor_tasks(&mut self) { - for (_, task) in self.sensor_tasks.drain() { - task.abort(); - } - } - - /// Listen for plugged sensors. - /// - /// Finds new USB sensors and adds them to registry. - fn detect_plugged_sensors(&self, cmd_tx: Sender) { - tokio::spawn(async move { - let mut interval = interval(TokioDuration::from_secs(2)); - - loop { - interval.tick().await; - - println!("Detector: Scanning for sensors..."); - let asc_ports = serial_port::find_asc_port(); - - if !asc_ports.is_empty() { - println!("Detector: Found {} ASC ports", asc_ports.len()); - } - - // Get current sensor list - let (respond_to, rx) = oneshot::channel(); - let _ = cmd_tx.send(ServiceCommand::AllSensors { respond_to }).await; - let current_sensors = rx.await; - - if let Ok(current_sensors) = current_sensors { - let mut new_sensors: Vec<(String, Arc)> = Vec::new(); - - for port in asc_ports.iter() { - if !current_sensors.contains_key(&port.serial_number) { - let sensor = create_sensor_from_port(port); - println!( - "Detector: Created sensor {}: {:#?}", - port.serial_number, - sensor.as_ref().map(|s| s.info()) - ); - - match sensor { - Ok(sensor) => { - let data = sensor.info(); - println!( - "Detector: Created sensor - firmware v{}", - data.firmware - ); - new_sensors.push((port.serial_number.clone(), sensor)); - } - Err(e) => { - eprintln!( - "Detector: Failed to create sensor {}: {}", - port.serial_number, e - ); - } - } - } - } - - if !new_sensors.is_empty() { - let _ = cmd_tx - .send(ServiceCommand::AddSensors { - sensors: new_sensors, - }) - .await; - } - } - } - }); - } - - /// Listen for unplugged sensors. - /// - /// Compare the list of sensors'connection with OS connections for both - /// UART and I2C and remove stale sensor from the list. - fn detect_unplugged_sensors(&self, cmd_tx: Sender) { - tokio::spawn(async move { - let mut interval = interval(TokioDuration::from_secs(2)); - - loop { - interval.tick().await; - - let available_asc_ports = serial_port::find_asc_port(); - let available_port_serials: HashSet<_> = available_asc_ports - .iter() - .map(|port| &port.serial_number) - .collect(); - - // Get current sensor list - let (respond_to, rx) = oneshot::channel(); - let _ = cmd_tx.send(ServiceCommand::AllSensors { respond_to }).await; - let current_sensors = rx.await; - - if let Ok(sensors) = current_sensors { - for sensor in sensors.values() { - let connection_info = &sensor.info().connection; - - match connection_info { - SensorConnection::Uart(port_metadata) => { - if !available_port_serials.contains(&port_metadata.serial_number) { - println!( - "Detector: Sensor {} is unplugged, removing from registry", - port_metadata.serial_number - ); - let _ = cmd_tx - .send(ServiceCommand::RemoveSensors { - uuids: vec![port_metadata.serial_number.clone()], - }) - .await; - } - } - SensorConnection::I2c(_) => { - unimplemented!("No I2C sensor handling yet"); - } - } - } - } - } - }); - } -} - -/// Factory function to create a sensor from a serial port -/// -/// This function: -/// 1. Creates a temporary UART driver -/// 2. Queries device info to determine sensor type -/// 3. Creates the appropriate sensor with the correct driver -/// 4. Returns it as Arc -fn create_sensor_from_port( - port: &SerialPortMetadata, -) -> Result, Box> { - // Create temporary driver to query device type - let mut uart_driver = UartDriver::new(port)?; - let device_info = uart_driver.device_info()?; - - println!( - "Factory: Detected {:?} sensor v{}", - device_info.device_type, device_info.firmware_version - ); - - // Create appropriate sensor based on device type - match device_info.device_type { - DeviceType::Rtd => { - let rtd = Rtd::::from_uart(uart_driver, device_info.firmware_version); - Ok(Arc::new(rtd) as Arc) - } - } -} +use arksync_sensor::services::SensorService; #[tokio::main] async fn main() { println!("Starting ArkSync Sensor Service..."); - UartService::new().run().await; + SensorService::new().run().await; } diff --git a/sensor/src/services/calibration/calibration_service.rs b/sensor/src/services/calibration/calibration_service.rs new file mode 100644 index 0000000..4c3fff8 --- /dev/null +++ b/sensor/src/services/calibration/calibration_service.rs @@ -0,0 +1 @@ +// to keep diff --git a/sensor/src/services/calibration/mod.rs b/sensor/src/services/calibration/mod.rs new file mode 100644 index 0000000..da521ab --- /dev/null +++ b/sensor/src/services/calibration/mod.rs @@ -0,0 +1 @@ +mod calibration_service; diff --git a/sensor/src/services/mod.rs b/sensor/src/services/mod.rs new file mode 100644 index 0000000..ecf9e0c --- /dev/null +++ b/sensor/src/services/mod.rs @@ -0,0 +1,4 @@ +mod calibration; +mod sensor; + +pub use sensor::SensorService; diff --git a/sensor/src/services/sensor/mod.rs b/sensor/src/services/sensor/mod.rs new file mode 100644 index 0000000..e7014d5 --- /dev/null +++ b/sensor/src/services/sensor/mod.rs @@ -0,0 +1,7 @@ +mod plugged_sensors; +mod sensor_service; +mod unplugged_sensors; + +pub use plugged_sensors::detect_plugged_sensors_task; +pub use sensor_service::*; +pub use unplugged_sensors::detect_unplugged_sensors; diff --git a/sensor/src/services/sensor/plugged_sensors.rs b/sensor/src/services/sensor/plugged_sensors.rs new file mode 100644 index 0000000..4f466c7 --- /dev/null +++ b/sensor/src/services/sensor/plugged_sensors.rs @@ -0,0 +1,101 @@ +use crate::ezo::driver::uart::UartDriver; +use crate::ezo::driver::{DeviceType, Driver}; +use crate::ezo::rtd::Rtd; +use crate::sensor::Sensor; +use crate::services::sensor::SensorServiceCmd; +use std::sync::Arc; +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; +use tokio::time::{interval, Duration as TokioDuration}; + +use crate::serial_port::{self, SerialPortMetadata}; + +/// Listen for plugged sensors. +/// +/// Finds new USB sensors and adds them to registry. +pub async fn detect_plugged_sensors_task(cmd_tx: Sender) { + let mut interval = interval(TokioDuration::from_secs(2)); + + loop { + interval.tick().await; + + println!("Detector: Scanning for sensors..."); + let asc_ports = serial_port::find_asc_port(); + + if !asc_ports.is_empty() { + println!("Detector: Found {} ASC ports", asc_ports.len()); + } + + // Get current sensor list + let (respond_to, rx) = oneshot::channel(); + let _ = cmd_tx + .send(SensorServiceCmd::AllSensors { respond_to }) + .await; + let current_sensors = rx.await; + + if let Ok(current_sensors) = current_sensors { + let mut new_sensors: Vec<(String, Arc)> = Vec::new(); + + for port in asc_ports.iter() { + if !current_sensors.contains_key(&port.serial_number) { + let sensor = create_sensor_from_port(port); + println!( + "Detector: Created sensor {}: {:#?}", + port.serial_number, + sensor.as_ref().map(|s| s.info()) + ); + + match sensor { + Ok(sensor) => { + let data = sensor.info(); + println!("Detector: Created sensor - firmware v{}", data.firmware); + new_sensors.push((port.serial_number.clone(), sensor)); + } + Err(e) => { + eprintln!( + "Detector: Failed to create sensor {}: {}", + port.serial_number, e + ); + } + } + } + } + + if !new_sensors.is_empty() { + let _ = cmd_tx + .send(SensorServiceCmd::AddSensors { + sensors: new_sensors, + }) + .await; + } + } + } +} + +/// Factory function to create a sensor from a serial port +/// +/// This function: +/// 1. Creates a temporary UART driver +/// 2. Queries device info to determine sensor type +/// 3. Creates the appropriate sensor with the correct driver +/// 4. Returns it as Arc +fn create_sensor_from_port( + port: &SerialPortMetadata, +) -> Result, Box> { + // Create temporary driver to query device type + let mut uart_driver = UartDriver::new(port)?; + let device_info = uart_driver.device_info()?; + + println!( + "Factory: Detected {:?} sensor v{}", + device_info.device_type, device_info.firmware_version + ); + + // Create appropriate sensor based on device type + match device_info.device_type { + DeviceType::Rtd => { + let rtd = Rtd::::from_uart(uart_driver, device_info.firmware_version); + Ok(Arc::new(rtd) as Arc) + } + } +} diff --git a/sensor/src/services/sensor/sensor_service.rs b/sensor/src/services/sensor/sensor_service.rs new file mode 100644 index 0000000..9b395bb --- /dev/null +++ b/sensor/src/services/sensor/sensor_service.rs @@ -0,0 +1,144 @@ +use super::detect_plugged_sensors_task; +use crate::sensor::Sensor; +use crate::services::sensor::detect_unplugged_sensors; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; + +/// A sensor list compatible with both UART and I2C protocols. +pub type SensorList = HashMap>; + +pub enum SensorServiceCmd { + /// Add sensors in the registry (no replacement) + AddSensors { + sensors: Vec<(String, Arc)>, + }, + /// Remove sensors from the registry + RemoveSensors { uuids: Vec }, + #[expect(unused)] + /// Get a specific sensor by serial number + FindSensor { + serial_number: String, + respond_to: oneshot::Sender>>, + }, + /// Get all sensors (snapshot) + AllSensors { + respond_to: oneshot::Sender>, + }, +} + +pub struct CommandChannel { + tx: mpsc::Sender, + rx: mpsc::Receiver, +} + +/// Supervisor service that maintains the list of sensors +pub struct SensorService { + sensors: SensorList, + sensor_tasks: HashMap>, + cmd_channel: CommandChannel, +} + +impl Default for SensorService { + fn default() -> Self { + Self::new() + } +} + +impl SensorService { + pub fn new() -> Self { + let (tx, rx) = mpsc::channel(100); + + Self { + sensors: HashMap::new(), + sensor_tasks: HashMap::new(), + cmd_channel: CommandChannel { tx, rx }, + } + } + + /// Main supervisor loop - maintains sensor registry + pub async fn run(mut self) { + let cmd_tx = self.cmd_channel.tx.clone(); + + println!("UartService started - maintaining sensor registry"); + + let main_loop = async move { + loop { + tokio::select! { + Some(cmd) = self.cmd_channel.rx.recv() => { + self.handle_cmd(cmd); + } + + // healthcheck => if this one dies then we stop the process + + _ = tokio::signal::ctrl_c() => { + println!("Shutting down sensor registry..."); + self.abort_all_sensor_tasks(); + break; + } + } + } + }; + + // TODO: check for mutex contention across awaits + tokio::join!( + main_loop, + detect_plugged_sensors_task(cmd_tx.clone()), + detect_unplugged_sensors(cmd_tx.clone()) + ); + } + + /// Handle commands to maintain sensor list + fn handle_cmd(&mut self, cmd: SensorServiceCmd) { + match cmd { + SensorServiceCmd::AddSensors { sensors } => { + println!("Registry: Adding up to {} sensors", sensors.len()); + for (uuid, sensor) in sensors { + if self.sensors.contains_key(&uuid) { + println!("Registry: Sensor {uuid} already exists, skipping add"); + continue; + } + + let task = Arc::clone(&sensor).run(); + self.sensor_tasks.insert(uuid.clone(), task); + self.sensors.insert(uuid, sensor); + } + println!("Registry: Total sensors = {}", self.sensors.len()); + } + + SensorServiceCmd::RemoveSensors { uuids } => { + println!("Registry: Removing {} sensors", uuids.len()); + for uuid in &uuids { + if let Some(task) = self.sensor_tasks.remove(uuid) { + task.abort(); + } + self.sensors.remove(uuid); + } + println!("Registry: Total sensors = {}", self.sensors.len()); + } + + SensorServiceCmd::FindSensor { + serial_number, + respond_to, + } => { + let sensor = self.sensors.get(&serial_number).cloned(); + let _ = respond_to.send(sensor); + } + + SensorServiceCmd::AllSensors { respond_to } => { + println!( + "Registry: Providing snapshot of all sensors ({} total)", + self.sensors.len() + ); + let _ = respond_to.send(Arc::new(self.sensors.clone())); + } + } + } + + fn abort_all_sensor_tasks(&mut self) { + for (_, task) in self.sensor_tasks.drain() { + task.abort(); + } + } +} diff --git a/sensor/src/services/sensor/unplugged_sensors.rs b/sensor/src/services/sensor/unplugged_sensors.rs new file mode 100644 index 0000000..cf98df9 --- /dev/null +++ b/sensor/src/services/sensor/unplugged_sensors.rs @@ -0,0 +1,58 @@ +use crate::sensor::SensorConnection; +use crate::services::sensor::SensorServiceCmd; +use std::collections::HashSet; +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; +use tokio::time::{interval, Duration as TokioDuration}; + +use crate::serial_port::{self}; + +/// Listen for unplugged sensors. +/// +/// Compare the list of sensors'connection with OS connections for both +/// UART and I2C and remove stale sensor from the list. +pub async fn detect_unplugged_sensors(cmd_tx: Sender) { + let mut interval = interval(TokioDuration::from_secs(2)); + + loop { + interval.tick().await; + + let available_asc_ports = serial_port::find_asc_port(); + let available_port_serials: HashSet<_> = available_asc_ports + .iter() + .map(|port| &port.serial_number) + .collect(); + + // Get current sensor list + let (respond_to, rx) = oneshot::channel(); + let _ = cmd_tx + .send(SensorServiceCmd::AllSensors { respond_to }) + .await; + let current_sensors = rx.await; + + if let Ok(sensors) = current_sensors { + for sensor in sensors.values() { + let connection_info = &sensor.info().connection; + + match connection_info { + SensorConnection::Uart(port_metadata) => { + if !available_port_serials.contains(&port_metadata.serial_number) { + println!( + "Detector: Sensor {} is unplugged, removing from registry", + port_metadata.serial_number + ); + let _ = cmd_tx + .send(SensorServiceCmd::RemoveSensors { + uuids: vec![port_metadata.serial_number.clone()], + }) + .await; + } + } + SensorConnection::I2c(_) => { + unimplemented!("No I2C sensor handling yet"); + } + } + } + } + } +}