diff --git a/.bumpversion.toml b/.bumpversion.toml index c25d9d42..739ccf36 100644 --- a/.bumpversion.toml +++ b/.bumpversion.toml @@ -33,6 +33,16 @@ filename = "questdb-rs/README.md" search = "https://github.com/questdb/c-questdb-client/blob/{current_version}/questdb-rs/" replace = "https://github.com/questdb/c-questdb-client/blob/{new_version}/questdb-rs/" +[[tool.bumpversion.files]] +filename = "questdb-rs/src/ingress/mod.md" +search = "https://github.com/questdb/c-questdb-client/tree/{current_version}/" +replace = "https://github.com/questdb/c-questdb-client/tree/{new_version}/" + +[[tool.bumpversion.files]] +filename = "questdb-rs/src/ingress/mod.md" +search = "https://github.com/questdb/c-questdb-client/blob/{current_version}/" +replace = "https://github.com/questdb/c-questdb-client/blob/{new_version}/" + [[tool.bumpversion.files]] filename = "questdb-rs-ffi/Cargo.toml" search = "version = \"{current_version}\"" diff --git a/ci/run_all_tests.py b/ci/run_all_tests.py index e8c6f250..a20f38f0 100644 --- a/ci/run_all_tests.py +++ b/ci/run_all_tests.py @@ -40,11 +40,19 @@ def main(): run_cmd('cargo', 'test', '--no-default-features', '--features=ring-crypto,tls-native-certs,sync-sender', '--', '--nocapture', cwd='questdb-rs') + # Narrow single-transport matrix legs: verify the library compiles + # and tests pass when a downstream consumer enables only ILP/TCP or + # only ILP/HTTP. Skip doctests via `--lib --tests --examples` -- the + # crate-level docs describe QWP/WebSocket (the default transport) so + # they assume `sync-sender-qwp-ws` is enabled, which docs.rs builds + # with anyway (see Cargo.toml `package.metadata.docs.rs`). run_cmd('cargo', 'test', '--no-default-features', '--features=ring-crypto,tls-webpki-certs,sync-sender-tcp', + '--lib', '--tests', '--examples', '--', '--nocapture', cwd='questdb-rs') run_cmd('cargo', 'test', '--no-default-features', '--features=ring-crypto,tls-webpki-certs,sync-sender-http', + '--lib', '--tests', '--examples', '--', '--nocapture', cwd='questdb-rs') run_cmd('cargo', 'test', '--features=almost-all-features', '--', '--nocapture', cwd='questdb-rs') diff --git a/ci/run_fuzz_pipeline.yaml b/ci/run_fuzz_pipeline.yaml index 61820c04..92543413 100644 --- a/ci/run_fuzz_pipeline.yaml +++ b/ci/run_fuzz_pipeline.yaml @@ -147,6 +147,14 @@ stages: exit 1 fi done + # Disable xtrace before emitting `##vso[task.setvariable]`: + # under `set -x`, bash also prints the command to stderr with + # single-quoted arguments. The Azure agent parses `##vso` from + # either stream and "last line wins" -- the stderr copy can + # win the race, leaking a trailing `'` into JAVA_HOME and + # making Maven@3 fail with `Not found jdkUserInputPath`. + # See https://questdb.com/blog/azure-pipelines-stdout-stderr-race-condition/ + set +x echo "##vso[task.setvariable variable=JAVA_HOME_17_X64]$JAVA_PATH_17" echo "##vso[task.setvariable variable=JAVA_HOME]$JAVA_PATH_25" displayName: "Install missing deps + resolve JDKs" diff --git a/ci/run_tests_pipeline.yaml b/ci/run_tests_pipeline.yaml index 2a2da047..d40abee6 100644 --- a/ci/run_tests_pipeline.yaml +++ b/ci/run_tests_pipeline.yaml @@ -160,6 +160,14 @@ stages: exit 1 fi done + # Disable xtrace before emitting `##vso[task.setvariable]`: + # under `set -x`, bash also prints the command to stderr with + # single-quoted arguments. The Azure agent parses `##vso` from + # either stream and "last line wins" -- the stderr copy can + # win the race, leaking a trailing `'` into JAVA_HOME and + # making Maven@3 fail with `Not found jdkUserInputPath`. + # See https://questdb.com/blog/azure-pipelines-stdout-stderr-race-condition/ + set +x echo "##vso[task.setvariable variable=JAVA_HOME_17_X64]$JAVA_PATH_17" echo "##vso[task.setvariable variable=JAVA_HOME]$JAVA_PATH_25" displayName: "Install missing deps + resolve JDKs" @@ -327,6 +335,14 @@ stages: exit 1 fi done + # Disable xtrace before emitting `##vso[task.setvariable]`: + # under `set -x`, bash also prints the command to stderr with + # single-quoted arguments. The Azure agent parses `##vso` from + # either stream and "last line wins" -- the stderr copy can + # win the race, leaking a trailing `'` into JAVA_HOME and + # making Maven@3 fail with `Not found jdkUserInputPath`. + # See https://questdb.com/blog/azure-pipelines-stdout-stderr-race-condition/ + set +x echo "##vso[task.setvariable variable=JAVA_HOME_17_X64]$JAVA_PATH_17" echo "##vso[task.setvariable variable=JAVA_HOME]$JAVA_PATH_25" displayName: "Install missing deps + resolve JDKs" diff --git a/doc/QWP_WEBSOCKET_ARCHITECTURE.md b/doc/QWP_WEBSOCKET_ARCHITECTURE.md index 219f8c5e..8e7cbf5c 100644 --- a/doc/QWP_WEBSOCKET_ARCHITECTURE.md +++ b/doc/QWP_WEBSOCKET_ARCHITECTURE.md @@ -32,7 +32,8 @@ All code is under `questdb-rs/src/ingress/sender/`: Public configuration lives in `ingress/conf.rs`: `QwpWsConfig`, `SfDurability` (`Memory`; `Flush` and `Append` are parsed but currently rejected), -`QwpWsInitialConnectMode` (`Off | Sync | Async`, default `Off`). +`QwpWsInitialConnectMode` (`Off | Sync | Async`; effective default `Sync` +when any `reconnect_*` knob is explicitly configured, otherwise `Off`). --- @@ -450,9 +451,11 @@ counts after disruptions must use QuestDB table-level dedup (`DEDUP UPSERT KEYS(...)`); this matches the Java client's documented behavior. -`initial_connect_retry` is `QwpWsInitialConnectMode` (`Off | Sync | Async`, -default `Off`). `Async` lets the runner start before the first connection -succeeds; `Sync` retries inline; `Off` fails fast on the first attempt. +`initial_connect_retry` is `QwpWsInitialConnectMode` (`Off | Sync | Async`). +When unset, the effective default is `Sync` if any `reconnect_*` knob is +explicitly configured, otherwise `Off`. `Async` lets the runner start before +the first connection succeeds; `Sync` retries inline; `Off` fails fast on the +first attempt. ### 6.3 Error visibility diff --git a/questdb-rs/Cargo.toml b/questdb-rs/Cargo.toml index f0885689..c5b9bd42 100644 --- a/questdb-rs/Cargo.toml +++ b/questdb-rs/Cargo.toml @@ -178,3 +178,15 @@ required-features = ["sync-sender-http", "ndarray", "bigdecimal"] [[example]] name = "qwp_ws_unified_sfa_bench" required-features = ["sync-sender-qwp-ws"] + +[[example]] +name = "qwp_ws_basic" +required-features = ["sync-sender-qwp-ws"] + +[[example]] +name = "qwp_ws_failover" +required-features = ["sync-sender-qwp-ws"] + +[[example]] +name = "qwp_ws_error_handling" +required-features = ["sync-sender-qwp-ws"] diff --git a/questdb-rs/README.md b/questdb-rs/README.md index ef7f9452..1efd9e23 100644 --- a/questdb-rs/README.md +++ b/questdb-rs/README.md @@ -3,141 +3,156 @@ Official Rust client for [QuestDB](https://questdb.io/), an open-source SQL database designed to process time-series data, faster. -The client library is designed for fast ingestion of data into QuestDB. It -supports three transports: the InfluxDB Line Protocol (ILP) over HTTP -(recommended) or TCP, and the QuestDB Wire Protocol (QWP) over UDP for -high-throughput ingestion on trusted networks. - -* [QuestDB Database docs](https://questdb.io/docs/) -* [Docs on InfluxDB Line Protocol](https://questdb.io/docs/reference/api/ilp/overview/) +The client library is designed for fast ingestion of data into QuestDB +over the **QuestDB Wire Protocol over WebSocket (QWP/WebSocket)**: a +columnar binary protocol with explicit asynchronous server +acknowledgements, multi-host failover, optional on-disk durability, and a +structured error model. + +* [Rust client documentation](https://questdb.com/docs/connect/clients/rust/) — + the full guide on the QuestDB documentation site. +* [`ingress` module docs](https://docs.rs/questdb-rs/6.1.0/questdb/ingress/) — + this crate's API reference: protocol details, configuration parameters, + and patterns. +* [QuestDB Database docs](https://questdb.com/docs/) — the wider + documentation site (SQL reference, deployment, operations). ## Transports The transport is selected by the scheme in the configuration string: -* `http::addr=...` / `https::addr=...` — request-response, errors returned - to the client, supports authentication and TLS. Recommended for most - workloads. -* `tcp::addr=...` / `tcps::addr=...` — streaming, legacy; errors cause - server-side disconnect and surface only in server logs. -* `qwpudp::addr=...` — best-effort UDP datagrams (IPv4-only); no - acknowledgements, no authentication, no TLS, no transactional guarantees. - See the [`ingress`](https://docs.rs/questdb-rs/latest/questdb/ingress/) - module docs (in particular `Protocol::QwpUdp`) for semantics and - configuration parameters. - -## Protocol Versions - -The library supports the following ILP protocol versions. These apply to -ILP/HTTP and ILP/TCP only — QWP/UDP uses its own wire format and is not -versioned through this mechanism. - -* If you use HTTP and `protocol_version=auto` or unset, the library will - automatically detect the server's - latest supported protocol version and use it (recommended). -* If you use TCP, you can specify the - `protocol_version=N` parameter when constructing the `Sender` object - (TCP defaults to `protocol_version=1`). - -| Version | Description | Server Compatibility | -| ------- | ------------------------------------------------------- | --------------------- | -| **1** | Over HTTP it's compatible InfluxDB Line Protocol (ILP) | All QuestDB versions | -| **2** | 64-bit floats sent as binary, adds n-dimentional arrays | 9.0.0+ (2023-10-30) | - -**Note**: QuestDB server version 9.0.0 or later is required for `protocol_version=2` support. +* `ws::addr=...` / `wss::addr=...` — **QWP/WebSocket**, the recommended + transport. Columnar binary frames, asynchronous server ACKs with explicit + frame-sequence-number watermarks, multi-host failover, optional + store-and-forward durability. Supports HTTP basic and bearer-token auth, + plus TLS via `wss::`. The long-form aliases `qwpws::` / `qwpwss::` are + also accepted. +* `http::addr=...` / `https::addr=...` — **ILP/HTTP** (legacy). + Request-response with error returns and per-request retry, no + asynchronous ACKs or multi-host failover. Suitable for existing + deployments and one-shot batches. +* `tcp::addr=...` / `tcps::addr=...` — **ILP/TCP** (legacy). Streaming with + no error reporting to the client; the server logs failures and silently + disconnects. Lowest overhead, lowest observability. + +See the [`ingress` module docs](https://docs.rs/questdb-rs/6.1.0/questdb/ingress/) +for the full configuration-parameter reference, including the +QWP-specific keys (`sf_dir`, `sender_id`, `reconnect_*`, +`request_durable_ack`, `qwp_ws_progress`, `max_in_flight`). ## Quick Start -To start using `questdb-rs`, add it as a dependency of your project: +Add `questdb-rs` to your project: ```bash cargo add questdb-rs ``` -Then you can try out this quick example, which connects to a QuestDB server -running on your local machine: +A minimal ingest using QWP/WebSocket: ```rust ignore use questdb::{ Result, - ingress::{ - Sender, - Buffer, - TimestampNanos}}; + ingress::{Sender, TimestampNanos}}; fn main() -> Result<()> { - let mut sender = Sender::from_conf("http::addr=localhost:9000;")?; - let mut buffer = sender.new_buffer(); - buffer - .table("trades")? - .symbol("symbol", "ETH-USD")? - .symbol("side", "sell")? - .column_f64("price", 2615.54)? - .column_f64("amount", 0.00044)? - - // Array ingestion (QuestDB 9.0.0+). Slices and ndarray supported through trait - .column_arr("price_history", &[2615.54f64, 2615.10, 2614.80])? - .column_arr("volatility", &ndarray::arr1(&[0.012f64, 0.011, 0.013]).view())? - .at(TimestampNanos::now())?; - sender.flush(&mut buffer)?; - Ok(()) + let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; + let mut buffer = sender.new_buffer(); + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .symbol("side", "sell")? + .column_f64("price", 2615.54)? + .column_f64("amount", 0.00044)? + .at(TimestampNanos::now())?; + sender.flush(&mut buffer)?; + sender.close_drain()?; + Ok(()) } ``` +`flush` returns once the frame has been appended to the local publication +log. `close_drain` waits for already-published frames to be acknowledged +by the server (bounded by `close_flush_timeout_millis`, default 5s) before +the sender is dropped. + ## Docs -Most of the client documentation is on the -[`ingress`](https://docs.rs/questdb-rs/6.1.0/questdb/ingress/) module page. +This crate's API reference is on the +[`ingress`](https://docs.rs/questdb-rs/6.1.0/questdb/ingress/) module +page: configuration keys, the QWP error model, FSN-based completion, +progress modes, multi-host failover, store-and-forward, authentication, +TLS, the `Buffer` API, and the legacy ILP transports. + +For the full Rust client guide — failover, store-and-forward operations, +migration from ILP, worked examples — see the +[Rust client documentation](https://questdb.com/docs/connect/clients/rust/) +on the QuestDB documentation site. ## Examples -A selection of usage examples is available in the [examples directory](https://github.com/questdb/c-questdb-client/tree/6.1.0/questdb-rs/examples): +QWP/WebSocket examples in the +[examples directory](https://github.com/questdb/c-questdb-client/tree/6.1.0/questdb-rs/examples): | Example | Description | |---------|-------------| -| [`basic.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/basic.rs) | Minimal TCP ingestion example; shows basic row and array ingestion. | -| [`auth.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/auth.rs) | Adds authentication (user/password, token) to basic ingestion. | -| [`auth_tls.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/auth_tls.rs) | Like `auth.rs`, but uses TLS for encrypted TCP connections. | -| [`from_conf.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/from_conf.rs) | Configures client via connection string instead of builder pattern. | -| [`from_env.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/from_env.rs) | Reads config from `QDB_CLIENT_CONF` environment variable. | -| [`http.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/http.rs) | Uses HTTP transport and demonstrates array ingestion with `ndarray`. | -| [`protocol_version.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/protocol_version.rs) | Shows protocol version selection and feature differences (e.g. arrays). | +| [`qwp_ws_basic.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_basic.rs) | Minimal QWP/WebSocket ingestion: build a sender, flush a row, `close_drain`. | +| [`qwp_ws_failover.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_failover.rs) | Multi-host `addr=` list with on-disk store-and-forward and `sender_id`. | +| [`qwp_ws_error_handling.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_error_handling.rs) | Server-error handling via `poll_qwp_ws_error` and via the `qwp_ws_error_handler` callback. | +| [`qwp_ws_unified_sfa_bench.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_unified_sfa_bench.rs) | Throughput benchmark with store-and-forward. | + +Build and run any of these with, for example: + +```sh +cargo run --example qwp_ws_basic --features sync-sender-qwp-ws +``` ## Crate features -The crate provides several optional features to enable additional functionality. You can enable features using Cargo's `--features` flag or in your `Cargo.toml`. +The crate provides optional features. Enable them via Cargo's +`--features` flag or in your `Cargo.toml`. ### Default features -- **sync-sender**: Enables both `sync-sender-tcp` and `sync-sender-http`. -- **sync-sender-tcp**: Enables ILP/TCP (legacy). Depends on the `socket2` crate. -- **sync-sender-http**: Enables ILP/HTTP support. Depends on the `ureq` crate. -- **tls-webpki-certs**: Uses a snapshot of the [Common CA Database](https://www.ccadb.org/) as root TLS certificates. Depends on the `webpki-roots` crate. -- **ring-crypto**: Uses the `ring` crate as the cryptography backend for TLS (default crypto backend). - -### Optional features -- **chrono_timestamp**: Allows specifying timestamps as `chrono::DateTime` objects. Depends on the `chrono` crate. -- **tls-native-certs**: Uses OS-provided root TLS certificates for secure connections. Depends on the `rustls-native-certs` crate. -- **insecure-skip-verify**: Allows skipping verification of insecure certificates (not recommended for production). -- **ndarray**: Enables integration with the `ndarray` crate for working with n-dimensional arrays. Without this feature, you can still send slices or implement custom array types via the `NdArrayView` trait. -- **aws-lc-crypto**: Uses `aws-lc-rs` as the cryptography backend for TLS. Mutually exclusive with the `ring-crypto` feature. +* **sync-sender** — umbrella for the synchronous sender transports. +* **sync-sender-qwp-ws** — QWP over WebSocket. Recommended transport. +* **sync-sender-http** — ILP over HTTP (legacy). +* **sync-sender-tcp** — ILP over TCP (legacy). +* **tls-webpki-certs** — bundled TLS roots from the + [Common CA Database](https://www.ccadb.org/) via + [`webpki-roots`](https://crates.io/crates/webpki-roots). +* **ring-crypto** — default TLS crypto backend, via the `ring` crate. -- **almost-all-features**: Convenience feature for development and testing. Enables most features except mutually exclusive crypto backends. +### Optional features -> See the `Cargo.toml` for the full list and details on feature interactions. +* **chrono_timestamp** — accept timestamps as `chrono::DateTime`. +* **tls-native-certs** — OS-provided root TLS certificates via + `rustls-native-certs`. +* **insecure-skip-verify** — disable TLS verification (test-only, + not for production). +* **ndarray** — integrate with the `ndarray` crate for N-dimensional + arrays. Without this feature, slices and custom types via the + `NdArrayView` trait still work. +* **aws-lc-crypto** — alternative TLS backend via `aws-lc-rs`. Mutually + exclusive with `ring-crypto`. +* **almost-all-features** — dev/test convenience: enables most features + except mutually exclusive crypto backends. + +> See `Cargo.toml` for the full list and feature interactions. ## C, C++ and Python APIs -This crate is also exposed as a C and C++ API and in turn exposed to Python. +This crate is also exposed as a C and C++ API and in turn exposed to +Python. -* This project's [GitHub page](https://github.com/questdb/c-questdb-client) - for the C and C++ API. -* [Python bindings](https://github.com/questdb/py-questdb-client). +* [c-questdb-client](https://github.com/questdb/c-questdb-client) — the C + and C++ API. +* [py-questdb-client](https://github.com/questdb/py-questdb-client) — + Python bindings. ## Community -If you need help, have additional questions or want to provide feedback, you -may find us on [Slack](https://slack.questdb.io/). - -You can also sign up to our [mailing list](https://questdb.io/community/) to -get notified of new releases. +If you need help, have questions, or want to provide feedback, +[join us on Slack](https://slack.questdb.io/). You can also sign up to +the [mailing list](https://questdb.com/community/) to get notified of new +releases. diff --git a/questdb-rs/examples/qwp_ws_basic.rs b/questdb-rs/examples/qwp_ws_basic.rs new file mode 100644 index 00000000..64c77a07 --- /dev/null +++ b/questdb-rs/examples/qwp_ws_basic.rs @@ -0,0 +1,51 @@ +//! Minimal QWP/WebSocket ingestion. +//! +//! Publishes one trades row to a QuestDB server reachable on +//! `ws::addr=localhost:9000;` (override host and port via command-line +//! arguments) and waits for in-flight frames to be acknowledged before +//! the sender is dropped. +//! +//! Run with: +//! +//! ```sh +//! cargo run --example qwp_ws_basic --features sync-sender-qwp-ws +//! cargo run --example qwp_ws_basic --features sync-sender-qwp-ws -- db.example.com 9000 +//! ``` + +use questdb::{ + Result, + ingress::{Sender, TimestampNanos}, +}; + +fn main() -> Result<()> { + let host = std::env::args() + .nth(1) + .unwrap_or_else(|| "localhost".to_string()); + let port = std::env::args() + .nth(2) + .unwrap_or_else(|| "9000".to_string()); + let conf = format!("ws::addr={host}:{port};"); + + let mut sender = Sender::from_conf(&conf)?; + let mut buffer = sender.new_buffer(); + + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .symbol("side", "sell")? + .column_f64("price", 2615.54)? + .column_f64("amount", 0.00044)? + .at(TimestampNanos::now())?; + + sender.flush(&mut buffer)?; + + // close_drain stops accepting new publications and waits up to + // close_flush_timeout_millis (default 5000) for already-published + // frames to be acknowledged by the server. Skip it (or rely on + // Sender::drop) only if delivery is not delivery-sensitive -- + // without close_drain, in-flight frames may not reach the server + // and any delivery failure is silent. + sender.close_drain()?; + + Ok(()) +} diff --git a/questdb-rs/examples/qwp_ws_error_handling.rs b/questdb-rs/examples/qwp_ws_error_handling.rs new file mode 100644 index 00000000..49b90e52 --- /dev/null +++ b/questdb-rs/examples/qwp_ws_error_handling.rs @@ -0,0 +1,99 @@ +//! QWP/WebSocket asynchronous error handling. +//! +//! Server errors (schema mismatch, parse error, security error, etc.) +//! arrive asynchronously after `flush` has returned. There are two +//! ways to observe them: +//! +//! - Poll for them with `Sender::poll_qwp_ws_error`. The diagnostic +//! log is bounded; if the caller polls too slowly, older +//! diagnostics are dropped and the dropped count is available via +//! `Sender::qwp_ws_errors_dropped`. +//! - Install a callback at construction time via +//! `SenderBuilder::qwp_ws_error_handler`. The callback runs +//! synchronously from sender API calls such as `flush` and must +//! not call methods on the same sender. +//! +//! This example shows both styles back-to-back. +//! +//! Run with: +//! +//! ```sh +//! cargo run --example qwp_ws_error_handling --features sync-sender-qwp-ws +//! ``` + +use questdb::{ + Result, + ingress::{Sender, SenderBuilder, TimestampNanos}, +}; +use std::time::Duration; + +fn polling_style() -> Result<()> { + let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; + let mut buffer = sender.new_buffer(); + + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .column_f64("price", 2615.54)? + .at(TimestampNanos::now())?; + let published_fsn = sender.flush_and_get_fsn(&mut buffer)?; + + let wait_result = if let Some(fsn) = published_fsn { + match sender.await_acked_fsn(fsn, Duration::from_secs(5)) { + Ok(true) => Ok(()), + Ok(false) => { + eprintln!("timed out waiting for QWP/WebSocket frame {fsn} to complete"); + Ok(()) + } + Err(err) => Err(err), + } + } else { + Ok(()) + }; + + // Drain server-side diagnostics observed for completed frames. + while let Some(err) = sender.poll_qwp_ws_error()? { + eprintln!( + "qwp error (poll): category={:?} policy={:?} fsn=[{}..={}] msg={:?}", + err.category, err.applied_policy, err.from_fsn, err.to_fsn, err.message + ); + } + + let dropped = sender.qwp_ws_errors_dropped()?; + if dropped > 0 { + eprintln!("note: {dropped} diagnostic(s) were dropped from the log"); + } + + sender.close_drain()?; + wait_result +} + +fn callback_style() -> Result<()> { + let mut sender = SenderBuilder::from_conf("ws::addr=localhost:9000;")? + .qwp_ws_error_handler(|err| { + eprintln!( + "qwp error (callback): category={:?} policy={:?} fsn=[{}..={}] msg={:?}", + err.category, err.applied_policy, err.from_fsn, err.to_fsn, err.message + ); + })? + .build()?; + + let mut buffer = sender.new_buffer(); + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .column_f64("price", 2615.54)? + .at(TimestampNanos::now())?; + sender.flush(&mut buffer)?; + + sender.close_drain()?; + Ok(()) +} + +fn main() -> Result<()> { + println!("polling style:"); + polling_style()?; + println!("\ncallback style:"); + callback_style()?; + Ok(()) +} diff --git a/questdb-rs/examples/qwp_ws_failover.rs b/questdb-rs/examples/qwp_ws_failover.rs new file mode 100644 index 00000000..78fd79e6 --- /dev/null +++ b/questdb-rs/examples/qwp_ws_failover.rs @@ -0,0 +1,77 @@ +//! QWP/WebSocket multi-host failover with store-and-forward. +//! +//! Demonstrates a production-style connect string with: +//! +//! - Multiple `addr=` entries (the driver walks the list to find a +//! healthy peer when the current connection breaks). +//! - `sf_dir=...` so unacknowledged frames spill to disk and are +//! replayed after reconnects and process restarts. Replay is +//! at-least-once, so target tables should declare +//! `DEDUP UPSERT KEYS(...)`. +//! - `sender_id=...` identifying this sender's slot under `sf_dir`. +//! Use a distinct id per sender process when several share the +//! same directory. +//! - `reconnect_max_duration_millis=...` to bound the per-outage +//! reconnect budget. +//! +//! Replace the address list with hosts on your network and ensure the +//! `sf_dir` path is writable. +//! +//! Run with: +//! +//! ```sh +//! cargo run --example qwp_ws_failover --features sync-sender-qwp-ws +//! ``` + +use std::thread; +use std::time::Duration; + +use questdb::{ + Result, + ingress::{Sender, TimestampNanos}, +}; + +fn main() -> Result<()> { + let sf_dir = std::env::temp_dir().join("myapp-qdb-sf"); + let conf = format!( + "ws::addr=db-primary:9000,db-replica-1:9000,db-replica-2:9000;\ + sf_dir={};\ + sender_id=ingest-1;\ + reconnect_max_duration_millis=300000;", + sf_dir.display() + ); + + let mut sender = Sender::from_conf(conf)?; + let mut buffer = sender.new_buffer(); + + // Publish ten rows, one per second. In a real ingest service this + // loop would be driven by your data source instead. + for _ in 0..10 { + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .symbol("side", "sell")? + .column_f64("price", 2615.54)? + .column_f64("amount", 0.00044)? + .at(TimestampNanos::now())?; + + // For QWP/WebSocket, a successful flush publishes the batch + // to the local replay queue or Store-and-Forward storage. + // Delivery, ACKs, reconnect, and replay happen asynchronously. + // If flush returns an error, the buffer is still intact; do + // not append more rows until you have retried, cleared, or + // dropped it. + if let Err(e) = sender.flush(&mut buffer) { + eprintln!("flush error: {e}"); + if sender.must_close() { + eprintln!("sender is terminal; aborting"); + } + return Err(e); + } + + thread::sleep(Duration::from_secs(1)); + } + + sender.close_drain()?; + Ok(()) +} diff --git a/questdb-rs/src/ingress.rs b/questdb-rs/src/ingress.rs index 34f780fa..e21a03f8 100644 --- a/questdb-rs/src/ingress.rs +++ b/questdb-rs/src/ingress.rs @@ -190,7 +190,7 @@ fn validate_auto_flush_params(params: &HashMap) -> Result<()> { )); } - for ¶m in ["auto_flush_rows", "auto_flush_bytes"].iter() { + for ¶m in ["auto_flush_rows", "auto_flush_bytes", "auto_flush_interval"].iter() { if params.contains_key(param) { return Err(error::fmt!( ConfigError, @@ -523,51 +523,50 @@ fn parse_qwp_ws_endpoints( Ok(endpoints) } -/// Accumulates parameters for a new `Sender` instance. +#[cfg(feature = "_sender-qwp-ws")] +fn qwp_ws_reconnect_policy_was_specified(qwp_ws: &conf::QwpWsConfig) -> bool { + matches!(&qwp_ws.reconnect_max_duration, ConfigSetting::Specified(_)) + || matches!( + &qwp_ws.reconnect_initial_backoff, + ConfigSetting::Specified(_) + ) + || matches!(&qwp_ws.reconnect_max_backoff, ConfigSetting::Specified(_)) +} + +#[cfg(feature = "_sender-qwp-ws")] +fn qwp_ws_effective_config(qwp_ws: &conf::QwpWsConfig) -> conf::QwpWsConfig { + let mut effective = qwp_ws.clone(); + if matches!( + &effective.initial_connect_retry, + ConfigSetting::Defaulted(_) + ) && qwp_ws_reconnect_policy_was_specified(qwp_ws) + { + effective.initial_connect_retry = + ConfigSetting::Defaulted(conf::QwpWsInitialConnectMode::Sync); + } + effective +} + +/// Accumulates parameters for a new `Sender`. /// -/// You can also create the builder from a config string. +/// Most callers should use [`SenderBuilder::from_conf`] with a connect +/// string; the per-key methods on this builder are for callers that +/// need to configure programmatically. /// /// ```no_run /// # use questdb::Result; /// use questdb::ingress::SenderBuilder; -/// /// # fn main() -> Result<()> { -/// let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?; -/// # Ok(()) +/// # #[cfg(feature = "sync-sender-qwp-ws")] { +/// let _sender = SenderBuilder::from_conf("ws::addr=localhost:9000;")?.build()?; /// # } -/// ``` -/// -/// Or create it from the `QDB_CLIENT_CONF` environment variable. -/// -/// ```no_run -/// # use questdb::Result; -/// use questdb::ingress::SenderBuilder; -/// -/// # fn main() -> Result<()> { -/// // export QDB_CLIENT_CONF="https::addr=localhost:9000;" -/// let mut sender = SenderBuilder::from_env()?.build()?; /// # Ok(()) /// # } /// ``` /// -/// The `SenderBuilder` can also be built programmatically. -/// The minimum required parameters are the protocol, host, and port. -/// -/// ```no_run -/// # use questdb::Result; -/// use questdb::ingress::SenderBuilder; -/// use questdb::ingress::Protocol; -/// -/// # fn main() -> Result<()> { -/// # #[cfg(feature = "sync-sender-http")] { -/// let mut sender = SenderBuilder::new(Protocol::Http, "localhost", 9000).build()?; -/// # } -/// # #[cfg(all(not(feature = "sync-sender-http"), feature = "sync-sender-tcp"))] { -/// let mut sender = SenderBuilder::new(Protocol::Tcp, "localhost", 9009).build()?; -/// # } -/// # Ok(()) -/// # } -/// ``` +/// See the +/// [Rust client guide](https://questdb.com/docs/connect/clients/rust/) +/// for the full configuration reference. #[derive(Debug, Clone)] pub struct SenderBuilder { protocol: Protocol, @@ -609,44 +608,26 @@ pub struct SenderBuilder { } impl SenderBuilder { - /// Create a new `SenderBuilder` instance from the configuration string. - /// - /// The format of the string is: `"http::addr=host:port;key=value;...;"`. - /// - /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, `"tcps"`, - /// `"qwpudp"`, `"qwpws"`, `"qwpwss"`, and the QWP/WebSocket aliases - /// `"ws"` / `"wss"` when the corresponding sender features are enabled. + /// Build a `SenderBuilder` from a connect string. /// - /// We recommend HTTP for most cases because it provides more features, like - /// reporting errors to the client and supporting transaction control. TCP can - /// sometimes be faster in higher-latency networks, but misses a number of - /// features. + /// Connect strings have the form + /// `"::addr=host:port;key=value;...;"`. The recommended + /// transport is QWP/WebSocket (`ws::` / `wss::`); ILP/HTTP + /// (`http::` / `https::`), ILP/TCP (`tcp::` / `tcps::`), and QWP/UDP + /// (`qwpudp::`) are also accepted when their Cargo features are + /// enabled. /// - /// Many accepted keys match one-for-one with the methods on `SenderBuilder`. - /// For example, this is a valid configuration string: + /// See the + /// [connect-string reference](https://questdb.com/docs/connect/clients/connect-string/) + /// for the full key list. Some QWP-specific keys (e.g. `sf_dir`, + /// `sender_id`, `in_flight_window`) are accepted only through the + /// connect string and not through a builder method. /// - /// "https::addr=host:port;username=alice;password=secret;" + /// You can also load the configuration from an environment variable. + /// See [`SenderBuilder::from_env`]. /// - /// and there are matching methods [SenderBuilder::username] and - /// [SenderBuilder::password]. The value of `addr=` is supplied directly to the - /// `SenderBuilder` constructor, so there's no matching method for that. - /// - /// Some QWP/WebSocket configuration keys are accepted only through the - /// configuration string, primarily for compatibility with Java-style - /// configuration names and settings without a public Rust builder method. - /// These include `in_flight_window`, `sf_dir`, `sender_id`, `sf_max_bytes`, - /// `sf_max_total_bytes`, `sf_durability`, `sf_append_deadline_millis`, - /// `auth_timeout_ms`, `close_flush_timeout_millis`, `request_durable_ack`, - /// `durable_ack_keepalive_interval_millis`, `drain_orphans`, - /// `max_background_drainers`, `error_inbox_capacity`, and - /// `max_schemas_per_connection`. - /// - /// You can also load the configuration from an environment variable. See - /// [`SenderBuilder::from_env`]. - /// - /// Once you have a `SenderBuilder` instance, you can further customize it - /// before calling [`SenderBuilder::build`], but you can't change any settings - /// that are already set in the config string. + /// Settings already supplied via the connect string cannot be + /// overridden by subsequent builder calls. pub fn from_conf>(conf: T) -> Result { let conf = conf.as_ref(); #[cfg(feature = "_sender-qwp-ws")] @@ -952,25 +933,11 @@ impl SenderBuilder { Self::from_conf(conf) } - /// Create a new `SenderBuilder` instance with the provided QuestDB - /// server and port, using ILP over the specified protocol. + /// Create a `SenderBuilder` with the given transport, host, and + /// port. /// - /// ```no_run - /// # use questdb::Result; - /// use questdb::ingress::{Protocol, SenderBuilder}; - /// - /// # fn main() -> Result<()> { - /// # #[cfg(feature = "sync-sender-tcp")] { - /// let mut sender = SenderBuilder::new( - /// Protocol::Tcp, "localhost", 9009).build()?; - /// # } - /// # #[cfg(all(not(feature = "sync-sender-tcp"), feature = "sync-sender-http"))] { - /// let mut sender = SenderBuilder::new( - /// Protocol::Http, "localhost", 9000).build()?; - /// # } - /// # Ok(()) - /// # } - /// ``` + /// Most callers should use [`SenderBuilder::from_conf`] or + /// [`Sender::from_conf`] instead. pub fn new, P: Into>(protocol: Protocol, host: H, port: P) -> Self { let host = host.into(); let port: Port = port.into(); @@ -1079,12 +1046,11 @@ impl SenderBuilder { /// Set the username for authentication. /// - /// For TCP, this is the `kid` part of the ECDSA key set. - /// The other fields are [`token`](SenderBuilder::token), [`token_x`](SenderBuilder::token_x), - /// and [`token_y`](SenderBuilder::token_y). - /// - /// For HTTP, this is a part of basic authentication. - /// See also: [`password`](SenderBuilder::password). + /// For QWP/WebSocket and ILP/HTTP this is the basic-auth username; + /// pair with [`password`](SenderBuilder::password). For ILP/TCP this + /// is the `kid` part of the ECDSA key set; pair with + /// [`token`](SenderBuilder::token) / [`token_x`](SenderBuilder::token_x) + /// / [`token_y`](SenderBuilder::token_y). pub fn username(mut self, username: &str) -> Result { #[cfg(feature = "_sender-qwp-udp")] self.reject_if_qwp_udp("username")?; @@ -1093,8 +1059,8 @@ impl SenderBuilder { Ok(self) } - /// Set the password for basic HTTP authentication. - /// See also: [`username`](SenderBuilder::username). + /// Set the password for basic authentication (QWP/WebSocket or + /// ILP/HTTP). See also: [`username`](SenderBuilder::username). pub fn password(mut self, password: &str) -> Result { #[cfg(feature = "_sender-qwp-udp")] self.reject_if_qwp_udp("password")?; @@ -1103,8 +1069,11 @@ impl SenderBuilder { Ok(self) } - /// Set the Token (Bearer) Authentication parameter for HTTP, - /// or the ECDSA private key for TCP authentication. + /// Set the bearer-token authentication value for QWP/WebSocket or + /// ILP/HTTP. On ILP/TCP this is the ECDSA private key (the `d` + /// component), used with [`username`](SenderBuilder::username) / + /// [`token_x`](SenderBuilder::token_x) / + /// [`token_y`](SenderBuilder::token_y). pub fn token(mut self, token: &str) -> Result { #[cfg(feature = "_sender-qwp-udp")] self.reject_if_qwp_udp("token")?; @@ -1159,15 +1128,14 @@ impl SenderBuilder { } } - /// Sets the protocol version for ILP transports. - /// - HTTP transport automatically negotiates the protocol version by default(unset, **Strong Recommended**). - /// You can explicitly configure the protocol version to avoid the slight latency cost at connection time. - /// - TCP transport does not negotiate the protocol version and uses [`ProtocolVersion::V1`] by - /// default. You must explicitly set [`ProtocolVersion::V2`] in order to ingest - /// arrays. - /// - QWP/UDP does not support explicit `protocol_version` configuration. + /// Set the ILP protocol version. /// - /// **Note**: QuestDB server version 9.0.0 or later is required for [`ProtocolVersion::V2`] support. + /// Applies only to the legacy ILP transports. ILP/HTTP auto-negotiates + /// by default (leave unset). ILP/TCP defaults to + /// [`ProtocolVersion::V1`]; set [`ProtocolVersion::V2`] for arrays + /// (server 9.0.0+) or [`ProtocolVersion::V3`] for decimals + /// (server 9.2.0+). QWP transports negotiate during handshake and do + /// not honour this setting. pub fn protocol_version(mut self, protocol_version: ProtocolVersion) -> Result { #[cfg(feature = "_sender-qwp-udp")] self.reject_if_qwp_udp("protocol_version")?; @@ -1505,7 +1473,8 @@ impl SenderBuilder { } #[cfg(feature = "_sender-qwp-ws")] - /// Retry the initial connection using the reconnect policy. Default false. + /// Retry the initial connection using the reconnect policy. Defaults to + /// false unless a `reconnect_*` setting was specified. pub fn initial_connect_retry(mut self, value: bool) -> Result { let Some(qwp_ws) = &mut self.qwp_ws else { return Err(error::fmt!( @@ -1764,9 +1733,9 @@ impl SenderBuilder { /// The maximum buffered size that the client will flush to the server. /// The default is 100 MiB. /// - /// For ILP this applies to the exact pending byte length. - /// For QWP/UDP this applies to the buffer size hint exposed by [`Buffer::len`]. /// For QWP/WebSocket this applies to the encoded replay message size. + /// For QWP/UDP this applies to the buffer size hint exposed by [`Buffer::len`]. + /// For ILP this applies to the exact pending byte length. pub fn max_buf_size(mut self, value: usize) -> Result { let min = 1024; if value < min { @@ -1779,11 +1748,12 @@ impl SenderBuilder { Ok(self) } - /// The maximum length of a table or column name in bytes. - /// Matches the `cairo.max.file.name.length` setting in the server. - /// The default is 127 bytes. - /// If running over HTTP and protocol version 2 is auto-negotiated, this - /// value is picked up from the server. + /// Maximum length of a table or column name in bytes. + /// + /// Matches the server's `cairo.max.file.name.length` setting; the + /// default is 127 bytes. ILP/HTTP with auto-negotiated protocol + /// version 2 picks this up from the server; other transports use the + /// configured (or default) value. pub fn max_name_len(mut self, value: usize) -> Result { if value < 16 { return Err(error::fmt!( @@ -1979,10 +1949,9 @@ impl SenderBuilder { #[cfg(feature = "_sync-sender")] /// Build the sender. /// - /// In the case of TCP, this synchronously establishes the TCP connection, and - /// returns once the connection is fully established. If the connection - /// requires authentication or TLS, these will also be completed before - /// returning. + /// For ILP/TCP and QWP/WebSocket this synchronously establishes the + /// connection (including any authentication and TLS handshake) and + /// returns once it is fully established. pub fn build(&self) -> Result { let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port); @@ -2108,7 +2077,8 @@ impl SenderBuilder { "QWP/WebSocket configuration is missing." )); }; - reject_unsupported_qwp_ws_sf_config(qwp_ws)?; + let qwp_ws = qwp_ws_effective_config(qwp_ws); + reject_unsupported_qwp_ws_sf_config(&qwp_ws)?; let basic_auth = qwp_ws_auth_header(&auth)?; if *qwp_ws.progress == QwpWsProgress::Manual { if *qwp_ws.initial_connect_retry == conf::QwpWsInitialConnectMode::Async { @@ -2122,7 +2092,7 @@ impl SenderBuilder { self.port.as_str(), matches!(self.protocol, Protocol::QwpWss), tls_settings, - qwp_ws, + &qwp_ws, basic_auth, )?)) } else { @@ -2131,7 +2101,7 @@ impl SenderBuilder { self.port.as_str(), matches!(self.protocol, Protocol::QwpWss), tls_settings, - qwp_ws, + &qwp_ws, basic_auth, )? } diff --git a/questdb-rs/src/ingress/buffer.rs b/questdb-rs/src/ingress/buffer.rs index b81e3ce1..ffeebbd8 100644 --- a/questdb-rs/src/ingress/buffer.rs +++ b/questdb-rs/src/ingress/buffer.rs @@ -374,8 +374,11 @@ enum BufferInner { /// A reusable row buffer. /// -/// For ILP senders this exposes the existing byte-oriented buffer implementation. -/// For QWP/UDP senders it dispatches to the QWP-specific row buffer. +/// The same `Buffer` type is shared across all transports; the underlying +/// representation is chosen by the sender at construction time. For +/// QWP senders (WebSocket or UDP) it dispatches to the QWP-specific +/// columnar row buffer; for ILP senders (HTTP or TCP) it exposes the +/// byte-oriented ILP buffer. #[derive(Clone, Debug)] pub struct Buffer { inner: BufferInner, @@ -449,10 +452,10 @@ impl Buffer { /// Returns the protocol version associated with this buffer. /// - /// For ILP buffers this is the ILP protocol version. For QWP/UDP buffers - /// this is the QWP datagram version, currently represented as - /// [`ProtocolVersion::V1`]. Interpret the value together with the buffer - /// transport; do not use it by itself for ILP feature gating. + /// For ILP buffers this is the ILP protocol version. For QWP buffers + /// (UDP and WebSocket) this is the QWP wire-format version, currently + /// [`ProtocolVersion::V1`]. Interpret the value together with the + /// buffer transport; do not use it by itself for ILP feature gating. pub fn protocol_version(&self) -> ProtocolVersion { match &self.inner { BufferInner::Ilp(inner) => inner.protocol_version(), @@ -463,13 +466,12 @@ impl Buffer { } } - /// Reserves capacity associated with `additional` more bytes of buffered data. + /// Reserve capacity for `additional` more bytes of buffered data. /// /// For ILP buffers this reserves exact serialized-byte capacity. For - /// QWP/UDP buffers this is a heuristic prewarm of the internal arenas and - /// planner scratch used during datagram planning and encoding; it is not an - /// exact guarantee that [`Buffer::len`] can grow by `additional` bytes - /// without further allocation. + /// QWP buffers (UDP and WebSocket) this is a heuristic prewarm of the + /// internal arenas, not an exact guarantee that [`Buffer::len`] can + /// grow by `additional` bytes without further allocation. pub fn reserve(&mut self, additional: usize) { match &mut self.inner { BufferInner::Ilp(inner) => inner.reserve(additional), @@ -480,12 +482,12 @@ impl Buffer { } } - /// Returns the current buffered size. + /// Return the current buffered size. /// - /// For ILP buffers this is the exact serialized byte count. For QWP/UDP - /// buffers this is the size hint used for flush planning. For QWP/WebSocket - /// buffers this is only a local size hint; the sender enforces - /// `max_buf_size` against the encoded replay message. + /// On ILP buffers this is the exact serialized byte count. On QWP + /// buffers (UDP and WebSocket) it is a local size hint; the actual + /// flushed payload may differ once the buffer is encoded into + /// datagrams or frames. pub fn len(&self) -> usize { match &self.inner { BufferInner::Ilp(inner) => inner.len(), @@ -510,11 +512,12 @@ impl Buffer { } } - /// Returns whether the buffered batch is transactional. + /// Return whether the buffered batch is transactional. /// - /// For ILP buffers this is `true` only while the buffer contains rows for - /// at most one table. QWP/UDP does not support transactional flushes, so - /// QWP buffers always return `false`. + /// For ILP buffers this is `true` only while the buffer contains + /// rows for at most one table. QWP buffers (UDP and WebSocket) do + /// not expose flush-level atomicity through this getter and always + /// return `false`. pub fn transactional(&self) -> bool { match &self.inner { BufferInner::Ilp(inner) => inner.transactional(), @@ -536,11 +539,11 @@ impl Buffer { } } - /// Returns the current retained-capacity hint for the buffer. + /// Return the current retained-capacity hint for the buffer. /// - /// For ILP buffers, this is byte capacity. For QWP/UDP buffers, this is an - /// implementation-defined retained-capacity hint and should not be - /// interpreted as exact byte capacity. + /// For ILP buffers this is byte capacity. For QWP buffers (UDP and + /// WebSocket) this is an implementation-defined retained-capacity + /// hint and should not be interpreted as exact byte capacity. pub fn capacity(&self) -> usize { match &self.inner { BufferInner::Ilp(inner) => inner.capacity(), @@ -551,10 +554,11 @@ impl Buffer { } } - /// Returns the raw serialized ILP bytes currently stored in the buffer. + /// Return the raw serialized ILP bytes currently stored in the + /// buffer. /// - /// QWP/UDP buffers build datagrams during flush, so this returns an empty - /// slice for QWP/UDP. + /// QWP buffers (UDP and WebSocket) build their wire payloads during + /// flush, so this returns an empty slice on those transports. pub fn as_bytes(&self) -> &[u8] { match &self.inner { BufferInner::Ilp(inner) => inner.as_bytes(), diff --git a/questdb-rs/src/ingress/buffer/ilp.rs b/questdb-rs/src/ingress/buffer/ilp.rs index 9135475a..bb722a12 100644 --- a/questdb-rs/src/ingress/buffer/ilp.rs +++ b/questdb-rs/src/ingress/buffer/ilp.rs @@ -215,18 +215,18 @@ struct IlpBookmarkState { /// /// | Buffer Method | Serialized as ILP type (Click on link to see possible casts) | /// |---------------|--------------------------------------------------------------| -/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.io/docs/concept/symbol/) | -/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.io/docs/reference/api/ilp/columnset-types#boolean) | -/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.io/docs/reference/api/ilp/columnset-types#integer) | -/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.io/docs/reference/api/ilp/columnset-types#float) | -/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.io/docs/reference/api/ilp/columnset-types#string) | -/// | [`column_arr`](Buffer::column_arr) | [`ARRAY`](https://questdb.io/docs/reference/api/ilp/columnset-types#array) | -/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp) | +/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.com/docs/concept/symbol/) | +/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.com/docs/reference/api/ilp/columnset-types#boolean) | +/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.com/docs/reference/api/ilp/columnset-types#integer) | +/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.com/docs/reference/api/ilp/columnset-types#float) | +/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.com/docs/reference/api/ilp/columnset-types#string) | +/// | [`column_arr`](Buffer::column_arr) | [`ARRAY`](https://questdb.com/docs/reference/api/ilp/columnset-types#array) | +/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.com/docs/reference/api/ilp/columnset-types#timestamp) | /// /// QuestDB supports both `STRING` and `SYMBOL` column types. /// /// To understand the difference, refer to the -/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/). In a nutshell, +/// [QuestDB documentation](https://questdb.com/docs/concept/symbol/). In a nutshell, /// symbols are interned strings, most suitable for identifiers that are repeated many /// times throughout the column. They offer an advantage in storage space and query /// performance. diff --git a/questdb-rs/src/ingress/conf.rs b/questdb-rs/src/ingress/conf.rs index 48831f7a..94a1b218 100644 --- a/questdb-rs/src/ingress/conf.rs +++ b/questdb-rs/src/ingress/conf.rs @@ -199,7 +199,8 @@ pub(crate) struct QwpWsConfig { /// Cap on the per-attempt reconnect delay. pub(crate) reconnect_max_backoff: ConfigSetting, /// Initial-connect retry mode. Default is fail-fast after one endpoint - /// round, matching Java's startup behavior. + /// round unless a reconnect policy knob was specified, in which case + /// build-time resolution promotes the effective mode to sync retry. pub(crate) initial_connect_retry: ConfigSetting, /// Bounded wait used by Sender::close_drain(). pub(crate) close_flush_timeout: ConfigSetting, diff --git a/questdb-rs/src/ingress/mod.md b/questdb-rs/src/ingress/mod.md index a8a79899..1c647e03 100644 --- a/questdb-rs/src/ingress/mod.md +++ b/questdb-rs/src/ingress/mod.md @@ -1,340 +1,361 @@ # Fast Ingestion of Data into QuestDB -The `ingress` module implements QuestDB's variant of the -[InfluxDB Line Protocol](https://questdb.io/docs/reference/api/ilp/overview/) -(ILP). +The `ingress` module sends rows of time-series data to a QuestDB server over +the **QuestDB Wire Protocol over WebSocket (QWP/WebSocket)**: a columnar +binary protocol with asynchronous server acknowledgements, multi-host +failover, and optional on-disk durability. To get started: -* Use [`Sender::from_conf()`] to get the [`Sender`] object -* Populate a [`Buffer`] with one or more rows of data -* Send the buffer using [`sender.flush()`](Sender::flush) +* Use [`Sender::from_conf()`] to build a [`Sender`]. +* Populate a [`Buffer`] with one or more rows. +* Call [`sender.flush()`](Sender::flush) to publish. +* Call [`sender.close_drain()`](Sender::close_drain) before dropping the + sender so already-published frames complete on the wire. ```rust no_run use questdb::{ Result, - ingress::{ - Sender, - Buffer, - TimestampNanos}}; + ingress::{Sender, TimestampNanos}}; fn main() -> Result<()> { - let mut sender = Sender::from_conf("http::addr=localhost:9000;")?; - let mut buffer = sender.new_buffer(); - buffer - .table("trades")? - .symbol("symbol", "ETH-USD")? - .symbol("side", "sell")? - .column_f64("price", 2615.54)? - .column_f64("amount", 0.00044)? - .at(TimestampNanos::now())?; - sender.flush(&mut buffer)?; - Ok(()) + let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; + let mut buffer = sender.new_buffer(); + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .symbol("side", "sell")? + .column_f64("price", 2615.54)? + .column_f64("amount", 0.00044)? + .at(TimestampNanos::now())?; + sender.flush(&mut buffer)?; + sender.close_drain()?; + Ok(()) } ``` +For more depth — failover, store-and-forward, durable ACKs, troubleshooting — +see the [Rust client documentation](https://questdb.com/docs/connect/clients/rust/). + # Configuration String -The easiest way to configure all the available parameters on a line sender is -the configuration string. The general structure is: +A sender is configured with a single string: ```plain -::addr=host:port;param1=val1;param2=val2;... -``` - -`transport` can be `http`, `https`, `tcp`, or `tcps`. See the full details on -supported parameters in a dedicated section below. - -# Don't Forget to Flush - -The sender and buffer objects are entirely decoupled. This means that the sender -won't get access to the data in the buffer until you explicitly call -[`sender.flush(&mut buffer)`](Sender::flush) or a variant. This may lead to a -pitfall where you drop a buffer that still has some data in it, resulting in -permanent data loss. - -A common technique is to flush periodically on a timer and/or once the buffer -exceeds a certain size. You can check the buffer's size by the calling -[`buffer.len()`](Buffer::len). - -The default `flush()` method clears the buffer after sending its data. If you -want to preserve its contents (for example, to send the same data to multiple -QuestDB instances), call -[`sender.flush_and_keep(&mut buffer)`](Sender::flush_and_keep) instead. - -# Error Handling - -The supported transport modes handle errors very differently. In a nutshell, -HTTP is much better at error handling. - -## TCP - -TCP doesn't report errors at all to the sender; instead, the server quietly -disconnects and you'll have to inspect the server logs to get more information -on the reason. When this has happened, the sender transitions into an error -state, and it is permanently unusable. You must drop it and create a new sender. -You can inspect the sender's error state by calling -[`sender.must_close()`](Sender::must_close). - -## QWP/UDP - -QWP/UDP is a best-effort datagram transport. A `flush()` call sends one or more -UDP datagrams and can report only local socket errors. A successful return does -not guarantee delivery, server-side processing, or flush-level atomicity. - -When one logical flush spans multiple datagrams, some datagrams may already -have been emitted before a later send fails. In that case, retrying the same -buffer may duplicate rows that were already sent. - -## QWP/WebSocket - -QWP/WebSocket is a reliable, in-order transport. Each published frame is -assigned a frame sequence number (FSN) and tracked through a publication -lifecycle until the server acknowledges or rejects it. Transient socket -errors and reconnects are absorbed by the driver and do not surface to the -caller; only server rejections and terminal protocol violations are -reported. - -Server rejections fall into two categories: - -- *Drop-and-continue* rejections (e.g. schema mismatch, write error) affect - only the rejected frame; the sender continues processing subsequent frames. - Retrieve structured diagnostics with - [`sender.poll_qwp_ws_error()`](Sender::poll_qwp_ws_error), or install a - callback via [`SenderBuilder::qwp_ws_error_handler`](SenderBuilder::qwp_ws_error_handler). - -- *Halt* rejections (e.g. parse error, internal error, security error) and - terminal WebSocket protocol violations latch the sender into a - permanently-unusable state. The next `flush()` call returns - `ErrorCode::ServerRejection` carrying the structured diagnostic, and - [`sender.must_close()`](Sender::must_close) returns `true`. The sender - must be dropped and a new one constructed. The buffer passed to that - final `flush()` is left unmodified, so its contents can be recovered - before dropping the sender. - -`flush()` returns once the frame has been appended to the local publication -log; the FSN is also available via -[`flush_and_get_fsn()`](Sender::flush_and_get_fsn). To wait for the server -to durably acknowledge a specific FSN, call -[`sender.await_acked_fsn()`](Sender::await_acked_fsn). -[`sender.published_fsn()`](Sender::published_fsn) and -[`sender.acked_fsn()`](Sender::acked_fsn) provide non-blocking polls. - -In `manual` progress mode no background thread observes the transport. -Server-side state — including halts — only becomes visible when the user -calls [`sender.drive_once()`](Sender::drive_once) or any sender method -that drives the transport (such as `flush` or `await_acked_fsn`). As a -consequence, `must_close()` on an otherwise-idle manual sender does not -reflect a halt until the next drive. - -## HTTP - -HTTP distinguishes between recoverable and non-recoverable errors. For -recoverable ones, it enters a retry loop with exponential backoff, and reports -the error to the caller only after it has exhausted the retry time budget -(configuration parameter: `retry_timeout`). - -`sender.flush()` and variant methods communicate the error in the `Result` -return value. The category of the error is signalled through the -[`ErrorCode`](crate::error::ErrorCode) enum, and it's accompanied with an error -message. - -After the sender has signalled an error, it remains usable. You can handle the -error as appropriate and continue using it. - -# Health Check - -The QuestDB server has a "ping" endpoint you can access to see if it's alive, -and confirm the version of the InfluxDB that it is compatible with at a protocol -level. - -```shell -curl -I http://localhost:9000/ping +::addr=host:port;key1=val1;key2=val2;... ``` -Example of the expected response: +For QWP/WebSocket the transport is `ws` (plain) or `wss` (TLS). The +long-form aliases `qwpws` / `qwpwss` are also accepted. The default port +is `9000` for both `ws` and `wss`. ```plain -HTTP/1.1 204 OK -Server: questDB/1.0 -Date: Fri, 2 Feb 2024 17:09:38 GMT -Transfer-Encoding: chunked -Content-Type: text/plain; charset=utf-8 -X-Influxdb-Version: v2.7.4 +ws::addr=localhost:9000; +wss::addr=db.example.com:9000;username=admin;password=secret; +ws::addr=node-a:9000,node-b:9000;sf_dir=/var/lib/myapp/qdb-sf;sender_id=ingest-1; ``` -# Configuration Parameters +For the full key reference (TLS, auth, failover, store-and-forward, +durable ACK, progress modes), see the +[connect-string reference](https://questdb.com/docs/connect/clients/connect-string/) +on the QuestDB documentation site. [`SenderBuilder`] exposes the same +options programmatically. -In the examples below, we'll use configuration strings. We also provide the -[`SenderBuilder`] to programmatically configure the sender. The methods on -[`SenderBuilder`] match one-for-one with the keys in the configuration string. - -## Authentication +# Don't Forget to Flush -To establish an -[authenticated](https://questdb.io/docs/reference/api/ilp/overview/#authentication) -and TLS-encrypted connection, use the `https` or `tcps` protocol, and use the -configuration options appropriate for the authentication method. +The sender and the buffer are decoupled: a buffer accumulates rows +locally and the sender does not see them until you call +[`sender.flush(&mut buffer)`](Sender::flush) (or +[`sender.flush_and_keep(&mut buffer)`](Sender::flush_and_keep) to retain +the buffer contents). -Here are quick examples of configuration strings for each authentication method -we support: +**This client does not auto-flush, regardless of transport.** The +configuration keys `auto_flush_rows`, `auto_flush_bytes`, and +`auto_flush_interval` are rejected; `auto_flush=off` is accepted as a +no-op for compatibility with older connect strings. A common pattern is +to flush periodically on a timer and/or once the buffer size +([`buffer.len()`](Buffer::len)) exceeds a threshold. -### HTTP Token Bearer Authentication +On QWP/WebSocket, `flush` returns once the frame has been appended to the +local publication log. A successful flush clears the buffer; a failed +flush retains the rows so you can retry. A typical ingest loop reuses one +sender and one buffer: -```no_run -# use questdb::{Result, ingress::Sender}; +```rust no_run +# use questdb::{Result, ingress::{Sender, TimestampNanos}}; +# use std::time::{Duration, Instant}; # fn main() -> Result<()> { -let mut sender = Sender::from_conf( - "https::addr=localhost:9000;token=Yfym3fgMv0B9;" -)?; -# Ok(()) -# } +# let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; +# let mut buffer = sender.new_buffer(); +# let running = true; +let flush_interval = Duration::from_secs(1); +let max_buffer_bytes = 64 * 1024; +let mut next_flush = Instant::now() + flush_interval; + +while running { + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .column_f64("price", 2615.54)? + .at(TimestampNanos::now())?; + + let now = Instant::now(); + if buffer.len() >= max_buffer_bytes || now >= next_flush { + sender.flush(&mut buffer)?; + next_flush = now + flush_interval; + } + // wait for or process the next row +# break; +} + +if buffer.len() > 0 { + sender.flush(&mut buffer)?; +} +# Ok(()) } ``` -* `token`: the authentication token +# Error Handling on QWP/WebSocket + +QWP/WebSocket is a reliable, in-order transport. Transient socket errors +and reconnects are absorbed by the driver and do not surface to the +caller. Two error surfaces are visible to user code: -### HTTP Basic Authentication +1. **Synchronous errors from `flush` and related calls** — local + failures or a terminal sender state. Returned as `Result::Err`. +2. **Asynchronous server errors** — protocol or schema errors reported + by the server *after* `flush` has returned. Drain them with + [`sender.poll_qwp_ws_error()`](Sender::poll_qwp_ws_error) or install a + callback via + [`SenderBuilder::qwp_ws_error_handler`](SenderBuilder::qwp_ws_error_handler). -```no_run +```rust no_run # use questdb::{Result, ingress::Sender}; # fn main() -> Result<()> { -let mut sender = Sender::from_conf( - "https::addr=localhost:9000;username=testUser1;password=Yfym3fgMv0B9;" -)?; -# Ok(()) -# } +# let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; +while let Some(err) = sender.poll_qwp_ws_error()? { + eprintln!("qwp error: {:?}", err); +} +# Ok(()) } ``` -* `username`: the username -* `password`: the password - -### TCP Elliptic Curve Digital Signature Algorithm (ECDSA) +Server errors are classified by +[`QwpWsErrorPolicy`]: `DropAndContinue` rejects only the affected +frame, `Halt` latches the sender into a permanently-unusable state +([`must_close()`](Sender::must_close) returns `true`). See +[`QwpWsSenderError`] for the diagnostic fields and +[`QwpWsErrorCategory`] for the categorisation. The +[QuestDB documentation site](https://questdb.com/docs/connect/clients/rust/#asynchronous-error-handling) +has the full protocol-level error model. + +# Completion Tracking, Progress Modes, Failover, Durability + +These are QWP/WebSocket features whose protocol-level details live in the +QuestDB documentation; the Rust API entry points are summarised here. + +* **FSN-based completion** — every published frame is assigned a frame + sequence number. Use [`flush_and_get_fsn`](Sender::flush_and_get_fsn) + to capture it, then + [`await_acked_fsn`](Sender::await_acked_fsn) (or + [`published_fsn`](Sender::published_fsn) / + [`acked_fsn`](Sender::acked_fsn) non-blocking) to wait for server + acknowledgement. +* **Progress modes** — `background` (default) runs a sender-owned thread + that drives the transport; `manual` requires the caller to drive + progress with [`drive_once`](Sender::drive_once) or any sender method + that progresses the loop. Select via the configuration string + (`qwp_ws_progress=manual`) or + [`SenderBuilder::qwp_ws_progress`](SenderBuilder::qwp_ws_progress). +* **Multi-host failover** — pass a comma-separated address list + (`addr=a:9000,b:9000`) and tune `reconnect_max_duration_millis`, + `reconnect_initial_backoff_millis`, `reconnect_max_backoff_millis`. + Setting any `reconnect_*` key implicitly enables synchronous initial + connect retry unless `initial_connect_retry=off` is set explicitly. + Auth failures are terminal across all endpoints; transport errors are + retried. +* **Store-and-forward** — set `sf_dir` to a writable directory and + `sender_id` to a stable identifier per sender process. Unacknowledged + frames are persisted and replayed across reconnects and process + restarts. Replay is at-least-once, so target tables should declare + `DEDUP UPSERT KEYS(...)`. +* **Durable acknowledgement** — set `request_durable_ack=on` (QuestDB + Enterprise with primary replication) so `acked_fsn` advances only + after durable upload to object storage. + +See [QuestDB high-availability docs](https://questdb.com/docs/high-availability/overview/) +for the full failover/SF/durable-ACK story and +[delivery semantics](https://questdb.com/docs/concepts/delivery-semantics/) +for the at-least-once/exactly-once model. + +# Authentication + +QWP/WebSocket authenticates at the HTTP layer during the WebSocket +upgrade. Credentials are sent once per connection and reused across +reconnects. -```no_run +```rust no_run # use questdb::{Result, ingress::Sender}; # fn main() -> Result<()> { -let mut sender = Sender::from_conf( - "tcps::addr=localhost:9009;username=testUser1;token=5UjEA0;token_x=fLKYa9;token_y=bS1dEfy;" -)?; -# Ok(()) -# } +// Bearer token (recommended for Enterprise) +let _s = Sender::from_conf( + "wss::addr=db.example.com:9000;token=Yfym3fgMv0B9;")?; + +// HTTP basic auth +let _s = Sender::from_conf( + "wss::addr=db.example.com:9000;username=admin;password=Yfym3fgMv0B9;")?; +# Ok(()) } ``` -The four ECDSA components are: - -* `username`, aka. _kid_ -* `token`, aka. _d_ -* `token_x`, aka. _x_ -* `token_y`, aka. _y_ - -### Authentication Timeout - -You can specify how long the client should wait for the authentication request -to resolve. The configuration parameter is: +`auth_timeout` (milliseconds, default `15000`) bounds the handshake. +`auth_timeout_ms` is accepted as a Java-compatible spelling. -* `auth_timeout` (milliseconds, default 15 seconds) +**Not supported by this client:** OIDC token acquisition / in-band +refresh, mutual TLS (client certificates), token rotation mid-session. +QuestDB itself supports OIDC server-side — see +[OpenID Connect](https://questdb.com/docs/security/oidc/); acquire a +token out-of-band from your IdP, pass it via `token=...`, and rebuild +the sender when it nears expiry. mTLS is not negotiated by the +QuestDB server regardless of client. -For QWP/WebSocket configuration strings, the Java-compatible spelling is also -accepted: +# TLS -* `auth_timeout_ms` (milliseconds, default 15 seconds) +Use the `wss` schema (or the alias `qwpwss`). Configuration parameters: -## Encryption on the Wire: TLS +* `tls_ca=webpki_roots` — bundled webpki roots (default). +* `tls_ca=os_roots` — OS certificate store (requires the + `tls-native-certs` feature). +* `tls_ca=webpki_and_os_roots` — both. +* `tls_roots=/path/to/root-ca.pem` — load roots from a PEM file. + Implies `tls_ca=pem_file`. +* `tls_verify=unsafe_off` — disable verification entirely. Requires + the `insecure-skip-verify` feature. **Never use in production.** -To enable TLS on the QuestDB Enterprise server, refer to the [QuestDB Enterprise -TLS documentation](https://questdb.io/docs/operations/tls/). +See the notes on +[how to generate a self-signed certificate](https://github.com/questdb/c-questdb-client/tree/6.1.0/tls_certs). -*Note*: QuestDB Open Source does not support TLS natively. To use TLS with -QuestDB Open Source, use a TLS proxy such as -[HAProxy](http://www.haproxy.org/). +# Closing the Sender -We support several certification authorities (sources of PKI root certificates). -To select one, use the `tls_ca` config option. These are the supported variants: +For delivery-sensitive shutdown, call +[`close_drain`](Sender::close_drain) before dropping the sender: -* `tls_ca=webpki_roots;` use the roots provided in the standard Rust crate - [webpki-roots](https://crates.io/crates/webpki-roots) +```rust no_run +# use questdb::{Result, ingress::Sender}; +# fn main() -> Result<()> { +# let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; +sender.close_drain()?; +# Ok(()) } +``` -* `tls_ca=os_roots;` use the OS-provided certificate store +`close_drain` stops accepting new publications and waits up to +`close_flush_timeout_millis` (default `5000`) for already-published +frames to be acknowledged. With `sf_dir`, anything still un-acked is +persisted to disk so a later sender can replay it. Dropping a sender +without `close_drain` is best-effort: in-flight frames may not reach the +server, and any delivery failure is silent. -* `tls_ca=webpki_and_os_roots;` combine both of the above +# Health Check -* `tls_roots=/path/to/root-ca.pem;` get the root certificates from the specified - file. Main purpose is for testing with self-signed certificates. _Note:_ this - automatically sets `tls_ca=pem_file`. +The QuestDB server exposes a `/ping` endpoint on the same port as +QWP/WebSocket (the HTTP listener; default `9000`): -See our notes on [how to generate a self-signed -certificate](https://github.com/questdb/c-questdb-client/tree/main/tls_certs). +```shell +curl -I http://localhost:9000/ping +``` -* `tls_verify=unsafe_off;` tells the QuestDB client to ignore all CA roots and - accept any server certificate without checking. You can use it as a last - resort, when you weren't able to apply the above approach with a self-signed - certificate. You should **never use it in production** as it defeats security - and allows a man-in-the middle attack. +# Sequential Coupling in the Buffer API -## HTTP Timeouts +The fluent API of [`Buffer`] has sequential coupling: there's a certain +order in which you are expected to call the methods. You must write the +symbols before the columns, and you must terminate each row by calling +either [`at`](Buffer::at) or [`at_now`](Buffer::at_now). Refer to the +[`Buffer`] doc for the full rules and a flowchart. -Instead of a fixed timeout value, we use a flexible timeout that depends on the -size of the HTTP request payload (how much data is in the buffer that you're -flushing). You can configure it using two options: +# Optimization: Avoid Revalidating Names -* `request_min_throughput` (bytes per second, default 100 KiB/s): divide the - payload size by this number to determine for how long to keep sending the - payload before timing out. -* `request_timeout` (milliseconds, default 10 seconds): additional time - allowance to account for the fixed latency of the request-response roundtrip. +The client validates every name you provide. To avoid re-validating the +same names on every row, create pre-validated [`ColumnName`] and +[`TableName`] values once: -Finally, the client will keep retrying the request if it experiences errors. You -can configure the total time budget for retrying: +```rust no_run +# use questdb::Result; +use questdb::ingress::{TableName, ColumnName, Sender, TimestampNanos}; +# fn main() -> Result<()> { +let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; +let mut buffer = sender.new_buffer(); +let table_name = TableName::new("trades")?; +let price_name = ColumnName::new("price")?; +buffer.table(table_name)?.column_f64(price_name, 2615.54)?.at(TimestampNanos::now())?; +buffer.table(table_name)?.column_f64(price_name, 39269.98)?.at(TimestampNanos::now())?; +# Ok(()) } +``` -* `retry_timeout` (milliseconds, default 10 seconds) +# Handling Optional Data (NULLs) -# Usage Considerations +In QuestDB, `NULL` values are represented by simply omitting the column +for that specific row. -## Transactional Flush +To make working with Rust's `Option` ergonomic and keep the fluent +builder chain unbroken, the [`Buffer`] API provides `_opt` variants for +all column methods (e.g. +[`column_str_opt`](Buffer::column_str_opt), +[`column_f64_opt`](Buffer::column_f64_opt)). -When using HTTP, you can arrange that each `flush()` call happens within its own -transaction. For this to work, your buffer must contain data that targets only -one table. This is because QuestDB doesn't support multi-table transactions. +If the provided value is `Some(v)`, the column is written normally. If +the value is `None`, the method acts as a no-op and skips the column. -In order to ensure in advance that a flush will be transactional, call -[`sender.flush_and_keep_with_flags(&mut buffer, true)`](Sender::flush_and_keep_with_flags). -This call will refuse to flush a buffer if the flush wouldn't be transactional. +**Note on ownership:** for types that implement `Copy` (like `i64`, +`f64`, `bool`), you can pass the `Option` directly. For heap-allocated +types like `String` or `Vec`, use `.as_ref()` or `.as_deref()` to pass a +reference without consuming the original value. -## When to Choose the TCP Transport? +```rust no_run +# use questdb::Result; +use questdb::ingress::{Sender, TimestampNanos}; +# fn main() -> Result<()> { +let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; +let mut buffer = sender.new_buffer(); +let humidity: Option = None; +buffer + .table("sensors")? + .symbol("location", "factory-1")? + .column_f64("temperature", 22.5)? + // Silently skips the humidity column (stored as NULL). + .column_f64_opt("humidity", humidity)? + .at(TimestampNanos::now())?; +# Ok(()) } +``` -As discussed above, the TCP transport mode is raw and simplistic: it doesn't -report any errors to the caller (the server just disconnects), has no automatic -retries, requires manual handling of connection failures, and doesn't support -transactional flushing. +# Array Datatype -However, TCP has a lower overhead than HTTP and it's worthwhile to try out as an -alternative in a scenario where you have a constantly high data rate and/or deal -with a high-latency network connection. +[`Buffer::column_arr`](Buffer::column_arr) supports efficient ingestion +of N-dimensional arrays using: -## Array Datatype +* native Rust arrays and slices (up to 3-dimensional) +* native Rust vectors (up to 3-dimensional) +* arrays from the [`ndarray`](https://docs.rs/ndarray) crate -The [`Buffer::column_arr`](Buffer::column_arr) method supports efficient ingestion of N-dimensional -arrays using several convenient types: +QWP/WebSocket carries array types natively in the wire format and does +not require a `protocol_version` setting. Requires QuestDB server +9.0.0 or later. -- native Rust arrays and slices (up to 3-dimensional) -- native Rust vectors (up to 3-dimensional) -- arrays from the [`ndarray`](https://docs.rs/ndarray) crate +# Decimal Datatype -You must use protocol version 2 to ingest arrays. The HTTP transport will -automatically enable it as long as you're connecting to an up-to-date QuestDB -server (version 9.0.0 or later), but with TCP you must explicitly specify it in -the configuration string: `protocol_version=2;`. +[`Buffer::column_dec`](Buffer::column_dec) accepts: -**Note**: QuestDB server version 9.0.0 or later is required for array support. +* native Rust string slices +* decimals from the [`rust_decimal`](https://docs.rs/rust_decimal) crate +* decimals from the [`bigdecimal`](https://docs.rs/bigdecimal) crate -## Timestamp Column Name +QWP/WebSocket carries `DECIMAL64`, `DECIMAL128`, and `DECIMAL256` +natively and does not require a `protocol_version` setting. Requires +QuestDB server 9.2.0 or later. Pre-create decimal columns with +`DECIMAL(precision, scale)` so the server enforces the expected +precision. -The InfluxDB Line Protocol (ILP) does not give a name to the designated timestamp, -so if you let this client auto-create the table, it will have the default `timestamp` name. -To use a custom name, say `my_ts`, pre-create the table with the desired -timestamp column name: +# Timestamp Column Name -To address this, issue a `CREATE TABLE` statement to create the table in advance. -Note the `timestamp(my_ts)` clause at the end specifies the designated timestamp. +The QuestDB ingest protocols do not give a name to the designated +timestamp, so if you let this client auto-create the table, it will have +the default `timestamp` name. To use a custom name, issue a +`CREATE TABLE` in advance: ```sql CREATE TABLE IF NOT EXISTS 'trades' ( @@ -346,125 +367,89 @@ CREATE TABLE IF NOT EXISTS 'trades' ( ) timestamp (my_ts) PARTITION BY DAY WAL; ``` -You can use the `CREATE TABLE IF NOT EXISTS` construct to make sure the table is -created, but without raising an error if the table already exists. +# Transactional Flush -## Sequential Coupling in the Buffer API +[`flush_and_keep_with_flags`](Sender::flush_and_keep_with_flags) with +`transactional = true` refuses to flush a buffer that would span +multiple tables, ensuring QuestDB treats the flush as a single +transaction. The function is gated on the `sync-sender-http` Cargo +feature; building with QWP/WebSocket only does not expose it. -The fluent API of [`Buffer`] has sequential coupling: there's a certain order in -which you are expected to call the methods. For example, you must write the -symbols before the columns, and you must terminate each row by calling either -[`at`](Buffer::at) or [`at_now`](Buffer::at_now). Refer to the [`Buffer`] doc -for the full rules and a flowchart. +# Check out the CONSIDERATIONS Document -## Optimization: Avoid Revalidating Names +The +[library considerations](https://github.com/questdb/c-questdb-client/blob/6.1.0/doc/CONSIDERATIONS.md) +covers threading, data quality, server errors, flushing, and +disconnections. -The client validates every name you provide. To avoid the redundant CPU work of -re-validating the same names on every row, create pre-validated [`ColumnName`] -and [`TableName`] values: +# Troubleshooting -```no_run -# use questdb::Result; -use questdb::ingress::{ - TableName, - ColumnName, - Buffer, - SenderBuilder, - TimestampNanos}; -# fn main() -> Result<()> { -let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?; -let mut buffer = sender.new_buffer(); -let table_name = TableName::new("trades")?; -let price_name = ColumnName::new("price")?; -buffer.table(table_name)?.column_f64(price_name, 2615.54)?.at(TimestampNanos::now())?; -buffer.table(table_name)?.column_f64(price_name, 39269.98)?.at(TimestampNanos::now())?; -# Ok(()) -# } -``` +If data doesn't appear in the database in a timely manner, you may not +be calling [`flush`](Sender::flush) often enough — this client has no +auto-flush on any transport. -## Handling Optional Data (NULLs) +For QWP/WebSocket, drain +[`poll_qwp_ws_error`](Sender::poll_qwp_ws_error) (or install a callback +via +[`qwp_ws_error_handler`](SenderBuilder::qwp_ws_error_handler)) to see +structured server diagnostics. The +[server log](https://questdb.com/docs/troubleshooting/log/) carries +additional context. -In QuestDB, `NULL` values are represented by simply omitting the column for that specific row. +# Legacy ILP Transports -To make working with Rust's `Option` ergonomic and keep the fluent builder chain unbroken, the [`Buffer`] API provides `_opt` variants for all column methods (e.g., [`column_str_opt`](Buffer::column_str_opt), [`column_f64_opt`](Buffer::column_f64_opt)). +> **Legacy.** The ILP transports (`http`, `https`, `tcp`, `tcps`) remain +> supported for backwards compatibility but are not recommended for new +> code. Use QWP/WebSocket +> ([top of this page](#fast-ingestion-of-data-into-questdb)) instead. -If the provided value is `Some(v)`, the column is written normally. If the value is `None`, the method acts as a no-op and skips the column. +The same [`Sender`] / [`Buffer`] API works across all transports — the +difference is the configuration string and the error model. -**Note on ownership:** For types that implement `Copy` (like `i64`, `f64`, `bool`), you can pass the `Option` directly. For heap-allocated types like `String` or `Vec`, use `.as_ref()` or `.as_deref()` to pass a reference without consuming the original value. +## ILP/HTTP -```rust no_run -# use questdb::Result; -use questdb::ingress::{Buffer, SenderBuilder, TimestampNanos}; +Configuration: `http::` or `https::`. Request-response. Recoverable +errors are retried with exponential backoff and surface only after the +retry budget is exhausted; the sender remains usable on signalled +errors. +```rust no_run +# use questdb::{Result, ingress::{Sender, TimestampNanos}}; # fn main() -> Result<()> { - let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?; - let mut buffer = sender.new_buffer(); - struct SensorData { - sensor_id: Option, - temperature: Option, - humidity: Option, - } - - let data = SensorData { sensor_id: Some("sensor-1".to_string()), temperature: Some(22.5), humidity: None }; - - buffer - .table("sensors")? - .symbol("location", "factory-1")? - // Writes the sensor_id column if it's Some, otherwise skips it (stored as NULL in QuestDB) - .symbol_opt("sensor_id", data.sensor_id.as_deref())? - // Writes the temperature column - .column_f64_opt("temperature", data.temperature)? - // Silently skips the humidity column (stored as NULL in QuestDB) - .column_f64_opt("humidity", data.humidity)? - .at(TimestampNanos::now())?; -# Ok(()) -# } +let mut sender = Sender::from_conf("http::addr=localhost:9000;")?; +# let mut buffer = sender.new_buffer(); +# buffer.table("trades")?.column_f64("price", 1.0)?.at(TimestampNanos::now())?; +# sender.flush(&mut buffer)?; +# Ok(()) } ``` -## Decimal Datatype - -The [`Buffer::column_dec`](Buffer::column_dec) method supports efficient ingestion of decimal values using several convenient types: - -- native Rust String slices -- decimals from the [`rust_decimal`](https://docs.rs/rust_decimal) crate -- decimals from the [`bigdecimal`](https://docs.rs/bigdecimal) crate - -You must use protocol version 3 to ingest decimals. The HTTP transport will -automatically enable it as long as you're connecting to an up-to-date QuestDB -server (version 9.2.0 or later), but with TCP you must explicitly specify it in -the configuration string: `protocol_version=3;`. +HTTP-specific tuning: `request_min_throughput` (bytes/sec, default +100 KiB/s), `request_timeout` (default 10 s), `retry_timeout` (default +10 s). HTTP also supports +[transactional flushes](#transactional-flush). -**Note**: QuestDB server version 9.2.0 or later is required for decimal support. +For arrays and decimals on HTTP, `protocol_version=auto` (the default) +negotiates the right version with the server. -## Check out the CONSIDERATIONS Document +## ILP/TCP -The [Library -considerations](https://github.com/questdb/c-questdb-client/blob/main/doc/CONSIDERATIONS.md) -document covers these topics: +Configuration: `tcp::` or `tcps::`. Streaming. Does not report errors to +the sender — the server silently disconnects on error and the failure +appears only in the server log. The sender transitions into a terminal +state which you can detect via +[`must_close`](Sender::must_close). TCP also has lower overhead than +HTTP, which suits very high steady-state throughput on a high-latency +network, at the cost of all observability. -* Threading -* Differences between the InfluxDB Line Protocol and QuestDB Data Types -* Data Quality -* Client-side checks and server errors -* Flushing -* Disconnections, data errors and troubleshooting +TCP supports ECDSA authentication: -# Troubleshooting Common Issues - -## Infrequent Flushing - -If the data doesn't appear in the database in a timely manner, you may not be -calling [`flush()`](Sender::flush) often enough. - -## Debug disconnects and inspect errors - -If you're using ILP-over-TCP, it doesn't report any errors to the client. -Instead, on error, the server terminates the connection, and logs any error -messages in [server logs](https://questdb.io/docs/troubleshooting/log/). - -To inspect or log a buffer's contents before you send it, call -[`buffer.as_bytes()`](Buffer::as_bytes). +```rust no_run +# use questdb::{Result, ingress::Sender}; +# fn main() -> Result<()> { +let _s = Sender::from_conf( + "tcps::addr=localhost:9009;username=testUser1;token=5UjEA0;token_x=fLKYa9;token_y=bS1dEfy;")?; +# Ok(()) } +``` -This byte-level inspection is only meaningful for ILP buffers. QWP buffers are -encoded into UDP datagrams during [`flush()`](Sender::flush), so -[`buffer.as_bytes()`](Buffer::as_bytes) is not useful there. +TCP defaults to `protocol_version=1`. To send arrays or decimals over +TCP you must specify `protocol_version=2` (or `=3`) explicitly. diff --git a/questdb-rs/src/ingress/sender.rs b/questdb-rs/src/ingress/sender.rs index 83b644d2..c1d29dbd 100644 --- a/questdb-rs/src/ingress/sender.rs +++ b/questdb-rs/src/ingress/sender.rs @@ -159,27 +159,33 @@ impl Sender { /// Create a new `Sender` instance from the given configuration string. /// - /// The format of the string is: `"http::addr=host:port;key=value;...;"`. + /// The format of the string is: `"ws::addr=host:port;key=value;...;"`. /// - /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, `"tcps"`, - /// and `"qwpudp"`. + /// Supported transport schemes: /// - /// We recommend HTTP for most cases because it provides more features, like - /// reporting errors to the client and supporting transaction control. TCP can - /// sometimes be faster in higher-latency networks, but misses a number of - /// features. + /// * `ws` / `wss` (long-form aliases: `qwpws` / `qwpwss`) — **recommended**. + /// QuestDB Wire Protocol over WebSocket. Asynchronous server ACKs, + /// multi-host failover, optional store-and-forward durability, + /// structured error model. + /// * `http` / `https` — InfluxDB Line Protocol over HTTP (legacy). + /// Request-response with retries. + /// * `tcp` / `tcps` — InfluxDB Line Protocol over TCP (legacy). No + /// error reporting to the client; failures appear only in the + /// server log. /// - /// Keys in the config string correspond to same-named methods on `SenderBuilder`. - /// - /// For the full list of keys and values, see the docs on [`SenderBuilder`]. + /// Keys in the config string correspond to same-named methods on + /// [`SenderBuilder`]. For the full list of keys and values, see the + /// [`ingress`](crate::ingress) module docs and the [`SenderBuilder`] + /// reference. /// /// You can also load the configuration from an environment variable. /// See [`Sender::from_env`]. /// - /// In the case of TCP, this synchronously establishes the TCP connection, and - /// returns once the connection is fully established. If the connection - /// requires authentication or TLS, these will also be completed before - /// returning. + /// For TCP this synchronously establishes the TCP connection and + /// returns once the connection (including any authentication and TLS + /// handshake) is fully established. For QWP/WebSocket the WebSocket + /// upgrade (likewise including any authentication and TLS handshake) + /// is performed synchronously here. #[cfg(feature = "_sync-sender")] pub fn from_conf>(conf: T) -> Result { SenderBuilder::from_conf(conf)?.build() @@ -189,10 +195,9 @@ impl Sender { /// environment variable. The format is the same as that accepted by /// [`Sender::from_conf`]. /// - /// In the case of TCP, this synchronously establishes the TCP connection, and - /// returns once the connection is fully established. If the connection - /// requires authentication or TLS, these will also be completed before - /// returning. + /// As with [`Sender::from_conf`], TCP and QWP/WebSocket transports + /// establish the connection (including any authentication and TLS + /// handshake) synchronously before this function returns. #[cfg(feature = "_sync-sender")] pub fn from_env() -> Result { SenderBuilder::from_env()?.build() @@ -459,9 +464,12 @@ impl Sender { /// Send the given buffer of rows to the QuestDB server. /// - /// All the data stays in the buffer. Clear the buffer before starting a new batch. + /// All the data stays in the buffer. Clear the buffer before + /// starting a new batch. /// - /// To send and clear in one step, call [Sender::flush] instead. + /// Same transport-specific semantics as [`Sender::flush`]; see its + /// docs for the QWP/WebSocket, ILP/HTTP, and ILP/TCP behaviour. To + /// send and clear in one step, call [`Sender::flush`] instead. pub fn flush_and_keep(&mut self, buf: &Buffer) -> Result<()> { self.flush_impl(buf, false) } @@ -498,11 +506,15 @@ impl Sender { /// transport failures observed later are reported by subsequent sender /// calls. /// - /// HTTP should be the first choice, but use TCP if you need to continuously send - /// data to the server at a high rate. + /// QWP/WebSocket is the recommended transport for new code: structured + /// error reporting, multi-host failover, and optional durable + /// store-and-forward. ILP-over-HTTP remains a reasonable second + /// choice; ILP-over-TCP trades observability for lower per-message + /// overhead. /// - /// To improve the HTTP performance, send larger buffers (with more rows), and - /// consider parallelizing writes using multiple senders from multiple threads. + /// To improve throughput, send larger buffers (with more rows), and + /// consider parallelizing writes using multiple senders from multiple + /// threads. pub fn flush(&mut self, buf: &mut Buffer) -> crate::Result<()> { self.flush_impl(buf, false)?; buf.clear(); @@ -728,16 +740,17 @@ impl Sender { /// Tell whether the sender is no longer usable and must be dropped. /// - /// Returns `true` after an unrecoverable failure. For ILP-over-TCP this - /// is any socket error. For QWP/WebSocket this also covers a server - /// rejection or protocol violation that latches the publication - /// lifecycle to its terminal state. ILP-over-HTTP and QWP/UDP never - /// transition into a permanently-unusable state and always return - /// `false`. - /// - /// In QWP/WebSocket manual progress mode the answer only refreshes when - /// the user drives the sender (`drive_once` / `flush`), since no - /// background thread is observing the transport. + /// Returns `true` after an unrecoverable failure. For QWP/WebSocket + /// this covers a server rejection (parse error, internal error, + /// security error) or a terminal WebSocket protocol violation that + /// latches the publication lifecycle. For ILP-over-TCP it is any + /// socket error. ILP-over-HTTP and QWP/UDP never transition into a + /// permanently-unusable state and always return `false`. + /// + /// In QWP/WebSocket manual progress mode the answer only refreshes + /// when the user drives the sender (`drive_once` / `flush` / + /// `await_acked_fsn`), since no background thread is observing the + /// transport. #[must_use] pub fn must_close(&self) -> bool { if !self.connected { @@ -768,12 +781,10 @@ impl Sender { self.protocol_version } - /// Return the sender's maxinum name length of any column or table name. - /// This is either set explicitly when constructing the sender, - /// or the default value of 127. - /// When unset and using protocol version 2 over HTTP, the value is read - /// from the server from the `cairo.max.file.name.length` setting in - /// `server.conf` which defaults to 127. + /// Return the sender's maximum name length for any column or table + /// name. Set explicitly via [`SenderBuilder::max_name_len`], or the + /// default 127 bytes (or whatever the server advertises on ILP/HTTP + /// with auto-negotiated protocol version 2). pub fn max_name_len(&self) -> usize { self.max_name_len } diff --git a/questdb-rs/src/ingress/tests.rs b/questdb-rs/src/ingress/tests.rs index ba3335b9..8b718b8e 100644 --- a/questdb-rs/src/ingress/tests.rs +++ b/questdb-rs/src/ingress/tests.rs @@ -191,6 +191,61 @@ fn qwpws_store_and_forward_defaults_match_java() { assert_defaulted_eq(&qwp_ws.progress, QwpWsProgress::Background); } +#[cfg(feature = "sync-sender-qwp-ws")] +#[test] +fn qwpws_reconnect_policy_promotes_default_initial_connect_retry() { + for reconnect_setting in [ + "reconnect_max_duration_millis=20000", + "reconnect_initial_backoff_millis=200", + "reconnect_max_backoff_millis=2000", + ] { + let builder = + SenderBuilder::from_conf(format!("qwpws::addr=localhost:9000;{reconnect_setting};")) + .unwrap(); + let qwp_ws = builder.qwp_ws.as_ref().unwrap(); + assert_defaulted_eq( + &qwp_ws.initial_connect_retry, + conf::QwpWsInitialConnectMode::Off, + ); + + let effective = qwp_ws_effective_config(qwp_ws); + assert_defaulted_eq( + &effective.initial_connect_retry, + conf::QwpWsInitialConnectMode::Sync, + ); + } +} + +#[cfg(feature = "sync-sender-qwp-ws")] +#[test] +fn qwpws_explicit_initial_connect_retry_suppresses_reconnect_policy_promotion() { + let off = SenderBuilder::from_conf( + "qwpws::addr=localhost:9000;\ + reconnect_max_duration_millis=20000;\ + initial_connect_retry=off;", + ) + .unwrap(); + let qwp_ws = off.qwp_ws.as_ref().unwrap(); + let effective = qwp_ws_effective_config(qwp_ws); + assert_specified_eq( + &effective.initial_connect_retry, + conf::QwpWsInitialConnectMode::Off, + ); + + let async_mode = SenderBuilder::from_conf( + "qwpws::addr=localhost:9000;\ + reconnect_max_duration_millis=20000;\ + initial_connect_retry=async;", + ) + .unwrap(); + let qwp_ws = async_mode.qwp_ws.as_ref().unwrap(); + let effective = qwp_ws_effective_config(qwp_ws); + assert_specified_eq( + &effective.initial_connect_retry, + conf::QwpWsInitialConnectMode::Async, + ); +} + #[cfg(feature = "sync-sender-qwp-ws")] #[test] fn qwpws_progress_config_parses_manual_and_background() { @@ -1123,6 +1178,15 @@ fn auto_flush_bytes_unsupported() { ); } +#[cfg(feature = "sync-sender-tcp")] +#[test] +fn auto_flush_interval_unsupported() { + assert_conf_err( + SenderBuilder::from_conf("tcps::addr=localhost;auto_flush_interval=1000;"), + "Invalid configuration parameter \"auto_flush_interval\". This client does not support auto-flush", + ); +} + fn assert_specified_eq>( actual: &ConfigSetting, expected: IntoV, diff --git a/questdb-rs/src/tests/qwp_ws.rs b/questdb-rs/src/tests/qwp_ws.rs index cbd824fa..aae77d39 100644 --- a/questdb-rs/src/tests/qwp_ws.rs +++ b/questdb-rs/src/tests/qwp_ws.rs @@ -3934,6 +3934,57 @@ fn qwp_ws_sync_initial_retry_budget_exhaustion_reports_context() { ); } +#[test] +fn qwp_ws_reconnect_knob_implicitly_promotes_initial_connect_to_sync() { + let probe = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = probe.local_addr().unwrap().port(); + drop(probe); + + let conf = format!( + "qwpws::addr=127.0.0.1:{port};\ + reconnect_max_duration_millis=1;" + ); + let err = SenderBuilder::from_conf(&conf) + .unwrap() + .build() + .unwrap_err(); + + assert_eq!(err.code(), ErrorCode::SocketError); + assert!( + err.msg() + .contains("QWP/WebSocket initial connect retry budget exhausted"), + "got: {}", + err.msg() + ); + assert!(err.msg().contains("attempts="), "got: {}", err.msg()); +} + +#[test] +fn qwp_ws_explicit_initial_connect_retry_off_suppresses_reconnect_promotion() { + let probe = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = probe.local_addr().unwrap().port(); + drop(probe); + + let conf = format!( + "qwpws::addr=127.0.0.1:{port};\ + reconnect_max_duration_millis=5000;\ + initial_connect_retry=off;" + ); + let err = SenderBuilder::from_conf(&conf) + .unwrap() + .build() + .unwrap_err(); + + assert_eq!(err.code(), ErrorCode::SocketError); + assert!( + !err.msg() + .contains("QWP/WebSocket initial connect retry budget exhausted"), + "got: {}", + err.msg() + ); + assert!(!err.msg().contains("attempts="), "got: {}", err.msg()); +} + #[test] fn qwp_ws_sync_initial_retry_resets_non_healthy_between_rounds() { let first_listener = TcpListener::bind("127.0.0.1:0").unwrap();