From 9f05b1f74e3a5fab3c0849c80fd97795a15b2123 Mon Sep 17 00:00:00 2001 From: theredfish Date: Fri, 12 Dec 2025 16:47:10 +0100 Subject: [PATCH 01/12] Init sensor crate with Tokio --- Cargo.lock | 192 ++++++++++++++++++++--------- Cargo.toml | 2 +- bash_scripts/setup/99-ttyusb.rules | 1 + bash_scripts/setup/setup.sh | 12 ++ sensor/Cargo.toml | 13 ++ sensor/src/commands/mod.rs | 1 + sensor/src/commands/uart.rs | 3 + sensor/src/lib.rs | 3 + sensor/src/main.rs | 153 +++++++++++++++++++++++ sensor/src/sensor.rs | 16 +++ sensor/src/serial_port.rs | 22 ++++ src-tauri/Cargo.toml | 2 +- 12 files changed, 359 insertions(+), 61 deletions(-) create mode 100644 bash_scripts/setup/99-ttyusb.rules create mode 100644 bash_scripts/setup/setup.sh create mode 100644 sensor/Cargo.toml create mode 100644 sensor/src/commands/mod.rs create mode 100644 sensor/src/commands/uart.rs create mode 100644 sensor/src/lib.rs create mode 100644 sensor/src/main.rs create mode 100644 sensor/src/sensor.rs create mode 100644 sensor/src/serial_port.rs diff --git a/Cargo.lock b/Cargo.lock index ef453ac..0cf486d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,15 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "addr2line" -version = "0.24.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" -dependencies = [ - "gimli", -] - [[package]] name = "adler2" version = "2.0.0" @@ -127,6 +118,14 @@ dependencies = [ "tokio", ] +[[package]] +name = "arksync-sensor" +version = "0.1.0" +dependencies = [ + "serialport", + "tokio", +] + [[package]] name = "arksync-ui" version = "0.1.0" @@ -362,21 +361,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" -[[package]] -name = "backtrace" -version = "0.3.74" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" -dependencies = [ - "addr2line", - "cfg-if", - "libc", - "miniz_oxide", - "object", - "rustc-demangle", - "windows-targets 0.52.6", -] - [[package]] name = "base64" version = "0.21.7" @@ -1149,7 +1133,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -1741,12 +1725,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "gimli" -version = "0.31.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" - [[package]] name = "gio" version = "0.18.4" @@ -2141,7 +2119,7 @@ dependencies = [ "http-body", "hyper", "pin-project-lite", - "socket2", + "socket2 0.5.8", "tokio", "tower-service", "tracing", @@ -2368,6 +2346,16 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71dd52191aae121e8611f1e8dc3e324dd0dd1dee1e6dd91d10ee07a3cfb4d9d8" +[[package]] +name = "io-kit-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "617ee6cf8e3f66f3b4ea67a4058564628cde41901316e19f559e14c7c72c5e7b" +dependencies = [ + "core-foundation-sys", + "mach2", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -2735,9 +2723,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.170" +version = "0.2.180" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" [[package]] name = "libloading" @@ -2760,6 +2748,26 @@ dependencies = [ "redox_syscall", ] +[[package]] +name = "libudev" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b324152da65df7bb95acfcaab55e3097ceaab02fb19b228a9eb74d55f135e0" +dependencies = [ + "libc", + "libudev-sys", +] + +[[package]] +name = "libudev-sys" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c8469b4a23b962c1396b9b451dda50ef5b283e8dd309d69033475fa9b334324" +dependencies = [ + "libc", + "pkg-config", +] + [[package]] name = "linear-map" version = "1.2.0" @@ -2809,6 +2817,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" +[[package]] +name = "mach2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d640282b302c0bb0a2a8e0233ead9035e3bed871f0b7e81fe4a1ec829765db44" +dependencies = [ + "libc", +] + [[package]] name = "manyhow" version = "0.11.4" @@ -2988,6 +3005,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60993920e071b0c9b66f14e2b32740a4e27ffc82854dcd72035887f336a09a28" +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", +] + [[package]] name = "nix" version = "0.29.0" @@ -3319,15 +3347,6 @@ dependencies = [ "objc2-foundation 0.3.0", ] -[[package]] -name = "object" -version = "0.36.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" -dependencies = [ - "memchr", -] - [[package]] name = "oco_ref" version = "0.2.0" @@ -3933,7 +3952,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2", + "socket2 0.5.8", "thiserror 2.0.12", "tokio", "tracing", @@ -3968,16 +3987,16 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.5.8", "tracing", "windows-sys 0.59.0", ] [[package]] name = "quote" -version = "1.0.41" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" dependencies = [ "proc-macro2", ] @@ -4362,12 +4381,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "rustc-demangle" -version = "0.1.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" - [[package]] name = "rustc-hash" version = "2.1.1" @@ -4710,6 +4723,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "serialport" +version = "4.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21f60a586160667241d7702c420fc223939fb3c0bb8d3fac84f78768e8970dee" +dependencies = [ + "bitflags 2.9.0", + "cfg-if", + "core-foundation", + "core-foundation-sys", + "io-kit-sys", + "libudev", + "mach2", + "nix 0.26.4", + "quote", + "scopeguard", + "unescaper", + "windows-sys 0.52.0", +] + [[package]] name = "server_fn" version = "0.8.2" @@ -4863,6 +4896,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + [[package]] name = "softbuffer" version = "0.4.6" @@ -5602,17 +5645,30 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.44.1" +version = "1.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" +checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86" dependencies = [ - "backtrace", "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", - "socket2", - "windows-sys 0.52.0", + "signal-hook-registry", + "socket2 0.6.1", + "tokio-macros", + "windows-sys 0.61.2", +] + +[[package]] +name = "tokio-macros" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.108", ] [[package]] @@ -5829,6 +5885,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "unescaper" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4064ed685c487dbc25bd3f0e9548f2e34bab9d18cefc700f9ec2dba74ba1138e" +dependencies = [ + "thiserror 2.0.12", +] + [[package]] name = "unic-char-property" version = "0.9.0" @@ -6478,6 +6543,15 @@ dependencies = [ "windows-targets 0.53.5", ] +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link 0.2.1", +] + [[package]] name = "windows-targets" version = "0.42.2" @@ -6930,7 +7004,7 @@ dependencies = [ "futures-core", "futures-lite", "hex", - "nix", + "nix 0.29.0", "ordered-stream", "serde", "serde_repr", diff --git a/Cargo.toml b/Cargo.toml index 5eb8c54..3fa8ac7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ eyre = "0.6" ndarray = "0.17.1" [workspace] -members = ["src-tauri"] +members = ["src-tauri", "sensor"] # Waiting for a more stable version of the software, we allow some dead code and # unused variables. diff --git a/bash_scripts/setup/99-ttyusb.rules b/bash_scripts/setup/99-ttyusb.rules new file mode 100644 index 0000000..0833a48 --- /dev/null +++ b/bash_scripts/setup/99-ttyusb.rules @@ -0,0 +1 @@ +KERNEL=="ttyUSB0", GROUP="${USER}", MODE="0666" diff --git a/bash_scripts/setup/setup.sh b/bash_scripts/setup/setup.sh new file mode 100644 index 0000000..aa56442 --- /dev/null +++ b/bash_scripts/setup/setup.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# sudo addgroup dialout +# sudo usermod -a -G dialout $USER +# +# GID 100 or 1000 depending on distro +# The others are reserved +# +# cp ./99-ttyusb.rules /etc/udev/rules.d + +sudo udevadm control --reload-rules +sudo udevadm trigger diff --git a/sensor/Cargo.toml b/sensor/Cargo.toml new file mode 100644 index 0000000..ebf1e0e --- /dev/null +++ b/sensor/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "arksync-sensor" +version = "0.1.0" +edition = "2021" +license = "AGPL-3.0-or-later" + +# [lib] +# name = "arksync_lib" +# crate-type = ["staticlib", "cdylib", "rlib"] + +[dependencies] +serialport = "4.8.1" +tokio = { version = "1.49.0", features = ["full"] } diff --git a/sensor/src/commands/mod.rs b/sensor/src/commands/mod.rs new file mode 100644 index 0000000..8c20360 --- /dev/null +++ b/sensor/src/commands/mod.rs @@ -0,0 +1 @@ +pub mod uart; diff --git a/sensor/src/commands/uart.rs b/sensor/src/commands/uart.rs new file mode 100644 index 0000000..b2ea7b8 --- /dev/null +++ b/sensor/src/commands/uart.rs @@ -0,0 +1,3 @@ +pub fn device_info() { + todo!() +} diff --git a/sensor/src/lib.rs b/sensor/src/lib.rs new file mode 100644 index 0000000..6319833 --- /dev/null +++ b/sensor/src/lib.rs @@ -0,0 +1,3 @@ +pub mod commands; +pub mod sensor; +pub mod serial_port; diff --git a/sensor/src/main.rs b/sensor/src/main.rs new file mode 100644 index 0000000..5b66eff --- /dev/null +++ b/sensor/src/main.rs @@ -0,0 +1,153 @@ +mod sensor; +mod serial_port; + +use sensor::Sensor; +use std::collections::HashMap; +use tokio::sync::mpsc::Sender; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::{sleep, Duration}; + +use serial_port::*; + +// pub type SensorList = Arc>>; + +enum ServiceCommand { + AddSensor { + uuid: String, + sensor: Sensor, + }, + RemoveSensor { + uuid: String, + }, + GetSensor { + uuid: String, + respond_to: oneshot::Sender>, + }, + GetActiveSensors { + respond_to: oneshot::Sender>, + }, +} + +pub struct CommandChannel { + tx: mpsc::Sender, + rx: mpsc::Receiver, +} + +pub struct UartService { + sensors: HashMap, + cmd_channel: CommandChannel, +} + +impl UartService { + pub fn new() -> Self { + let (tx, rx) = mpsc::channel(100); + + Self { + sensors: HashMap::new(), + cmd_channel: CommandChannel { tx, rx }, + } + } + + /// Supervisor loop + pub async fn run(mut self) { + let cmd_tx = self.cmd_channel.tx.clone(); + + // Spawn background tasks + self.spawn_detector(cmd_tx.clone()); + self.spawn_healthcheck(cmd_tx.clone()); + self.spawn_reader_manager(cmd_tx.clone()); + + // Main supervisor loop + loop { + tokio::select! { + Some(cmd) = self.cmd_channel.rx.recv() => { + self.handle_cmd(cmd); + } + + // Graceful termination signal (Ctrl+C) + _ = tokio::signal::ctrl_c() => { + println!("Shutting down..."); + break; + } + } + } + } + + fn handle_cmd(&self, cmd: ServiceCommand) { + match cmd { + ServiceCommand::AddSensor { uuid, sensor } => { + println!("Adding sensor: {} - {:?}", uuid, sensor); + } + ServiceCommand::RemoveSensor { uuid } => { + println!("Removing sensor: {}", uuid); + } + ServiceCommand::GetSensor { uuid, respond_to } => { + println!("Getting sensor: {}", uuid); + let _ = respond_to.send(None); + } + ServiceCommand::GetActiveSensors { respond_to } => { + println!("Getting active sensors"); + let _ = respond_to.send(vec![]); + } + } + } + + fn spawn_detector(&self, cmd_tx: Sender) { + tokio::spawn(async move { + loop { + println!("Checking for sensors..."); + let atlas_sc_ports = serial_port::find_atlas_sc_port(); + println!("{atlas_sc_ports:#?}"); + sleep(Duration::from_secs(60)).await; + } + }); + } + + fn spawn_reader_manager(&self, cmd_tx: Sender) { + tokio::spawn(async move { + println!("Reading sensors..."); + sleep(Duration::from_secs(60)).await; + }); + } + + fn spawn_healthcheck(&self, cmd_tx: Sender) { + tokio::spawn(async move { + loop { + let (respond_to, rx) = oneshot::channel(); + + // TODO: warn on error + let _ = cmd_tx + .send(ServiceCommand::GetActiveSensors { respond_to }) + .await; + + if let Ok(sensors) = rx.await { + println!("Health check: {} sensors", sensors.len()); + } + + sleep(Duration::from_secs(20)).await; + } + }); + } +} + +// - A UartSensorService: this service is in charge of managing all kind of +// Atlas Scientific sensors. +// +// - A task to manage sensors among existing ones with filtering. When a new +// sensor is detected, it is added to the list. When the current USB ports don't +// include one of the sensors, it is removed from the list. This task only track +// some kind of unique information. +// +// - A task to healthcheck sensors and update their status. Healthy, Unhealthy, +// Lost. The status tracks the last status updated_at which helps to determine +// the state machine for passing from one state to another. +// +// - A task per healthy sensor to handle sensor values. A cancellation token +// should be used to terminate the task when a sensor status is Lost. +// +// - A task to handle commands sent to sensors and handle responses. + +#[tokio::main] +async fn main() { + UartService::new().run().await +} diff --git a/sensor/src/sensor.rs b/sensor/src/sensor.rs new file mode 100644 index 0000000..de54010 --- /dev/null +++ b/sensor/src/sensor.rs @@ -0,0 +1,16 @@ +#[derive(Debug, Clone, Copy)] +pub enum SensorKind { + Rtd, +} + +pub enum SensorName { + Unnamed, + Named(String), +} + +#[derive(Debug)] +pub struct Sensor { + pub kind: SensorKind, + pub firmware: f64, + pub name: Option, +} diff --git a/sensor/src/serial_port.rs b/sensor/src/serial_port.rs new file mode 100644 index 0000000..e2caa63 --- /dev/null +++ b/sensor/src/serial_port.rs @@ -0,0 +1,22 @@ +use serialport::{SerialPortInfo, SerialPortType}; + +pub fn find_atlas_sc_port() -> Vec { + serialport::available_ports() + .unwrap_or(Vec::new()) + .into_iter() + .filter(|port| is_atlas_sc_device(port)) + .collect::>() +} + +/// Checks if a port is an Atlas Scientific device +pub fn is_atlas_sc_device(port: &SerialPortInfo) -> bool { + match &port.port_type { + SerialPortType::UsbPort(usb_info) => { + // Atlas Scientific USB devices typically use FTDI chips + // FTDI Vendor ID: 0x0403 + // Common Product IDs: 0x6001 (FT232), 0x6015 (FT231X) + usb_info.vid == 0x0403 && (usb_info.pid == 0x6001 || usb_info.pid == 0x6015) + } + _ => false, + } +} diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 24ca344..d4d4aea 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -24,7 +24,7 @@ tauri-plugin-opener = "2" serde = { version = "1", features = ["derive"] } serde_json = "1" rand = "0.9" -tokio = { version = "1.44", features = ["time"] } +tokio = { version = "1.44.2", features = ["time"] } tauri-plugin-log = "2" log = "0.4" eyre = "0.6.12" From 0d2b6b60f46cf8bfccd746d1e1123ff210efe37d Mon Sep 17 00:00:00 2001 From: theredfish Date: Tue, 3 Feb 2026 00:20:02 +0100 Subject: [PATCH 02/12] Sensor detection Detect a sensor from UART mode and serial port. Maintain a list of connected sensors. This commit add the basic loop with a command-based pattern to maintain a sensor list. More efforts should then be spent in improving the API and extending the capabilities of the crate to manage sensors. --- Cargo.lock | 1 + sensor/Cargo.toml | 1 + sensor/src/commands/mod.rs | 2 + sensor/src/commands/uart.rs | 200 +++++++++++++++++++++++++++++++++++- sensor/src/main.rs | 163 ++++++++++++++++++++++------- sensor/src/sensor.rs | 96 ++++++++++++++++- sensor/src/serial_port.rs | 141 ++++++++++++++++++++++++- 7 files changed, 559 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0cf486d..774764a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -122,6 +122,7 @@ dependencies = [ name = "arksync-sensor" version = "0.1.0" dependencies = [ + "chrono", "serialport", "tokio", ] diff --git a/sensor/Cargo.toml b/sensor/Cargo.toml index ebf1e0e..40937f2 100644 --- a/sensor/Cargo.toml +++ b/sensor/Cargo.toml @@ -11,3 +11,4 @@ license = "AGPL-3.0-or-later" [dependencies] serialport = "4.8.1" tokio = { version = "1.49.0", features = ["full"] } +chrono = "0.4" diff --git a/sensor/src/commands/mod.rs b/sensor/src/commands/mod.rs index 8c20360..27b206c 100644 --- a/sensor/src/commands/mod.rs +++ b/sensor/src/commands/mod.rs @@ -1 +1,3 @@ pub mod uart; + +pub use uart::{CalibrationStatus, DeviceInfo, StatusCode, UartCommand}; diff --git a/sensor/src/commands/uart.rs b/sensor/src/commands/uart.rs index b2ea7b8..a057e91 100644 --- a/sensor/src/commands/uart.rs +++ b/sensor/src/commands/uart.rs @@ -1,3 +1,199 @@ -pub fn device_info() { - todo!() +//! UART commands for Atlas Scientific sensors +//! +//! # Example Usage +//! +//! ```no_run +//! use arksync_sensor::commands::UartCommand; +//! use arksync_sensor::serial_port::SerialPort; +//! +//! # fn main() -> Result<(), Box> { +//! // Create port metadata +//! let port = SerialPort { +//! port_name: "/dev/ttyUSB0".to_string(), +//! serial_number: "DP065KS3".to_string(), +//! }; +//! +//! // Connect to the sensor +//! let mut uart = UartCommand::connect(port)?; +//! +//! // Get device information +//! let info = uart.device_info()?; +//! println!("Device: {} v{}", info.device_type, info.firmware_version); +//! +//! // Read temperature +//! let temp = uart.read_temperature()?; +//! println!("Temperature: {:.2}°C", temp); +//! +//! // Check status +//! let status = uart.check_status()?; +//! println!("Status: {:?}", status); +//! +//! // Get calibration info +//! let cal = uart.get_calibration()?; +//! println!("Calibration points: {}", cal.calibration_points); +//! # Ok(()) +//! # } +//! ``` + +use crate::serial_port::{SerialPort, SerialPortConnection}; +use std::io; + +#[derive(Debug)] +pub struct UartCommand { + connection: SerialPortConnection, +} + +impl UartCommand { + /// Connect to a sensor via UART + pub fn connect(port: SerialPort) -> Result { + let connection = SerialPortConnection::open(&port)?; + Ok(Self { connection }) + } + + /// Create a UartCommand from an already-open connection + pub fn connect_with_connection( + connection: SerialPortConnection, + ) -> Result { + Ok(Self { connection }) + } + + /// Get device information (firmware version, device type) + /// + /// Retries up to 3 times if we get unexpected data (like temperature readings) + pub fn device_info(&mut self) -> io::Result { + const MAX_RETRIES: usize = 3; + + for attempt in 1..=MAX_RETRIES { + // Send "i" command to get device information + let response = self.connection.send_command("i")?; + + // Atlas Scientific response format: ?I,RTD,1.0 + // Format: ?I,, + if response.starts_with("?I,") { + let parts: Vec<&str> = response.trim_start_matches("?I,").split(',').collect(); + + if parts.len() >= 2 { + return Ok(DeviceInfo { + device_type: parts[0].to_string(), + firmware_version: parts[1].parse().unwrap_or(0.0), + }); + } + } + + // Got unexpected response (possibly temperature reading or stale data) + eprintln!( + "Attempt {}/{}: Unexpected response to 'i' command: '{}' - retrying...", + attempt, MAX_RETRIES, response + ); + + // Small delay before retry + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + Err(io::Error::new( + io::ErrorKind::InvalidData, + "Failed to get valid device info after multiple attempts", + )) + } + + /// Get current temperature reading + /// + /// Sends "R" command and waits for response (terminated by \r) + /// Sensor responds when measurement is complete (~1 second per reading max) + pub fn read_temperature(&mut self) -> io::Result { + let response = self.connection.send_command("R")?; + + response + .trim() + .parse() + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + } + + /// Check if sensor is responsive (return status code) + pub fn check_status(&mut self) -> io::Result { + let response = self.connection.send_command("Status")?; + + // Atlas Scientific status response format: ?STATUS, + // Codes: P (powered off and restarted), S (software reset), B (brown out), W (watchdog), U (unknown) + if response.starts_with("?STATUS,") { + let code = response.chars().nth(8).unwrap_or('U'); + Ok(match code { + 'P' => StatusCode::PoweredOn, + 'S' => StatusCode::SoftwareReset, + 'B' => StatusCode::BrownOut, + 'W' => StatusCode::Watchdog, + _ => StatusCode::Unknown, + }) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid status response: {}", response), + )) + } + } + + /// Set sensor to sleep mode (low power) + pub fn sleep(&mut self) -> io::Result<()> { + self.connection.send_command("Sleep")?; + Ok(()) + } + + /// Get sensor calibration status + pub fn get_calibration(&mut self) -> io::Result { + let response = self.connection.send_command("Cal,?")?; + + // Response format: ?Cal, + if response.starts_with("?Cal,") { + let points: u8 = response + .trim_start_matches("?Cal,") + .trim() + .parse() + .unwrap_or(0); + + Ok(CalibrationStatus { + calibration_points: points, + }) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid calibration response: {}", response), + )) + } + } + + /// Send raw command (for testing/debugging) + pub fn send_raw(&mut self, command: &str) -> io::Result { + self.connection.send_command(command) + } +} + +#[derive(Debug, Clone)] +pub struct DeviceInfo { + pub device_type: String, + pub firmware_version: f64, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StatusCode { + PoweredOn, + SoftwareReset, + BrownOut, + Watchdog, + Unknown, +} + +#[derive(Debug, Clone)] +pub struct CalibrationStatus { + pub calibration_points: u8, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_device_info_parsing() { + // This would require a mock connection for proper testing + // For now, this is a placeholder for future integration tests + } } diff --git a/sensor/src/main.rs b/sensor/src/main.rs index 5b66eff..1de6d2c 100644 --- a/sensor/src/main.rs +++ b/sensor/src/main.rs @@ -1,30 +1,30 @@ +mod commands; mod sensor; mod serial_port; -use sensor::Sensor; +use chrono::{Duration as ChronoDuration, Utc}; +use sensor::{Sensor, SensorState}; use std::collections::HashMap; +use std::sync::Arc; use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, oneshot}; -use tokio::time::{sleep, Duration}; +use tokio::time::{sleep, Duration as TokioDuration}; -use serial_port::*; - -// pub type SensorList = Arc>>; +pub type SensorList = HashMap; enum ServiceCommand { - AddSensor { - uuid: String, - sensor: Sensor, - }, - RemoveSensor { - uuid: String, + RemoveSensors { + uuids: Vec, }, - GetSensor { - uuid: String, + FindSensor { + serial_number: String, respond_to: oneshot::Sender>, }, - GetActiveSensors { - respond_to: oneshot::Sender>, + AllSensors { + respond_to: oneshot::Sender>, + }, + UpsertSensors { + sensors: Vec<(String, Sensor)>, }, } @@ -34,7 +34,7 @@ pub struct CommandChannel { } pub struct UartService { - sensors: HashMap, + sensors: SensorList, cmd_channel: CommandChannel, } @@ -73,21 +73,33 @@ impl UartService { } } - fn handle_cmd(&self, cmd: ServiceCommand) { + fn handle_cmd(&mut self, cmd: ServiceCommand) { match cmd { - ServiceCommand::AddSensor { uuid, sensor } => { - println!("Adding sensor: {} - {:?}", uuid, sensor); + ServiceCommand::RemoveSensors { uuids } => { + println!("RemoveSensors: {} sensors", uuids.len()); + for uuid in uuids { + self.sensors.remove(&uuid); + } } - ServiceCommand::RemoveSensor { uuid } => { - println!("Removing sensor: {}", uuid); + ServiceCommand::FindSensor { + serial_number, + respond_to, + } => { + println!("FindSensor: {}", serial_number); + let sensor = self.sensors.get(&serial_number); + let _ = respond_to.send(sensor.cloned()); } - ServiceCommand::GetSensor { uuid, respond_to } => { - println!("Getting sensor: {}", uuid); - let _ = respond_to.send(None); + ServiceCommand::AllSensors { respond_to } => { + println!("AllSensors: {:#?}", self.sensors); + let _ = respond_to.send(Arc::new(self.sensors.clone())); } - ServiceCommand::GetActiveSensors { respond_to } => { - println!("Getting active sensors"); - let _ = respond_to.send(vec![]); + ServiceCommand::UpsertSensors { sensors } => { + println!("UpsertSensors: {} sensors", sensors.len()); + for (uuid, mut sensor) in sensors { + sensor.last_activity = Utc::now(); + sensor.state = SensorState::Active; + self.sensors.insert(uuid, sensor); + } } } } @@ -96,17 +108,57 @@ impl UartService { tokio::spawn(async move { loop { println!("Checking for sensors..."); - let atlas_sc_ports = serial_port::find_atlas_sc_port(); - println!("{atlas_sc_ports:#?}"); - sleep(Duration::from_secs(60)).await; + let asc_ports = serial_port::find_asc_port(); + println!("Found {} ASC ports: {:#?}", asc_ports.len(), asc_ports); + + // Get current sensor list + let (respond_to, rx) = oneshot::channel(); + let _ = cmd_tx.send(ServiceCommand::AllSensors { respond_to }).await; + + if let Ok(current_sensors) = rx.await { + // Find new sensors (not in the current list) + let mut new_sensors: Vec<(String, Sensor)> = Vec::new(); + + for port in asc_ports.iter() { + if !current_sensors.contains_key(&port.serial_number) { + // Try to connect and query device info + match Sensor::from_device(port.clone()) { + Ok(sensor) => { + println!( + "Successfully connected to sensor: {:?} v{} ({})", + sensor.kind, sensor.firmware, port.serial_number + ); + new_sensors.push((port.serial_number.clone(), sensor)); + } + Err(e) => { + eprintln!( + "Failed to connect to sensor {}: {}", + port.serial_number, e + ); + } + } + } + } + + if !new_sensors.is_empty() { + println!("Adding {} new sensors", new_sensors.len()); + let _ = cmd_tx + .send(ServiceCommand::UpsertSensors { + sensors: new_sensors, + }) + .await; + } + } + + sleep(TokioDuration::from_secs(5)).await; } }); } - fn spawn_reader_manager(&self, cmd_tx: Sender) { + fn spawn_reader_manager(&self, _cmd_tx: Sender) { tokio::spawn(async move { println!("Reading sensors..."); - sleep(Duration::from_secs(60)).await; + sleep(TokioDuration::from_secs(60)).await; }); } @@ -115,16 +167,53 @@ impl UartService { loop { let (respond_to, rx) = oneshot::channel(); - // TODO: warn on error - let _ = cmd_tx - .send(ServiceCommand::GetActiveSensors { respond_to }) - .await; + // Request snapshot of all sensors + let _ = cmd_tx.send(ServiceCommand::AllSensors { respond_to }).await; if let Ok(sensors) = rx.await { println!("Health check: {} sensors", sensors.len()); + + let now = Utc::now(); + let timeout_duration = ChronoDuration::minutes(2); + + // Collect sensors to remove: Unreachable AND last_activity > 2 minutes + let sensors_to_remove: Vec = sensors + .iter() + .filter(|(_, sensor)| { + matches!(sensor.state, SensorState::Unreachable) + && (now - sensor.last_activity) > timeout_duration + }) + .map(|(uuid, _)| uuid.clone()) + .collect(); + + // Collect sensors to update: all others + let sensors_to_update: Vec<(String, Sensor)> = sensors + .iter() + .filter(|(uuid, _)| !sensors_to_remove.contains(uuid)) + .map(|(uuid, sensor)| (uuid.clone(), sensor.clone())) + .collect(); + + // Remove timed-out unreachable sensors + if !sensors_to_remove.is_empty() { + println!("Removing {} unreachable sensors", sensors_to_remove.len()); + let _ = cmd_tx + .send(ServiceCommand::RemoveSensors { + uuids: sensors_to_remove, + }) + .await; + } + + // Update active sensors with current timestamp and Active state + if !sensors_to_update.is_empty() { + let _ = cmd_tx + .send(ServiceCommand::UpsertSensors { + sensors: sensors_to_update, + }) + .await; + } } - sleep(Duration::from_secs(20)).await; + sleep(TokioDuration::from_secs(20)).await; } }); } diff --git a/sensor/src/sensor.rs b/sensor/src/sensor.rs index de54010..f374b3c 100644 --- a/sensor/src/sensor.rs +++ b/sensor/src/sensor.rs @@ -1,16 +1,108 @@ +use chrono::{DateTime, Utc}; +use std::sync::{Arc, Mutex}; + +use crate::commands::UartCommand; +use crate::serial_port::{SerialPort, SerialPortConnection}; + #[derive(Debug, Clone, Copy)] pub enum SensorKind { Rtd, } +#[derive(Debug, Clone, Default)] pub enum SensorName { + #[default] Unnamed, Named(String), } -#[derive(Debug)] +#[derive(Default, Debug, Clone, Copy)] +pub enum SensorState { + Active, + Degraded, + #[default] + Initializing, + Unreachable, +} + +#[derive(Debug, Clone)] +pub enum SensorType { + I2c { addr: String }, + Uart { port: SerialPort }, +} + +#[derive(Debug, Clone)] pub struct Sensor { pub kind: SensorKind, pub firmware: f64, - pub name: Option, + pub name: SensorName, + pub state: SensorState, + pub sensor_type: SensorType, + pub connection: Option>>, + pub last_activity: DateTime, } + +impl Sensor { + /// Create a new sensor by connecting to a serial port and querying device info + /// + /// This method: + /// 1. Opens a connection to the serial port + /// 2. Queries device information (type, firmware) + /// 3. Creates a Sensor with the retrieved information + /// + /// # Arguments + /// * `port` - Serial port metadata (port name and serial number) + /// + /// # Returns + /// * `Ok(Sensor)` - Successfully connected and queried sensor + /// * `Err` - Failed to connect or query device + pub fn from_device(port: SerialPort) -> Result> { + // Open connection + let connection = SerialPortConnection::open(&port)?; + let mut uart = UartCommand::connect_with_connection(connection)?; + + // Query device information + let device_info = uart.device_info()?; + + // Parse device type to SensorKind + let kind = match device_info.device_type.as_str() { + "RTD" => SensorKind::Rtd, + _ => SensorKind::Rtd, // Default to RTD for now + }; + + // Wrap connection for sharing + let shared_connection = Arc::new(Mutex::new(uart)); + + Ok(Sensor { + kind, + firmware: device_info.firmware_version, + name: SensorName::Unnamed, // Default to unnamed, can be set later + state: SensorState::Initializing, + sensor_type: SensorType::Uart { port }, + connection: Some(shared_connection), + last_activity: Utc::now(), + }) + } + + /// Check if sensor has an active connection + pub fn is_connected(&self) -> bool { + self.connection.is_some() + } + + /// Get a reference to the UART command interface + /// + /// Returns None if no connection is established + pub fn get_uart(&self) -> Option>> { + self.connection.clone() + } +} + +// TODO: update the sensor module to organize the lib per different sensors. +// Start with RTD sensor, and then expand for each type of sensor +// +// See to have Read and Write types +// command => u8 +// features: i2c, uart +// i2c => later see for embedded_hal compatibility +// Rename lib into arksync_ezo (since we capture from ezo circuit) +// See https://github.com/RougeEtoile/ezo_i2c_rs diff --git a/sensor/src/serial_port.rs b/sensor/src/serial_port.rs index e2caa63..ddb3fb6 100644 --- a/sensor/src/serial_port.rs +++ b/sensor/src/serial_port.rs @@ -1,15 +1,122 @@ use serialport::{SerialPortInfo, SerialPortType}; +use std::io::{self, Read, Write}; +use std::time::Duration; -pub fn find_atlas_sc_port() -> Vec { +// Atlas Scientific RTD Sensor Configuration +// Based on datasheet specifications +pub const RTD_BAUD_RATE: u32 = 9600; +pub const RTD_TIMEOUT_MS: u64 = 1000; // Timeout acts as safety net for response-based reading + +/// Metadata about a serial port (no active connection) +#[derive(Debug, Clone)] +pub struct SerialPort { + pub port_name: String, + pub serial_number: String, +} + +/// Active serial port connection for communication +pub struct SerialPortConnection { + port: Box, +} + +impl SerialPortConnection { + /// Open a serial port connection with Atlas Scientific RTD defaults + /// - Baud rate: 9600 + /// - Timeout: 1000ms (safety net for unresponsive sensors) + /// - Encoding: ASCII + /// - Terminator: Carriage return (\r) + /// - Decimal places: 3 + /// - Temperature unit: Celsius (default) + pub fn open(port_info: &SerialPort) -> Result { + let port = serialport::new(&port_info.port_name, RTD_BAUD_RATE) + .timeout(Duration::from_millis(RTD_TIMEOUT_MS)) + .open()?; + + let mut port = port; + + // Flush any stale data in the input buffer + // Atlas Scientific sensors might have leftover readings or responses + let _ = port.clear(serialport::ClearBuffer::Input); + + Ok(Self { port }) + } + + /// Write a command to the sensor + pub fn write_command(&mut self, command: &str) -> std::io::Result<()> { + self.port.write_all(command.as_bytes())?; + self.port.write_all(b"\r")?; // Atlas Scientific expects carriage return + self.port.flush()?; + Ok(()) + } + + /// Read response from the sensor until carriage return terminator + /// + /// Atlas Scientific sensors terminate responses with \r + /// This method blocks until \r is received or timeout occurs + pub fn read_response(&mut self) -> std::io::Result { + let mut buffer = Vec::new(); + let mut single_byte = [0u8; 1]; + + // Read byte-by-byte until carriage return + loop { + match self.port.read_exact(&mut single_byte) { + Ok(_) => { + if single_byte[0] == b'\r' { + break; + } + buffer.push(single_byte[0]); + } + Err(e) => return Err(e), + } + } + + // Convert to string and trim whitespace + let response = String::from_utf8_lossy(&buffer).trim().to_string(); + + Ok(response) + } + + /// Flush the input buffer to clear any stale data + pub fn flush_input(&mut self) -> std::io::Result<()> { + self.port + .clear(serialport::ClearBuffer::Input) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + } + + /// Send a command and read the response + /// + /// Flushes input buffer first to clear stale data, then sends command and waits for response. + /// The serialport timeout (1000ms) acts as a safety net if the sensor doesn't respond. + /// + /// # Arguments + /// * `command` - The command string to send + pub fn send_command(&mut self, command: &str) -> io::Result { + // Flush any stale data before sending command + self.flush_input()?; + self.write_command(command)?; + self.read_response() + } +} + +impl std::fmt::Debug for SerialPortConnection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SerialPortConnection") + .field("port", &"") + .finish() + } +} + +pub fn find_asc_port() -> Vec { serialport::available_ports() .unwrap_or(Vec::new()) .into_iter() - .filter(|port| is_atlas_sc_device(port)) - .collect::>() + .filter(filter_asc_device) + .filter_map(filter_map_usb_serial) + .collect::>() } /// Checks if a port is an Atlas Scientific device -pub fn is_atlas_sc_device(port: &SerialPortInfo) -> bool { +fn filter_asc_device(port: &SerialPortInfo) -> bool { match &port.port_type { SerialPortType::UsbPort(usb_info) => { // Atlas Scientific USB devices typically use FTDI chips @@ -20,3 +127,29 @@ pub fn is_atlas_sc_device(port: &SerialPortInfo) -> bool { _ => false, } } + +fn filter_map_usb_serial(port: SerialPortInfo) -> Option { + let SerialPortInfo { + port_name, + port_type, + } = port; + + let SerialPortType::UsbPort(usb_port) = port_type else { + return None; + }; + + usb_port.serial_number.map(|serial_number| SerialPort { + port_name, + serial_number, + }) +} + +// We need to maintain a list of opened serial ports +// +// DP065KS3 + serial port +// DK0HFBFB + serial port +// pub fn collect_atlas_sc_sensors(serial_ports: Vec) -> Vec { +// serial_ports.iter().map(|serial_port| { +// serial_port. +// }).collect() +// } From d553ea0e6901e71baeff677dc1e0290c910bf37e Mon Sep 17 00:00:00 2001 From: theredfish Date: Fri, 13 Feb 2026 01:45:30 +0100 Subject: [PATCH 03/12] Generic EZO driver implementation --- Cargo.lock | 1 + sensor/Cargo.toml | 1 + sensor/src/ezo/command.rs | 10 ++++ sensor/src/ezo/driver/error.rs | 9 ++++ sensor/src/ezo/driver/i2c.rs | 5 ++ sensor/src/ezo/driver/mod.rs | 47 +++++++++++++++++ sensor/src/ezo/driver/uart.rs | 94 ++++++++++++++++++++++++++++++++++ sensor/src/ezo/mod.rs | 5 ++ sensor/src/ezo/ph.rs | 0 sensor/src/ezo/rtd.rs | 52 +++++++++++++++++++ sensor/src/ezo/sensor.rs | 24 +++++++++ sensor/src/lib.rs | 1 + sensor/src/serial_port.rs | 67 +++++++++++++----------- 13 files changed, 285 insertions(+), 31 deletions(-) create mode 100644 sensor/src/ezo/command.rs create mode 100644 sensor/src/ezo/driver/error.rs create mode 100644 sensor/src/ezo/driver/i2c.rs create mode 100644 sensor/src/ezo/driver/mod.rs create mode 100644 sensor/src/ezo/driver/uart.rs create mode 100644 sensor/src/ezo/mod.rs create mode 100644 sensor/src/ezo/ph.rs create mode 100644 sensor/src/ezo/rtd.rs create mode 100644 sensor/src/ezo/sensor.rs diff --git a/Cargo.lock b/Cargo.lock index 774764a..15b8e6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,6 +123,7 @@ name = "arksync-sensor" version = "0.1.0" dependencies = [ "chrono", + "eyre", "serialport", "tokio", ] diff --git a/sensor/Cargo.toml b/sensor/Cargo.toml index 40937f2..df1a3fd 100644 --- a/sensor/Cargo.toml +++ b/sensor/Cargo.toml @@ -12,3 +12,4 @@ license = "AGPL-3.0-or-later" serialport = "4.8.1" tokio = { version = "1.49.0", features = ["full"] } chrono = "0.4" +eyre = "0.6.12" diff --git a/sensor/src/ezo/command.rs b/sensor/src/ezo/command.rs new file mode 100644 index 0000000..277ac5e --- /dev/null +++ b/sensor/src/ezo/command.rs @@ -0,0 +1,10 @@ +pub trait BaseCmd { + fn baud(); + fn cal(); + fn i2c(); + fn information(); +} + +pub trait UartCmd: BaseCmd {} + +pub trait I2cCmd: BaseCmd {} diff --git a/sensor/src/ezo/driver/error.rs b/sensor/src/ezo/driver/error.rs new file mode 100644 index 0000000..50ca7b5 --- /dev/null +++ b/sensor/src/ezo/driver/error.rs @@ -0,0 +1,9 @@ +// To see... +pub enum DriverError { + Connection(String), + UnknownDevice(String), + Read(String), + Write(String), +} + +pub type Result = std::result::Result; diff --git a/sensor/src/ezo/driver/i2c.rs b/sensor/src/ezo/driver/i2c.rs new file mode 100644 index 0000000..39d39c7 --- /dev/null +++ b/sensor/src/ezo/driver/i2c.rs @@ -0,0 +1,5 @@ +pub struct I2c { + address: u8, + // Later replaced by I2c bus or similar from hal + bus: String, +} diff --git a/sensor/src/ezo/driver/mod.rs b/sensor/src/ezo/driver/mod.rs new file mode 100644 index 0000000..547f637 --- /dev/null +++ b/sensor/src/ezo/driver/mod.rs @@ -0,0 +1,47 @@ +mod error; +pub mod i2c; +pub mod uart; + +pub use self::error::*; + +#[derive(Debug, Clone, Copy)] +pub enum DeviceType { + Rtd, +} + +impl TryFrom<&str> for DeviceType { + type Error = DriverError; + + fn try_from(value: &str) -> std::result::Result { + match value { + "RTD" => Ok(DeviceType::Rtd), + other => Err(DriverError::UnknownDevice(other.to_string())), + } + } +} + +#[derive(Debug, Clone)] +pub struct DeviceInfo { + pub device_type: DeviceType, + pub firmware_version: f64, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Status { + PoweredOn, + SoftwareReset, + BrownOut, + Watchdog, + Unknown, +} + +pub trait ReadWriteCmd { + fn read(&mut self) -> Result; + fn write(&mut self, buf: &[u8]) -> Result<()>; +} + +/// Commands common to both UART and I2C drivers. +pub trait Driver: ReadWriteCmd { + fn device_info(&mut self) -> Result; + fn status(&mut self) -> Result; +} diff --git a/sensor/src/ezo/driver/uart.rs b/sensor/src/ezo/driver/uart.rs new file mode 100644 index 0000000..f6f01f0 --- /dev/null +++ b/sensor/src/ezo/driver/uart.rs @@ -0,0 +1,94 @@ +use super::{DeviceInfo, DeviceType, Driver, DriverError, ReadWriteCmd, Result}; +use crate::serial_port::SerialPort; +use crate::{ezo::driver::Status, serial_port::SerialPortConnection}; + +pub struct Uart { + connection: SerialPortConnection, +} + +impl Uart { + pub fn new(serial_port: SerialPort) -> Result { + let connection = SerialPortConnection::open(&serial_port) + .map_err(|err| DriverError::Connection(err.to_string()))?; + + Ok(Uart { connection }) + } +} + +impl ReadWriteCmd for Uart { + fn read(&mut self) -> Result { + self.connection + .read_until_carrier() + .map_err(|err| DriverError::Read(err.to_string())) + } + + fn write(&mut self, buf: &[u8]) -> Result<()> { + self.connection + .write_command(buf) + .map_err(|err| DriverError::Write(err.to_string())) + } +} + +impl Driver for Uart { + /// Get device information (firmware version, device type) + /// + /// Retries up to 3 times if we get unexpected data (like temperature readings) + fn device_info(&mut self) -> Result { + const MAX_RETRIES: usize = 3; + + for attempt in 1..=MAX_RETRIES { + // Send "i" command to get device information + self.write(b"i")?; + let response = self.read()?; + + // Atlas Scientific response format: ?I,RTD,1.0 + // Format: ?I,, + if response.starts_with("?I,") { + let parts: Vec<&str> = response.trim_start_matches("?I,").split(',').collect(); + + if parts.len() >= 2 { + return Ok(DeviceInfo { + device_type: DeviceType::try_from(parts[0])?, + firmware_version: parts[1].parse().unwrap_or(0.0), + }); + } + } + + // Got unexpected response (possibly temperature reading or stale data) + eprintln!( + "Attempt {}/{}: Unexpected response to 'i' command: '{}' - retrying...", + attempt, MAX_RETRIES, response + ); + + // Small delay before retry + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + Err(DriverError::Read( + "Failed to get valid device info after multiple attempts".to_string(), + )) + } + + fn status(&mut self) -> Result { + self.write(b"Status")?; + let response = self.read()?; + + // Atlas Scientific status response format: ?STATUS, + // Codes: P (powered off and restarted), S (software reset), B (brown out), W (watchdog), U (unknown) + if response.starts_with("?STATUS,") { + let code = response.chars().nth(8).unwrap_or('U'); + Ok(match code { + 'P' => Status::PoweredOn, + 'S' => Status::SoftwareReset, + 'B' => Status::BrownOut, + 'W' => Status::Watchdog, + _ => Status::Unknown, + }) + } else { + Err(DriverError::Read(format!( + "Unexpected response to 'Status' command: '{}'", + response + ))) + } + } +} diff --git a/sensor/src/ezo/mod.rs b/sensor/src/ezo/mod.rs new file mode 100644 index 0000000..adc15a7 --- /dev/null +++ b/sensor/src/ezo/mod.rs @@ -0,0 +1,5 @@ +pub mod command; +pub mod driver; +pub mod ph; +pub mod rtd; +pub mod sensor; diff --git a/sensor/src/ezo/ph.rs b/sensor/src/ezo/ph.rs new file mode 100644 index 0000000..e69de29 diff --git a/sensor/src/ezo/rtd.rs b/sensor/src/ezo/rtd.rs new file mode 100644 index 0000000..510be3b --- /dev/null +++ b/sensor/src/ezo/rtd.rs @@ -0,0 +1,52 @@ +use crate::ezo::{driver::Driver, sensor::SensorData}; + +pub struct Rtd { + data: SensorData, + driver: D, +} + +// I'm trying to find a way to represent two different implementations but to organize +// nicely each sensor with a strongly typed representation. +// +// For example EZO RTD has common functions such as in BaseCmd. But also specific +// commands for uart (C: enable/disable continuous reading, *OK), +// and others for i2c. +// +// So we have some kind of same names for two modes, but some mode has something +// (uart) that the other (i2c) doesn't have. +// +// The same way is for pH EZO, but this sensors also had functions in uart +// that RTD doesn't have such has Slope (ph probe slope), pHext (extended ph scale) ... +// +// I was thinking of a system of trait to implement. But then how to implement both +// I2c and Uart: we can't. Or we would provide the behavior of one for the other. At +// first I thought about it because I could, by trait bound, inherit common commands. +// +// Then I was thinking that maybe I could use an enum Mode, and based on the mode +// passed to initialize; such as Rtd::uart() -> RtdUart or Rtd::i2c() -> RtdI2c +// that would be a factory pattern. Seems the best so far, I can find better. +// +// Then impl BaseUart for RtdUart; impl BaseI2c for RtdI2c +// Then impl RtdUart { fn continuous_reading(bool)... } for specific functions. +// But I find it a bit less great than one Rtd struct. +// +// Maybe another way, and since I want to make a library, would be to feature gate +// each module. Like one uart module and i2c module. For each sensor type? Then if you enable +// both features you could do uart::Rtd or i2c::Rtd. But could complexify name clashing issues. +// +// What's the best when it comes to polymorphism in Rust but with specifics based on the +// narrowed down type. + +// TODO: +// - Define Rtd uart/i2c based on current implemented commands +// - Try driver style first then quickly switch if too complex +// - Just create todo impl for i2c to prepare +// - Manage sensor connection variant Uart/I2c +// - Must have a working state at then of the iteration +// +// Later: +// +// - Define impl Write/Read for Uart +// - Implement uart driver +// - Implement i2c driver https://docs.rs/embedded-hal/latest/embedded_hal/i2c/index.html +// - See to avoid Mutex on sensor connection? diff --git a/sensor/src/ezo/sensor.rs b/sensor/src/ezo/sensor.rs new file mode 100644 index 0000000..a1b5ee0 --- /dev/null +++ b/sensor/src/ezo/sensor.rs @@ -0,0 +1,24 @@ +use chrono::{DateTime, Utc}; + +#[derive(Debug, Clone, Default)] +pub enum SensorName { + #[default] + Unnamed, + Named(String), +} + +#[derive(Default, Debug, Clone, Copy)] +pub enum SensorState { + Active, + Degraded, + #[default] + Initializing, + Unreachable, +} + +pub struct SensorData { + pub firmware: f64, + pub name: SensorName, + pub state: SensorState, + pub last_activity: DateTime, +} diff --git a/sensor/src/lib.rs b/sensor/src/lib.rs index 6319833..be755d4 100644 --- a/sensor/src/lib.rs +++ b/sensor/src/lib.rs @@ -1,3 +1,4 @@ pub mod commands; +mod ezo; pub mod sensor; pub mod serial_port; diff --git a/sensor/src/serial_port.rs b/sensor/src/serial_port.rs index ddb3fb6..039c450 100644 --- a/sensor/src/serial_port.rs +++ b/sensor/src/serial_port.rs @@ -4,19 +4,20 @@ use std::time::Duration; // Atlas Scientific RTD Sensor Configuration // Based on datasheet specifications -pub const RTD_BAUD_RATE: u32 = 9600; -pub const RTD_TIMEOUT_MS: u64 = 1000; // Timeout acts as safety net for response-based reading +pub const DEFAULT_BAUD_RATE: u32 = 9600; +pub const SERIAL_PORT_CONN_TIMEOUT: u64 = 1000; // Timeout acts as safety net for response-based reading /// Metadata about a serial port (no active connection) #[derive(Debug, Clone)] pub struct SerialPort { pub port_name: String, pub serial_number: String, + pub baud_rate: u32, } /// Active serial port connection for communication pub struct SerialPortConnection { - port: Box, + pub port: Box, } impl SerialPortConnection { @@ -27,13 +28,17 @@ impl SerialPortConnection { /// - Terminator: Carriage return (\r) /// - Decimal places: 3 /// - Temperature unit: Celsius (default) - pub fn open(port_info: &SerialPort) -> Result { - let port = serialport::new(&port_info.port_name, RTD_BAUD_RATE) - .timeout(Duration::from_millis(RTD_TIMEOUT_MS)) + pub fn open(serial_port: &SerialPort) -> Result { + let SerialPort { + port_name, + baud_rate, + .. + } = serial_port; + + let port = serialport::new(port_name, *baud_rate) + .timeout(Duration::from_millis(SERIAL_PORT_CONN_TIMEOUT)) .open()?; - let mut port = port; - // Flush any stale data in the input buffer // Atlas Scientific sensors might have leftover readings or responses let _ = port.clear(serialport::ClearBuffer::Input); @@ -42,9 +47,13 @@ impl SerialPortConnection { } /// Write a command to the sensor - pub fn write_command(&mut self, command: &str) -> std::io::Result<()> { - self.port.write_all(command.as_bytes())?; + pub fn write_command(&mut self, command: &[u8]) -> std::io::Result<()> { + // Flush any stale data before writing + self.flush_input()?; + + self.port.write_all(command)?; self.port.write_all(b"\r")?; // Atlas Scientific expects carriage return + self.port.flush()?; Ok(()) } @@ -53,19 +62,24 @@ impl SerialPortConnection { /// /// Atlas Scientific sensors terminate responses with \r /// This method blocks until \r is received or timeout occurs - pub fn read_response(&mut self) -> std::io::Result { + pub fn read_until_carrier(&mut self) -> std::io::Result { let mut buffer = Vec::new(); let mut single_byte = [0u8; 1]; // Read byte-by-byte until carriage return loop { + if self.port.read_carrier_detect()? { + break; + } + match self.port.read_exact(&mut single_byte) { - Ok(_) => { - if single_byte[0] == b'\r' { - break; - } - buffer.push(single_byte[0]); - } + Ok(_) => buffer.push(single_byte[0]), + // { + // if single_byte[0] == b'\r' { + // break; + // } + + // } Err(e) => return Err(e), } } @@ -90,11 +104,11 @@ impl SerialPortConnection { /// /// # Arguments /// * `command` - The command string to send + /// + /// TODO: deprecate this in favor of uart/i2c driver impl pub fn send_command(&mut self, command: &str) -> io::Result { - // Flush any stale data before sending command - self.flush_input()?; - self.write_command(command)?; - self.read_response() + self.write_command(command.as_bytes())?; + self.read_until_carrier() } } @@ -141,15 +155,6 @@ fn filter_map_usb_serial(port: SerialPortInfo) -> Option { usb_port.serial_number.map(|serial_number| SerialPort { port_name, serial_number, + baud_rate: DEFAULT_BAUD_RATE, }) } - -// We need to maintain a list of opened serial ports -// -// DP065KS3 + serial port -// DK0HFBFB + serial port -// pub fn collect_atlas_sc_sensors(serial_ports: Vec) -> Vec { -// serial_ports.iter().map(|serial_port| { -// serial_port. -// }).collect() -// } From f320109b7fce6618165aec7e79f29d6566d9d624 Mon Sep 17 00:00:00 2001 From: theredfish Date: Sat, 21 Feb 2026 01:16:50 +0100 Subject: [PATCH 04/12] Add driver-based logic with UART impl --- sensor/src/commands/mod.rs | 3 +- sensor/src/ezo/command.rs | 10 -- sensor/src/ezo/driver/error.rs | 17 ++- sensor/src/ezo/driver/i2c.rs | 2 +- sensor/src/ezo/driver/uart.rs | 12 +- sensor/src/ezo/error.rs | 31 +++++ sensor/src/ezo/mod.rs | 2 +- sensor/src/ezo/rtd.rs | 75 +++++------- sensor/src/ezo/sensor.rs | 5 + sensor/src/main.rs | 217 ++++++++++++++------------------- sensor/src/sensor.rs | 2 +- sensor/src/serial_port.rs | 17 +-- 12 files changed, 190 insertions(+), 203 deletions(-) delete mode 100644 sensor/src/ezo/command.rs create mode 100644 sensor/src/ezo/error.rs diff --git a/sensor/src/commands/mod.rs b/sensor/src/commands/mod.rs index 27b206c..511432a 100644 --- a/sensor/src/commands/mod.rs +++ b/sensor/src/commands/mod.rs @@ -1,3 +1,4 @@ pub mod uart; -pub use uart::{CalibrationStatus, DeviceInfo, StatusCode, UartCommand}; +// Re-export uart module items if needed by other modules +// pub use uart::{CalibrationStatus, DeviceInfo, StatusCode, UartCommand}; diff --git a/sensor/src/ezo/command.rs b/sensor/src/ezo/command.rs deleted file mode 100644 index 277ac5e..0000000 --- a/sensor/src/ezo/command.rs +++ /dev/null @@ -1,10 +0,0 @@ -pub trait BaseCmd { - fn baud(); - fn cal(); - fn i2c(); - fn information(); -} - -pub trait UartCmd: BaseCmd {} - -pub trait I2cCmd: BaseCmd {} diff --git a/sensor/src/ezo/driver/error.rs b/sensor/src/ezo/driver/error.rs index 50ca7b5..7c73fcf 100644 --- a/sensor/src/ezo/driver/error.rs +++ b/sensor/src/ezo/driver/error.rs @@ -1,4 +1,6 @@ -// To see... +use std::fmt; + +#[derive(Debug)] pub enum DriverError { Connection(String), UnknownDevice(String), @@ -6,4 +8,17 @@ pub enum DriverError { Write(String), } +impl fmt::Display for DriverError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + DriverError::Connection(msg) => write!(f, "Connection error: {}", msg), + DriverError::UnknownDevice(msg) => write!(f, "Unknown device: {}", msg), + DriverError::Read(msg) => write!(f, "Read error: {}", msg), + DriverError::Write(msg) => write!(f, "Write error: {}", msg), + } + } +} + +impl std::error::Error for DriverError {} + pub type Result = std::result::Result; diff --git a/sensor/src/ezo/driver/i2c.rs b/sensor/src/ezo/driver/i2c.rs index 39d39c7..75fd9cd 100644 --- a/sensor/src/ezo/driver/i2c.rs +++ b/sensor/src/ezo/driver/i2c.rs @@ -1,4 +1,4 @@ -pub struct I2c { +pub struct I2cDriver { address: u8, // Later replaced by I2c bus or similar from hal bus: String, diff --git a/sensor/src/ezo/driver/uart.rs b/sensor/src/ezo/driver/uart.rs index f6f01f0..429968d 100644 --- a/sensor/src/ezo/driver/uart.rs +++ b/sensor/src/ezo/driver/uart.rs @@ -2,20 +2,20 @@ use super::{DeviceInfo, DeviceType, Driver, DriverError, ReadWriteCmd, Result}; use crate::serial_port::SerialPort; use crate::{ezo::driver::Status, serial_port::SerialPortConnection}; -pub struct Uart { +pub struct UartDriver { connection: SerialPortConnection, } -impl Uart { - pub fn new(serial_port: SerialPort) -> Result { +impl UartDriver { + pub fn new(serial_port: &SerialPort) -> Result { let connection = SerialPortConnection::open(&serial_port) .map_err(|err| DriverError::Connection(err.to_string()))?; - Ok(Uart { connection }) + Ok(UartDriver { connection }) } } -impl ReadWriteCmd for Uart { +impl ReadWriteCmd for UartDriver { fn read(&mut self) -> Result { self.connection .read_until_carrier() @@ -29,7 +29,7 @@ impl ReadWriteCmd for Uart { } } -impl Driver for Uart { +impl Driver for UartDriver { /// Get device information (firmware version, device type) /// /// Retries up to 3 times if we get unexpected data (like temperature readings) diff --git a/sensor/src/ezo/error.rs b/sensor/src/ezo/error.rs new file mode 100644 index 0000000..1f5a012 --- /dev/null +++ b/sensor/src/ezo/error.rs @@ -0,0 +1,31 @@ +use crate::ezo::driver::DriverError; +use std::fmt; + +#[derive(Debug)] +pub enum SensorError { + Driver(DriverError), +} + +impl fmt::Display for SensorError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SensorError::Driver(err) => write!(f, "Driver error: {}", err), + } + } +} + +impl std::error::Error for SensorError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + SensorError::Driver(err) => Some(err), + } + } +} + +impl From for SensorError { + fn from(err: DriverError) -> Self { + SensorError::Driver(err) + } +} + +pub type Result = std::result::Result; diff --git a/sensor/src/ezo/mod.rs b/sensor/src/ezo/mod.rs index adc15a7..95c7de3 100644 --- a/sensor/src/ezo/mod.rs +++ b/sensor/src/ezo/mod.rs @@ -1,5 +1,5 @@ -pub mod command; pub mod driver; +pub mod error; pub mod ph; pub mod rtd; pub mod sensor; diff --git a/sensor/src/ezo/rtd.rs b/sensor/src/ezo/rtd.rs index 510be3b..db4efb8 100644 --- a/sensor/src/ezo/rtd.rs +++ b/sensor/src/ezo/rtd.rs @@ -1,52 +1,33 @@ -use crate::ezo::{driver::Driver, sensor::SensorData}; +use chrono::Utc; +use std::sync::Mutex; + +use crate::ezo::sensor::*; +use crate::ezo::{ + driver::{uart::UartDriver, Driver}, + sensor::SensorData, +}; pub struct Rtd { - data: SensorData, - driver: D, + data: Mutex, + driver: Mutex, } -// I'm trying to find a way to represent two different implementations but to organize -// nicely each sensor with a strongly typed representation. -// -// For example EZO RTD has common functions such as in BaseCmd. But also specific -// commands for uart (C: enable/disable continuous reading, *OK), -// and others for i2c. -// -// So we have some kind of same names for two modes, but some mode has something -// (uart) that the other (i2c) doesn't have. -// -// The same way is for pH EZO, but this sensors also had functions in uart -// that RTD doesn't have such has Slope (ph probe slope), pHext (extended ph scale) ... -// -// I was thinking of a system of trait to implement. But then how to implement both -// I2c and Uart: we can't. Or we would provide the behavior of one for the other. At -// first I thought about it because I could, by trait bound, inherit common commands. -// -// Then I was thinking that maybe I could use an enum Mode, and based on the mode -// passed to initialize; such as Rtd::uart() -> RtdUart or Rtd::i2c() -> RtdI2c -// that would be a factory pattern. Seems the best so far, I can find better. -// -// Then impl BaseUart for RtdUart; impl BaseI2c for RtdI2c -// Then impl RtdUart { fn continuous_reading(bool)... } for specific functions. -// But I find it a bit less great than one Rtd struct. -// -// Maybe another way, and since I want to make a library, would be to feature gate -// each module. Like one uart module and i2c module. For each sensor type? Then if you enable -// both features you could do uart::Rtd or i2c::Rtd. But could complexify name clashing issues. -// -// What's the best when it comes to polymorphism in Rust but with specifics based on the -// narrowed down type. +impl Sensor for Rtd { + fn data(&self) -> SensorData { + self.data.lock().unwrap().clone() + } +} -// TODO: -// - Define Rtd uart/i2c based on current implemented commands -// - Try driver style first then quickly switch if too complex -// - Just create todo impl for i2c to prepare -// - Manage sensor connection variant Uart/I2c -// - Must have a working state at then of the iteration -// -// Later: -// -// - Define impl Write/Read for Uart -// - Implement uart driver -// - Implement i2c driver https://docs.rs/embedded-hal/latest/embedded_hal/i2c/index.html -// - See to avoid Mutex on sensor connection? +impl Rtd { + pub fn from_uart(driver: UartDriver, firmware: f64) -> Self { + Self { + data: Mutex::new(SensorData { + firmware, + name: SensorName::Unnamed, + state: SensorState::Initializing, + last_activity: Utc::now(), + }), + driver: Mutex::new(driver), + } + } +} diff --git a/sensor/src/ezo/sensor.rs b/sensor/src/ezo/sensor.rs index a1b5ee0..9198d9e 100644 --- a/sensor/src/ezo/sensor.rs +++ b/sensor/src/ezo/sensor.rs @@ -16,9 +16,14 @@ pub enum SensorState { Unreachable, } +#[derive(Debug, Clone)] pub struct SensorData { pub firmware: f64, pub name: SensorName, pub state: SensorState, pub last_activity: DateTime, } + +pub trait Sensor: Send + Sync { + fn data(&self) -> SensorData; +} diff --git a/sensor/src/main.rs b/sensor/src/main.rs index 1de6d2c..cf260fa 100644 --- a/sensor/src/main.rs +++ b/sensor/src/main.rs @@ -1,31 +1,35 @@ mod commands; -mod sensor; +mod ezo; mod serial_port; -use chrono::{Duration as ChronoDuration, Utc}; -use sensor::{Sensor, SensorState}; +use ezo::driver::uart::UartDriver; +use ezo::driver::{DeviceType, Driver}; +use ezo::rtd::Rtd; +use ezo::sensor::Sensor; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, oneshot}; use tokio::time::{sleep, Duration as TokioDuration}; -pub type SensorList = HashMap; +pub type SensorList = HashMap>; enum ServiceCommand { - RemoveSensors { - uuids: Vec, + /// Add or update sensors in the registry + UpsertSensors { + sensors: Vec<(String, Arc)>, }, + /// Remove sensors from the registry + RemoveSensors { uuids: Vec }, + /// Get a specific sensor by serial number FindSensor { serial_number: String, - respond_to: oneshot::Sender>, + respond_to: oneshot::Sender>>, }, + /// Get all sensors (snapshot) AllSensors { respond_to: oneshot::Sender>, }, - UpsertSensors { - sensors: Vec<(String, Sensor)>, - }, } pub struct CommandChannel { @@ -33,6 +37,7 @@ pub struct CommandChannel { rx: mpsc::Receiver, } +/// Supervisor service that maintains the list of sensors pub struct UartService { sensors: SensorList, cmd_channel: CommandChannel, @@ -48,91 +53,107 @@ impl UartService { } } - /// Supervisor loop + /// Main supervisor loop - maintains sensor registry pub async fn run(mut self) { let cmd_tx = self.cmd_channel.tx.clone(); - // Spawn background tasks - self.spawn_detector(cmd_tx.clone()); - self.spawn_healthcheck(cmd_tx.clone()); - self.spawn_reader_manager(cmd_tx.clone()); + // Spawn detector task (detects new sensors and adds them to registry) + self.detect_sensors(cmd_tx.clone()); - // Main supervisor loop + println!("UartService started - maintaining sensor registry"); + + // Main event loop - just manages the HashMap loop { tokio::select! { Some(cmd) = self.cmd_channel.rx.recv() => { self.handle_cmd(cmd); } - // Graceful termination signal (Ctrl+C) _ = tokio::signal::ctrl_c() => { - println!("Shutting down..."); + println!("Shutting down sensor registry..."); break; } } } } + /// Handle commands to maintain sensor list fn handle_cmd(&mut self, cmd: ServiceCommand) { match cmd { + ServiceCommand::UpsertSensors { sensors } => { + println!("Registry: Upserting {} sensors", sensors.len()); + for (uuid, sensor) in sensors { + self.sensors.insert(uuid, sensor); + } + println!("Registry: Total sensors = {}", self.sensors.len()); + } + ServiceCommand::RemoveSensors { uuids } => { - println!("RemoveSensors: {} sensors", uuids.len()); - for uuid in uuids { - self.sensors.remove(&uuid); + println!("Registry: Removing {} sensors", uuids.len()); + for uuid in &uuids { + self.sensors.remove(uuid); } + println!("Registry: Total sensors = {}", self.sensors.len()); } + ServiceCommand::FindSensor { serial_number, respond_to, } => { - println!("FindSensor: {}", serial_number); - let sensor = self.sensors.get(&serial_number); - let _ = respond_to.send(sensor.cloned()); + let sensor = self.sensors.get(&serial_number).cloned(); + let _ = respond_to.send(sensor); } + ServiceCommand::AllSensors { respond_to } => { - println!("AllSensors: {:#?}", self.sensors); + println!( + "Registry: Providing snapshot of all sensors ({} total)", + self.sensors.len() + ); let _ = respond_to.send(Arc::new(self.sensors.clone())); } - ServiceCommand::UpsertSensors { sensors } => { - println!("UpsertSensors: {} sensors", sensors.len()); - for (uuid, mut sensor) in sensors { - sensor.last_activity = Utc::now(); - sensor.state = SensorState::Active; - self.sensors.insert(uuid, sensor); - } - } } } - fn spawn_detector(&self, cmd_tx: Sender) { + /// Detector task - finds new USB sensors and adds them to registry + fn detect_sensors(&self, cmd_tx: Sender) { tokio::spawn(async move { loop { - println!("Checking for sensors..."); + println!("Detector: Scanning for sensors..."); let asc_ports = serial_port::find_asc_port(); - println!("Found {} ASC ports: {:#?}", asc_ports.len(), asc_ports); + + if !asc_ports.is_empty() { + println!("Detector: Found {} ASC ports", asc_ports.len()); + } // Get current sensor list let (respond_to, rx) = oneshot::channel(); let _ = cmd_tx.send(ServiceCommand::AllSensors { respond_to }).await; + let current_sensors = rx.await; - if let Ok(current_sensors) = rx.await { - // Find new sensors (not in the current list) - let mut new_sensors: Vec<(String, Sensor)> = Vec::new(); + if let Ok(current_sensors) = current_sensors { + let mut new_sensors: Vec<(String, Arc)> = Vec::new(); for port in asc_ports.iter() { if !current_sensors.contains_key(&port.serial_number) { - // Try to connect and query device info - match Sensor::from_device(port.clone()) { + let sensor = create_sensor_from_port(&port); + println!( + "Detector: Created sensor {}: {:#?}", + port.serial_number, + sensor.as_ref().map(|s| s.data()) + ); + + match sensor { Ok(sensor) => { + let data = sensor.data(); println!( - "Successfully connected to sensor: {:?} v{} ({})", - sensor.kind, sensor.firmware, port.serial_number + "Detector: Created sensor - firmware v{}", + data.firmware ); new_sensors.push((port.serial_number.clone(), sensor)); } Err(e) => { eprintln!( - "Failed to connect to sensor {}: {}", + "Detector: Failed to create sensor {}: {}", port.serial_number, e ); } @@ -141,7 +162,6 @@ impl UartService { } if !new_sensors.is_empty() { - println!("Adding {} new sensors", new_sensors.len()); let _ = cmd_tx .send(ServiceCommand::UpsertSensors { sensors: new_sensors, @@ -154,89 +174,38 @@ impl UartService { } }); } +} - fn spawn_reader_manager(&self, _cmd_tx: Sender) { - tokio::spawn(async move { - println!("Reading sensors..."); - sleep(TokioDuration::from_secs(60)).await; - }); - } - - fn spawn_healthcheck(&self, cmd_tx: Sender) { - tokio::spawn(async move { - loop { - let (respond_to, rx) = oneshot::channel(); - - // Request snapshot of all sensors - let _ = cmd_tx.send(ServiceCommand::AllSensors { respond_to }).await; - - if let Ok(sensors) = rx.await { - println!("Health check: {} sensors", sensors.len()); - - let now = Utc::now(); - let timeout_duration = ChronoDuration::minutes(2); - - // Collect sensors to remove: Unreachable AND last_activity > 2 minutes - let sensors_to_remove: Vec = sensors - .iter() - .filter(|(_, sensor)| { - matches!(sensor.state, SensorState::Unreachable) - && (now - sensor.last_activity) > timeout_duration - }) - .map(|(uuid, _)| uuid.clone()) - .collect(); - - // Collect sensors to update: all others - let sensors_to_update: Vec<(String, Sensor)> = sensors - .iter() - .filter(|(uuid, _)| !sensors_to_remove.contains(uuid)) - .map(|(uuid, sensor)| (uuid.clone(), sensor.clone())) - .collect(); - - // Remove timed-out unreachable sensors - if !sensors_to_remove.is_empty() { - println!("Removing {} unreachable sensors", sensors_to_remove.len()); - let _ = cmd_tx - .send(ServiceCommand::RemoveSensors { - uuids: sensors_to_remove, - }) - .await; - } - - // Update active sensors with current timestamp and Active state - if !sensors_to_update.is_empty() { - let _ = cmd_tx - .send(ServiceCommand::UpsertSensors { - sensors: sensors_to_update, - }) - .await; - } - } - - sleep(TokioDuration::from_secs(20)).await; - } - }); +/// Factory function to create a sensor from a serial port +/// +/// This function: +/// 1. Creates a temporary UART driver +/// 2. Queries device info to determine sensor type +/// 3. Creates the appropriate sensor with the correct driver +/// 4. Returns it as Arc +fn create_sensor_from_port( + port: &serial_port::SerialPort, +) -> Result, Box> { + // Create temporary driver to query device type + let mut uart_driver = UartDriver::new(port)?; + let device_info = uart_driver.device_info()?; + + println!( + "Factory: Detected {:?} sensor v{}", + device_info.device_type, device_info.firmware_version + ); + + // Create appropriate sensor based on device type + match device_info.device_type { + DeviceType::Rtd => { + let rtd = Rtd::::from_uart(uart_driver, device_info.firmware_version); + Ok(Arc::new(rtd) as Arc) + } } } -// - A UartSensorService: this service is in charge of managing all kind of -// Atlas Scientific sensors. -// -// - A task to manage sensors among existing ones with filtering. When a new -// sensor is detected, it is added to the list. When the current USB ports don't -// include one of the sensors, it is removed from the list. This task only track -// some kind of unique information. -// -// - A task to healthcheck sensors and update their status. Healthy, Unhealthy, -// Lost. The status tracks the last status updated_at which helps to determine -// the state machine for passing from one state to another. -// -// - A task per healthy sensor to handle sensor values. A cancellation token -// should be used to terminate the task when a sensor status is Lost. -// -// - A task to handle commands sent to sensors and handle responses. - #[tokio::main] async fn main() { - UartService::new().run().await + println!("Starting ArkSync Sensor Service..."); + UartService::new().run().await; } diff --git a/sensor/src/sensor.rs b/sensor/src/sensor.rs index f374b3c..4058631 100644 --- a/sensor/src/sensor.rs +++ b/sensor/src/sensor.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use std::sync::{Arc, Mutex}; -use crate::commands::UartCommand; +use crate::commands::uart::UartCommand; use crate::serial_port::{SerialPort, SerialPortConnection}; #[derive(Debug, Clone, Copy)] diff --git a/sensor/src/serial_port.rs b/sensor/src/serial_port.rs index 039c450..f6a5dc2 100644 --- a/sensor/src/serial_port.rs +++ b/sensor/src/serial_port.rs @@ -68,18 +68,13 @@ impl SerialPortConnection { // Read byte-by-byte until carriage return loop { - if self.port.read_carrier_detect()? { - break; - } - match self.port.read_exact(&mut single_byte) { - Ok(_) => buffer.push(single_byte[0]), - // { - // if single_byte[0] == b'\r' { - // break; - // } - - // } + Ok(_) => { + if single_byte[0] == b'\r' { + break; + } + buffer.push(single_byte[0]); + } Err(e) => return Err(e), } } From bdec8a3bae3240c64363aac48e98029e5b5c31b5 Mon Sep 17 00:00:00 2001 From: theredfish Date: Fri, 27 Feb 2026 23:43:36 +0100 Subject: [PATCH 05/12] Generic EZO sensor reading --- sensor/src/ezo/rtd.rs | 28 +++++++--- sensor/src/ezo/sensor.rs | 55 +++++++++++++++++++- sensor/src/lib.rs | 1 - sensor/src/main.rs | 51 +++++++++++++----- sensor/src/sensor.rs | 108 --------------------------------------- 5 files changed, 110 insertions(+), 133 deletions(-) delete mode 100644 sensor/src/sensor.rs diff --git a/sensor/src/ezo/rtd.rs b/sensor/src/ezo/rtd.rs index db4efb8..0723593 100644 --- a/sensor/src/ezo/rtd.rs +++ b/sensor/src/ezo/rtd.rs @@ -8,26 +8,38 @@ use crate::ezo::{ }; pub struct Rtd { - data: Mutex, + data: SensorData, driver: Mutex, } -impl Sensor for Rtd { - fn data(&self) -> SensorData { - self.data.lock().unwrap().clone() +impl Sensor for Rtd { + type DriverType = D; + + fn data(&self) -> &SensorData { + &self.data + } + + fn driver(&self) -> &Mutex { + &self.driver } } -impl Rtd { - pub fn from_uart(driver: UartDriver, firmware: f64) -> Self { +impl Rtd { + pub fn new(driver: D, firmware: f64) -> Self { Self { - data: Mutex::new(SensorData { + data: SensorData { firmware, name: SensorName::Unnamed, state: SensorState::Initializing, last_activity: Utc::now(), - }), + }, driver: Mutex::new(driver), } } } + +impl Rtd { + pub fn from_uart(driver: UartDriver, firmware: f64) -> Self { + Self::new(driver, firmware) + } +} diff --git a/sensor/src/ezo/sensor.rs b/sensor/src/ezo/sensor.rs index 9198d9e..41afcc8 100644 --- a/sensor/src/ezo/sensor.rs +++ b/sensor/src/ezo/sensor.rs @@ -1,4 +1,11 @@ use chrono::{DateTime, Utc}; +use std::sync::{Arc, Mutex}; +use tokio::task::JoinHandle; +use tokio::time::{interval, Duration as TokioDuration}; + +use crate::ezo::driver::DriverError; +use crate::ezo::driver::{Driver, ReadWriteCmd}; +use crate::ezo::error::{Result, SensorError}; #[derive(Debug, Clone, Default)] pub enum SensorName { @@ -24,6 +31,50 @@ pub struct SensorData { pub last_activity: DateTime, } -pub trait Sensor: Send + Sync { - fn data(&self) -> SensorData; +pub trait Sensor: Send + Sync + 'static { + type DriverType: Driver; + + fn data(&self) -> &SensorData; + fn driver(&self) -> &Mutex; + + /// Measurement command for this sensor. + fn measurement_command(&self) -> &'static [u8] { + b"R" + } + + /// EZO measurement command (`R`) parsed as `f64`. + fn read_measurement(&self) -> Result { + let response = self.read_command_response(self.measurement_command())?; + + response + .trim() + .parse::() + .map_err(|err| SensorError::Driver(DriverError::Read(err.to_string()))) + } + + fn read_command_response(&self, command: &[u8]) -> Result { + let mut driver = self + .driver() + .lock() + .map_err(|err| SensorError::Driver(DriverError::Read(err.to_string())))?; + + driver.write(command)?; + driver.read().map_err(Into::into) + } + + /// Spawn the main background task for this sensor. + fn run(self: Arc) -> JoinHandle<()> { + tokio::spawn(async move { + let mut ticker = interval(TokioDuration::from_millis(1500)); + + loop { + ticker.tick().await; + + match self.read_measurement() { + Ok(value) => println!("Sensor reading: {value:.3}"), + Err(err) => eprintln!("Sensor read error: {err}"), + } + } + }) + } } diff --git a/sensor/src/lib.rs b/sensor/src/lib.rs index be755d4..c42ec7c 100644 --- a/sensor/src/lib.rs +++ b/sensor/src/lib.rs @@ -1,4 +1,3 @@ pub mod commands; mod ezo; -pub mod sensor; pub mod serial_port; diff --git a/sensor/src/main.rs b/sensor/src/main.rs index cf260fa..45825a3 100644 --- a/sensor/src/main.rs +++ b/sensor/src/main.rs @@ -10,21 +10,22 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, oneshot}; -use tokio::time::{sleep, Duration as TokioDuration}; +use tokio::task::JoinHandle; +use tokio::time::{interval, Duration as TokioDuration}; -pub type SensorList = HashMap>; +pub type SensorList = HashMap>>; enum ServiceCommand { - /// Add or update sensors in the registry - UpsertSensors { - sensors: Vec<(String, Arc)>, + /// Add sensors in the registry (no replacement) + AddSensors { + sensors: Vec<(String, Arc>)>, }, /// Remove sensors from the registry RemoveSensors { uuids: Vec }, /// Get a specific sensor by serial number FindSensor { serial_number: String, - respond_to: oneshot::Sender>>, + respond_to: oneshot::Sender>>>, }, /// Get all sensors (snapshot) AllSensors { @@ -40,6 +41,7 @@ pub struct CommandChannel { /// Supervisor service that maintains the list of sensors pub struct UartService { sensors: SensorList, + sensor_tasks: HashMap>, cmd_channel: CommandChannel, } @@ -49,6 +51,7 @@ impl UartService { Self { sensors: HashMap::new(), + sensor_tasks: HashMap::new(), cmd_channel: CommandChannel { tx, rx }, } } @@ -71,6 +74,7 @@ impl UartService { _ = tokio::signal::ctrl_c() => { println!("Shutting down sensor registry..."); + self.abort_all_sensor_tasks(); break; } } @@ -80,9 +84,16 @@ impl UartService { /// Handle commands to maintain sensor list fn handle_cmd(&mut self, cmd: ServiceCommand) { match cmd { - ServiceCommand::UpsertSensors { sensors } => { - println!("Registry: Upserting {} sensors", sensors.len()); + ServiceCommand::AddSensors { sensors } => { + println!("Registry: Adding up to {} sensors", sensors.len()); for (uuid, sensor) in sensors { + if self.sensors.contains_key(&uuid) { + println!("Registry: Sensor {uuid} already exists, skipping add"); + continue; + } + + let task = Arc::clone(&sensor).run(); + self.sensor_tasks.insert(uuid.clone(), task); self.sensors.insert(uuid, sensor); } println!("Registry: Total sensors = {}", self.sensors.len()); @@ -91,6 +102,9 @@ impl UartService { ServiceCommand::RemoveSensors { uuids } => { println!("Registry: Removing {} sensors", uuids.len()); for uuid in &uuids { + if let Some(task) = self.sensor_tasks.remove(uuid) { + task.abort(); + } self.sensors.remove(uuid); } println!("Registry: Total sensors = {}", self.sensors.len()); @@ -114,10 +128,20 @@ impl UartService { } } + fn abort_all_sensor_tasks(&mut self) { + for (_, task) in self.sensor_tasks.drain() { + task.abort(); + } + } + /// Detector task - finds new USB sensors and adds them to registry fn detect_sensors(&self, cmd_tx: Sender) { tokio::spawn(async move { + let mut interval = interval(TokioDuration::from_secs(5)); + loop { + interval.tick().await; + println!("Detector: Scanning for sensors..."); let asc_ports = serial_port::find_asc_port(); @@ -131,7 +155,8 @@ impl UartService { let current_sensors = rx.await; if let Ok(current_sensors) = current_sensors { - let mut new_sensors: Vec<(String, Arc)> = Vec::new(); + let mut new_sensors: Vec<(String, Arc>)> = + Vec::new(); for port in asc_ports.iter() { if !current_sensors.contains_key(&port.serial_number) { @@ -163,14 +188,12 @@ impl UartService { if !new_sensors.is_empty() { let _ = cmd_tx - .send(ServiceCommand::UpsertSensors { + .send(ServiceCommand::AddSensors { sensors: new_sensors, }) .await; } } - - sleep(TokioDuration::from_secs(5)).await; } }); } @@ -185,7 +208,7 @@ impl UartService { /// 4. Returns it as Arc fn create_sensor_from_port( port: &serial_port::SerialPort, -) -> Result, Box> { +) -> Result>, Box> { // Create temporary driver to query device type let mut uart_driver = UartDriver::new(port)?; let device_info = uart_driver.device_info()?; @@ -199,7 +222,7 @@ fn create_sensor_from_port( match device_info.device_type { DeviceType::Rtd => { let rtd = Rtd::::from_uart(uart_driver, device_info.firmware_version); - Ok(Arc::new(rtd) as Arc) + Ok(Arc::new(rtd) as Arc>) } } } diff --git a/sensor/src/sensor.rs b/sensor/src/sensor.rs deleted file mode 100644 index 4058631..0000000 --- a/sensor/src/sensor.rs +++ /dev/null @@ -1,108 +0,0 @@ -use chrono::{DateTime, Utc}; -use std::sync::{Arc, Mutex}; - -use crate::commands::uart::UartCommand; -use crate::serial_port::{SerialPort, SerialPortConnection}; - -#[derive(Debug, Clone, Copy)] -pub enum SensorKind { - Rtd, -} - -#[derive(Debug, Clone, Default)] -pub enum SensorName { - #[default] - Unnamed, - Named(String), -} - -#[derive(Default, Debug, Clone, Copy)] -pub enum SensorState { - Active, - Degraded, - #[default] - Initializing, - Unreachable, -} - -#[derive(Debug, Clone)] -pub enum SensorType { - I2c { addr: String }, - Uart { port: SerialPort }, -} - -#[derive(Debug, Clone)] -pub struct Sensor { - pub kind: SensorKind, - pub firmware: f64, - pub name: SensorName, - pub state: SensorState, - pub sensor_type: SensorType, - pub connection: Option>>, - pub last_activity: DateTime, -} - -impl Sensor { - /// Create a new sensor by connecting to a serial port and querying device info - /// - /// This method: - /// 1. Opens a connection to the serial port - /// 2. Queries device information (type, firmware) - /// 3. Creates a Sensor with the retrieved information - /// - /// # Arguments - /// * `port` - Serial port metadata (port name and serial number) - /// - /// # Returns - /// * `Ok(Sensor)` - Successfully connected and queried sensor - /// * `Err` - Failed to connect or query device - pub fn from_device(port: SerialPort) -> Result> { - // Open connection - let connection = SerialPortConnection::open(&port)?; - let mut uart = UartCommand::connect_with_connection(connection)?; - - // Query device information - let device_info = uart.device_info()?; - - // Parse device type to SensorKind - let kind = match device_info.device_type.as_str() { - "RTD" => SensorKind::Rtd, - _ => SensorKind::Rtd, // Default to RTD for now - }; - - // Wrap connection for sharing - let shared_connection = Arc::new(Mutex::new(uart)); - - Ok(Sensor { - kind, - firmware: device_info.firmware_version, - name: SensorName::Unnamed, // Default to unnamed, can be set later - state: SensorState::Initializing, - sensor_type: SensorType::Uart { port }, - connection: Some(shared_connection), - last_activity: Utc::now(), - }) - } - - /// Check if sensor has an active connection - pub fn is_connected(&self) -> bool { - self.connection.is_some() - } - - /// Get a reference to the UART command interface - /// - /// Returns None if no connection is established - pub fn get_uart(&self) -> Option>> { - self.connection.clone() - } -} - -// TODO: update the sensor module to organize the lib per different sensors. -// Start with RTD sensor, and then expand for each type of sensor -// -// See to have Read and Write types -// command => u8 -// features: i2c, uart -// i2c => later see for embedded_hal compatibility -// Rename lib into arksync_ezo (since we capture from ezo circuit) -// See https://github.com/RougeEtoile/ezo_i2c_rs From cf4adc7fd7410fe5c3d056399827c77026e5e00d Mon Sep 17 00:00:00 2001 From: theredfish Date: Sat, 28 Feb 2026 01:08:52 +0100 Subject: [PATCH 06/12] Improve command transport trait api --- sensor/src/ezo/driver/mod.rs | 9 +++++++-- sensor/src/ezo/driver/uart.rs | 10 ++++------ sensor/src/ezo/sensor.rs | 9 ++++----- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/sensor/src/ezo/driver/mod.rs b/sensor/src/ezo/driver/mod.rs index 547f637..6085b26 100644 --- a/sensor/src/ezo/driver/mod.rs +++ b/sensor/src/ezo/driver/mod.rs @@ -35,13 +35,18 @@ pub enum Status { Unknown, } -pub trait ReadWriteCmd { +pub trait CommandTransport { fn read(&mut self) -> Result; fn write(&mut self, buf: &[u8]) -> Result<()>; + + fn send_command(&mut self, command: &[u8]) -> Result { + self.write(command)?; + self.read() + } } /// Commands common to both UART and I2C drivers. -pub trait Driver: ReadWriteCmd { +pub trait Driver: CommandTransport { fn device_info(&mut self) -> Result; fn status(&mut self) -> Result; } diff --git a/sensor/src/ezo/driver/uart.rs b/sensor/src/ezo/driver/uart.rs index 429968d..69be95a 100644 --- a/sensor/src/ezo/driver/uart.rs +++ b/sensor/src/ezo/driver/uart.rs @@ -1,4 +1,4 @@ -use super::{DeviceInfo, DeviceType, Driver, DriverError, ReadWriteCmd, Result}; +use super::{CommandTransport, DeviceInfo, DeviceType, Driver, DriverError, Result}; use crate::serial_port::SerialPort; use crate::{ezo::driver::Status, serial_port::SerialPortConnection}; @@ -15,7 +15,7 @@ impl UartDriver { } } -impl ReadWriteCmd for UartDriver { +impl CommandTransport for UartDriver { fn read(&mut self) -> Result { self.connection .read_until_carrier() @@ -38,8 +38,7 @@ impl Driver for UartDriver { for attempt in 1..=MAX_RETRIES { // Send "i" command to get device information - self.write(b"i")?; - let response = self.read()?; + let response = self.send_command(b"i")?; // Atlas Scientific response format: ?I,RTD,1.0 // Format: ?I,, @@ -70,8 +69,7 @@ impl Driver for UartDriver { } fn status(&mut self) -> Result { - self.write(b"Status")?; - let response = self.read()?; + let response = self.send_command(b"Status")?; // Atlas Scientific status response format: ?STATUS, // Codes: P (powered off and restarted), S (software reset), B (brown out), W (watchdog), U (unknown) diff --git a/sensor/src/ezo/sensor.rs b/sensor/src/ezo/sensor.rs index 41afcc8..547c58f 100644 --- a/sensor/src/ezo/sensor.rs +++ b/sensor/src/ezo/sensor.rs @@ -1,10 +1,10 @@ use chrono::{DateTime, Utc}; use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; -use tokio::time::{interval, Duration as TokioDuration}; +use tokio::time::{interval, Duration}; use crate::ezo::driver::DriverError; -use crate::ezo::driver::{Driver, ReadWriteCmd}; +use crate::ezo::driver::{CommandTransport, Driver}; use crate::ezo::error::{Result, SensorError}; #[derive(Debug, Clone, Default)] @@ -58,14 +58,13 @@ pub trait Sensor: Send + Sync + 'static { .lock() .map_err(|err| SensorError::Driver(DriverError::Read(err.to_string())))?; - driver.write(command)?; - driver.read().map_err(Into::into) + driver.send_command(command).map_err(Into::into) } /// Spawn the main background task for this sensor. fn run(self: Arc) -> JoinHandle<()> { tokio::spawn(async move { - let mut ticker = interval(TokioDuration::from_millis(1500)); + let mut ticker = interval(Duration::from_millis(1500)); loop { ticker.tick().await; From 2693b15cc7337852846c4e4ba80e12f7d3d9b67d Mon Sep 17 00:00:00 2001 From: theredfish Date: Sat, 28 Feb 2026 01:15:37 +0100 Subject: [PATCH 07/12] Improve sensor runtime type genericity --- sensor/src/ezo/rtd.rs | 2 +- sensor/src/ezo/sensor.rs | 44 ++++++++++++++++++++++++++++------------ sensor/src/main.rs | 13 ++++++------ 3 files changed, 38 insertions(+), 21 deletions(-) diff --git a/sensor/src/ezo/rtd.rs b/sensor/src/ezo/rtd.rs index 0723593..2651460 100644 --- a/sensor/src/ezo/rtd.rs +++ b/sensor/src/ezo/rtd.rs @@ -12,7 +12,7 @@ pub struct Rtd { driver: Mutex, } -impl Sensor for Rtd { +impl EzoSensor for Rtd { type DriverType = D; fn data(&self) -> &SensorData { diff --git a/sensor/src/ezo/sensor.rs b/sensor/src/ezo/sensor.rs index 547c58f..891ff2f 100644 --- a/sensor/src/ezo/sensor.rs +++ b/sensor/src/ezo/sensor.rs @@ -32,6 +32,27 @@ pub struct SensorData { } pub trait Sensor: Send + Sync + 'static { + fn data(&self) -> &SensorData; + fn read_measurement(&self) -> Result; + + /// Spawn the main background task for this sensor. + fn run(self: Arc) -> JoinHandle<()> { + tokio::spawn(async move { + let mut ticker = interval(Duration::from_millis(1500)); + + loop { + ticker.tick().await; + + match self.read_measurement() { + Ok(value) => println!("Sensor reading: {value:.3}"), + Err(err) => eprintln!("Sensor read error: {err}"), + } + } + }) + } +} + +pub trait EzoSensor: Send + Sync + 'static { type DriverType: Driver; fn data(&self) -> &SensorData; @@ -60,20 +81,17 @@ pub trait Sensor: Send + Sync + 'static { driver.send_command(command).map_err(Into::into) } +} - /// Spawn the main background task for this sensor. - fn run(self: Arc) -> JoinHandle<()> { - tokio::spawn(async move { - let mut ticker = interval(Duration::from_millis(1500)); - - loop { - ticker.tick().await; +impl Sensor for T +where + T: EzoSensor, +{ + fn data(&self) -> &SensorData { + EzoSensor::data(self) + } - match self.read_measurement() { - Ok(value) => println!("Sensor reading: {value:.3}"), - Err(err) => eprintln!("Sensor read error: {err}"), - } - } - }) + fn read_measurement(&self) -> Result { + EzoSensor::read_measurement(self) } } diff --git a/sensor/src/main.rs b/sensor/src/main.rs index 45825a3..5a20fa6 100644 --- a/sensor/src/main.rs +++ b/sensor/src/main.rs @@ -13,19 +13,19 @@ use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio::time::{interval, Duration as TokioDuration}; -pub type SensorList = HashMap>>; +pub type SensorList = HashMap>; enum ServiceCommand { /// Add sensors in the registry (no replacement) AddSensors { - sensors: Vec<(String, Arc>)>, + sensors: Vec<(String, Arc)>, }, /// Remove sensors from the registry RemoveSensors { uuids: Vec }, /// Get a specific sensor by serial number FindSensor { serial_number: String, - respond_to: oneshot::Sender>>>, + respond_to: oneshot::Sender>>, }, /// Get all sensors (snapshot) AllSensors { @@ -155,8 +155,7 @@ impl UartService { let current_sensors = rx.await; if let Ok(current_sensors) = current_sensors { - let mut new_sensors: Vec<(String, Arc>)> = - Vec::new(); + let mut new_sensors: Vec<(String, Arc)> = Vec::new(); for port in asc_ports.iter() { if !current_sensors.contains_key(&port.serial_number) { @@ -208,7 +207,7 @@ impl UartService { /// 4. Returns it as Arc fn create_sensor_from_port( port: &serial_port::SerialPort, -) -> Result>, Box> { +) -> Result, Box> { // Create temporary driver to query device type let mut uart_driver = UartDriver::new(port)?; let device_info = uart_driver.device_info()?; @@ -222,7 +221,7 @@ fn create_sensor_from_port( match device_info.device_type { DeviceType::Rtd => { let rtd = Rtd::::from_uart(uart_driver, device_info.firmware_version); - Ok(Arc::new(rtd) as Arc>) + Ok(Arc::new(rtd) as Arc) } } } From 281d685f6fbe3c3cb44090fcbf9e7500bba7e694 Mon Sep 17 00:00:00 2001 From: theredfish Date: Sat, 28 Feb 2026 01:16:37 +0100 Subject: [PATCH 08/12] Refactor SensorData to SensorInfo --- sensor/src/ezo/rtd.rs | 8 ++++---- sensor/src/ezo/sensor.rs | 8 ++++---- sensor/src/main.rs | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sensor/src/ezo/rtd.rs b/sensor/src/ezo/rtd.rs index 2651460..fb74388 100644 --- a/sensor/src/ezo/rtd.rs +++ b/sensor/src/ezo/rtd.rs @@ -4,18 +4,18 @@ use std::sync::Mutex; use crate::ezo::sensor::*; use crate::ezo::{ driver::{uart::UartDriver, Driver}, - sensor::SensorData, + sensor::SensorInfo, }; pub struct Rtd { - data: SensorData, + data: SensorInfo, driver: Mutex, } impl EzoSensor for Rtd { type DriverType = D; - fn data(&self) -> &SensorData { + fn data(&self) -> &SensorInfo { &self.data } @@ -27,7 +27,7 @@ impl EzoSensor for Rtd { impl Rtd { pub fn new(driver: D, firmware: f64) -> Self { Self { - data: SensorData { + data: SensorInfo { firmware, name: SensorName::Unnamed, state: SensorState::Initializing, diff --git a/sensor/src/ezo/sensor.rs b/sensor/src/ezo/sensor.rs index 891ff2f..fa3d97d 100644 --- a/sensor/src/ezo/sensor.rs +++ b/sensor/src/ezo/sensor.rs @@ -24,7 +24,7 @@ pub enum SensorState { } #[derive(Debug, Clone)] -pub struct SensorData { +pub struct SensorInfo { pub firmware: f64, pub name: SensorName, pub state: SensorState, @@ -32,7 +32,7 @@ pub struct SensorData { } pub trait Sensor: Send + Sync + 'static { - fn data(&self) -> &SensorData; + fn info(&self) -> &SensorInfo; fn read_measurement(&self) -> Result; /// Spawn the main background task for this sensor. @@ -55,7 +55,7 @@ pub trait Sensor: Send + Sync + 'static { pub trait EzoSensor: Send + Sync + 'static { type DriverType: Driver; - fn data(&self) -> &SensorData; + fn data(&self) -> &SensorInfo; fn driver(&self) -> &Mutex; /// Measurement command for this sensor. @@ -87,7 +87,7 @@ impl Sensor for T where T: EzoSensor, { - fn data(&self) -> &SensorData { + fn info(&self) -> &SensorInfo { EzoSensor::data(self) } diff --git a/sensor/src/main.rs b/sensor/src/main.rs index 5a20fa6..5f818a7 100644 --- a/sensor/src/main.rs +++ b/sensor/src/main.rs @@ -163,12 +163,12 @@ impl UartService { println!( "Detector: Created sensor {}: {:#?}", port.serial_number, - sensor.as_ref().map(|s| s.data()) + sensor.as_ref().map(|s| s.info()) ); match sensor { Ok(sensor) => { - let data = sensor.data(); + let data = sensor.info(); println!( "Detector: Created sensor - firmware v{}", data.firmware From 2a0ebe2fc3b2e3244c5c1eefbf876b987f0f55bd Mon Sep 17 00:00:00 2001 From: theredfish Date: Sat, 28 Feb 2026 01:31:43 +0100 Subject: [PATCH 09/12] Separate sensor runner from ezo sensor --- sensor/src/error.rs | 41 +++++++++++++++ sensor/src/ezo/error.rs | 31 ------------ sensor/src/ezo/ezo_sensor.rs | 50 +++++++++++++++++++ sensor/src/ezo/mod.rs | 3 +- sensor/src/ezo/rtd.rs | 8 ++- sensor/src/ezo/sensor.rs | 97 ------------------------------------ sensor/src/lib.rs | 2 + sensor/src/main.rs | 4 +- sensor/src/sensor.rs | 51 +++++++++++++++++++ 9 files changed, 151 insertions(+), 136 deletions(-) create mode 100644 sensor/src/error.rs delete mode 100644 sensor/src/ezo/error.rs create mode 100644 sensor/src/ezo/ezo_sensor.rs delete mode 100644 sensor/src/ezo/sensor.rs create mode 100644 sensor/src/sensor.rs diff --git a/sensor/src/error.rs b/sensor/src/error.rs new file mode 100644 index 0000000..2116529 --- /dev/null +++ b/sensor/src/error.rs @@ -0,0 +1,41 @@ +use std::error::Error; +use std::fmt; + +#[derive(Debug)] +pub enum SensorError { + Message(String), + Source(Box), +} + +impl SensorError { + pub fn message(msg: impl Into) -> Self { + SensorError::Message(msg.into()) + } + + pub fn source(err: E) -> Self + where + E: Error + Send + Sync + 'static, + { + SensorError::Source(Box::new(err)) + } +} + +impl fmt::Display for SensorError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SensorError::Message(msg) => write!(f, "{msg}"), + SensorError::Source(err) => write!(f, "{err}"), + } + } +} + +impl Error for SensorError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + SensorError::Message(_) => None, + SensorError::Source(err) => Some(err.as_ref()), + } + } +} + +pub type Result = std::result::Result; diff --git a/sensor/src/ezo/error.rs b/sensor/src/ezo/error.rs deleted file mode 100644 index 1f5a012..0000000 --- a/sensor/src/ezo/error.rs +++ /dev/null @@ -1,31 +0,0 @@ -use crate::ezo::driver::DriverError; -use std::fmt; - -#[derive(Debug)] -pub enum SensorError { - Driver(DriverError), -} - -impl fmt::Display for SensorError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SensorError::Driver(err) => write!(f, "Driver error: {}", err), - } - } -} - -impl std::error::Error for SensorError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - SensorError::Driver(err) => Some(err), - } - } -} - -impl From for SensorError { - fn from(err: DriverError) -> Self { - SensorError::Driver(err) - } -} - -pub type Result = std::result::Result; diff --git a/sensor/src/ezo/ezo_sensor.rs b/sensor/src/ezo/ezo_sensor.rs new file mode 100644 index 0000000..a1155a4 --- /dev/null +++ b/sensor/src/ezo/ezo_sensor.rs @@ -0,0 +1,50 @@ +use std::sync::Mutex; + +use crate::error::{Result, SensorError}; +use crate::ezo::driver::DriverError; +use crate::ezo::driver::{CommandTransport, Driver}; +use crate::sensor::{Sensor, SensorInfo}; + +pub trait EzoSensor: Send + Sync + 'static { + type DriverType: Driver; + + fn data(&self) -> &SensorInfo; + fn driver(&self) -> &Mutex; + + /// Measurement command for this sensor. + fn measurement_command(&self) -> &'static [u8] { + b"R" + } + + /// EZO measurement command (`R`) parsed as `f64`. + fn read_measurement(&self) -> Result { + let response = self.read_command_response(self.measurement_command())?; + + response + .trim() + .parse::() + .map_err(|err| SensorError::source(DriverError::Read(err.to_string()))) + } + + fn read_command_response(&self, command: &[u8]) -> Result { + let mut driver = self + .driver() + .lock() + .map_err(|err| SensorError::source(DriverError::Read(err.to_string())))?; + + driver.send_command(command).map_err(SensorError::source) + } +} + +impl Sensor for T +where + T: EzoSensor, +{ + fn info(&self) -> &SensorInfo { + EzoSensor::data(self) + } + + fn read_measurement(&self) -> Result { + EzoSensor::read_measurement(self) + } +} diff --git a/sensor/src/ezo/mod.rs b/sensor/src/ezo/mod.rs index 95c7de3..a1eee58 100644 --- a/sensor/src/ezo/mod.rs +++ b/sensor/src/ezo/mod.rs @@ -1,5 +1,4 @@ pub mod driver; -pub mod error; +pub mod ezo_sensor; pub mod ph; pub mod rtd; -pub mod sensor; diff --git a/sensor/src/ezo/rtd.rs b/sensor/src/ezo/rtd.rs index fb74388..789a050 100644 --- a/sensor/src/ezo/rtd.rs +++ b/sensor/src/ezo/rtd.rs @@ -1,11 +1,9 @@ use chrono::Utc; use std::sync::Mutex; -use crate::ezo::sensor::*; -use crate::ezo::{ - driver::{uart::UartDriver, Driver}, - sensor::SensorInfo, -}; +use crate::ezo::driver::{uart::UartDriver, Driver}; +use crate::ezo::ezo_sensor::EzoSensor; +use crate::sensor::{SensorInfo, SensorName, SensorState}; pub struct Rtd { data: SensorInfo, diff --git a/sensor/src/ezo/sensor.rs b/sensor/src/ezo/sensor.rs deleted file mode 100644 index fa3d97d..0000000 --- a/sensor/src/ezo/sensor.rs +++ /dev/null @@ -1,97 +0,0 @@ -use chrono::{DateTime, Utc}; -use std::sync::{Arc, Mutex}; -use tokio::task::JoinHandle; -use tokio::time::{interval, Duration}; - -use crate::ezo::driver::DriverError; -use crate::ezo::driver::{CommandTransport, Driver}; -use crate::ezo::error::{Result, SensorError}; - -#[derive(Debug, Clone, Default)] -pub enum SensorName { - #[default] - Unnamed, - Named(String), -} - -#[derive(Default, Debug, Clone, Copy)] -pub enum SensorState { - Active, - Degraded, - #[default] - Initializing, - Unreachable, -} - -#[derive(Debug, Clone)] -pub struct SensorInfo { - pub firmware: f64, - pub name: SensorName, - pub state: SensorState, - pub last_activity: DateTime, -} - -pub trait Sensor: Send + Sync + 'static { - fn info(&self) -> &SensorInfo; - fn read_measurement(&self) -> Result; - - /// Spawn the main background task for this sensor. - fn run(self: Arc) -> JoinHandle<()> { - tokio::spawn(async move { - let mut ticker = interval(Duration::from_millis(1500)); - - loop { - ticker.tick().await; - - match self.read_measurement() { - Ok(value) => println!("Sensor reading: {value:.3}"), - Err(err) => eprintln!("Sensor read error: {err}"), - } - } - }) - } -} - -pub trait EzoSensor: Send + Sync + 'static { - type DriverType: Driver; - - fn data(&self) -> &SensorInfo; - fn driver(&self) -> &Mutex; - - /// Measurement command for this sensor. - fn measurement_command(&self) -> &'static [u8] { - b"R" - } - - /// EZO measurement command (`R`) parsed as `f64`. - fn read_measurement(&self) -> Result { - let response = self.read_command_response(self.measurement_command())?; - - response - .trim() - .parse::() - .map_err(|err| SensorError::Driver(DriverError::Read(err.to_string()))) - } - - fn read_command_response(&self, command: &[u8]) -> Result { - let mut driver = self - .driver() - .lock() - .map_err(|err| SensorError::Driver(DriverError::Read(err.to_string())))?; - - driver.send_command(command).map_err(Into::into) - } -} - -impl Sensor for T -where - T: EzoSensor, -{ - fn info(&self) -> &SensorInfo { - EzoSensor::data(self) - } - - fn read_measurement(&self) -> Result { - EzoSensor::read_measurement(self) - } -} diff --git a/sensor/src/lib.rs b/sensor/src/lib.rs index c42ec7c..e0b5436 100644 --- a/sensor/src/lib.rs +++ b/sensor/src/lib.rs @@ -1,3 +1,5 @@ pub mod commands; +pub mod error; mod ezo; +pub mod sensor; pub mod serial_port; diff --git a/sensor/src/main.rs b/sensor/src/main.rs index 5f818a7..74b9149 100644 --- a/sensor/src/main.rs +++ b/sensor/src/main.rs @@ -1,11 +1,13 @@ mod commands; +mod error; mod ezo; +mod sensor; mod serial_port; use ezo::driver::uart::UartDriver; use ezo::driver::{DeviceType, Driver}; use ezo::rtd::Rtd; -use ezo::sensor::Sensor; +use sensor::Sensor; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::Sender; diff --git a/sensor/src/sensor.rs b/sensor/src/sensor.rs new file mode 100644 index 0000000..7fc3dce --- /dev/null +++ b/sensor/src/sensor.rs @@ -0,0 +1,51 @@ +use chrono::{DateTime, Utc}; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tokio::time::{interval, Duration}; + +use crate::error::Result; + +#[derive(Debug, Clone, Default)] +pub enum SensorName { + #[default] + Unnamed, + Named(String), +} + +#[derive(Default, Debug, Clone, Copy)] +pub enum SensorState { + Active, + Degraded, + #[default] + Initializing, + Unreachable, +} + +#[derive(Debug, Clone)] +pub struct SensorInfo { + pub firmware: f64, + pub name: SensorName, + pub state: SensorState, + pub last_activity: DateTime, +} + +pub trait Sensor: Send + Sync + 'static { + fn info(&self) -> &SensorInfo; + fn read_measurement(&self) -> Result; + + /// Spawn the main background task for this sensor. + fn run(self: Arc) -> JoinHandle<()> { + tokio::spawn(async move { + let mut ticker = interval(Duration::from_millis(1500)); + + loop { + ticker.tick().await; + + match self.read_measurement() { + Ok(value) => println!("Sensor reading: {value:.3}"), + Err(err) => eprintln!("Sensor read error: {err}"), + } + } + }) + } +} From 4dab6647c9934fe699aa594a61bfdaf9b26b5852 Mon Sep 17 00:00:00 2001 From: theredfish Date: Sat, 28 Feb 2026 01:40:52 +0100 Subject: [PATCH 10/12] Clean sensor crate --- sensor/src/commands/mod.rs | 4 - sensor/src/commands/uart.rs | 199 --------------------------------- sensor/src/ezo/driver/error.rs | 1 + sensor/src/ezo/driver/i2c.rs | 1 + sensor/src/ezo/driver/mod.rs | 2 + sensor/src/ezo/driver/uart.rs | 2 +- sensor/src/ezo/mod.rs | 1 - sensor/src/ezo/ph.rs | 0 sensor/src/lib.rs | 3 +- sensor/src/main.rs | 10 +- sensor/src/serial_port.rs | 20 +--- 11 files changed, 17 insertions(+), 226 deletions(-) delete mode 100644 sensor/src/commands/mod.rs delete mode 100644 sensor/src/commands/uart.rs delete mode 100644 sensor/src/ezo/ph.rs diff --git a/sensor/src/commands/mod.rs b/sensor/src/commands/mod.rs deleted file mode 100644 index 511432a..0000000 --- a/sensor/src/commands/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod uart; - -// Re-export uart module items if needed by other modules -// pub use uart::{CalibrationStatus, DeviceInfo, StatusCode, UartCommand}; diff --git a/sensor/src/commands/uart.rs b/sensor/src/commands/uart.rs deleted file mode 100644 index a057e91..0000000 --- a/sensor/src/commands/uart.rs +++ /dev/null @@ -1,199 +0,0 @@ -//! UART commands for Atlas Scientific sensors -//! -//! # Example Usage -//! -//! ```no_run -//! use arksync_sensor::commands::UartCommand; -//! use arksync_sensor::serial_port::SerialPort; -//! -//! # fn main() -> Result<(), Box> { -//! // Create port metadata -//! let port = SerialPort { -//! port_name: "/dev/ttyUSB0".to_string(), -//! serial_number: "DP065KS3".to_string(), -//! }; -//! -//! // Connect to the sensor -//! let mut uart = UartCommand::connect(port)?; -//! -//! // Get device information -//! let info = uart.device_info()?; -//! println!("Device: {} v{}", info.device_type, info.firmware_version); -//! -//! // Read temperature -//! let temp = uart.read_temperature()?; -//! println!("Temperature: {:.2}°C", temp); -//! -//! // Check status -//! let status = uart.check_status()?; -//! println!("Status: {:?}", status); -//! -//! // Get calibration info -//! let cal = uart.get_calibration()?; -//! println!("Calibration points: {}", cal.calibration_points); -//! # Ok(()) -//! # } -//! ``` - -use crate::serial_port::{SerialPort, SerialPortConnection}; -use std::io; - -#[derive(Debug)] -pub struct UartCommand { - connection: SerialPortConnection, -} - -impl UartCommand { - /// Connect to a sensor via UART - pub fn connect(port: SerialPort) -> Result { - let connection = SerialPortConnection::open(&port)?; - Ok(Self { connection }) - } - - /// Create a UartCommand from an already-open connection - pub fn connect_with_connection( - connection: SerialPortConnection, - ) -> Result { - Ok(Self { connection }) - } - - /// Get device information (firmware version, device type) - /// - /// Retries up to 3 times if we get unexpected data (like temperature readings) - pub fn device_info(&mut self) -> io::Result { - const MAX_RETRIES: usize = 3; - - for attempt in 1..=MAX_RETRIES { - // Send "i" command to get device information - let response = self.connection.send_command("i")?; - - // Atlas Scientific response format: ?I,RTD,1.0 - // Format: ?I,, - if response.starts_with("?I,") { - let parts: Vec<&str> = response.trim_start_matches("?I,").split(',').collect(); - - if parts.len() >= 2 { - return Ok(DeviceInfo { - device_type: parts[0].to_string(), - firmware_version: parts[1].parse().unwrap_or(0.0), - }); - } - } - - // Got unexpected response (possibly temperature reading or stale data) - eprintln!( - "Attempt {}/{}: Unexpected response to 'i' command: '{}' - retrying...", - attempt, MAX_RETRIES, response - ); - - // Small delay before retry - std::thread::sleep(std::time::Duration::from_millis(100)); - } - - Err(io::Error::new( - io::ErrorKind::InvalidData, - "Failed to get valid device info after multiple attempts", - )) - } - - /// Get current temperature reading - /// - /// Sends "R" command and waits for response (terminated by \r) - /// Sensor responds when measurement is complete (~1 second per reading max) - pub fn read_temperature(&mut self) -> io::Result { - let response = self.connection.send_command("R")?; - - response - .trim() - .parse() - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) - } - - /// Check if sensor is responsive (return status code) - pub fn check_status(&mut self) -> io::Result { - let response = self.connection.send_command("Status")?; - - // Atlas Scientific status response format: ?STATUS, - // Codes: P (powered off and restarted), S (software reset), B (brown out), W (watchdog), U (unknown) - if response.starts_with("?STATUS,") { - let code = response.chars().nth(8).unwrap_or('U'); - Ok(match code { - 'P' => StatusCode::PoweredOn, - 'S' => StatusCode::SoftwareReset, - 'B' => StatusCode::BrownOut, - 'W' => StatusCode::Watchdog, - _ => StatusCode::Unknown, - }) - } else { - Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("Invalid status response: {}", response), - )) - } - } - - /// Set sensor to sleep mode (low power) - pub fn sleep(&mut self) -> io::Result<()> { - self.connection.send_command("Sleep")?; - Ok(()) - } - - /// Get sensor calibration status - pub fn get_calibration(&mut self) -> io::Result { - let response = self.connection.send_command("Cal,?")?; - - // Response format: ?Cal, - if response.starts_with("?Cal,") { - let points: u8 = response - .trim_start_matches("?Cal,") - .trim() - .parse() - .unwrap_or(0); - - Ok(CalibrationStatus { - calibration_points: points, - }) - } else { - Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("Invalid calibration response: {}", response), - )) - } - } - - /// Send raw command (for testing/debugging) - pub fn send_raw(&mut self, command: &str) -> io::Result { - self.connection.send_command(command) - } -} - -#[derive(Debug, Clone)] -pub struct DeviceInfo { - pub device_type: String, - pub firmware_version: f64, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum StatusCode { - PoweredOn, - SoftwareReset, - BrownOut, - Watchdog, - Unknown, -} - -#[derive(Debug, Clone)] -pub struct CalibrationStatus { - pub calibration_points: u8, -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_device_info_parsing() { - // This would require a mock connection for proper testing - // For now, this is a placeholder for future integration tests - } -} diff --git a/sensor/src/ezo/driver/error.rs b/sensor/src/ezo/driver/error.rs index 7c73fcf..adb8648 100644 --- a/sensor/src/ezo/driver/error.rs +++ b/sensor/src/ezo/driver/error.rs @@ -1,5 +1,6 @@ use std::fmt; +#[allow(unused)] #[derive(Debug)] pub enum DriverError { Connection(String), diff --git a/sensor/src/ezo/driver/i2c.rs b/sensor/src/ezo/driver/i2c.rs index 75fd9cd..99978fa 100644 --- a/sensor/src/ezo/driver/i2c.rs +++ b/sensor/src/ezo/driver/i2c.rs @@ -1,3 +1,4 @@ +#[allow(unused)] pub struct I2cDriver { address: u8, // Later replaced by I2c bus or similar from hal diff --git a/sensor/src/ezo/driver/mod.rs b/sensor/src/ezo/driver/mod.rs index 6085b26..7832960 100644 --- a/sensor/src/ezo/driver/mod.rs +++ b/sensor/src/ezo/driver/mod.rs @@ -26,6 +26,7 @@ pub struct DeviceInfo { pub firmware_version: f64, } +#[allow(unused)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Status { PoweredOn, @@ -48,5 +49,6 @@ pub trait CommandTransport { /// Commands common to both UART and I2C drivers. pub trait Driver: CommandTransport { fn device_info(&mut self) -> Result; + #[allow(unused)] fn status(&mut self) -> Result; } diff --git a/sensor/src/ezo/driver/uart.rs b/sensor/src/ezo/driver/uart.rs index 69be95a..2586c33 100644 --- a/sensor/src/ezo/driver/uart.rs +++ b/sensor/src/ezo/driver/uart.rs @@ -8,7 +8,7 @@ pub struct UartDriver { impl UartDriver { pub fn new(serial_port: &SerialPort) -> Result { - let connection = SerialPortConnection::open(&serial_port) + let connection = SerialPortConnection::open(serial_port) .map_err(|err| DriverError::Connection(err.to_string()))?; Ok(UartDriver { connection }) diff --git a/sensor/src/ezo/mod.rs b/sensor/src/ezo/mod.rs index a1eee58..80f8819 100644 --- a/sensor/src/ezo/mod.rs +++ b/sensor/src/ezo/mod.rs @@ -1,4 +1,3 @@ pub mod driver; pub mod ezo_sensor; -pub mod ph; pub mod rtd; diff --git a/sensor/src/ezo/ph.rs b/sensor/src/ezo/ph.rs deleted file mode 100644 index e69de29..0000000 diff --git a/sensor/src/lib.rs b/sensor/src/lib.rs index e0b5436..fe9bb89 100644 --- a/sensor/src/lib.rs +++ b/sensor/src/lib.rs @@ -1,5 +1,4 @@ -pub mod commands; pub mod error; -mod ezo; +pub mod ezo; pub mod sensor; pub mod serial_port; diff --git a/sensor/src/main.rs b/sensor/src/main.rs index 74b9149..bb098dd 100644 --- a/sensor/src/main.rs +++ b/sensor/src/main.rs @@ -1,4 +1,3 @@ -mod commands; mod error; mod ezo; mod sensor; @@ -17,6 +16,7 @@ use tokio::time::{interval, Duration as TokioDuration}; pub type SensorList = HashMap>; +#[allow(unused)] enum ServiceCommand { /// Add sensors in the registry (no replacement) AddSensors { @@ -47,6 +47,12 @@ pub struct UartService { cmd_channel: CommandChannel, } +impl Default for UartService { + fn default() -> Self { + Self::new() + } +} + impl UartService { pub fn new() -> Self { let (tx, rx) = mpsc::channel(100); @@ -161,7 +167,7 @@ impl UartService { for port in asc_ports.iter() { if !current_sensors.contains_key(&port.serial_number) { - let sensor = create_sensor_from_port(&port); + let sensor = create_sensor_from_port(port); println!( "Detector: Created sensor {}: {:#?}", port.serial_number, diff --git a/sensor/src/serial_port.rs b/sensor/src/serial_port.rs index f6a5dc2..5255963 100644 --- a/sensor/src/serial_port.rs +++ b/sensor/src/serial_port.rs @@ -1,5 +1,5 @@ use serialport::{SerialPortInfo, SerialPortType}; -use std::io::{self, Read, Write}; +use std::io::{Read, Write}; use std::time::Duration; // Atlas Scientific RTD Sensor Configuration @@ -89,21 +89,7 @@ impl SerialPortConnection { pub fn flush_input(&mut self) -> std::io::Result<()> { self.port .clear(serialport::ClearBuffer::Input) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - } - - /// Send a command and read the response - /// - /// Flushes input buffer first to clear stale data, then sends command and waits for response. - /// The serialport timeout (1000ms) acts as a safety net if the sensor doesn't respond. - /// - /// # Arguments - /// * `command` - The command string to send - /// - /// TODO: deprecate this in favor of uart/i2c driver impl - pub fn send_command(&mut self, command: &str) -> io::Result { - self.write_command(command.as_bytes())?; - self.read_until_carrier() + .map_err(std::io::Error::other) } } @@ -117,7 +103,7 @@ impl std::fmt::Debug for SerialPortConnection { pub fn find_asc_port() -> Vec { serialport::available_ports() - .unwrap_or(Vec::new()) + .unwrap_or_default() .into_iter() .filter(filter_asc_device) .filter_map(filter_map_usb_serial) From 3a637a5873a6f9a68b5caef248bcc881f285f705 Mon Sep 17 00:00:00 2001 From: theredfish Date: Thu, 5 Mar 2026 00:40:42 +0100 Subject: [PATCH 11/12] Detect unplugged sensors --- sensor/src/ezo/driver/i2c.rs | 6 +-- sensor/src/ezo/driver/mod.rs | 3 ++ sensor/src/ezo/driver/uart.rs | 15 ++++++-- sensor/src/ezo/ezo_sensor.rs | 16 +++----- sensor/src/ezo/rtd.rs | 1 + sensor/src/i2c_bus.rs | 5 +++ sensor/src/lib.rs | 1 + sensor/src/main.rs | 72 +++++++++++++++++++++++++++++++---- sensor/src/sensor.rs | 25 ++++++++++-- sensor/src/serial_port.rs | 32 +++++++++------- 10 files changed, 135 insertions(+), 41 deletions(-) create mode 100644 sensor/src/i2c_bus.rs diff --git a/sensor/src/ezo/driver/i2c.rs b/sensor/src/ezo/driver/i2c.rs index 99978fa..3ba845c 100644 --- a/sensor/src/ezo/driver/i2c.rs +++ b/sensor/src/ezo/driver/i2c.rs @@ -1,6 +1,6 @@ +use crate::i2c_bus::I2cConnection; + #[allow(unused)] pub struct I2cDriver { - address: u8, - // Later replaced by I2c bus or similar from hal - bus: String, + pub connection: I2cConnection, } diff --git a/sensor/src/ezo/driver/mod.rs b/sensor/src/ezo/driver/mod.rs index 7832960..4dc80c5 100644 --- a/sensor/src/ezo/driver/mod.rs +++ b/sensor/src/ezo/driver/mod.rs @@ -2,6 +2,8 @@ mod error; pub mod i2c; pub mod uart; +use crate::sensor::SensorConnection; + pub use self::error::*; #[derive(Debug, Clone, Copy)] @@ -48,6 +50,7 @@ pub trait CommandTransport { /// Commands common to both UART and I2C drivers. pub trait Driver: CommandTransport { + fn connection_info(&self) -> SensorConnection; fn device_info(&mut self) -> Result; #[allow(unused)] fn status(&mut self) -> Result; diff --git a/sensor/src/ezo/driver/uart.rs b/sensor/src/ezo/driver/uart.rs index 2586c33..8e21696 100644 --- a/sensor/src/ezo/driver/uart.rs +++ b/sensor/src/ezo/driver/uart.rs @@ -1,13 +1,16 @@ use super::{CommandTransport, DeviceInfo, DeviceType, Driver, DriverError, Result}; -use crate::serial_port::SerialPort; -use crate::{ezo::driver::Status, serial_port::SerialPortConnection}; +use crate::{ + ezo::driver::Status, + sensor::SensorConnection, + serial_port::{SerialPortConnection, SerialPortMetadata}, +}; pub struct UartDriver { - connection: SerialPortConnection, + pub connection: SerialPortConnection, } impl UartDriver { - pub fn new(serial_port: &SerialPort) -> Result { + pub fn new(serial_port: &SerialPortMetadata) -> Result { let connection = SerialPortConnection::open(serial_port) .map_err(|err| DriverError::Connection(err.to_string()))?; @@ -30,6 +33,10 @@ impl CommandTransport for UartDriver { } impl Driver for UartDriver { + fn connection_info(&self) -> SensorConnection { + SensorConnection::Uart(self.connection.metadata.clone()) + } + /// Get device information (firmware version, device type) /// /// Retries up to 3 times if we get unexpected data (like temperature readings) diff --git a/sensor/src/ezo/ezo_sensor.rs b/sensor/src/ezo/ezo_sensor.rs index a1155a4..89a367f 100644 --- a/sensor/src/ezo/ezo_sensor.rs +++ b/sensor/src/ezo/ezo_sensor.rs @@ -18,21 +18,17 @@ pub trait EzoSensor: Send + Sync + 'static { /// EZO measurement command (`R`) parsed as `f64`. fn read_measurement(&self) -> Result { - let response = self.read_command_response(self.measurement_command())?; - - response - .trim() - .parse::() - .map_err(|err| SensorError::source(DriverError::Read(err.to_string()))) - } - - fn read_command_response(&self, command: &[u8]) -> Result { let mut driver = self .driver() .lock() .map_err(|err| SensorError::source(DriverError::Read(err.to_string())))?; - driver.send_command(command).map_err(SensorError::source) + driver + .send_command(self.measurement_command()) + .map_err(SensorError::source)? + .trim() + .parse::() + .map_err(|err| SensorError::source(DriverError::Read(err.to_string()))) } } diff --git a/sensor/src/ezo/rtd.rs b/sensor/src/ezo/rtd.rs index 789a050..1a3f45a 100644 --- a/sensor/src/ezo/rtd.rs +++ b/sensor/src/ezo/rtd.rs @@ -30,6 +30,7 @@ impl Rtd { name: SensorName::Unnamed, state: SensorState::Initializing, last_activity: Utc::now(), + connection: driver.connection_info(), }, driver: Mutex::new(driver), } diff --git a/sensor/src/i2c_bus.rs b/sensor/src/i2c_bus.rs new file mode 100644 index 0000000..80071c4 --- /dev/null +++ b/sensor/src/i2c_bus.rs @@ -0,0 +1,5 @@ +#[derive(Debug)] +/// Active i2c connection for communication +pub struct I2cConnection { + pub address: u8, +} diff --git a/sensor/src/lib.rs b/sensor/src/lib.rs index fe9bb89..39c7708 100644 --- a/sensor/src/lib.rs +++ b/sensor/src/lib.rs @@ -1,4 +1,5 @@ pub mod error; pub mod ezo; +pub mod i2c_bus; pub mod sensor; pub mod serial_port; diff --git a/sensor/src/main.rs b/sensor/src/main.rs index bb098dd..0d22451 100644 --- a/sensor/src/main.rs +++ b/sensor/src/main.rs @@ -1,5 +1,6 @@ mod error; mod ezo; +mod i2c_bus; mod sensor; mod serial_port; @@ -7,13 +8,17 @@ use ezo::driver::uart::UartDriver; use ezo::driver::{DeviceType, Driver}; use ezo::rtd::Rtd; use sensor::Sensor; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio::time::{interval, Duration as TokioDuration}; +use crate::sensor::SensorConnection; +use crate::serial_port::SerialPortMetadata; + +/// A sensor list compatible with both UART and I2C protocols. pub type SensorList = HashMap>; #[allow(unused)] @@ -68,8 +73,9 @@ impl UartService { pub async fn run(mut self) { let cmd_tx = self.cmd_channel.tx.clone(); - // Spawn detector task (detects new sensors and adds them to registry) - self.detect_sensors(cmd_tx.clone()); + // Spawn tasks to detect (un)plugged sensors and update the registry + self.detect_plugged_sensors(cmd_tx.clone()); + self.detect_unplugged_sensors(cmd_tx.clone()); println!("UartService started - maintaining sensor registry"); @@ -142,10 +148,12 @@ impl UartService { } } - /// Detector task - finds new USB sensors and adds them to registry - fn detect_sensors(&self, cmd_tx: Sender) { + /// Listen for plugged sensors. + /// + /// Finds new USB sensors and adds them to registry. + fn detect_plugged_sensors(&self, cmd_tx: Sender) { tokio::spawn(async move { - let mut interval = interval(TokioDuration::from_secs(5)); + let mut interval = interval(TokioDuration::from_secs(2)); loop { interval.tick().await; @@ -204,6 +212,56 @@ impl UartService { } }); } + + /// Listen for unplugged sensors. + /// + /// Compare the list of sensors'connection with OS connections for both + /// UART and I2C and remove stale sensor from the list. + fn detect_unplugged_sensors(&self, cmd_tx: Sender) { + tokio::spawn(async move { + let mut interval = interval(TokioDuration::from_secs(2)); + + loop { + interval.tick().await; + + let available_asc_ports = serial_port::find_asc_port(); + let available_port_serials: HashSet<_> = available_asc_ports + .iter() + .map(|port| &port.serial_number) + .collect(); + + // Get current sensor list + let (respond_to, rx) = oneshot::channel(); + let _ = cmd_tx.send(ServiceCommand::AllSensors { respond_to }).await; + let current_sensors = rx.await; + + if let Ok(sensors) = current_sensors { + for sensor in sensors.values() { + let connection_info = &sensor.info().connection; + + match connection_info { + SensorConnection::Uart(port_metadata) => { + if !available_port_serials.contains(&port_metadata.serial_number) { + println!( + "Detector: Sensor {} is unplugged, removing from registry", + port_metadata.serial_number + ); + let _ = cmd_tx + .send(ServiceCommand::RemoveSensors { + uuids: vec![port_metadata.serial_number.clone()], + }) + .await; + } + } + SensorConnection::I2c(_) => { + unimplemented!("No I2C sensor handling yet"); + } + } + } + } + } + }); + } } /// Factory function to create a sensor from a serial port @@ -214,7 +272,7 @@ impl UartService { /// 3. Creates the appropriate sensor with the correct driver /// 4. Returns it as Arc fn create_sensor_from_port( - port: &serial_port::SerialPort, + port: &SerialPortMetadata, ) -> Result, Box> { // Create temporary driver to query device type let mut uart_driver = UartDriver::new(port)?; diff --git a/sensor/src/sensor.rs b/sensor/src/sensor.rs index 7fc3dce..86ed72a 100644 --- a/sensor/src/sensor.rs +++ b/sensor/src/sensor.rs @@ -4,6 +4,8 @@ use tokio::task::JoinHandle; use tokio::time::{interval, Duration}; use crate::error::Result; +use crate::i2c_bus::I2cConnection; +use crate::serial_port::SerialPortMetadata; #[derive(Debug, Clone, Default)] pub enum SensorName { @@ -21,12 +23,19 @@ pub enum SensorState { Unreachable, } -#[derive(Debug, Clone)] +#[derive(Debug)] +pub enum SensorConnection { + Uart(SerialPortMetadata), + I2c(I2cConnection), +} + +#[derive(Debug)] pub struct SensorInfo { pub firmware: f64, pub name: SensorName, pub state: SensorState, pub last_activity: DateTime, + pub connection: SensorConnection, } pub trait Sensor: Send + Sync + 'static { @@ -36,14 +45,22 @@ pub trait Sensor: Send + Sync + 'static { /// Spawn the main background task for this sensor. fn run(self: Arc) -> JoinHandle<()> { tokio::spawn(async move { - let mut ticker = interval(Duration::from_millis(1500)); + // This is based on Atlas Scientific read time, plus some time to not + // be at the edge of the value disponibility + let mut ticker = interval(Duration::from_millis(1200)); loop { ticker.tick().await; + // TODO: Retry with backoff strategy: we allow some I/O error but after a specific threshold we start to update + // the state of the sensor to Degraded then Unresponsive. match self.read_measurement() { - Ok(value) => println!("Sensor reading: {value:.3}"), - Err(err) => eprintln!("Sensor read error: {err}"), + Ok(value) => { + println!("Sensor reading: {value:.3}"); + // TODO: If it's successful then we update the last activity so the supervisor healthcheck get an + // up-to-date information. As long as we can read, then we are active. + } + Err(err) => eprintln!("Sensor read error: {err:#?}"), } } }) diff --git a/sensor/src/serial_port.rs b/sensor/src/serial_port.rs index 5255963..0cf9891 100644 --- a/sensor/src/serial_port.rs +++ b/sensor/src/serial_port.rs @@ -9,7 +9,7 @@ pub const SERIAL_PORT_CONN_TIMEOUT: u64 = 1000; // Timeout acts as safety net fo /// Metadata about a serial port (no active connection) #[derive(Debug, Clone)] -pub struct SerialPort { +pub struct SerialPortMetadata { pub port_name: String, pub serial_number: String, pub baud_rate: u32, @@ -18,6 +18,7 @@ pub struct SerialPort { /// Active serial port connection for communication pub struct SerialPortConnection { pub port: Box, + pub metadata: SerialPortMetadata, } impl SerialPortConnection { @@ -28,12 +29,12 @@ impl SerialPortConnection { /// - Terminator: Carriage return (\r) /// - Decimal places: 3 /// - Temperature unit: Celsius (default) - pub fn open(serial_port: &SerialPort) -> Result { - let SerialPort { + pub fn open(serial_port_metadata: &SerialPortMetadata) -> Result { + let SerialPortMetadata { port_name, baud_rate, .. - } = serial_port; + } = serial_port_metadata; let port = serialport::new(port_name, *baud_rate) .timeout(Duration::from_millis(SERIAL_PORT_CONN_TIMEOUT)) @@ -43,7 +44,10 @@ impl SerialPortConnection { // Atlas Scientific sensors might have leftover readings or responses let _ = port.clear(serialport::ClearBuffer::Input); - Ok(Self { port }) + Ok(Self { + port, + metadata: serial_port_metadata.clone(), + }) } /// Write a command to the sensor @@ -101,13 +105,13 @@ impl std::fmt::Debug for SerialPortConnection { } } -pub fn find_asc_port() -> Vec { +pub fn find_asc_port() -> Vec { serialport::available_ports() .unwrap_or_default() .into_iter() .filter(filter_asc_device) .filter_map(filter_map_usb_serial) - .collect::>() + .collect::>() } /// Checks if a port is an Atlas Scientific device @@ -123,7 +127,7 @@ fn filter_asc_device(port: &SerialPortInfo) -> bool { } } -fn filter_map_usb_serial(port: SerialPortInfo) -> Option { +fn filter_map_usb_serial(port: SerialPortInfo) -> Option { let SerialPortInfo { port_name, port_type, @@ -133,9 +137,11 @@ fn filter_map_usb_serial(port: SerialPortInfo) -> Option { return None; }; - usb_port.serial_number.map(|serial_number| SerialPort { - port_name, - serial_number, - baud_rate: DEFAULT_BAUD_RATE, - }) + usb_port + .serial_number + .map(|serial_number| SerialPortMetadata { + port_name, + serial_number, + baud_rate: DEFAULT_BAUD_RATE, + }) } From ff5e0f063c758e2ae19588c6c3f58fbc99dcf9e5 Mon Sep 17 00:00:00 2001 From: theredfish Date: Thu, 5 Mar 2026 01:58:08 +0100 Subject: [PATCH 12/12] Add libudev-dev sys dep for serialport --- .github/workflows/ci.yml | 6 +++--- .github/workflows/publish.yml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 60e24ff..1c432ee 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,7 +26,7 @@ jobs: - uses: actions/checkout@v5 - uses: awalsh128/cache-apt-pkgs-action@latest with: - packages: libwebkit2gtk-4.1-dev + packages: libwebkit2gtk-4.1-dev libudev-dev version: 1.0 - uses: actions-rust-lang/setup-rust-toolchain@v1 with: @@ -42,9 +42,9 @@ jobs: - uses: actions/checkout@v5 - uses: awalsh128/cache-apt-pkgs-action@latest with: - packages: libwebkit2gtk-4.1-dev + packages: libwebkit2gtk-4.1-dev libudev-dev version: 1.0 - uses: taiki-e/install-action@v2 with: tool: nextest - - run: cargo nextest run --workspace --all-features --profile ci \ No newline at end of file + - run: cargo nextest run --workspace --all-features --profile ci diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 563a632..0a6f18b 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -27,7 +27,7 @@ jobs: if: contains(matrix.platform, 'ubuntu') run: | sudo apt-get update - sudo apt-get install -y libwebkit2gtk-4.1-dev libappindicator3-dev librsvg2-dev patchelf xdg-utils + sudo apt-get install -y libwebkit2gtk-4.1-dev libappindicator3-dev librsvg2-dev patchelf xdg-utils libudev-dev - name: Install Rust Nightly uses: dtolnay/rust-toolchain@nightly