Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,443 changes: 2,195 additions & 248 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "FerroFlow"
name = "ferro_flow"
version = "0.1.0"
edition = "2024"

Expand All @@ -11,3 +11,7 @@ chrono = "0.4"
serde_json = "1.0"
anyhow = "1.0.102"
dashmap = "6.1.0"

[dev-dependencies]
diesel_migrations = "2.3"
testcontainers = { version = "0.27", features = ["blocking"] }
2 changes: 1 addition & 1 deletion LiquidCAN
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ Using docker is recommended for local development (if you already have another i
docker run -d --name timescaledb -p 5432:5432 -e POSTGRES_PASSWORD=yourpassword timescale/timescaledb:latest-pg18
```

The `field_logs` migration enables the `timescaledb` extension before creating the hypertable, so the database user running migrations must be allowed to create that extension.

The project uses Diesel for database interactions. Diesel CLI is recommended for managing db migrations and schema. You can check [the official Diesel documentation for installation instructions](https://diesel.rs/guides/getting-started#installing-diesel-cli), or check if your package manager provides it.

**Running Diesel CLI**
Expand All @@ -55,4 +57,13 @@ diesel migration revert # Revert the last migration
Migrations are located in the `migrations` directory. Each migration consists of an up.sql file (for applying the migration) and a down.sql file (for reverting it).

**Rust schema**
Since diesel-cli cannot generate a schema for tables without a primary key, we maintain a manually written schema for the field_logs table in `src/db/timescale_schema.rs`. For all other (future) tables, the schema is (would be) automatically generated by diesel-cli and located in `src/db/schema.rs`.
Since diesel-cli cannot generate a schema for tables without a primary key, we maintain a manually written schema for the field_logs table in `src/db/timescale_schema.rs`. For all other (future) tables, the schema is (would be) automatically generated by diesel-cli and located in `src/db/schema.rs`.

### Container-backed Tests

Database tests use `testcontainers` to start a temporary TimescaleDB/PostgreSQL instance in Docker. Running the full CI test step requires a working Docker daemon.

There are two examples in the repository:

- a unit test in `src/db/mod.rs`
- an integration test in `tests/db_logging.rs`
2 changes: 2 additions & 0 deletions migrations/2026-03-06-005113-0000_create_field_logs/up.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
CREATE EXTENSION IF NOT EXISTS timescaledb;

CREATE TABLE field_logs (
timestamp TIMESTAMPTZ NOT NULL,
node_id smallint not null,
Expand Down
174 changes: 136 additions & 38 deletions src/can/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,161 @@

use std::{
sync::{Arc, mpsc},
thread,
thread::Scope,
};

use anyhow::{Context, Result};
use socketcan::{CanAnyFrame, CanFdSocket, Socket};
use anyhow::{Context, Result, bail, ensure};
use liquidcan::{CanMessage, CanMessageId, NODE_ID_BROADCAST, NODE_ID_INVALID, NODE_ID_SERVER};
use socketcan::{CanAnyFrame, CanFdFrame, CanFdSocket, EmbeddedFrame, Frame, Socket, StandardId};

type CanThreadHandles = [thread::JoinHandle<()>; 2];
use crate::events::{self, Event, EventDispatcher};

pub fn spawn_can_threads(
interface: &str,
) -> Result<(
mpsc::Receiver<CanAnyFrame>,
mpsc::Sender<CanAnyFrame>,
CanThreadHandles,
)> {
let socket =
Arc::new(CanFdSocket::open(interface).with_context(|| {
format!("failed to open can fd socket for interface {}", interface)
})?);

let (recv_sender, recv_receiver) = mpsc::channel();
let (send_sender, send_receiver) = mpsc::channel();

let socket_clone = Arc::clone(&socket);
let handle1 = std::thread::spawn(move || can_recv_thread(socket_clone, recv_sender));
let handle2 = std::thread::spawn(move || can_send_thread(socket, send_receiver));

Ok((recv_receiver, send_sender, [handle1, handle2]))
pub fn spawn_can_threads<'a>(
interfaces: &'a [&'a str],
event_dispatcher: &'a EventDispatcher,
scope: &'a Scope<'a, '_>,
) -> Result<()> {
ensure!(
!interfaces.is_empty(),
"at least one CAN interface is required"
);

let sockets = interfaces
.iter()
.map(|&interface| {
let socket = Arc::new(CanFdSocket::open(interface).with_context(|| {
format!("failed to open can fd socket for interface {}", interface)
})?);
Ok((interface, socket))
})
.collect::<Result<Vec<_>>>()?;

for (interface, socket) in &sockets {
let interface = *interface;
let socket = Arc::clone(socket);
scope.spawn(move || can_recv_thread(interface, socket, event_dispatcher));
}

scope.spawn(move || can_send_thread(sockets, event_dispatcher));

Ok(())
}

fn can_recv_thread(socket: Arc<CanFdSocket>, sender: mpsc::Sender<CanAnyFrame>) {
fn can_recv_thread(interface: &str, socket: Arc<CanFdSocket>, event_dispatcher: &EventDispatcher) {
loop {
if let Err(error) = receive_frame(&socket, &sender) {
eprintln!("CAN receive thread error: {error:#}");
if let Err(error) = receive_frame(interface, &socket, event_dispatcher) {
eprintln!("CAN receive thread error on {interface}: {error:#}");
}
}
}

fn can_send_thread(socket: Arc<CanFdSocket>, receiver: mpsc::Receiver<CanAnyFrame>) {
while let Ok(frame) = receiver.recv() {
if let Err(error) = send_frame(&socket, &frame) {
eprintln!("CAN send thread error: {error:#}");
fn can_send_thread(sockets: Vec<(&str, Arc<CanFdSocket>)>, event_dispatcher: &EventDispatcher) {
let (sender, receiver) = mpsc::channel::<events::Event>();
event_dispatcher.subscribe(sender, "CAN send thread");

while let Ok(event) = receiver.recv() {
match event {
events::Event::SendCanMessage {
receiver_node_id,
message,
} => {
for (interface, socket) in &sockets {
let mut frame: CanFdFrame = (&message).into();

let id = CanMessageId::new()
.with_receiver_id(receiver_node_id)
.with_sender_id(NODE_ID_SERVER);
let Some(can_id) = StandardId::new(id.into()) else {
eprintln!(
"Failed to convert CAN message ID to standard CAN ID for interface {interface}, message ID: {:#010x}",
u16::from(id)
);
continue;
};
frame.set_id(can_id);

let frame = CanAnyFrame::Fd(frame);

if let Err(error) = send_frame(interface, socket, &frame) {
eprintln!("CAN send thread error on {interface}: {error:#}");
}
}
}
events::Event::RelayCanMessage {
from_interface,
frame,
} => {
for (interface, socket) in &sockets {
if *interface == from_interface {
continue; // Don't send back to the sender
}
if let Err(error) = send_frame(interface, socket, &frame) {
eprintln!("CAN send thread error on {interface}: {error:#}");
}
}
}
_ => continue,
}
}
}

fn receive_frame(socket: &CanFdSocket, sender: &mpsc::Sender<CanAnyFrame>) -> Result<()> {
let frame = socket.read_frame().context("failed to read CAN frame")?;
sender
.send(frame)
.context("failed to forward received CAN frame to channel")?;
fn receive_frame(
interface: &str,
socket: &CanFdSocket,
event_dispatcher: &EventDispatcher,
) -> Result<()> {
let frame = socket
.read_frame()
.with_context(|| format!("failed to read CAN frame on interface {}", interface))?;

let CanAnyFrame::Fd(frame) = frame else {
anyhow::bail!(
"received non-FD CAN frame on interface {}, {:?}",
interface,
frame
);
};

let raw_id = match frame.id() {
socketcan::Id::Standard(id) => id.as_raw(),
socketcan::Id::Extended(id) => id.standard_id().as_raw(),
};
let message_id: liquidcan::CanMessageId = raw_id.into();

if message_id.receiver_id() == NODE_ID_INVALID {
bail!(
"received CAN message with invalid receiver ID on interface {}, id: {raw_id:#010x}",
interface
);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment that no broadcast messages are forwarded

// TODO: Currently broadcast messages are not relayed. We should check if any client nodes ever need to broadcast messages and if so, what other nodes they need to reach.
if message_id.receiver_id() == NODE_ID_BROADCAST || message_id.receiver_id() == NODE_ID_SERVER {
let message = CanMessage::try_from(frame).with_context(|| {
format!(
"failed to parse CAN frame into CanMessage for node {}. Frame content: {:02x?}",
message_id.sender_id(),
frame.data()
)
})?;

event_dispatcher.dispatch(Event::CanMessageReceived {
id: message_id,
message,
});
} else {
// broadcast this to all other interfaces.
event_dispatcher.dispatch(Event::RelayCanMessage {
from_interface: interface.to_string(),
frame: CanAnyFrame::Fd(frame),
});
}
Ok(())
}

fn send_frame(socket: &CanFdSocket, frame: &CanAnyFrame) -> Result<()> {
fn send_frame(interface: &str, socket: &CanFdSocket, frame: &CanAnyFrame) -> Result<()> {
socket
.write_frame(frame)
.context("failed to send CAN frame")?;
.with_context(|| format!("failed to send CAN frame on interface {}", interface))?;
Ok(())
}
Loading
Loading