diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 73f12087..9cced944 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,6 +39,9 @@ jobs: version: '25.x' repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Install OpenSSL (required for quic-openssl feature in --all-features builds) + run: sudo apt-get update -qq && sudo apt-get install -y libssl-dev + - name: Cache dependencies uses: actions/cache@v4 with: @@ -118,6 +121,47 @@ jobs: timeout 30s cargo fuzz run fuzz_parser_funs || true timeout 30s cargo fuzz run fuzz_mqtt_packet_symmetric || true + quic-build-modes: + name: QUIC build modes + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Install OpenSSL (required for quic-openssl backend) + run: sudo apt-get update -qq && sudo apt-get install -y libssl-dev + + - name: Install protoc + uses: arduino/setup-protoc@v3 + with: + version: '25.x' + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Cache dependencies + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-quic-build-${{ hashFiles('**/Cargo.lock') }} + + # Mode 1: default build — quinn not in the dep tree at all + - name: Build (default, no QUIC) + run: cargo build + + # Mode 2: mainstream quinn 0.11 (crates.io) + ring crypto + - name: Build flowsdk_ffi --features quic (mainstream quinn) + run: cargo build -p flowsdk_ffi --features quic --no-default-features + + # Mode 3: git fork + OpenSSL crypto, optimised for binary size + - name: Build flowsdk_ffi --features quic-openssl --profile release-small (OpenSSL fork) + run: cargo build -p flowsdk_ffi --features quic-openssl --no-default-features --profile release-small + security: name: Security Audit runs-on: ubuntu-latest @@ -287,6 +331,9 @@ jobs: - name: Install cargo-llvm-cov run: cargo install cargo-llvm-cov + - name: Install OpenSSL (required for quic-openssl feature) + run: sudo apt-get update -qq && sudo apt-get install -y libssl-dev + - name: Generate coverage report run: | ./scripts/generate_coverage.sh diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..0db0b6ec --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,44 @@ +{ + "configurations": [ + { + "type": "swift", + "request": "launch", + "args": [], + "cwd": "${workspaceFolder:flowsdk-ffi}/swift", + "name": "Debug TcpClientExample (swift)", + "target": "TcpClientExample", + "configuration": "debug", + "preLaunchTask": "swift: Build Debug TcpClientExample (swift)" + }, + { + "type": "swift", + "request": "launch", + "args": [], + "cwd": "${workspaceFolder:flowsdk-ffi}/swift", + "name": "Release TcpClientExample (swift)", + "target": "TcpClientExample", + "configuration": "release", + "preLaunchTask": "swift: Build Release TcpClientExample (swift)" + }, + { + "type": "swift", + "request": "launch", + "args": [], + "cwd": "${workspaceFolder:flowsdk-ffi}/swift", + "name": "Debug QuicClientExample (swift)", + "target": "QuicClientExample", + "configuration": "debug", + "preLaunchTask": "swift: Build Debug QuicClientExample (swift)" + }, + { + "type": "swift", + "request": "launch", + "args": [], + "cwd": "${workspaceFolder:flowsdk-ffi}/swift", + "name": "Release QuicClientExample (swift)", + "target": "QuicClientExample", + "configuration": "release", + "preLaunchTask": "swift: Build Release QuicClientExample (swift)" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..b57f31c8 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +{ + "chat.tools.terminal.autoApprove": { + "gradle wrapper": true, + "cargo build": true + } +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 3d9ae2df..c9733352 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,17 +11,17 @@ categories = ["network-programming", "asynchronous"] [workspace] members = [".", "mqtt_grpc_duality", "flowsdk_ffi"] -exclude = ["fuzz"] +exclude = ["fuzz", "release-small"] [dependencies] -tonic = "0.14.1" -prost = "0.14" -tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "net", "io-util", "sync", "time"] } -tonic-prost = "0.14.1" +tonic = { version = "0.14.1", optional = true } +prost = { version = "0.14", optional = true } +tokio = { version = "1.0", features = ["sync", "time", "io-util"] } +tonic-prost = { version = "0.14.1", optional = true } serde = { version = "1.0.218", features = ["derive"] } hex = "0.4" bytes = { version = "1", features = ["serde"] } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", optional = true } arbitrary = { version = "1", optional = true, features = ["derive"] } serde_json = "1.0" slab = "0.4.11" @@ -33,27 +33,51 @@ ctrlc = "3.4" tokio-native-tls = { version = "0.3", optional = true } native-tls = { version = "0.2", optional = true } -# QUIC support (optional) -quinn = { version = "0.11", optional = true } -quinn-proto = { version = "0.11", optional = true } -rustls = { version = "0.23", optional = true, default-features = false, features = ["ring", "std"] } +# QUIC support — mainstream quinn with ring crypto (regular builds) +quinn = { version = "0.11", optional = true, default-features = false, features = ["runtime-tokio", "rustls", "ring"] } +quinn-proto = { version = "0.11", optional = true, default-features = false, features = ["rustls", "ring"] } +# QUIC support — OpenSSL crypto via fork (for release-small / size-optimised builds) +quinn-openssl = { package = "quinn", git = "https://github.com/qzhuyan/quinn.git", branch = "dev/william/ring-no-deps", optional = true, default-features = false, features = ["runtime-tokio", "rustls-openssl"] } +quinn-proto-openssl = { package = "quinn-proto", git = "https://github.com/qzhuyan/quinn.git", branch = "dev/william/ring-no-deps", optional = true, default-features = false, features = ["rustls-openssl"] } +rustls = { version = "0.23", optional = true, default-features = false, features = ["std"] } +rustls-openssl = { version = "0.3", optional = true } rustls-native-certs = { version = "0.7", optional = true } rustls-pki-types = { version = "1", optional = true } tokio-rustls = { version = "0.26", optional = true } [features] -default = ["strict-protocol-compliance", "tls"] +default = ["strict-protocol-compliance", "tls", "async-client"] # TLS/SSL transport support -tls = ["dep:tokio-native-tls", "dep:native-tls"] -# QUIC transport support -quic = ["dep:quinn", "dep:quinn-proto", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pki-types"] +tls = ["dep:tokio-native-tls", "dep:native-tls", "async-client"] +# QUIC transport support (mainstream quinn + ring crypto) +quic = ["quic-proto", "dep:quinn", "async-client"] +# QUIC transport support (OpenSSL backend via fork, for release-small builds) +# Implies quic so all #[cfg(feature = "quic")] gates activate; the fork is aliased +# over mainstream quinn via extern crate in lib.rs. LTO strips unused mainstream +# quinn symbols from the final binary. +# NOTE: do not enable quic and quic-openssl at the same time. +quic-openssl = ["quic", "quic-proto-openssl", "dep:quinn-openssl"] +# Sans-I/O QUIC protocol engine — ring crypto (no tokio runtime required) +quic-proto = ["dep:quinn-proto", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pki-types"] +# Sans-I/O QUIC protocol engine — OpenSSL crypto (for release-small builds) +quic-proto-openssl = ["quic-proto", "dep:quinn-proto-openssl", "dep:rustls-openssl"] # Rustls-based TLS over TCP (mqtts://) transport support rustls-tls = [ - "dep:tokio-rustls", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pki-types", + "dep:tokio-rustls", + "async-client", +] +# Tokio-based async MQTT client (adds the high-throughput multi-threaded runtime) +async-client = [ + "tokio/macros", + "tokio/rt-multi-thread", + "tokio/net", + "dep:tokio-stream", ] +# gRPC support (only needed for mqtt_grpc_duality proxy) +grpc = ["dep:tonic", "dep:prost", "dep:tonic-prost"] strict-protocol-compliance = [] # ⚠️ DANGEROUS: Enable raw packet API for protocol compliance testing # DO NOT enable in production builds @@ -64,6 +88,13 @@ protocol-testing = [] [target.aarch64-unknown-linux-musl] rustflags = ["-C", "target-feature=+crt-static"] +[profile.release-small] +inherits = "release" +opt-level = "z" +lto = true +codegen-units = 1 +strip = true +panic = "abort" [dev-dependencies] futures = "0.3" diff --git a/docs/UNIFIED_CLIENT_CONSIDERATIONS.md b/docs/UNIFIED_CLIENT_CONSIDERATIONS.md new file mode 100644 index 00000000..3e0c3bf7 --- /dev/null +++ b/docs/UNIFIED_CLIENT_CONSIDERATIONS.md @@ -0,0 +1,127 @@ +# Unified Client Design Considerations + +This document outlines design considerations and decisions made when implementing unified TCP/QUIC transport support in the Python async client. + +## 1. Tick Timing Strategy + +### Issue +TCP and QUIC transports use different tick scheduling strategies: + +- **TCP**: Adaptive scheduling using `engine.next_tick_ms()` to calculate optimal delay +- **QUIC**: Fixed 10ms tick interval for responsiveness + +### Decision +**Use transport-specific defaults** - Each protocol implements its own tick scheduling strategy that's optimized for the transport characteristics. + +**Rationale:** +- QUIC requires more frequent ticking for connection handshake and keep-alive +- TCP can optimize battery/CPU usage with adaptive scheduling +- Users shouldn't need to tune timing parameters manually +- Future transports can implement their own optimal strategies + +**Implementation:** +```python +# FlowMqttProtocol (TCP) +def _on_timer(self): + # Use next_tick_ms() for adaptive scheduling + next_tick_ms = self.engine.next_tick_ms() + delay = max(0, (next_tick_ms - now_ms) / 1000.0) + +# FlowMqttDatagramProtocol (QUIC) +def _on_timer(self): + # Fixed 10ms for QUIC responsiveness + delay = 0.01 +``` + +**Future Enhancement:** +Could add optional `tick_interval_ms` parameter to constructor for advanced users who want to tune performance. + +--- + +## 2. Missing QUIC APIs + +### Issue +`QuicMqttEngineFfi` doesn't implement all methods available in `MqttEngineFfi`: + +**Missing Methods:** +- `handle_connection_lost()` - Notifies engine of connection loss +- `next_tick_ms()` - Returns optimal next tick time + +### Current Approach +**Document limitations and provide fallback behavior:** + +1. **handle_connection_lost()**: Not called for QUIC since UDP is connectionless + - QUIC engine detects connection loss through timeout internally + - No action needed in protocol's `connection_lost()` + +2. **next_tick_ms()**: Not available for QUIC + - Use fixed 10ms interval (see Tick Timing Strategy above) + - QUIC's `handle_tick()` returns events directly without needing separate timing + +### Rationale +These differences reflect fundamental protocol differences: +- UDP is connectionless, so "connection lost" is detected differently +- QUIC's internal state machine handles timing optimization + +### Future Enhancement +Could add these methods to the Rust FFI if adaptive QUIC timing proves beneficial, but current fixed approach works well. + +--- + +## 3. TLS Transport Support + + ### Current State +The unified client architecture now fully supports `TransportType.TLS` for explicit TLS over TCP transport. This allows for fine-grained control over TLS configuration, including certificate validation and SNI. + + ### FFI Support +`TlsMqttEngineFfi` is used to handle the MQTT protocol over an encrypted TCP stream. It follows a similar API to the standard TCP engine but requires explicit calls to `handle_socket_data` and `take_socket_data` for BIOS-like data pumping. + +### Implementation Details + +**Key Features:** +- **Unified Adapter**: Transparently handle different method names across engines (`handle_incoming` vs `handle_socket_data`). +- **SNI Support**: Uses `server_name` parameter for TLS Server Name Indication. +- **Certificate Validation**: Supports standard CA files, client certificates, and insecure mode (skip verify). + +**Usage Example:** +```python +from flowsdk import FlowMqttClient, TransportType + +client = FlowMqttClient( + "my_client", + transport=TransportType.TLS, + server_name="broker.emqx.io", + ca_cert_file="emqxsl-ca.crt" +) +await client.connect("broker.emqx.io", 8883) +``` + +### Decision +**Implement explicit TLS transport support** - To provide full flexibility for production environments where custom CA bundles or client certificates are required. + +**Rationale:** +- Many industrial environments use internal PKI. +- Client certificate authentication is a common requirement for high-security IoT. +- Provides a consistent API across TCP, TLS, and QUIC. + +--- + +## Summary of Decisions + +| Consideration | Decision | Status | +|--------------|----------|--------| +| Tick Timing | Transport-specific defaults | ✅ Implemented | +| Missing QUIC APIs | Document limitations, use fallbacks | ✅ Implemented | +| TLS Transport | Explicit unified support | ✅ Implemented | + +--- + +## Related Documentation + +- [Python Async Client API](../python/package/README.md) +- [MQTT Session Management](MQTT_SESSION.md) +- [Async Client Architecture](ASYNC_CLIENT.md) + +--- + +*Last Updated: February 3, 2026* diff --git a/examples/c_ffi_example/quic_main.c b/examples/c_ffi_example/quic_main.c index b048e9a8..2f0dbc3d 100644 --- a/examples/c_ffi_example/quic_main.c +++ b/examples/c_ffi_example/quic_main.c @@ -71,6 +71,25 @@ char *mqtt_event_list_get_message_topic(const MqttEventListFFI *ptr, uint8_t *mqtt_event_list_get_message_payload(const MqttEventListFFI *ptr, size_t index, size_t *out_len); +static const char *mqtt_event_tag_name(uint8_t tag) { + switch (tag) { + case 1: + return "Connected"; + case 2: + return "Disconnected"; + case 3: + return "MessageReceived"; + case 4: + return "Published"; + case 5: + return "Subscribed"; + case 6: + return "Unsubscribed"; + default: + return "Unknown"; + } +} + // Helper to get monotonic time in milliseconds uint64_t get_time_ms() { struct timespec ts; @@ -89,6 +108,7 @@ int main(int argc, char **argv) { */ const char *broker_host = "broker.emqx.io"; const char *broker_port = "14567"; + const char *server_name = NULL; const char *sslkeylogfile = getenv("SSLKEYLOGFILE"); if (argc > 1) @@ -96,6 +116,18 @@ int main(int argc, char **argv) { if (argc > 2) broker_port = argv[2]; + if (argc > 3) { + server_name = argv[3]; + } else { + struct in_addr ip4; + if (inet_pton(AF_INET, broker_host, &ip4) == 1) { + // If caller passes an IP, keep SNI on broker hostname by default. + server_name = "broker.emqx.io"; + } else { + server_name = broker_host; + } + } + printf("Resolving %s:%s...\n", broker_host, broker_port); // 1. Resolve Hostname @@ -120,6 +152,7 @@ int main(int argc, char **argv) { printf("Connecting to MQTT-over-QUIC broker at %s (resolved from %s)...\n", server_addr_str, broker_host); + printf("Using TLS server name: %s\n", server_name); // 2. Create UDP socket int sock = socket(AF_INET, SOCK_DGRAM, 0); @@ -151,13 +184,14 @@ int main(int argc, char **argv) { printf("TLS key logging enabled -> %s\n", sslkeylogfile); } - if (mqtt_quic_engine_connect(engine, server_addr_str, broker_host, &q_opts) != + if (mqtt_quic_engine_connect(engine, server_addr_str, server_name, &q_opts) != 0) { fprintf(stderr, "Failed to initiate QUIC connection\n"); return 1; } uint64_t start_time = get_time_ms(); + uint64_t last_tick_ms = 0; int running = 1; uint32_t loop_without_activity = 0; @@ -169,50 +203,73 @@ int main(int argc, char **argv) { while (running) { uint64_t now_ms = get_time_ms() - start_time; - // A. Handle Ticks - mqtt_quic_engine_handle_tick(engine, now_ms); + // A. Handle Incoming Datagrams first to avoid unnecessary retransmit bursts. + ssize_t recvd = 0; + addr_len = sizeof(remote_addr); + while ((recvd = recvfrom(sock, read_buf, sizeof(read_buf), 0, + (struct sockaddr *)&remote_addr, &addr_len)) > 0) { + char remote_ip[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &remote_addr.sin_addr, remote_ip, sizeof(remote_ip)); + char remote_str[INET_ADDRSTRLEN + 16]; + snprintf(remote_str, sizeof(remote_str), "%s:%u", remote_ip, + ntohs(remote_addr.sin_port)); + + mqtt_quic_engine_handle_datagram(engine, read_buf, recvd, remote_str); + loop_without_activity = 0; + + addr_len = sizeof(remote_addr); + } + + if (recvd < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + perror("recvfrom"); + } + + // B. Handle Ticks every 10ms to match quinn's expected pacing + if (now_ms - last_tick_ms >= 10) { + mqtt_quic_engine_handle_tick(engine, now_ms); + last_tick_ms = now_ms; + } - // B. Handle Outgoing Datagrams + // C. Handle Outgoing Datagrams size_t dg_count = 0; MqttDatagramC *datagrams = mqtt_quic_engine_take_outgoing_datagrams(engine, &dg_count); if (datagrams) { for (size_t i = 0; i < dg_count; i++) { - struct addrinfo hints, *res; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_DGRAM; - - char host[256], port[16]; + // Datagram destinations from the engine are always numeric IP:port + // strings — parse directly to avoid getaddrinfo overhead. + char host[INET_ADDRSTRLEN + 1] = {0}; char *colon = strrchr(datagrams[i].addr, ':'); - if (colon) { - size_t host_len = colon - datagrams[i].addr; - strncpy(host, datagrams[i].addr, host_len); - host[host_len] = '\0'; - strcpy(port, colon + 1); - - if (getaddrinfo(host, port, &hints, &res) == 0) { - sendto(sock, datagrams[i].data, datagrams[i].data_len, 0, - res->ai_addr, res->ai_addrlen); - freeaddrinfo(res); - } + if (!colon) { + fprintf(stderr, "Invalid datagram destination: %s\n", + datagrams[i].addr); + continue; } - } - mqtt_quic_engine_free_datagrams(datagrams, dg_count); - loop_without_activity = 0; - } - // C. Handle Incoming Datagrams - ssize_t recvd = recvfrom(sock, read_buf, sizeof(read_buf), 0, - (struct sockaddr *)&remote_addr, &addr_len); - if (recvd > 0) { - char remote_ip[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &remote_addr.sin_addr, remote_ip, sizeof(remote_ip)); - char remote_str[INET_ADDRSTRLEN + 16]; - snprintf(remote_str, sizeof(remote_str), "%s:%u", remote_ip, - ntohs(remote_addr.sin_port)); + size_t host_len = (size_t)(colon - datagrams[i].addr); + if (host_len == 0 || host_len >= sizeof(host)) { + fprintf(stderr, "Invalid datagram host: %s\n", datagrams[i].addr); + continue; + } + memcpy(host, datagrams[i].addr, host_len); + host[host_len] = '\0'; + + struct sockaddr_in dst; + memset(&dst, 0, sizeof(dst)); + dst.sin_family = AF_INET; + if (inet_pton(AF_INET, host, &dst.sin_addr) != 1) { + fprintf(stderr, "Invalid datagram IP: %s\n", host); + continue; + } + dst.sin_port = htons((uint16_t)atoi(colon + 1)); - mqtt_quic_engine_handle_datagram(engine, read_buf, recvd, remote_str); + ssize_t sent = sendto(sock, datagrams[i].data, datagrams[i].data_len, 0, + (struct sockaddr *)&dst, sizeof(dst)); + if (sent < 0) { + perror("sendto"); + } + } + mqtt_quic_engine_free_datagrams(datagrams, dg_count); loop_without_activity = 0; } @@ -233,8 +290,7 @@ int main(int argc, char **argv) { engine, "test/topic/quic", (const uint8_t *)"hello from C over QUIC native", 29, 1); } else if (tag == 4) { // Published - printf("Published! Disconnecting...\n"); - mqtt_quic_engine_disconnect(engine); + printf("Published! Waiting for echo...\n"); } else if (tag == 2) { // Disconnected printf("Disconnected gracefully.\n"); running = 0; @@ -247,14 +303,19 @@ int main(int argc, char **argv) { (char *)payload); mqtt_engine_free_string(topic); mqtt_engine_free_bytes(payload, p_len); + printf("Disconnecting...\n"); + mqtt_quic_engine_disconnect(engine); + } else { + printf("Unhandled MQTT event: %s (tag=%u)\n", + mqtt_event_tag_name(tag), tag); } } mqtt_event_list_free(events); } - usleep(10000); // 10ms + usleep(1000); // 1ms loop_without_activity++; - if (loop_without_activity > 5000) { + if (loop_without_activity > 5000) { // 5s timeout printf("Timeout, exiting...\n"); running = 0; } diff --git a/examples/no_io_quic_async_client_example.rs b/examples/no_io_quic_async_client_example.rs index 08ec1171..f714b2ae 100644 --- a/examples/no_io_quic_async_client_example.rs +++ b/examples/no_io_quic_async_client_example.rs @@ -87,7 +87,7 @@ impl Future for ProtocolDriver { /// Demonstrates how to use the QuicMqttEngine as a non-tokio async driver. pub fn run_example(exit_when_rcvd: bool) -> Result<(), Box> { - let _ = rustls::crypto::ring::default_provider().install_default(); + let _ = rustls_openssl::default_provider().install_default(); let mqtt_opts = MqttClientOptions::builder() .client_id("no-io-quic-async-client") .peer("broker.emqx.io:14567") diff --git a/examples/no_io_quic_client_example.rs b/examples/no_io_quic_client_example.rs index 1280ceae..71e47d03 100644 --- a/examples/no_io_quic_client_example.rs +++ b/examples/no_io_quic_client_example.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; fn main() -> Result<(), Box> { - let _ = rustls::crypto::ring::default_provider().install_default(); + let _ = rustls_openssl::default_provider().install_default(); let mqtt_opts = MqttClientOptions::builder() .client_id("no-io-quic-client") .peer("broker.emqx.io:14567") diff --git a/examples/no_io_tokio_quic_client_example.rs b/examples/no_io_tokio_quic_client_example.rs index dcd2777d..d185a8f8 100644 --- a/examples/no_io_tokio_quic_client_example.rs +++ b/examples/no_io_tokio_quic_client_example.rs @@ -11,7 +11,7 @@ use std::time::Duration; /// in a more low-level, event-driven style where you manually drive the client's event loop. /// In most cases, users should prefer higher-level helpers, but this shows direct use of `TokioQuicMqttClient`. async fn run_example() -> Result<(), Box> { - let _ = rustls::crypto::ring::default_provider().install_default(); + let _ = rustls_openssl::default_provider().install_default(); let mqtt_opts = MqttClientOptions::builder() .client_id("quic-async-wrapper-client") .peer("broker.emqx.io:14567") diff --git a/examples/tokio_async_mqtt_quic_example.rs b/examples/tokio_async_mqtt_quic_example.rs index fe813d0a..3619cc21 100644 --- a/examples/tokio_async_mqtt_quic_example.rs +++ b/examples/tokio_async_mqtt_quic_example.rs @@ -139,7 +139,7 @@ impl TokioMqttEventHandler for QuicExampleHandler { fn init_crypto() { #[cfg(feature = "quic")] { - let _ = rustls::crypto::ring::default_provider().install_default(); + let _ = rustls_openssl::default_provider().install_default(); } } diff --git a/flowsdk_ffi/Cargo.toml b/flowsdk_ffi/Cargo.toml index fa6c51ec..eaa46ef1 100644 --- a/flowsdk_ffi/Cargo.toml +++ b/flowsdk_ffi/Cargo.toml @@ -12,20 +12,48 @@ categories = ["api-bindings"] [lib] crate-type = ["cdylib", "staticlib", "rlib"] +[[bin]] +name = "uniffi-bindgen" +path = "src/bin/uniffi-bindgen.rs" +required-features = ["uniffi-bindings"] + [dependencies] -flowsdk = { path = "..", version = "0.4.1" } +flowsdk = { path = "..", version = "0.4.1", default-features = false } libc = "0.2" -serde = { version = "1.0", features = ["derive"] } -uniffi = { version = "0.28", features = ["cli"] } -uniffi_macros = "0.28" -serde_json = "1.0" -rustls = { version = "0.23", optional = true } +serde = { version = "1.0", features = ["derive"], optional = true } +uniffi = { version = "0.28", features = ["cli"], optional = true } +uniffi_macros = { version = "0.28", optional = true } +serde_json = { version = "1.0", optional = true } +rustls = { version = "0.23", optional = true, default-features = false, features = ["std", "ring"] } +rustls-openssl = { version = "0.3", optional = true } rustls-pemfile = { version = "2.1", optional = true } rustls-native-certs = { version = "0.7", optional = true } -quinn-proto = { version = "0.11", optional = true } +# Mainstream quinn-proto (ring crypto, crates.io) +quinn-proto = { version = "0.11", optional = true, default-features = false, features = ["rustls", "ring"] } +# Fork quinn-proto (OpenSSL crypto, for release-small builds) +quinn-proto-openssl = { package = "quinn-proto", git = "https://github.com/qzhuyan/quinn.git", branch = "dev/william/ring-no-deps", optional = true, default-features = false, features = ["rustls-openssl"] } rustls-pki-types = { version = "1", optional = true } [features] -default = ["tls", "quic"] -quic = ["tls", "flowsdk/quic", "dep:quinn-proto"] -tls = ["flowsdk/rustls-tls", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types"] +default = ["tls", "quic", "uniffi-bindings"] +quic = [ + "flowsdk/quic-proto", + "flowsdk/async-client", + "flowsdk/strict-protocol-compliance", + "dep:quinn-proto", + "dep:rustls", + "dep:rustls-native-certs", + "dep:rustls-pemfile", + "dep:rustls-pki-types", +] +quic-openssl = [ + "flowsdk/quic-proto-openssl", + "flowsdk/async-client", + "flowsdk/strict-protocol-compliance", + "quic", + "dep:quinn-proto-openssl", + "dep:rustls-openssl", +] +tls = ["flowsdk/rustls-tls", "flowsdk/strict-protocol-compliance", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types"] +# UniFFI bindings for Kotlin/Python/Swift language bindings +uniffi-bindings = ["dep:uniffi", "dep:uniffi_macros", "dep:serde", "dep:serde_json"] diff --git a/flowsdk_ffi/src/engine.rs b/flowsdk_ffi/src/engine.rs index b8142fa1..849b5d59 100644 --- a/flowsdk_ffi/src/engine.rs +++ b/flowsdk_ffi/src/engine.rs @@ -11,7 +11,7 @@ use ffi_types::*; use std::sync::Mutex; -#[derive(uniffi::Object)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct MqttEngineFFI { engine: Mutex, start_time: Instant, @@ -25,9 +25,9 @@ use flowsdk::mqtt_client::tls_engine::TlsMqttEngine; use std::net::SocketAddr; use std::sync::Arc; -#[uniffi::export] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl MqttEngineFFI { - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new(client_id: Option, mqtt_version: u8) -> Self { let client_id = client_id.unwrap_or_else(|| "mqtt_client".to_string()); let options = MqttClientOptions::builder() @@ -43,7 +43,7 @@ impl MqttEngineFFI { } } - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new_with_opts(opts: MqttOptionsFFI) -> Self { let mut builder = MqttClientOptions::builder() .client_id(opts.client_id) @@ -223,16 +223,18 @@ fn map_event(event: MqttEvent) -> MqttEventFFI { } } -#[derive(uniffi::Object)] +#[cfg(feature = "tls")] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct TlsMqttEngineFFI { engine: Mutex, start_time: Instant, events: Mutex>, } -#[uniffi::export] +#[cfg(feature = "tls")] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl TlsMqttEngineFFI { - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new(opts: MqttOptionsFFI, tls_opts: MqttTlsOptionsFFI, server_name: String) -> Self { let options = MqttClientOptions::builder() .client_id(opts.client_id) @@ -244,6 +246,9 @@ impl TlsMqttEngineFFI { .max_reconnect_attempts(opts.max_reconnect_attempts) .build(); + #[cfg(feature = "quic-openssl")] + let _ = rustls_openssl::default_provider().install_default(); + #[cfg(not(feature = "quic-openssl"))] let _ = rustls::crypto::ring::default_provider().install_default(); let crypto_builder = rustls::ClientConfig::builder(); @@ -389,6 +394,60 @@ impl TlsMqttEngineFFI { } } +#[cfg(not(feature = "tls"))] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] +pub struct TlsMqttEngineFFI { + start_time: Instant, + events: Mutex>, +} + +#[cfg(not(feature = "tls"))] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] +impl TlsMqttEngineFFI { + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] + pub fn new(_opts: MqttOptionsFFI, _tls_opts: MqttTlsOptionsFFI, _server_name: String) -> Self { + TlsMqttEngineFFI { + start_time: Instant::now(), + events: Mutex::new(Vec::new()), + } + } + + pub fn handle_socket_data(&self, _data: Vec) {} + + pub fn take_socket_data(&self) -> Vec { + Vec::new() + } + + pub fn handle_tick(&self, _now_ms: u64) -> Vec { + let _ = self.start_time; + Vec::new() + } + + pub fn take_events(&self) -> Vec { + std::mem::take(&mut *self.events.lock().unwrap()) + } + + pub fn connect(&self) {} + + pub fn publish(&self, _topic: String, _payload: Vec, _qos: u8) -> i32 { + -1 + } + + pub fn subscribe(&self, _topic_filter: String, _qos: u8) -> i32 { + -1 + } + + pub fn unsubscribe(&self, _topic_filter: String) -> i32 { + -1 + } + + pub fn disconnect(&self) {} + + pub fn is_connected(&self) -> bool { + false + } +} + #[derive(Debug)] struct InsecureServerCertVerifier; @@ -431,16 +490,16 @@ impl rustls::client::danger::ServerCertVerifier for InsecureServerCertVerifier { } } -#[derive(uniffi::Object)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct QuicMqttEngineFFI { engine: Mutex, start_time: Instant, events: Mutex>, } -#[uniffi::export] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl QuicMqttEngineFFI { - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new(opts: MqttOptionsFFI) -> Self { let options = MqttClientOptions::builder() .client_id(opts.client_id) @@ -470,6 +529,9 @@ impl QuicMqttEngineFFI { let addr: SocketAddr = server_addr.parse().unwrap(); let now = self.start_time + Duration::from_millis(now_ms); + #[cfg(feature = "quic-openssl")] + let _ = rustls_openssl::default_provider().install_default(); + #[cfg(not(feature = "quic-openssl"))] let _ = rustls::crypto::ring::default_provider().install_default(); let crypto_builder = rustls::ClientConfig::builder(); @@ -521,6 +583,10 @@ impl QuicMqttEngineFFI { .ok(); } + fn elapsed_ms(&self) -> u64 { + self.start_time.elapsed().as_millis() as u64 + } + pub fn handle_datagram(&self, data: Vec, remote_addr: String, now_ms: u64) { let addr: SocketAddr = remote_addr.parse().unwrap(); let now = self.start_time + Duration::from_millis(now_ms); @@ -1125,6 +1191,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_is_connected(ptr: *mut TlsMqttEngineFFI /// This function is unsafe because it dereferences a raw pointer to `MqttEngineFFI` /// and returns an allocated `c_char` pointer that must be freed using `mqtt_engine_free_string`. #[no_mangle] +#[cfg(feature = "uniffi-bindings")] pub unsafe extern "C" fn mqtt_engine_take_events(ptr: *mut MqttEngineFFI) -> *mut c_char { if let Some(engine) = ptr.as_ref() { let events = engine.take_events(); @@ -1140,6 +1207,7 @@ pub unsafe extern "C" fn mqtt_engine_take_events(ptr: *mut MqttEngineFFI) -> *mu /// This function is unsafe because it dereferences a raw pointer to `TlsMqttEngineFFI` /// and returns an allocated `c_char` pointer that must be freed using `mqtt_engine_free_string`. #[no_mangle] +#[cfg(feature = "uniffi-bindings")] pub unsafe extern "C" fn mqtt_tls_engine_take_events(ptr: *mut TlsMqttEngineFFI) -> *mut c_char { if let Some(engine) = ptr.as_ref() { let events = engine.take_events(); @@ -1254,7 +1322,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_connect( } }; - engine.connect(server_addr, server_name, tls_opts_v, 0); + engine.connect(server_addr, server_name, tls_opts_v, engine.elapsed_ms()); 0 } else { -1 @@ -1274,7 +1342,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_handle_datagram( if let (Some(engine), true, true) = (ptr.as_ref(), !data.is_null(), !remote_addr.is_null()) { let buf = std::slice::from_raw_parts(data, len); let remote_addr = CStr::from_ptr(remote_addr).to_string_lossy().into_owned(); - engine.handle_datagram(buf.to_vec(), remote_addr, 0); + engine.handle_datagram(buf.to_vec(), remote_addr, engine.elapsed_ms()); } } @@ -1358,6 +1426,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_handle_tick(ptr: *mut QuicMqttEngineFF /// This function is unsafe because it dereferences a raw pointer to `QuicMqttEngineFFI` /// and returns an allocated `c_char` pointer that must be freed using `mqtt_engine_free_string`. #[no_mangle] +#[cfg(feature = "uniffi-bindings")] pub unsafe extern "C" fn mqtt_quic_engine_take_events(ptr: *mut QuicMqttEngineFFI) -> *mut c_char { if let Some(engine) = ptr.as_ref() { let events = engine.take_events(); @@ -1480,12 +1549,12 @@ pub struct MqttDatagramC { // Event Inspection API for C (Native Structs) // Actually, let's just use a dedicated "C Event List" object to manage the lifetime. -#[derive(uniffi::Object)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct MqttEventListFFI { events: Vec, } -#[uniffi::export] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl MqttEventListFFI { pub fn len(&self) -> u32 { self.events.len() as u32 diff --git a/flowsdk_ffi/src/engine/ffi_types.rs b/flowsdk_ffi/src/engine/ffi_types.rs index d40c7ccb..6e1147e1 100644 --- a/flowsdk_ffi/src/engine/ffi_types.rs +++ b/flowsdk_ffi/src/engine/ffi_types.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: MPL-2.0 -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record, serde::Serialize))] +#[derive(Clone)] pub struct MqttMessageFFI { pub topic: String, pub payload: Vec, @@ -8,32 +9,37 @@ pub struct MqttMessageFFI { pub retain: bool, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record, serde::Serialize))] +#[derive(Clone)] pub struct ConnectionResultFFI { pub reason_code: u8, pub session_present: bool, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record, serde::Serialize))] +#[derive(Clone)] pub struct PublishResultFFI { pub packet_id: Option, pub reason_code: Option, pub qos: u8, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record, serde::Serialize))] +#[derive(Clone)] pub struct SubscribeResultFFI { pub packet_id: u16, pub reason_codes: Vec, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record, serde::Serialize))] +#[derive(Clone)] pub struct UnsubscribeResultFFI { pub packet_id: u16, pub reason_codes: Vec, } -#[derive(uniffi::Enum, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Enum, serde::Serialize))] +#[derive(Clone)] pub enum MqttEventFFI { Connected(ConnectionResultFFI), Disconnected { reason_code: Option }, @@ -47,7 +53,7 @@ pub enum MqttEventFFI { ReconnectScheduled { attempt: u32, delay_ms: u64 }, } -#[derive(uniffi::Record)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] pub struct MqttOptionsFFI { pub client_id: String, pub mqtt_version: u8, @@ -60,7 +66,7 @@ pub struct MqttOptionsFFI { pub max_reconnect_attempts: u32, } -#[derive(uniffi::Record)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] pub struct MqttTlsOptionsFFI { pub ca_cert_file: Option, pub client_cert_file: Option, @@ -70,7 +76,7 @@ pub struct MqttTlsOptionsFFI { pub enable_key_log: bool, } -#[derive(uniffi::Record)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] pub struct MqttDatagramFFI { pub addr: String, pub data: Vec, diff --git a/flowsdk_ffi/src/lib.rs b/flowsdk_ffi/src/lib.rs index 28dca148..d84da405 100644 --- a/flowsdk_ffi/src/lib.rs +++ b/flowsdk_ffi/src/lib.rs @@ -1,5 +1,11 @@ // SPDX-License-Identifier: MPL-2.0 +// Alias the fork's quinn-proto to the standard name when building with the +// OpenSSL backend, so engine.rs code using quinn_proto:: works transparently. +#[cfg(feature = "quic-openssl")] +extern crate quinn_proto_openssl as quinn_proto; + pub mod engine; +#[cfg(feature = "uniffi-bindings")] uniffi::setup_scaffolding!("flowsdk_ffi"); diff --git a/scripts/generate_coverage.sh b/scripts/generate_coverage.sh index 615cdbc4..49fac5a0 100755 --- a/scripts/generate_coverage.sh +++ b/scripts/generate_coverage.sh @@ -11,6 +11,11 @@ mkdir -p target/llvm-cov-target # 1. Collect coverage from Rust tests and examples cargo +stable llvm-cov --workspace --no-report --tests cargo +stable llvm-cov --workspace --no-report --tests -- --ignored +# Mainstream QUIC build mode (ring crypto, mainstream quinn): covers +# #[cfg(not(feature = "quic-proto-openssl"))] branches +cargo +stable llvm-cov --workspace --no-report --tests --features quic +# All features (includes quic-openssl): covers +# #[cfg(feature = "quic-proto-openssl"))] branches cargo +stable llvm-cov --workspace --no-report --examples --all-features cargo +stable llvm-cov report --lcov --output-path lcov.info diff --git a/src/lib.rs b/src/lib.rs index b96ecfdc..734342e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,14 @@ // SPDX-License-Identifier: MPL-2.0 +// When building with the OpenSSL QUIC backend (fork), alias the renamed crates +// back to the standard names so all downstream code using `quinn::` and +// `quinn_proto::` works transparently. LTO strips the unused mainstream quinn +// symbols from the release-small binary. +#[cfg(feature = "quic-openssl")] +extern crate quinn_openssl as quinn; +#[cfg(feature = "quic-proto-openssl")] +extern crate quinn_proto_openssl as quinn_proto; + pub mod mqtt_client; pub mod mqtt_serde; pub mod mqtt_session; diff --git a/src/mqtt_client/engine.rs b/src/mqtt_client/engine.rs index 387e4f7c..32f1a131 100644 --- a/src/mqtt_client/engine.rs +++ b/src/mqtt_client/engine.rs @@ -1,9 +1,9 @@ // SPDX-License-Identifier: MPL-2.0 -#[cfg(feature = "quic")] +#[cfg(feature = "quic-proto")] use quinn_proto::{ClientConfig, Connection, ConnectionHandle, Endpoint, EndpointConfig, StreamId}; use std::collections::VecDeque; -#[cfg(feature = "quic")] +#[cfg(feature = "quic-proto")] use std::sync::Arc; use std::time::{Duration, Instant}; @@ -853,7 +853,7 @@ impl MqttEngine { /// /// This engine combines the `MqttEngine` (MQTT state machine) with `quinn_proto` (QUIC state machine) /// to provide a complete MQTT-over-QUIC implementation that does not perform any direct I/O. -#[cfg(feature = "quic")] +#[cfg(feature = "quic-proto")] pub struct QuicMqttEngine { mqtt_engine: MqttEngine, endpoint: Endpoint, @@ -869,7 +869,7 @@ pub struct QuicMqttEngine { // stream_read_buffer removed } -#[cfg(feature = "quic")] +#[cfg(feature = "quic-proto")] impl QuicMqttEngine { pub fn new(options: MqttClientOptions) -> Result { // Initialize MqttEngine @@ -877,7 +877,11 @@ impl QuicMqttEngine { // Initialize QUIC Endpoint (Client) let endpoint_config = EndpointConfig::default(); - // Endpoint::new(config, server_config, disable_stateless_retry, reset_token_key) + // quinn-proto 0.11 (mainstream) has a 4th reset_token_key parameter; + // the fork (0.12) removed it. + #[cfg(feature = "quic-proto-openssl")] + let endpoint = Endpoint::new(Arc::new(endpoint_config), None, true); + #[cfg(not(feature = "quic-proto-openssl"))] let endpoint = Endpoint::new(Arc::new(endpoint_config), None, true, None); Ok(Self { diff --git a/src/mqtt_client/mod.rs b/src/mqtt_client/mod.rs index 418c2d2d..cce44529 100644 --- a/src/mqtt_client/mod.rs +++ b/src/mqtt_client/mod.rs @@ -12,6 +12,7 @@ pub mod opts; pub mod raw_packet; #[cfg(feature = "rustls-tls")] pub mod tls_engine; +#[cfg(feature = "async-client")] pub mod tokio_async_client; #[cfg(feature = "quic")] pub mod tokio_quic_client; @@ -33,6 +34,7 @@ pub use no_io_client::NoIoMqttClient; pub use opts::{MqttClientOptions, MqttClientOptionsBuilder}; #[cfg(feature = "rustls-tls")] pub use tls_engine::TlsMqttEngine; +#[cfg(feature = "async-client")] pub use tokio_async_client::{ TokioAsyncClientConfig, TokioAsyncMqttClient, TokioMqttEvent, TokioMqttEventHandler, }; diff --git a/src/mqtt_client/transport/mod.rs b/src/mqtt_client/transport/mod.rs index af8d0337..8b7159cb 100644 --- a/src/mqtt_client/transport/mod.rs +++ b/src/mqtt_client/transport/mod.rs @@ -9,6 +9,7 @@ use async_trait::async_trait; use std::io; use tokio::io::{AsyncRead, AsyncWrite}; +#[cfg(feature = "async-client")] pub mod tcp; #[cfg(feature = "tls")] @@ -101,6 +102,7 @@ pub trait Transport: AsyncRead + AsyncWrite + Send + Sync + Unpin { pub type BoxedTransport = Box; // Re-export transport types +#[cfg(feature = "async-client")] pub use tcp::TcpTransport; #[cfg(feature = "tls")] diff --git a/src/mqtt_client/transport/quic.rs b/src/mqtt_client/transport/quic.rs index 688116f9..2866d98e 100644 --- a/src/mqtt_client/transport/quic.rs +++ b/src/mqtt_client/transport/quic.rs @@ -455,7 +455,19 @@ mod imp { quinn_crypto.transport_config(Arc::new(trpt_cfg)); // Create an endpoint bound to an ephemeral UDP port - let mut endpoint = Endpoint::client("0.0.0.0:0".parse().unwrap()).map_err(|e| { + let socket = std::net::UdpSocket::bind("0.0.0.0:0").map_err(|e| { + TransportError::ConnectionFailed(format!("QUIC endpoint create failed: {}", e)) + })?; + let runtime = quinn::default_runtime() + .ok_or_else(|| TransportError::ConnectionFailed("no tokio runtime".to_string()))?; + #[allow(unused_mut)] + let mut endpoint = Endpoint::new( + quinn::EndpointConfig::default(), + None, + socket, + runtime, + ) + .map_err(|e| { TransportError::ConnectionFailed(format!("QUIC endpoint create failed: {}", e)) })?; diff --git a/src/priority_queue.rs b/src/priority_queue.rs index c1ba675e..7698d230 100644 --- a/src/priority_queue.rs +++ b/src/priority_queue.rs @@ -27,8 +27,7 @@ where impl PriorityQueue where - P: Ord + Clone + Serialize + DeserializeOwned, - T: Serialize + DeserializeOwned, + P: Ord + Clone, { /// Creates a new `PriorityQueue` with the specified capacity. pub fn new(capacity: usize) -> Self { @@ -99,7 +98,11 @@ where } /// Saves the queue state to a file (JSON format). - pub fn save_to_file>(&self, path: Q) -> io::Result<()> { + pub fn save_to_file>(&self, path: Q) -> io::Result<()> + where + P: Serialize + DeserializeOwned, + T: Serialize + DeserializeOwned, + { let file = File::create(path)?; let writer = BufWriter::new(file); serde_json::to_writer(writer, self)?; @@ -107,7 +110,11 @@ where } /// Restores the queue state from a file. - pub fn load_from_file>(path: Q) -> io::Result { + pub fn load_from_file>(path: Q) -> io::Result + where + P: Serialize + DeserializeOwned, + T: Serialize + DeserializeOwned, + { let file = File::open(path)?; let reader = BufReader::new(file); let queue: Self = serde_json::from_reader(reader)?;