Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
443 changes: 433 additions & 10 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ chrono = "0.4"
serde_json = "1.0"
anyhow = "1.0.102"
dashmap = "6.1.0"
toml = "1.1.2"
serde = { version = "1.0.228", features = ["derive"] }
config = "0.15.22"

[dev-dependencies]
diesel_migrations = "2.3"
ntest = "0.9.5"
testcontainers = { version = "0.27", features = ["blocking"] }
187 changes: 187 additions & 0 deletions sequences/_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
{
"$schema": "http://json-schema.org/draft-07/schema",
"$defs": {
"setParam": {
"type": "object",
"additionalProperties": false,
"required": [
"timestamp",
"param",
"value"
],
"properties": {
"timestamp": {
"type": "number"
},
"param": {
"type": "string"
},
"value": {
"type": "number"
}
}
},
"holdcondition": {
"type": "object",
"additionalProperties": false,
"required": [
"field",
"is",
"value"
],
"properties": {
"field": {
"type": "string"
},
"is": {
"enum": [
"equal",
"not_eq",
"less",
"less_eq",
"greater",
"greater_eq"
]
},
"value": {
"type": "number"
}
}
}
},
"title": "FerroFlow-Sequence",
"type": "object",
"additionalProperties": false,
"required": [
"name",
"globals",
"steps"
],
"properties": {
"$schema": {
"type": "string"
},
"name": {
"type": "string"
},
"globals": {
"type": "object",
"additionalProperties": false,
"required": [
"start_time",
"end_time",
"interpolations",
"interpolation_interval"
],
"properties": {
"start_time": {
"type": "number"
},
"end_time": {
"type": "number"
},
"interpolation_interval": {
"type": "number",
"exclusiveMinimum": 0
},
"interpolations": {
"type": "object",
"patternProperties": {
"^.+$": {
"type": "string",
"enum": [
"linear",
"none"
]
}
}
}
}
},
"steps": {
"type": "array",
"items": {
"anyOf": [
{
"type": "object",
"additionalProperties": false,
"required": [
"name",
"timestamp",
"hold"
],
"properties": {
"name": {
"type": "string"
},
"description": {
"type": "string"
},
"timestamp": {
"type": "number"
},
"hold": {
"type": "array",
"minItems": 1,
"items": {
"$ref": "#/$defs/holdcondition"
}
}
}
Comment thread
fweichselbaum marked this conversation as resolved.
},
{
"type": "object",
"additionalProperties": false,
"required": [
"name",
"timestamp",
"hold"
],
"properties": {
"name": {
"type": "string"
},
"description": {
"type": "string"
},
"timestamp": {
"type": "number"
},
"hold": {
"type": "string",
"const": "always"
}
}
},
{
"type": "object",
"additionalProperties": false,
"required": [
"name",
"timestamp",
"set_params"
],
"properties": {
"name": {
"type": "string"
},
"description": {
"type": "string"
},
"timestamp": {
"type": "number"
},
"set_params": {
"type": "array",
"minItems": 1,
"items": {
"$ref": "#/$defs/setParam"
}
}
}
}
]
}
}
}
}
7 changes: 7 additions & 0 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ pub enum Event {
from_interface: String,
frame: CanAnyFrame,
},
StartSequence {
seq_name: String,
abort_seq_name: String,
},
PauseSequence,
ResumeSequence,
AbortSequence,
}

struct EventListener {
Expand Down
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::single_match)]
use anyhow::Result;
use ferro_flow::{can, config, db, events, nodes};
use ferro_flow::{can, config, db, events, nodes, sequence};

fn main() -> Result<()> {
let _config = config::load_config()?;
Expand All @@ -22,6 +22,7 @@ fn main() -> Result<()> {
&event_dispatcher,
scope,
);
sequence::spawn_sequence_runner_thread(&event_dispatcher, scope);

Ok(())
});
Expand Down
147 changes: 68 additions & 79 deletions src/sequence/mod.rs
Original file line number Diff line number Diff line change
@@ -1,88 +1,77 @@
//! Code for managing and running sequences.

#![allow(unused)]

