From 4327af008f2d7690bdc68c8c1112f985d355063a Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 4 May 2026 15:37:47 +0200 Subject: [PATCH 01/10] tmp: update dependencies for quinn and rustls for smaller shared lib Co-authored-by: Copilot --- Cargo.toml | 17 +++++++++-- flowsdk_ffi/Cargo.toml | 13 +++++++-- flowsdk_ffi/src/engine.rs | 60 +++++++++++++++++++++++++++++++++++++-- src/mqtt_client/engine.rs | 3 +- 4 files changed, 84 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3d9ae2df..999d765f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,9 +34,9 @@ tokio-native-tls = { version = "0.3", optional = true } native-tls = { version = "0.2", optional = true } # QUIC support (optional) -quinn = { version = "0.11", optional = true } -quinn-proto = { version = "0.11", optional = true } -rustls = { version = "0.23", optional = true, default-features = false, features = ["ring", "std"] } +quinn = { version = "0.12", optional = true, default-features = false, features = ["runtime-tokio", "rustls-aws-lc-rs"] } +quinn-proto = { version = "0.12", optional = true, default-features = false } +rustls = { version = "0.23", optional = true, default-features = false, features = ["aws-lc-rs", "std"] } rustls-native-certs = { version = "0.7", optional = true } rustls-pki-types = { version = "1", optional = true } tokio-rustls = { version = "0.26", optional = true } @@ -64,6 +64,17 @@ protocol-testing = [] [target.aarch64-unknown-linux-musl] rustflags = ["-C", "target-feature=+crt-static"] +[patch.crates-io] +quinn = { git = "https://github.com/qzhuyan/quinn.git", branch = "dev/william/ring-no-deps" } +quinn-proto = { git = "https://github.com/qzhuyan/quinn.git", branch = "dev/william/ring-no-deps" } + +[profile.release-small] +inherits = "release" +opt-level = "z" +lto = true +codegen-units = 1 +strip = true +panic = "abort" [dev-dependencies] futures = "0.3" diff --git a/flowsdk_ffi/Cargo.toml b/flowsdk_ffi/Cargo.toml index fa6c51ec..fa3e3f16 100644 --- a/flowsdk_ffi/Cargo.toml +++ b/flowsdk_ffi/Cargo.toml @@ -19,13 +19,20 @@ serde = { version = "1.0", features = ["derive"] } uniffi = { version = "0.28", features = ["cli"] } uniffi_macros = "0.28" serde_json = "1.0" -rustls = { version = "0.23", optional = true } +rustls = { version = "0.23", optional = true, default-features = false, features = ["aws-lc-rs", "std"] } rustls-pemfile = { version = "2.1", optional = true } rustls-native-certs = { version = "0.7", optional = true } -quinn-proto = { version = "0.11", optional = true } +quinn-proto = { version = "0.12", optional = true, default-features = false } rustls-pki-types = { version = "1", optional = true } [features] default = ["tls", "quic"] -quic = ["tls", "flowsdk/quic", "dep:quinn-proto"] +quic = [ + "flowsdk/quic", + "dep:quinn-proto", + "dep:rustls", + "dep:rustls-native-certs", + "dep:rustls-pemfile", + "dep:rustls-pki-types", +] tls = ["flowsdk/rustls-tls", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types"] diff --git a/flowsdk_ffi/src/engine.rs b/flowsdk_ffi/src/engine.rs index b8142fa1..96f5345e 100644 --- a/flowsdk_ffi/src/engine.rs +++ b/flowsdk_ffi/src/engine.rs @@ -223,6 +223,7 @@ fn map_event(event: MqttEvent) -> MqttEventFFI { } } +#[cfg(feature = "tls")] #[derive(uniffi::Object)] pub struct TlsMqttEngineFFI { engine: Mutex, @@ -230,6 +231,7 @@ pub struct TlsMqttEngineFFI { events: Mutex>, } +#[cfg(feature = "tls")] #[uniffi::export] impl TlsMqttEngineFFI { #[uniffi::constructor] @@ -244,7 +246,7 @@ impl TlsMqttEngineFFI { .max_reconnect_attempts(opts.max_reconnect_attempts) .build(); - let _ = rustls::crypto::ring::default_provider().install_default(); + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); let crypto_builder = rustls::ClientConfig::builder(); let mut config = if tls_opts.insecure_skip_verify { @@ -389,6 +391,60 @@ impl TlsMqttEngineFFI { } } +#[cfg(not(feature = "tls"))] +#[derive(uniffi::Object)] +pub struct TlsMqttEngineFFI { + start_time: Instant, + events: Mutex>, +} + +#[cfg(not(feature = "tls"))] +#[uniffi::export] +impl TlsMqttEngineFFI { + #[uniffi::constructor] + pub fn new(_opts: MqttOptionsFFI, _tls_opts: MqttTlsOptionsFFI, _server_name: String) -> Self { + TlsMqttEngineFFI { + start_time: Instant::now(), + events: Mutex::new(Vec::new()), + } + } + + pub fn handle_socket_data(&self, _data: Vec) {} + + pub fn take_socket_data(&self) -> Vec { + Vec::new() + } + + pub fn handle_tick(&self, _now_ms: u64) -> Vec { + let _ = self.start_time; + Vec::new() + } + + pub fn take_events(&self) -> Vec { + std::mem::take(&mut *self.events.lock().unwrap()) + } + + pub fn connect(&self) {} + + pub fn publish(&self, _topic: String, _payload: Vec, _qos: u8) -> i32 { + -1 + } + + pub fn subscribe(&self, _topic_filter: String, _qos: u8) -> i32 { + -1 + } + + pub fn unsubscribe(&self, _topic_filter: String) -> i32 { + -1 + } + + pub fn disconnect(&self) {} + + pub fn is_connected(&self) -> bool { + false + } +} + #[derive(Debug)] struct InsecureServerCertVerifier; @@ -470,7 +526,7 @@ impl QuicMqttEngineFFI { let addr: SocketAddr = server_addr.parse().unwrap(); let now = self.start_time + Duration::from_millis(now_ms); - let _ = rustls::crypto::ring::default_provider().install_default(); + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); let crypto_builder = rustls::ClientConfig::builder(); let mut config = if tls_opts.insecure_skip_verify { diff --git a/src/mqtt_client/engine.rs b/src/mqtt_client/engine.rs index 387e4f7c..ca150bb8 100644 --- a/src/mqtt_client/engine.rs +++ b/src/mqtt_client/engine.rs @@ -878,7 +878,8 @@ impl QuicMqttEngine { // Initialize QUIC Endpoint (Client) let endpoint_config = EndpointConfig::default(); // Endpoint::new(config, server_config, disable_stateless_retry, reset_token_key) - let endpoint = Endpoint::new(Arc::new(endpoint_config), None, true, None); + // quinn-proto 0.12 removed the reset_token_key parameter + let endpoint = Endpoint::new(Arc::new(endpoint_config), None, true); Ok(Self { mqtt_engine, From b6a020a8565ede73033937b205fbf260a3961e42 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 4 May 2026 16:48:58 +0200 Subject: [PATCH 02/10] feat(ffi): reduce flowsdk_ffi size by feature-gating uniffi/grpc and enabling QUIC-only minimal build - switch QUIC/rustls crypto path to aws-lc-rs (ring-free via patched quinn) - add release-small profile for size-optimized artifacts - make grpc stack optional (tonic/prost/tonic-prost) - add async-client/grpc feature gates in core crate - make uniffi/serde/serde_json optional in flowsdk_ffi - gate UniFFI derives/exports/scaffolding behind uniffi-bindings - gate serde_json-based event JSON C APIs behind uniffi-bindings - require uniffi-bindings for uniffi-bindgen bin target - relax PriorityQueue serde bounds to persistence methods only Co-authored-by: Copilot --- Cargo.toml | 29 +++++++++++++++++-------- flowsdk_ffi/Cargo.toml | 22 +++++++++++++------ flowsdk_ffi/src/engine.rs | 33 ++++++++++++++++------------- flowsdk_ffi/src/engine/ffi_types.rs | 24 +++++++++++++-------- flowsdk_ffi/src/lib.rs | 1 + src/mqtt_client/mod.rs | 2 ++ src/priority_queue.rs | 15 +++++++++---- 7 files changed, 82 insertions(+), 44 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 999d765f..19597506 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,14 +14,14 @@ members = [".", "mqtt_grpc_duality", "flowsdk_ffi"] exclude = ["fuzz"] [dependencies] -tonic = "0.14.1" -prost = "0.14" -tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "net", "io-util", "sync", "time"] } -tonic-prost = "0.14.1" +tonic = { version = "0.14.1", optional = true } +prost = { version = "0.14", optional = true } +tokio = { version = "1.0", features = ["sync", "time", "io-util"] } +tonic-prost = { version = "0.14.1", optional = true } serde = { version = "1.0.218", features = ["derive"] } hex = "0.4" bytes = { version = "1", features = ["serde"] } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", optional = true } arbitrary = { version = "1", optional = true, features = ["derive"] } serde_json = "1.0" slab = "0.4.11" @@ -42,18 +42,29 @@ rustls-pki-types = { version = "1", optional = true } tokio-rustls = { version = "0.26", optional = true } [features] -default = ["strict-protocol-compliance", "tls"] +default = ["strict-protocol-compliance", "tls", "async-client"] # TLS/SSL transport support -tls = ["dep:tokio-native-tls", "dep:native-tls"] +tls = ["dep:tokio-native-tls", "dep:native-tls", "async-client"] # QUIC transport support -quic = ["dep:quinn", "dep:quinn-proto", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pki-types"] +quic = ["quic-proto", "dep:quinn", "async-client"] +# Sans-I/O QUIC protocol engine (no tokio runtime required) +quic-proto = ["dep:quinn-proto", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pki-types"] # Rustls-based TLS over TCP (mqtts://) transport support rustls-tls = [ - "dep:tokio-rustls", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pki-types", + "dep:tokio-rustls", +] +# Tokio-based async MQTT client (adds the high-throughput multi-threaded runtime) +async-client = [ + "tokio/macros", + "tokio/rt-multi-thread", + "tokio/net", + "dep:tokio-stream", ] +# gRPC support (only needed for mqtt_grpc_duality proxy) +grpc = ["dep:tonic", "dep:prost", "dep:tonic-prost"] strict-protocol-compliance = [] # ⚠️ DANGEROUS: Enable raw packet API for protocol compliance testing # DO NOT enable in production builds diff --git a/flowsdk_ffi/Cargo.toml b/flowsdk_ffi/Cargo.toml index fa3e3f16..b967b2b0 100644 --- a/flowsdk_ffi/Cargo.toml +++ b/flowsdk_ffi/Cargo.toml @@ -12,13 +12,18 @@ categories = ["api-bindings"] [lib] crate-type = ["cdylib", "staticlib", "rlib"] +[[bin]] +name = "uniffi-bindgen" +path = "src/bin/uniffi-bindgen.rs" +required-features = ["uniffi-bindings"] + [dependencies] -flowsdk = { path = "..", version = "0.4.1" } +flowsdk = { path = "..", version = "0.4.1", default-features = false } libc = "0.2" -serde = { version = "1.0", features = ["derive"] } -uniffi = { version = "0.28", features = ["cli"] } -uniffi_macros = "0.28" -serde_json = "1.0" +serde = { version = "1.0", features = ["derive"], optional = true } +uniffi = { version = "0.28", features = ["cli"], optional = true } +uniffi_macros = { version = "0.28", optional = true } +serde_json = { version = "1.0", optional = true } rustls = { version = "0.23", optional = true, default-features = false, features = ["aws-lc-rs", "std"] } rustls-pemfile = { version = "2.1", optional = true } rustls-native-certs = { version = "0.7", optional = true } @@ -26,13 +31,16 @@ quinn-proto = { version = "0.12", optional = true, default-features = false } rustls-pki-types = { version = "1", optional = true } [features] -default = ["tls", "quic"] +default = ["tls", "quic", "uniffi-bindings"] quic = [ "flowsdk/quic", + "flowsdk/strict-protocol-compliance", "dep:quinn-proto", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types", ] -tls = ["flowsdk/rustls-tls", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types"] +tls = ["flowsdk/rustls-tls", "flowsdk/strict-protocol-compliance", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types"] +# UniFFI bindings for Kotlin/Python/Swift language bindings +uniffi-bindings = ["dep:uniffi", "dep:uniffi_macros", "dep:serde", "dep:serde_json"] diff --git a/flowsdk_ffi/src/engine.rs b/flowsdk_ffi/src/engine.rs index 96f5345e..ae0812b2 100644 --- a/flowsdk_ffi/src/engine.rs +++ b/flowsdk_ffi/src/engine.rs @@ -11,7 +11,7 @@ use ffi_types::*; use std::sync::Mutex; -#[derive(uniffi::Object)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct MqttEngineFFI { engine: Mutex, start_time: Instant, @@ -25,9 +25,9 @@ use flowsdk::mqtt_client::tls_engine::TlsMqttEngine; use std::net::SocketAddr; use std::sync::Arc; -#[uniffi::export] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl MqttEngineFFI { - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new(client_id: Option, mqtt_version: u8) -> Self { let client_id = client_id.unwrap_or_else(|| "mqtt_client".to_string()); let options = MqttClientOptions::builder() @@ -43,7 +43,7 @@ impl MqttEngineFFI { } } - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new_with_opts(opts: MqttOptionsFFI) -> Self { let mut builder = MqttClientOptions::builder() .client_id(opts.client_id) @@ -224,7 +224,7 @@ fn map_event(event: MqttEvent) -> MqttEventFFI { } #[cfg(feature = "tls")] -#[derive(uniffi::Object)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct TlsMqttEngineFFI { engine: Mutex, start_time: Instant, @@ -232,9 +232,9 @@ pub struct TlsMqttEngineFFI { } #[cfg(feature = "tls")] -#[uniffi::export] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl TlsMqttEngineFFI { - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new(opts: MqttOptionsFFI, tls_opts: MqttTlsOptionsFFI, server_name: String) -> Self { let options = MqttClientOptions::builder() .client_id(opts.client_id) @@ -392,16 +392,16 @@ impl TlsMqttEngineFFI { } #[cfg(not(feature = "tls"))] -#[derive(uniffi::Object)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct TlsMqttEngineFFI { start_time: Instant, events: Mutex>, } #[cfg(not(feature = "tls"))] -#[uniffi::export] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl TlsMqttEngineFFI { - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new(_opts: MqttOptionsFFI, _tls_opts: MqttTlsOptionsFFI, _server_name: String) -> Self { TlsMqttEngineFFI { start_time: Instant::now(), @@ -487,16 +487,16 @@ impl rustls::client::danger::ServerCertVerifier for InsecureServerCertVerifier { } } -#[derive(uniffi::Object)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct QuicMqttEngineFFI { engine: Mutex, start_time: Instant, events: Mutex>, } -#[uniffi::export] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl QuicMqttEngineFFI { - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new(opts: MqttOptionsFFI) -> Self { let options = MqttClientOptions::builder() .client_id(opts.client_id) @@ -1181,6 +1181,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_is_connected(ptr: *mut TlsMqttEngineFFI /// This function is unsafe because it dereferences a raw pointer to `MqttEngineFFI` /// and returns an allocated `c_char` pointer that must be freed using `mqtt_engine_free_string`. #[no_mangle] +#[cfg(feature = "uniffi-bindings")] pub unsafe extern "C" fn mqtt_engine_take_events(ptr: *mut MqttEngineFFI) -> *mut c_char { if let Some(engine) = ptr.as_ref() { let events = engine.take_events(); @@ -1196,6 +1197,7 @@ pub unsafe extern "C" fn mqtt_engine_take_events(ptr: *mut MqttEngineFFI) -> *mu /// This function is unsafe because it dereferences a raw pointer to `TlsMqttEngineFFI` /// and returns an allocated `c_char` pointer that must be freed using `mqtt_engine_free_string`. #[no_mangle] +#[cfg(feature = "uniffi-bindings")] pub unsafe extern "C" fn mqtt_tls_engine_take_events(ptr: *mut TlsMqttEngineFFI) -> *mut c_char { if let Some(engine) = ptr.as_ref() { let events = engine.take_events(); @@ -1414,6 +1416,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_handle_tick(ptr: *mut QuicMqttEngineFF /// This function is unsafe because it dereferences a raw pointer to `QuicMqttEngineFFI` /// and returns an allocated `c_char` pointer that must be freed using `mqtt_engine_free_string`. #[no_mangle] +#[cfg(feature = "uniffi-bindings")] pub unsafe extern "C" fn mqtt_quic_engine_take_events(ptr: *mut QuicMqttEngineFFI) -> *mut c_char { if let Some(engine) = ptr.as_ref() { let events = engine.take_events(); @@ -1536,12 +1539,12 @@ pub struct MqttDatagramC { // Event Inspection API for C (Native Structs) // Actually, let's just use a dedicated "C Event List" object to manage the lifetime. -#[derive(uniffi::Object)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct MqttEventListFFI { events: Vec, } -#[uniffi::export] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl MqttEventListFFI { pub fn len(&self) -> u32 { self.events.len() as u32 diff --git a/flowsdk_ffi/src/engine/ffi_types.rs b/flowsdk_ffi/src/engine/ffi_types.rs index d40c7ccb..6e1147e1 100644 --- a/flowsdk_ffi/src/engine/ffi_types.rs +++ b/flowsdk_ffi/src/engine/ffi_types.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: MPL-2.0 -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record, serde::Serialize))] +#[derive(Clone)] pub struct MqttMessageFFI { pub topic: String, pub payload: Vec, @@ -8,32 +9,37 @@ pub struct MqttMessageFFI { pub retain: bool, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record, serde::Serialize))] +#[derive(Clone)] pub struct ConnectionResultFFI { pub reason_code: u8, pub session_present: bool, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record, serde::Serialize))] +#[derive(Clone)] pub struct PublishResultFFI { pub packet_id: Option, pub reason_code: Option, pub qos: u8, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record, serde::Serialize))] +#[derive(Clone)] pub struct SubscribeResultFFI { pub packet_id: u16, pub reason_codes: Vec, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record, serde::Serialize))] +#[derive(Clone)] pub struct UnsubscribeResultFFI { pub packet_id: u16, pub reason_codes: Vec, } -#[derive(uniffi::Enum, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Enum, serde::Serialize))] +#[derive(Clone)] pub enum MqttEventFFI { Connected(ConnectionResultFFI), Disconnected { reason_code: Option }, @@ -47,7 +53,7 @@ pub enum MqttEventFFI { ReconnectScheduled { attempt: u32, delay_ms: u64 }, } -#[derive(uniffi::Record)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] pub struct MqttOptionsFFI { pub client_id: String, pub mqtt_version: u8, @@ -60,7 +66,7 @@ pub struct MqttOptionsFFI { pub max_reconnect_attempts: u32, } -#[derive(uniffi::Record)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] pub struct MqttTlsOptionsFFI { pub ca_cert_file: Option, pub client_cert_file: Option, @@ -70,7 +76,7 @@ pub struct MqttTlsOptionsFFI { pub enable_key_log: bool, } -#[derive(uniffi::Record)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] pub struct MqttDatagramFFI { pub addr: String, pub data: Vec, diff --git a/flowsdk_ffi/src/lib.rs b/flowsdk_ffi/src/lib.rs index 28dca148..9adf4837 100644 --- a/flowsdk_ffi/src/lib.rs +++ b/flowsdk_ffi/src/lib.rs @@ -2,4 +2,5 @@ pub mod engine; +#[cfg(feature = "uniffi-bindings")] uniffi::setup_scaffolding!("flowsdk_ffi"); diff --git a/src/mqtt_client/mod.rs b/src/mqtt_client/mod.rs index 418c2d2d..cce44529 100644 --- a/src/mqtt_client/mod.rs +++ b/src/mqtt_client/mod.rs @@ -12,6 +12,7 @@ pub mod opts; pub mod raw_packet; #[cfg(feature = "rustls-tls")] pub mod tls_engine; +#[cfg(feature = "async-client")] pub mod tokio_async_client; #[cfg(feature = "quic")] pub mod tokio_quic_client; @@ -33,6 +34,7 @@ pub use no_io_client::NoIoMqttClient; pub use opts::{MqttClientOptions, MqttClientOptionsBuilder}; #[cfg(feature = "rustls-tls")] pub use tls_engine::TlsMqttEngine; +#[cfg(feature = "async-client")] pub use tokio_async_client::{ TokioAsyncClientConfig, TokioAsyncMqttClient, TokioMqttEvent, TokioMqttEventHandler, }; diff --git a/src/priority_queue.rs b/src/priority_queue.rs index c1ba675e..7698d230 100644 --- a/src/priority_queue.rs +++ b/src/priority_queue.rs @@ -27,8 +27,7 @@ where impl PriorityQueue where - P: Ord + Clone + Serialize + DeserializeOwned, - T: Serialize + DeserializeOwned, + P: Ord + Clone, { /// Creates a new `PriorityQueue` with the specified capacity. pub fn new(capacity: usize) -> Self { @@ -99,7 +98,11 @@ where } /// Saves the queue state to a file (JSON format). - pub fn save_to_file>(&self, path: Q) -> io::Result<()> { + pub fn save_to_file>(&self, path: Q) -> io::Result<()> + where + P: Serialize + DeserializeOwned, + T: Serialize + DeserializeOwned, + { let file = File::create(path)?; let writer = BufWriter::new(file); serde_json::to_writer(writer, self)?; @@ -107,7 +110,11 @@ where } /// Restores the queue state from a file. - pub fn load_from_file>(path: Q) -> io::Result { + pub fn load_from_file>(path: Q) -> io::Result + where + P: Serialize + DeserializeOwned, + T: Serialize + DeserializeOwned, + { let file = File::open(path)?; let reader = BufReader::new(file); let queue: Self = serde_json::from_reader(reader)?; From 9451550778cc24361ca28aa8c3d2166110cf6f33 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 4 May 2026 17:56:30 +0200 Subject: [PATCH 03/10] fix: update dependencies for rustls and quinn to use openssl variant for reducing the shared lib size. Co-authored-by: Copilot --- Cargo.toml | 9 +++++---- flowsdk_ffi/Cargo.toml | 10 ++++++---- flowsdk_ffi/src/engine.rs | 4 ++-- src/mqtt_client/engine.rs | 8 ++++---- src/mqtt_client/transport/mod.rs | 2 ++ 5 files changed, 19 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 19597506..6309741a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,9 +34,10 @@ tokio-native-tls = { version = "0.3", optional = true } native-tls = { version = "0.2", optional = true } # QUIC support (optional) -quinn = { version = "0.12", optional = true, default-features = false, features = ["runtime-tokio", "rustls-aws-lc-rs"] } -quinn-proto = { version = "0.12", optional = true, default-features = false } -rustls = { version = "0.23", optional = true, default-features = false, features = ["aws-lc-rs", "std"] } +quinn = { version = "0.12", optional = true, default-features = false, features = ["runtime-tokio", "rustls-openssl"] } +quinn-proto = { version = "0.12", optional = true, default-features = false, features = ["rustls-openssl"] } +rustls = { version = "0.23", optional = true, default-features = false, features = ["std"] } +rustls-openssl = { version = "0.3", optional = true } rustls-native-certs = { version = "0.7", optional = true } rustls-pki-types = { version = "1", optional = true } tokio-rustls = { version = "0.26", optional = true } @@ -48,7 +49,7 @@ tls = ["dep:tokio-native-tls", "dep:native-tls", "async-client"] # QUIC transport support quic = ["quic-proto", "dep:quinn", "async-client"] # Sans-I/O QUIC protocol engine (no tokio runtime required) -quic-proto = ["dep:quinn-proto", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pki-types"] +quic-proto = ["dep:quinn-proto", "dep:rustls", "dep:rustls-openssl", "dep:rustls-native-certs", "dep:rustls-pki-types"] # Rustls-based TLS over TCP (mqtts://) transport support rustls-tls = [ "dep:rustls", diff --git a/flowsdk_ffi/Cargo.toml b/flowsdk_ffi/Cargo.toml index b967b2b0..f55503b0 100644 --- a/flowsdk_ffi/Cargo.toml +++ b/flowsdk_ffi/Cargo.toml @@ -24,23 +24,25 @@ serde = { version = "1.0", features = ["derive"], optional = true } uniffi = { version = "0.28", features = ["cli"], optional = true } uniffi_macros = { version = "0.28", optional = true } serde_json = { version = "1.0", optional = true } -rustls = { version = "0.23", optional = true, default-features = false, features = ["aws-lc-rs", "std"] } +rustls = { version = "0.23", optional = true, default-features = false, features = ["std"] } +rustls-openssl = { version = "0.3", optional = true } rustls-pemfile = { version = "2.1", optional = true } rustls-native-certs = { version = "0.7", optional = true } -quinn-proto = { version = "0.12", optional = true, default-features = false } +quinn-proto = { version = "0.12", optional = true, default-features = false, features = ["rustls-openssl"] } rustls-pki-types = { version = "1", optional = true } [features] default = ["tls", "quic", "uniffi-bindings"] quic = [ - "flowsdk/quic", + "flowsdk/quic-proto", "flowsdk/strict-protocol-compliance", "dep:quinn-proto", "dep:rustls", + "dep:rustls-openssl", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types", ] -tls = ["flowsdk/rustls-tls", "flowsdk/strict-protocol-compliance", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types"] +tls = ["flowsdk/rustls-tls", "flowsdk/strict-protocol-compliance", "dep:rustls", "dep:rustls-openssl", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types"] # UniFFI bindings for Kotlin/Python/Swift language bindings uniffi-bindings = ["dep:uniffi", "dep:uniffi_macros", "dep:serde", "dep:serde_json"] diff --git a/flowsdk_ffi/src/engine.rs b/flowsdk_ffi/src/engine.rs index ae0812b2..052092c4 100644 --- a/flowsdk_ffi/src/engine.rs +++ b/flowsdk_ffi/src/engine.rs @@ -246,7 +246,7 @@ impl TlsMqttEngineFFI { .max_reconnect_attempts(opts.max_reconnect_attempts) .build(); - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let _ = rustls_openssl::default_provider().install_default(); let crypto_builder = rustls::ClientConfig::builder(); let mut config = if tls_opts.insecure_skip_verify { @@ -526,7 +526,7 @@ impl QuicMqttEngineFFI { let addr: SocketAddr = server_addr.parse().unwrap(); let now = self.start_time + Duration::from_millis(now_ms); - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let _ = rustls_openssl::default_provider().install_default(); let crypto_builder = rustls::ClientConfig::builder(); let mut config = if tls_opts.insecure_skip_verify { diff --git a/src/mqtt_client/engine.rs b/src/mqtt_client/engine.rs index ca150bb8..3b6f404e 100644 --- a/src/mqtt_client/engine.rs +++ b/src/mqtt_client/engine.rs @@ -1,9 +1,9 @@ // SPDX-License-Identifier: MPL-2.0 -#[cfg(feature = "quic")] +#[cfg(feature = "quic-proto")] use quinn_proto::{ClientConfig, Connection, ConnectionHandle, Endpoint, EndpointConfig, StreamId}; use std::collections::VecDeque; -#[cfg(feature = "quic")] +#[cfg(feature = "quic-proto")] use std::sync::Arc; use std::time::{Duration, Instant}; @@ -853,7 +853,7 @@ impl MqttEngine { /// /// This engine combines the `MqttEngine` (MQTT state machine) with `quinn_proto` (QUIC state machine) /// to provide a complete MQTT-over-QUIC implementation that does not perform any direct I/O. -#[cfg(feature = "quic")] +#[cfg(feature = "quic-proto")] pub struct QuicMqttEngine { mqtt_engine: MqttEngine, endpoint: Endpoint, @@ -869,7 +869,7 @@ pub struct QuicMqttEngine { // stream_read_buffer removed } -#[cfg(feature = "quic")] +#[cfg(feature = "quic-proto")] impl QuicMqttEngine { pub fn new(options: MqttClientOptions) -> Result { // Initialize MqttEngine diff --git a/src/mqtt_client/transport/mod.rs b/src/mqtt_client/transport/mod.rs index af8d0337..8b7159cb 100644 --- a/src/mqtt_client/transport/mod.rs +++ b/src/mqtt_client/transport/mod.rs @@ -9,6 +9,7 @@ use async_trait::async_trait; use std::io; use tokio::io::{AsyncRead, AsyncWrite}; +#[cfg(feature = "async-client")] pub mod tcp; #[cfg(feature = "tls")] @@ -101,6 +102,7 @@ pub trait Transport: AsyncRead + AsyncWrite + Send + Sync + Unpin { pub type BoxedTransport = Box; // Re-export transport types +#[cfg(feature = "async-client")] pub use tcp::TcpTransport; #[cfg(feature = "tls")] From 7ddcd3e3ce546f22136c99fc55ddee1617290098 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 4 May 2026 20:40:08 +0200 Subject: [PATCH 04/10] fix: update rustls provider to use openssl variant for improved compatibility --- examples/no_io_quic_async_client_example.rs | 2 +- examples/no_io_quic_client_example.rs | 2 +- examples/no_io_tokio_quic_client_example.rs | 2 +- examples/tokio_async_mqtt_quic_example.rs | 2 +- src/mqtt_client/transport/quic.rs | 13 ++++++++++++- 5 files changed, 16 insertions(+), 5 deletions(-) diff --git a/examples/no_io_quic_async_client_example.rs b/examples/no_io_quic_async_client_example.rs index 08ec1171..f714b2ae 100644 --- a/examples/no_io_quic_async_client_example.rs +++ b/examples/no_io_quic_async_client_example.rs @@ -87,7 +87,7 @@ impl Future for ProtocolDriver { /// Demonstrates how to use the QuicMqttEngine as a non-tokio async driver. pub fn run_example(exit_when_rcvd: bool) -> Result<(), Box> { - let _ = rustls::crypto::ring::default_provider().install_default(); + let _ = rustls_openssl::default_provider().install_default(); let mqtt_opts = MqttClientOptions::builder() .client_id("no-io-quic-async-client") .peer("broker.emqx.io:14567") diff --git a/examples/no_io_quic_client_example.rs b/examples/no_io_quic_client_example.rs index 1280ceae..71e47d03 100644 --- a/examples/no_io_quic_client_example.rs +++ b/examples/no_io_quic_client_example.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; fn main() -> Result<(), Box> { - let _ = rustls::crypto::ring::default_provider().install_default(); + let _ = rustls_openssl::default_provider().install_default(); let mqtt_opts = MqttClientOptions::builder() .client_id("no-io-quic-client") .peer("broker.emqx.io:14567") diff --git a/examples/no_io_tokio_quic_client_example.rs b/examples/no_io_tokio_quic_client_example.rs index dcd2777d..d185a8f8 100644 --- a/examples/no_io_tokio_quic_client_example.rs +++ b/examples/no_io_tokio_quic_client_example.rs @@ -11,7 +11,7 @@ use std::time::Duration; /// in a more low-level, event-driven style where you manually drive the client's event loop. /// In most cases, users should prefer higher-level helpers, but this shows direct use of `TokioQuicMqttClient`. async fn run_example() -> Result<(), Box> { - let _ = rustls::crypto::ring::default_provider().install_default(); + let _ = rustls_openssl::default_provider().install_default(); let mqtt_opts = MqttClientOptions::builder() .client_id("quic-async-wrapper-client") .peer("broker.emqx.io:14567") diff --git a/examples/tokio_async_mqtt_quic_example.rs b/examples/tokio_async_mqtt_quic_example.rs index fe813d0a..3619cc21 100644 --- a/examples/tokio_async_mqtt_quic_example.rs +++ b/examples/tokio_async_mqtt_quic_example.rs @@ -139,7 +139,7 @@ impl TokioMqttEventHandler for QuicExampleHandler { fn init_crypto() { #[cfg(feature = "quic")] { - let _ = rustls::crypto::ring::default_provider().install_default(); + let _ = rustls_openssl::default_provider().install_default(); } } diff --git a/src/mqtt_client/transport/quic.rs b/src/mqtt_client/transport/quic.rs index 688116f9..8e53f0ea 100644 --- a/src/mqtt_client/transport/quic.rs +++ b/src/mqtt_client/transport/quic.rs @@ -455,7 +455,18 @@ mod imp { quinn_crypto.transport_config(Arc::new(trpt_cfg)); // Create an endpoint bound to an ephemeral UDP port - let mut endpoint = Endpoint::client("0.0.0.0:0".parse().unwrap()).map_err(|e| { + let socket = std::net::UdpSocket::bind("0.0.0.0:0").map_err(|e| { + TransportError::ConnectionFailed(format!("QUIC endpoint create failed: {}", e)) + })?; + let runtime = quinn::default_runtime() + .ok_or_else(|| TransportError::ConnectionFailed("no tokio runtime".to_string()))?; + let endpoint = Endpoint::new( + quinn::EndpointConfig::default(), + None, + socket, + runtime, + ) + .map_err(|e| { TransportError::ConnectionFailed(format!("QUIC endpoint create failed: {}", e)) })?; From 5f675fea8b0c7895e64d8b973d096df4b69498bc Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 5 May 2026 09:39:42 +0200 Subject: [PATCH 05/10] feat(ffi): add async-client dependency and enhance QUIC connection in C FFI elapsed time tracking Co-authored-by: Copilot --- Cargo.toml | 1 + examples/c_ffi_example/quic_main.c | 139 +++++++++++++++++++++-------- flowsdk_ffi/Cargo.toml | 1 + flowsdk_ffi/src/engine.rs | 8 +- 4 files changed, 108 insertions(+), 41 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6309741a..6f72938a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ rustls-tls = [ "dep:rustls-native-certs", "dep:rustls-pki-types", "dep:tokio-rustls", + "async-client", ] # Tokio-based async MQTT client (adds the high-throughput multi-threaded runtime) async-client = [ diff --git a/examples/c_ffi_example/quic_main.c b/examples/c_ffi_example/quic_main.c index b048e9a8..2f0dbc3d 100644 --- a/examples/c_ffi_example/quic_main.c +++ b/examples/c_ffi_example/quic_main.c @@ -71,6 +71,25 @@ char *mqtt_event_list_get_message_topic(const MqttEventListFFI *ptr, uint8_t *mqtt_event_list_get_message_payload(const MqttEventListFFI *ptr, size_t index, size_t *out_len); +static const char *mqtt_event_tag_name(uint8_t tag) { + switch (tag) { + case 1: + return "Connected"; + case 2: + return "Disconnected"; + case 3: + return "MessageReceived"; + case 4: + return "Published"; + case 5: + return "Subscribed"; + case 6: + return "Unsubscribed"; + default: + return "Unknown"; + } +} + // Helper to get monotonic time in milliseconds uint64_t get_time_ms() { struct timespec ts; @@ -89,6 +108,7 @@ int main(int argc, char **argv) { */ const char *broker_host = "broker.emqx.io"; const char *broker_port = "14567"; + const char *server_name = NULL; const char *sslkeylogfile = getenv("SSLKEYLOGFILE"); if (argc > 1) @@ -96,6 +116,18 @@ int main(int argc, char **argv) { if (argc > 2) broker_port = argv[2]; + if (argc > 3) { + server_name = argv[3]; + } else { + struct in_addr ip4; + if (inet_pton(AF_INET, broker_host, &ip4) == 1) { + // If caller passes an IP, keep SNI on broker hostname by default. + server_name = "broker.emqx.io"; + } else { + server_name = broker_host; + } + } + printf("Resolving %s:%s...\n", broker_host, broker_port); // 1. Resolve Hostname @@ -120,6 +152,7 @@ int main(int argc, char **argv) { printf("Connecting to MQTT-over-QUIC broker at %s (resolved from %s)...\n", server_addr_str, broker_host); + printf("Using TLS server name: %s\n", server_name); // 2. Create UDP socket int sock = socket(AF_INET, SOCK_DGRAM, 0); @@ -151,13 +184,14 @@ int main(int argc, char **argv) { printf("TLS key logging enabled -> %s\n", sslkeylogfile); } - if (mqtt_quic_engine_connect(engine, server_addr_str, broker_host, &q_opts) != + if (mqtt_quic_engine_connect(engine, server_addr_str, server_name, &q_opts) != 0) { fprintf(stderr, "Failed to initiate QUIC connection\n"); return 1; } uint64_t start_time = get_time_ms(); + uint64_t last_tick_ms = 0; int running = 1; uint32_t loop_without_activity = 0; @@ -169,50 +203,73 @@ int main(int argc, char **argv) { while (running) { uint64_t now_ms = get_time_ms() - start_time; - // A. Handle Ticks - mqtt_quic_engine_handle_tick(engine, now_ms); + // A. Handle Incoming Datagrams first to avoid unnecessary retransmit bursts. + ssize_t recvd = 0; + addr_len = sizeof(remote_addr); + while ((recvd = recvfrom(sock, read_buf, sizeof(read_buf), 0, + (struct sockaddr *)&remote_addr, &addr_len)) > 0) { + char remote_ip[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &remote_addr.sin_addr, remote_ip, sizeof(remote_ip)); + char remote_str[INET_ADDRSTRLEN + 16]; + snprintf(remote_str, sizeof(remote_str), "%s:%u", remote_ip, + ntohs(remote_addr.sin_port)); + + mqtt_quic_engine_handle_datagram(engine, read_buf, recvd, remote_str); + loop_without_activity = 0; + + addr_len = sizeof(remote_addr); + } + + if (recvd < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + perror("recvfrom"); + } + + // B. Handle Ticks every 10ms to match quinn's expected pacing + if (now_ms - last_tick_ms >= 10) { + mqtt_quic_engine_handle_tick(engine, now_ms); + last_tick_ms = now_ms; + } - // B. Handle Outgoing Datagrams + // C. Handle Outgoing Datagrams size_t dg_count = 0; MqttDatagramC *datagrams = mqtt_quic_engine_take_outgoing_datagrams(engine, &dg_count); if (datagrams) { for (size_t i = 0; i < dg_count; i++) { - struct addrinfo hints, *res; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_DGRAM; - - char host[256], port[16]; + // Datagram destinations from the engine are always numeric IP:port + // strings — parse directly to avoid getaddrinfo overhead. + char host[INET_ADDRSTRLEN + 1] = {0}; char *colon = strrchr(datagrams[i].addr, ':'); - if (colon) { - size_t host_len = colon - datagrams[i].addr; - strncpy(host, datagrams[i].addr, host_len); - host[host_len] = '\0'; - strcpy(port, colon + 1); - - if (getaddrinfo(host, port, &hints, &res) == 0) { - sendto(sock, datagrams[i].data, datagrams[i].data_len, 0, - res->ai_addr, res->ai_addrlen); - freeaddrinfo(res); - } + if (!colon) { + fprintf(stderr, "Invalid datagram destination: %s\n", + datagrams[i].addr); + continue; } - } - mqtt_quic_engine_free_datagrams(datagrams, dg_count); - loop_without_activity = 0; - } - // C. Handle Incoming Datagrams - ssize_t recvd = recvfrom(sock, read_buf, sizeof(read_buf), 0, - (struct sockaddr *)&remote_addr, &addr_len); - if (recvd > 0) { - char remote_ip[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &remote_addr.sin_addr, remote_ip, sizeof(remote_ip)); - char remote_str[INET_ADDRSTRLEN + 16]; - snprintf(remote_str, sizeof(remote_str), "%s:%u", remote_ip, - ntohs(remote_addr.sin_port)); + size_t host_len = (size_t)(colon - datagrams[i].addr); + if (host_len == 0 || host_len >= sizeof(host)) { + fprintf(stderr, "Invalid datagram host: %s\n", datagrams[i].addr); + continue; + } + memcpy(host, datagrams[i].addr, host_len); + host[host_len] = '\0'; + + struct sockaddr_in dst; + memset(&dst, 0, sizeof(dst)); + dst.sin_family = AF_INET; + if (inet_pton(AF_INET, host, &dst.sin_addr) != 1) { + fprintf(stderr, "Invalid datagram IP: %s\n", host); + continue; + } + dst.sin_port = htons((uint16_t)atoi(colon + 1)); - mqtt_quic_engine_handle_datagram(engine, read_buf, recvd, remote_str); + ssize_t sent = sendto(sock, datagrams[i].data, datagrams[i].data_len, 0, + (struct sockaddr *)&dst, sizeof(dst)); + if (sent < 0) { + perror("sendto"); + } + } + mqtt_quic_engine_free_datagrams(datagrams, dg_count); loop_without_activity = 0; } @@ -233,8 +290,7 @@ int main(int argc, char **argv) { engine, "test/topic/quic", (const uint8_t *)"hello from C over QUIC native", 29, 1); } else if (tag == 4) { // Published - printf("Published! Disconnecting...\n"); - mqtt_quic_engine_disconnect(engine); + printf("Published! Waiting for echo...\n"); } else if (tag == 2) { // Disconnected printf("Disconnected gracefully.\n"); running = 0; @@ -247,14 +303,19 @@ int main(int argc, char **argv) { (char *)payload); mqtt_engine_free_string(topic); mqtt_engine_free_bytes(payload, p_len); + printf("Disconnecting...\n"); + mqtt_quic_engine_disconnect(engine); + } else { + printf("Unhandled MQTT event: %s (tag=%u)\n", + mqtt_event_tag_name(tag), tag); } } mqtt_event_list_free(events); } - usleep(10000); // 10ms + usleep(1000); // 1ms loop_without_activity++; - if (loop_without_activity > 5000) { + if (loop_without_activity > 5000) { // 5s timeout printf("Timeout, exiting...\n"); running = 0; } diff --git a/flowsdk_ffi/Cargo.toml b/flowsdk_ffi/Cargo.toml index f55503b0..2399f1d7 100644 --- a/flowsdk_ffi/Cargo.toml +++ b/flowsdk_ffi/Cargo.toml @@ -35,6 +35,7 @@ rustls-pki-types = { version = "1", optional = true } default = ["tls", "quic", "uniffi-bindings"] quic = [ "flowsdk/quic-proto", + "flowsdk/async-client", "flowsdk/strict-protocol-compliance", "dep:quinn-proto", "dep:rustls", diff --git a/flowsdk_ffi/src/engine.rs b/flowsdk_ffi/src/engine.rs index 052092c4..5deaf6e1 100644 --- a/flowsdk_ffi/src/engine.rs +++ b/flowsdk_ffi/src/engine.rs @@ -577,6 +577,10 @@ impl QuicMqttEngineFFI { .ok(); } + fn elapsed_ms(&self) -> u64 { + self.start_time.elapsed().as_millis() as u64 + } + pub fn handle_datagram(&self, data: Vec, remote_addr: String, now_ms: u64) { let addr: SocketAddr = remote_addr.parse().unwrap(); let now = self.start_time + Duration::from_millis(now_ms); @@ -1312,7 +1316,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_connect( } }; - engine.connect(server_addr, server_name, tls_opts_v, 0); + engine.connect(server_addr, server_name, tls_opts_v, engine.elapsed_ms()); 0 } else { -1 @@ -1332,7 +1336,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_handle_datagram( if let (Some(engine), true, true) = (ptr.as_ref(), !data.is_null(), !remote_addr.is_null()) { let buf = std::slice::from_raw_parts(data, len); let remote_addr = CStr::from_ptr(remote_addr).to_string_lossy().into_owned(); - engine.handle_datagram(buf.to_vec(), remote_addr, 0); + engine.handle_datagram(buf.to_vec(), remote_addr, engine.elapsed_ms()); } } From 1b17ec1bdccd27f1b0c2dcdde390b6fa3048408a Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 7 May 2026 14:20:51 +0200 Subject: [PATCH 06/10] feat(ffi): enhance QUIC support with OpenSSL backend and update dependencies for improved compatibility Co-authored-by: Copilot --- Cargo.toml | 29 ++++++++++++++++++----------- flowsdk_ffi/Cargo.toml | 18 ++++++++++++++---- flowsdk_ffi/src/engine.rs | 6 ++++++ flowsdk_ffi/src/lib.rs | 5 +++++ src/lib.rs | 9 +++++++++ src/mqtt_client/engine.rs | 7 +++++-- src/mqtt_client/transport/quic.rs | 9 ++------- 7 files changed, 59 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6f72938a..c9733352 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [workspace] members = [".", "mqtt_grpc_duality", "flowsdk_ffi"] -exclude = ["fuzz"] +exclude = ["fuzz", "release-small"] [dependencies] tonic = { version = "0.14.1", optional = true } @@ -33,9 +33,12 @@ ctrlc = "3.4" tokio-native-tls = { version = "0.3", optional = true } native-tls = { version = "0.2", optional = true } -# QUIC support (optional) -quinn = { version = "0.12", optional = true, default-features = false, features = ["runtime-tokio", "rustls-openssl"] } -quinn-proto = { version = "0.12", optional = true, default-features = false, features = ["rustls-openssl"] } +# QUIC support — mainstream quinn with ring crypto (regular builds) +quinn = { version = "0.11", optional = true, default-features = false, features = ["runtime-tokio", "rustls", "ring"] } +quinn-proto = { version = "0.11", optional = true, default-features = false, features = ["rustls", "ring"] } +# QUIC support — OpenSSL crypto via fork (for release-small / size-optimised builds) +quinn-openssl = { package = "quinn", git = "https://github.com/qzhuyan/quinn.git", branch = "dev/william/ring-no-deps", optional = true, default-features = false, features = ["runtime-tokio", "rustls-openssl"] } +quinn-proto-openssl = { package = "quinn-proto", git = "https://github.com/qzhuyan/quinn.git", branch = "dev/william/ring-no-deps", optional = true, default-features = false, features = ["rustls-openssl"] } rustls = { version = "0.23", optional = true, default-features = false, features = ["std"] } rustls-openssl = { version = "0.3", optional = true } rustls-native-certs = { version = "0.7", optional = true } @@ -46,10 +49,18 @@ tokio-rustls = { version = "0.26", optional = true } default = ["strict-protocol-compliance", "tls", "async-client"] # TLS/SSL transport support tls = ["dep:tokio-native-tls", "dep:native-tls", "async-client"] -# QUIC transport support +# QUIC transport support (mainstream quinn + ring crypto) quic = ["quic-proto", "dep:quinn", "async-client"] -# Sans-I/O QUIC protocol engine (no tokio runtime required) -quic-proto = ["dep:quinn-proto", "dep:rustls", "dep:rustls-openssl", "dep:rustls-native-certs", "dep:rustls-pki-types"] +# QUIC transport support (OpenSSL backend via fork, for release-small builds) +# Implies quic so all #[cfg(feature = "quic")] gates activate; the fork is aliased +# over mainstream quinn via extern crate in lib.rs. LTO strips unused mainstream +# quinn symbols from the final binary. +# NOTE: do not enable quic and quic-openssl at the same time. +quic-openssl = ["quic", "quic-proto-openssl", "dep:quinn-openssl"] +# Sans-I/O QUIC protocol engine — ring crypto (no tokio runtime required) +quic-proto = ["dep:quinn-proto", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pki-types"] +# Sans-I/O QUIC protocol engine — OpenSSL crypto (for release-small builds) +quic-proto-openssl = ["quic-proto", "dep:quinn-proto-openssl", "dep:rustls-openssl"] # Rustls-based TLS over TCP (mqtts://) transport support rustls-tls = [ "dep:rustls", @@ -77,10 +88,6 @@ protocol-testing = [] [target.aarch64-unknown-linux-musl] rustflags = ["-C", "target-feature=+crt-static"] -[patch.crates-io] -quinn = { git = "https://github.com/qzhuyan/quinn.git", branch = "dev/william/ring-no-deps" } -quinn-proto = { git = "https://github.com/qzhuyan/quinn.git", branch = "dev/william/ring-no-deps" } - [profile.release-small] inherits = "release" opt-level = "z" diff --git a/flowsdk_ffi/Cargo.toml b/flowsdk_ffi/Cargo.toml index 2399f1d7..eaa46ef1 100644 --- a/flowsdk_ffi/Cargo.toml +++ b/flowsdk_ffi/Cargo.toml @@ -24,11 +24,14 @@ serde = { version = "1.0", features = ["derive"], optional = true } uniffi = { version = "0.28", features = ["cli"], optional = true } uniffi_macros = { version = "0.28", optional = true } serde_json = { version = "1.0", optional = true } -rustls = { version = "0.23", optional = true, default-features = false, features = ["std"] } +rustls = { version = "0.23", optional = true, default-features = false, features = ["std", "ring"] } rustls-openssl = { version = "0.3", optional = true } rustls-pemfile = { version = "2.1", optional = true } rustls-native-certs = { version = "0.7", optional = true } -quinn-proto = { version = "0.12", optional = true, default-features = false, features = ["rustls-openssl"] } +# Mainstream quinn-proto (ring crypto, crates.io) +quinn-proto = { version = "0.11", optional = true, default-features = false, features = ["rustls", "ring"] } +# Fork quinn-proto (OpenSSL crypto, for release-small builds) +quinn-proto-openssl = { package = "quinn-proto", git = "https://github.com/qzhuyan/quinn.git", branch = "dev/william/ring-no-deps", optional = true, default-features = false, features = ["rustls-openssl"] } rustls-pki-types = { version = "1", optional = true } [features] @@ -39,11 +42,18 @@ quic = [ "flowsdk/strict-protocol-compliance", "dep:quinn-proto", "dep:rustls", - "dep:rustls-openssl", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types", ] -tls = ["flowsdk/rustls-tls", "flowsdk/strict-protocol-compliance", "dep:rustls", "dep:rustls-openssl", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types"] +quic-openssl = [ + "flowsdk/quic-proto-openssl", + "flowsdk/async-client", + "flowsdk/strict-protocol-compliance", + "quic", + "dep:quinn-proto-openssl", + "dep:rustls-openssl", +] +tls = ["flowsdk/rustls-tls", "flowsdk/strict-protocol-compliance", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types"] # UniFFI bindings for Kotlin/Python/Swift language bindings uniffi-bindings = ["dep:uniffi", "dep:uniffi_macros", "dep:serde", "dep:serde_json"] diff --git a/flowsdk_ffi/src/engine.rs b/flowsdk_ffi/src/engine.rs index 5deaf6e1..849b5d59 100644 --- a/flowsdk_ffi/src/engine.rs +++ b/flowsdk_ffi/src/engine.rs @@ -246,7 +246,10 @@ impl TlsMqttEngineFFI { .max_reconnect_attempts(opts.max_reconnect_attempts) .build(); + #[cfg(feature = "quic-openssl")] let _ = rustls_openssl::default_provider().install_default(); + #[cfg(not(feature = "quic-openssl"))] + let _ = rustls::crypto::ring::default_provider().install_default(); let crypto_builder = rustls::ClientConfig::builder(); let mut config = if tls_opts.insecure_skip_verify { @@ -526,7 +529,10 @@ impl QuicMqttEngineFFI { let addr: SocketAddr = server_addr.parse().unwrap(); let now = self.start_time + Duration::from_millis(now_ms); + #[cfg(feature = "quic-openssl")] let _ = rustls_openssl::default_provider().install_default(); + #[cfg(not(feature = "quic-openssl"))] + let _ = rustls::crypto::ring::default_provider().install_default(); let crypto_builder = rustls::ClientConfig::builder(); let mut config = if tls_opts.insecure_skip_verify { diff --git a/flowsdk_ffi/src/lib.rs b/flowsdk_ffi/src/lib.rs index 9adf4837..d84da405 100644 --- a/flowsdk_ffi/src/lib.rs +++ b/flowsdk_ffi/src/lib.rs @@ -1,5 +1,10 @@ // SPDX-License-Identifier: MPL-2.0 +// Alias the fork's quinn-proto to the standard name when building with the +// OpenSSL backend, so engine.rs code using quinn_proto:: works transparently. +#[cfg(feature = "quic-openssl")] +extern crate quinn_proto_openssl as quinn_proto; + pub mod engine; #[cfg(feature = "uniffi-bindings")] diff --git a/src/lib.rs b/src/lib.rs index b96ecfdc..734342e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,14 @@ // SPDX-License-Identifier: MPL-2.0 +// When building with the OpenSSL QUIC backend (fork), alias the renamed crates +// back to the standard names so all downstream code using `quinn::` and +// `quinn_proto::` works transparently. LTO strips the unused mainstream quinn +// symbols from the release-small binary. +#[cfg(feature = "quic-openssl")] +extern crate quinn_openssl as quinn; +#[cfg(feature = "quic-proto-openssl")] +extern crate quinn_proto_openssl as quinn_proto; + pub mod mqtt_client; pub mod mqtt_serde; pub mod mqtt_session; diff --git a/src/mqtt_client/engine.rs b/src/mqtt_client/engine.rs index 3b6f404e..32f1a131 100644 --- a/src/mqtt_client/engine.rs +++ b/src/mqtt_client/engine.rs @@ -877,9 +877,12 @@ impl QuicMqttEngine { // Initialize QUIC Endpoint (Client) let endpoint_config = EndpointConfig::default(); - // Endpoint::new(config, server_config, disable_stateless_retry, reset_token_key) - // quinn-proto 0.12 removed the reset_token_key parameter + // quinn-proto 0.11 (mainstream) has a 4th reset_token_key parameter; + // the fork (0.12) removed it. + #[cfg(feature = "quic-proto-openssl")] let endpoint = Endpoint::new(Arc::new(endpoint_config), None, true); + #[cfg(not(feature = "quic-proto-openssl"))] + let endpoint = Endpoint::new(Arc::new(endpoint_config), None, true, None); Ok(Self { mqtt_engine, diff --git a/src/mqtt_client/transport/quic.rs b/src/mqtt_client/transport/quic.rs index 8e53f0ea..f917709b 100644 --- a/src/mqtt_client/transport/quic.rs +++ b/src/mqtt_client/transport/quic.rs @@ -460,13 +460,8 @@ mod imp { })?; let runtime = quinn::default_runtime() .ok_or_else(|| TransportError::ConnectionFailed("no tokio runtime".to_string()))?; - let endpoint = Endpoint::new( - quinn::EndpointConfig::default(), - None, - socket, - runtime, - ) - .map_err(|e| { + let endpoint = Endpoint::new(quinn::EndpointConfig::default(), None, socket, runtime) + .map_err(|e| { TransportError::ConnectionFailed(format!("QUIC endpoint create failed: {}", e)) })?; From 50c65f58b675d2a87325435cc483b1933ba999c5 Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 7 May 2026 15:25:40 +0200 Subject: [PATCH 07/10] ci: add 3 QUIC build modes Co-authored-by: Copilot --- .github/workflows/ci.yml | 41 +++++++++ .vscode/launch.json | 44 +++++++++ .vscode/settings.json | 6 ++ docs/UNIFIED_CLIENT_CONSIDERATIONS.md | 127 ++++++++++++++++++++++++++ 4 files changed, 218 insertions(+) create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json create mode 100644 docs/UNIFIED_CLIENT_CONSIDERATIONS.md diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 73f12087..a5620ab0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -118,6 +118,47 @@ jobs: timeout 30s cargo fuzz run fuzz_parser_funs || true timeout 30s cargo fuzz run fuzz_mqtt_packet_symmetric || true + quic-build-modes: + name: QUIC build modes + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Install OpenSSL (required for quic-openssl backend) + run: sudo apt-get update -qq && sudo apt-get install -y libssl-dev + + - name: Install protoc + uses: arduino/setup-protoc@v3 + with: + version: '25.x' + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Cache dependencies + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-quic-build-${{ hashFiles('**/Cargo.lock') }} + + # Mode 1: default build — quinn not in the dep tree at all + - name: Build (default, no QUIC) + run: cargo build + + # Mode 2: mainstream quinn 0.11 (crates.io) + ring crypto + - name: Build flowsdk_ffi --features quic (mainstream quinn) + run: cargo build -p flowsdk_ffi --features quic --no-default-features + + # Mode 3: git fork + OpenSSL crypto, optimised for binary size + - name: Build flowsdk_ffi --features quic-openssl --profile release-small (OpenSSL fork) + run: cargo build -p flowsdk_ffi --features quic-openssl --no-default-features --profile release-small + security: name: Security Audit runs-on: ubuntu-latest diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..0db0b6ec --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,44 @@ +{ + "configurations": [ + { + "type": "swift", + "request": "launch", + "args": [], + "cwd": "${workspaceFolder:flowsdk-ffi}/swift", + "name": "Debug TcpClientExample (swift)", + "target": "TcpClientExample", + "configuration": "debug", + "preLaunchTask": "swift: Build Debug TcpClientExample (swift)" + }, + { + "type": "swift", + "request": "launch", + "args": [], + "cwd": "${workspaceFolder:flowsdk-ffi}/swift", + "name": "Release TcpClientExample (swift)", + "target": "TcpClientExample", + "configuration": "release", + "preLaunchTask": "swift: Build Release TcpClientExample (swift)" + }, + { + "type": "swift", + "request": "launch", + "args": [], + "cwd": "${workspaceFolder:flowsdk-ffi}/swift", + "name": "Debug QuicClientExample (swift)", + "target": "QuicClientExample", + "configuration": "debug", + "preLaunchTask": "swift: Build Debug QuicClientExample (swift)" + }, + { + "type": "swift", + "request": "launch", + "args": [], + "cwd": "${workspaceFolder:flowsdk-ffi}/swift", + "name": "Release QuicClientExample (swift)", + "target": "QuicClientExample", + "configuration": "release", + "preLaunchTask": "swift: Build Release QuicClientExample (swift)" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..b57f31c8 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +{ + "chat.tools.terminal.autoApprove": { + "gradle wrapper": true, + "cargo build": true + } +} \ No newline at end of file diff --git a/docs/UNIFIED_CLIENT_CONSIDERATIONS.md b/docs/UNIFIED_CLIENT_CONSIDERATIONS.md new file mode 100644 index 00000000..3e0c3bf7 --- /dev/null +++ b/docs/UNIFIED_CLIENT_CONSIDERATIONS.md @@ -0,0 +1,127 @@ +# Unified Client Design Considerations + +This document outlines design considerations and decisions made when implementing unified TCP/QUIC transport support in the Python async client. + +## 1. Tick Timing Strategy + +### Issue +TCP and QUIC transports use different tick scheduling strategies: + +- **TCP**: Adaptive scheduling using `engine.next_tick_ms()` to calculate optimal delay +- **QUIC**: Fixed 10ms tick interval for responsiveness + +### Decision +**Use transport-specific defaults** - Each protocol implements its own tick scheduling strategy that's optimized for the transport characteristics. + +**Rationale:** +- QUIC requires more frequent ticking for connection handshake and keep-alive +- TCP can optimize battery/CPU usage with adaptive scheduling +- Users shouldn't need to tune timing parameters manually +- Future transports can implement their own optimal strategies + +**Implementation:** +```python +# FlowMqttProtocol (TCP) +def _on_timer(self): + # Use next_tick_ms() for adaptive scheduling + next_tick_ms = self.engine.next_tick_ms() + delay = max(0, (next_tick_ms - now_ms) / 1000.0) + +# FlowMqttDatagramProtocol (QUIC) +def _on_timer(self): + # Fixed 10ms for QUIC responsiveness + delay = 0.01 +``` + +**Future Enhancement:** +Could add optional `tick_interval_ms` parameter to constructor for advanced users who want to tune performance. + +--- + +## 2. Missing QUIC APIs + +### Issue +`QuicMqttEngineFfi` doesn't implement all methods available in `MqttEngineFfi`: + +**Missing Methods:** +- `handle_connection_lost()` - Notifies engine of connection loss +- `next_tick_ms()` - Returns optimal next tick time + +### Current Approach +**Document limitations and provide fallback behavior:** + +1. **handle_connection_lost()**: Not called for QUIC since UDP is connectionless + - QUIC engine detects connection loss through timeout internally + - No action needed in protocol's `connection_lost()` + +2. **next_tick_ms()**: Not available for QUIC + - Use fixed 10ms interval (see Tick Timing Strategy above) + - QUIC's `handle_tick()` returns events directly without needing separate timing + +### Rationale +These differences reflect fundamental protocol differences: +- UDP is connectionless, so "connection lost" is detected differently +- QUIC's internal state machine handles timing optimization + +### Future Enhancement +Could add these methods to the Rust FFI if adaptive QUIC timing proves beneficial, but current fixed approach works well. + +--- + +## 3. TLS Transport Support + + ### Current State +The unified client architecture now fully supports `TransportType.TLS` for explicit TLS over TCP transport. This allows for fine-grained control over TLS configuration, including certificate validation and SNI. + + ### FFI Support +`TlsMqttEngineFfi` is used to handle the MQTT protocol over an encrypted TCP stream. It follows a similar API to the standard TCP engine but requires explicit calls to `handle_socket_data` and `take_socket_data` for BIOS-like data pumping. + +### Implementation Details + +**Key Features:** +- **Unified Adapter**: Transparently handle different method names across engines (`handle_incoming` vs `handle_socket_data`). +- **SNI Support**: Uses `server_name` parameter for TLS Server Name Indication. +- **Certificate Validation**: Supports standard CA files, client certificates, and insecure mode (skip verify). + +**Usage Example:** +```python +from flowsdk import FlowMqttClient, TransportType + +client = FlowMqttClient( + "my_client", + transport=TransportType.TLS, + server_name="broker.emqx.io", + ca_cert_file="emqxsl-ca.crt" +) +await client.connect("broker.emqx.io", 8883) +``` + +### Decision +**Implement explicit TLS transport support** - To provide full flexibility for production environments where custom CA bundles or client certificates are required. + +**Rationale:** +- Many industrial environments use internal PKI. +- Client certificate authentication is a common requirement for high-security IoT. +- Provides a consistent API across TCP, TLS, and QUIC. + +--- + +## Summary of Decisions + +| Consideration | Decision | Status | +|--------------|----------|--------| +| Tick Timing | Transport-specific defaults | ✅ Implemented | +| Missing QUIC APIs | Document limitations, use fallbacks | ✅ Implemented | +| TLS Transport | Explicit unified support | ✅ Implemented | + +--- + +## Related Documentation + +- [Python Async Client API](../python/package/README.md) +- [MQTT Session Management](MQTT_SESSION.md) +- [Async Client Architecture](ASYNC_CLIENT.md) + +--- + +*Last Updated: February 3, 2026* From 022c16cb00bcb66c0b2bb6164c70d8b7a321dd8c Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 7 May 2026 16:19:04 +0200 Subject: [PATCH 08/10] feat(ci): add OpenSSL installation step for quic-openssl feature in CI workflow Co-authored-by: Copilot --- .github/workflows/ci.yml | 6 ++++++ scripts/generate_coverage.sh | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a5620ab0..9cced944 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,6 +39,9 @@ jobs: version: '25.x' repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Install OpenSSL (required for quic-openssl feature in --all-features builds) + run: sudo apt-get update -qq && sudo apt-get install -y libssl-dev + - name: Cache dependencies uses: actions/cache@v4 with: @@ -328,6 +331,9 @@ jobs: - name: Install cargo-llvm-cov run: cargo install cargo-llvm-cov + - name: Install OpenSSL (required for quic-openssl feature) + run: sudo apt-get update -qq && sudo apt-get install -y libssl-dev + - name: Generate coverage report run: | ./scripts/generate_coverage.sh diff --git a/scripts/generate_coverage.sh b/scripts/generate_coverage.sh index 615cdbc4..49fac5a0 100755 --- a/scripts/generate_coverage.sh +++ b/scripts/generate_coverage.sh @@ -11,6 +11,11 @@ mkdir -p target/llvm-cov-target # 1. Collect coverage from Rust tests and examples cargo +stable llvm-cov --workspace --no-report --tests cargo +stable llvm-cov --workspace --no-report --tests -- --ignored +# Mainstream QUIC build mode (ring crypto, mainstream quinn): covers +# #[cfg(not(feature = "quic-proto-openssl"))] branches +cargo +stable llvm-cov --workspace --no-report --tests --features quic +# All features (includes quic-openssl): covers +# #[cfg(feature = "quic-proto-openssl"))] branches cargo +stable llvm-cov --workspace --no-report --examples --all-features cargo +stable llvm-cov report --lcov --output-path lcov.info From 21a0488126e80dec20315c63a5712d486b5b6027 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 11 May 2026 18:22:44 +0200 Subject: [PATCH 09/10] fix(quic): change endpoint variable to mutable --- src/mqtt_client/transport/quic.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/mqtt_client/transport/quic.rs b/src/mqtt_client/transport/quic.rs index f917709b..a00c9ed8 100644 --- a/src/mqtt_client/transport/quic.rs +++ b/src/mqtt_client/transport/quic.rs @@ -460,8 +460,13 @@ mod imp { })?; let runtime = quinn::default_runtime() .ok_or_else(|| TransportError::ConnectionFailed("no tokio runtime".to_string()))?; - let endpoint = Endpoint::new(quinn::EndpointConfig::default(), None, socket, runtime) - .map_err(|e| { + let mut endpoint = Endpoint::new( + quinn::EndpointConfig::default(), + None, + socket, + runtime, + ) + .map_err(|e| { TransportError::ConnectionFailed(format!("QUIC endpoint create failed: {}", e)) })?; From 7b29150de4a8cf0195a8313e6d6433774be14418 Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 12 May 2026 13:09:21 +0200 Subject: [PATCH 10/10] chore: allow unsed_mut to cover different confs --- src/mqtt_client/transport/quic.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mqtt_client/transport/quic.rs b/src/mqtt_client/transport/quic.rs index a00c9ed8..2866d98e 100644 --- a/src/mqtt_client/transport/quic.rs +++ b/src/mqtt_client/transport/quic.rs @@ -460,6 +460,7 @@ mod imp { })?; let runtime = quinn::default_runtime() .ok_or_else(|| TransportError::ConnectionFailed("no tokio runtime".to_string()))?; + #[allow(unused_mut)] let mut endpoint = Endpoint::new( quinn::EndpointConfig::default(), None,