use std::{
sync::mpsc,
thread,
time::{Duration, Instant},
mod sequence_builder;
mod sequence_definition;
mod sequence_runner;
mod sequence_validation;

use std::path::Path;

use crate::{
events,
sequence::{
sequence_definition::Sequence,
sequence_runner::{SequenceCmd, SequenceRunner},
},
};

pub struct Sequence {
name: String,
steps: Vec<SequenceStep>,
}

struct SequenceStep {
description: String,
delay_from_start_ms: u64,
action: (), // TODO: how are steps defined?
}

pub struct SequenceHandle {
cancel_tx: mpsc::Sender<()>,
thread_handle: thread::JoinHandle<()>,
}

impl SequenceHandle {
/// Signals the sequence to stop executing further steps.
pub fn cancel(self) {
let _ = self.cancel_tx.send(());
}

/// Blocks until the sequence finishes (or is cancelled).
pub fn wait(self) -> thread::Result<()> {
self.thread_handle.join()
}
}

pub fn run_sequence(mut seq: Sequence) -> SequenceHandle {
// Create a channel for our interrupt signal
let (cancel_tx, cancel_rx) = mpsc::channel();

// Sort steps.
// TODO: We could probably require that they are already sorted at this point.
seq.steps.sort_by_key(|s| s.delay_from_start_ms);

let thread_handle = thread::spawn(move || {
println!("Starting sequence: {}", seq.name);

let start_time = Instant::now();

for step in seq.steps {
// Calculate the absolute target time for this specific step
let target_time = start_time + Duration::from_millis(step.delay_from_start_ms);
let now = Instant::now();

// If the target time is in the future, we need to wait
if target_time > now {
let wait_duration = target_time - now;

// recv_timeout blocks until a message is received OR the timeout is reached.
match cancel_rx.recv_timeout(wait_duration) {
Ok(_) => {
println!("Sequence '{}' interrupted! Aborting.", seq.name);
return;
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
// The caller dropped the handle without explicitly calling cancel().
println!("Sequence handle dropped. Aborting '{}'.", seq.name);
return;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// Timeout reached without interruption. Run the step.
pub fn spawn_sequence_runner_thread<'scope>(
event_dispatcher: &'scope events::EventDispatcher,
scope: &'scope std::thread::Scope<'scope, '_>,
) {
scope.spawn(move || {
let (tx, rx) = std::sync::mpsc::channel::<events::Event>();
event_dispatcher.subscribe(tx, "Sequence Runner thread");
let mut sequence_runner = SequenceRunner::new(event_dispatcher, scope);

while let Ok(event) = rx.recv() {
match event {
events::Event::Shutdown => {
sequence_runner.control_sequence(SequenceCmd::Shutdown);
break;
}
events::Event::StartSequence {
seq_name,
abort_seq_name,
} => {
// TODO: replace with loading sequences from the frontend
let seq_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("sequences");
let seq = Sequence::load_from_path(&seq_dir.join(&seq_name));
let abort_seq = Sequence::load_from_path(&seq_dir.join(&abort_seq_name));
Comment on lines +33 to +40
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the StartSequence Event to take the actual sequence string instead of path? Because that's how it's going to work with the socket later on and it shouldn't really change anything for the tests


let seq = match seq {
Ok(seq) => seq,
Err(err) => {
eprintln!("Error while loading sequence '{seq_name}': {err:#}");
continue;
}
};
let abort_seq = match abort_seq {
Ok(abort_seq) => abort_seq,
Err(err) => {
eprintln!(
"Error while loading abort sequence '{abort_seq_name}': {err:#}"
);
continue;
}
};

let result = sequence_runner.run_sequence(seq, abort_seq);
if let Err(err) = result {
eprintln!("Error while running sequence: {err:#}");
}
}
}

println!("Executing step: {}", step.description);
events::Event::PauseSequence => {
sequence_runner.control_sequence(SequenceCmd::Pause)
}
events::Event::ResumeSequence => {
sequence_runner.control_sequence(SequenceCmd::Resume)
}
events::Event::AbortSequence => {
sequence_runner.control_sequence(SequenceCmd::Abort)
}
_ => continue,
};
}

println!("Sequence '{}' completed successfully.", seq.name);
});

SequenceHandle {
cancel_tx,
thread_handle,
}
}
Loading
Loading