From 5fa49eec6d6e5851912b8da626d7fca842c1a97b Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 19 May 2026 17:39:21 +0200 Subject: [PATCH 01/22] docs(ingress): reorient rustdoc and README around QWP/WebSocket The published rustdoc landing pages on docs.rs/questdb-rs/ still framed ILP as the primary transport and did not document QWP/WebSocket as a first-class user-facing surface. This commit reorganises the user-facing documentation so that QWP/WebSocket is the recommended path and ILP is relegated to a clearly-marked legacy section. No behaviour changes. questdb-rs/README.md - Transports section leads with ws::/wss:: (QWP/WebSocket) and demotes http::/https:: and tcp::/tcps:: as legacy. QWP/UDP is not advertised. - Quick Start switched to a ws:: example with close_drain. - Protocol Versions clarified: QWP carries types natively; the ILP protocol_version mechanism applies only to ILP/HTTP and ILP/TCP. - Examples table relabels existing examples as ILP/legacy and adds the QWP/WebSocket benchmark. - Default features list trimmed (no QWP/UDP advertised). - All GitHub and docs.rs URLs pinned to the 6.1.0 release tag so the README inside a published crate keeps stable links. questdb-rs/src/ingress/mod.md (~449 lines, down from ~470) - Lead paragraph rewritten to recommend QWP/WebSocket and explain the legacy ILP positioning up front. - Configuration string section uses ws::/wss:: throughout and links to questdb.io/docs/connect/clients/connect-string/ for the full key reference rather than enumerating keys inline. - "Don't Forget to Flush" reframed: the client has no auto-flush on any transport (auto_flush_rows / auto_flush_bytes / auto_flush_interval all rejected; auto_flush=off accepted as a compatibility no-op). Buffer.len()-based flush-timing guidance restored. - "Error Handling on QWP/WebSocket" kept concise; protocol-level detail (Halt vs DropAndContinue framing, FSN watermarks, multi-host failover, store-and-forward, durable acknowledgement) is summarised in a single section that lists the Rust API entry points and links out to questdb.io/docs/high-availability/ and the C/C++ client doc for depth. - Authentication section calls out OIDC / mTLS / token-rotation as unsupported by this client, with workarounds. - TLS, close_drain, health-check, buffer-API sub-sections preserved with minor reframing for a QWP-first reader. - Array and decimal sections note that QWP carries these types natively (no protocol_version handshake required). - Legacy ILP Transports section at the bottom covers ILP/HTTP and ILP/TCP for backwards-compatibility readers, with an explicit legacy admonition. - The implicit-promotion claim about initial_connect_retry (which is a Java-client behaviour, not present in this crate) has been removed. questdb-rs/src/ingress/sender.rs - Sender::from_conf documents transports in QWP-first order and drops the qwpudp entry. - Sender::from_conf and Sender::from_env note that both TCP and QWP/WebSocket complete authentication and TLS synchronously before returning. - Sender::flush recommendation no longer reads "HTTP first, TCP for high rate"; QWP/WebSocket is the default recommendation, framed around structured error reporting and durability rather than raw throughput. - Sender::must_close description reordered so QWP/WebSocket is primary; ILP/TCP follows. questdb-rs/src/ingress/buffer.rs - Buffer struct-level doc no longer claims dispatch is "ILP or QWP/UDP" only; it now correctly covers QWP/WebSocket as well. Validation: cargo doc --no-deps builds clean with no warnings; cargo test --doc passes all 50 doctests (including the 13 new code blocks in mod.md and the new README example). Untouched in this commit: doc/QWP_UPSTREAM_PR_NOTES.md was already untracked and remains so. Per-method buffer.rs doc comments that say "For ILP buffers ... For QWP/UDP buffers ..." are factually correct descriptions of internal dispatch and were left as-is. Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/README.md | 207 +++++---- questdb-rs/src/ingress/buffer.rs | 7 +- questdb-rs/src/ingress/mod.md | 705 +++++++++++++++---------------- questdb-rs/src/ingress/sender.rs | 74 ++-- 4 files changed, 505 insertions(+), 488 deletions(-) diff --git a/questdb-rs/README.md b/questdb-rs/README.md index ef7f9452..d87b786a 100644 --- a/questdb-rs/README.md +++ b/questdb-rs/README.md @@ -3,141 +3,166 @@ Official Rust client for [QuestDB](https://questdb.io/), an open-source SQL database designed to process time-series data, faster. -The client library is designed for fast ingestion of data into QuestDB. It -supports three transports: the InfluxDB Line Protocol (ILP) over HTTP -(recommended) or TCP, and the QuestDB Wire Protocol (QWP) over UDP for -high-throughput ingestion on trusted networks. +The client library is designed for fast ingestion of data into QuestDB. The +recommended transport is the +**QuestDB Wire Protocol over WebSocket (QWP/WebSocket)**: a columnar binary +protocol with explicit asynchronous server acknowledgements, multi-host +failover, optional on-disk durability, and a structured error model. + +Legacy InfluxDB Line Protocol (ILP) transports — over HTTP or TCP — remain +supported for backwards compatibility but are not recommended for new code. * [QuestDB Database docs](https://questdb.io/docs/) -* [Docs on InfluxDB Line Protocol](https://questdb.io/docs/reference/api/ilp/overview/) +* [`ingress` module docs](https://docs.rs/questdb-rs/6.1.0/questdb/ingress/) — + protocol details, configuration parameters, and patterns. ## Transports The transport is selected by the scheme in the configuration string: -* `http::addr=...` / `https::addr=...` — request-response, errors returned - to the client, supports authentication and TLS. Recommended for most - workloads. -* `tcp::addr=...` / `tcps::addr=...` — streaming, legacy; errors cause - server-side disconnect and surface only in server logs. -* `qwpudp::addr=...` — best-effort UDP datagrams (IPv4-only); no - acknowledgements, no authentication, no TLS, no transactional guarantees. - See the [`ingress`](https://docs.rs/questdb-rs/latest/questdb/ingress/) - module docs (in particular `Protocol::QwpUdp`) for semantics and - configuration parameters. - -## Protocol Versions - -The library supports the following ILP protocol versions. These apply to -ILP/HTTP and ILP/TCP only — QWP/UDP uses its own wire format and is not -versioned through this mechanism. - -* If you use HTTP and `protocol_version=auto` or unset, the library will - automatically detect the server's - latest supported protocol version and use it (recommended). -* If you use TCP, you can specify the - `protocol_version=N` parameter when constructing the `Sender` object - (TCP defaults to `protocol_version=1`). - -| Version | Description | Server Compatibility | -| ------- | ------------------------------------------------------- | --------------------- | -| **1** | Over HTTP it's compatible InfluxDB Line Protocol (ILP) | All QuestDB versions | -| **2** | 64-bit floats sent as binary, adds n-dimentional arrays | 9.0.0+ (2023-10-30) | - -**Note**: QuestDB server version 9.0.0 or later is required for `protocol_version=2` support. +* `ws::addr=...` / `wss::addr=...` — **QWP/WebSocket**, the recommended + transport. Columnar binary frames, asynchronous server ACKs with explicit + frame-sequence-number watermarks, multi-host failover, optional + store-and-forward durability. Supports HTTP basic and bearer-token auth, + plus TLS via `wss::`. The long-form aliases `qwpws::` / `qwpwss::` are + also accepted. +* `http::addr=...` / `https::addr=...` — **ILP/HTTP** (legacy). + Request-response with error returns and per-request retry, no + asynchronous ACKs or multi-host failover. Suitable for existing + deployments and one-shot batches. +* `tcp::addr=...` / `tcps::addr=...` — **ILP/TCP** (legacy). Streaming with + no error reporting to the client; the server logs failures and silently + disconnects. Lowest overhead, lowest observability. + +See the [`ingress` module docs](https://docs.rs/questdb-rs/6.1.0/questdb/ingress/) +for the full configuration-parameter reference, including the +QWP-specific keys (`sf_dir`, `sender_id`, `reconnect_*`, +`request_durable_ack`, `qwp_ws_progress`, `max_in_flight`). ## Quick Start -To start using `questdb-rs`, add it as a dependency of your project: +Add `questdb-rs` to your project: ```bash cargo add questdb-rs ``` -Then you can try out this quick example, which connects to a QuestDB server -running on your local machine: +A minimal ingest using QWP/WebSocket: ```rust ignore use questdb::{ Result, - ingress::{ - Sender, - Buffer, - TimestampNanos}}; + ingress::{Sender, TimestampNanos}}; fn main() -> Result<()> { - let mut sender = Sender::from_conf("http::addr=localhost:9000;")?; - let mut buffer = sender.new_buffer(); - buffer - .table("trades")? - .symbol("symbol", "ETH-USD")? - .symbol("side", "sell")? - .column_f64("price", 2615.54)? - .column_f64("amount", 0.00044)? - - // Array ingestion (QuestDB 9.0.0+). Slices and ndarray supported through trait - .column_arr("price_history", &[2615.54f64, 2615.10, 2614.80])? - .column_arr("volatility", &ndarray::arr1(&[0.012f64, 0.011, 0.013]).view())? - .at(TimestampNanos::now())?; - sender.flush(&mut buffer)?; - Ok(()) + let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; + let mut buffer = sender.new_buffer(); + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .symbol("side", "sell")? + .column_f64("price", 2615.54)? + .column_f64("amount", 0.00044)? + .at(TimestampNanos::now())?; + sender.flush(&mut buffer)?; + sender.close_drain()?; + Ok(()) } ``` +`flush` returns once the frame has been appended to the local publication +log. `close_drain` waits for already-published frames to be acknowledged +by the server (bounded by `close_flush_timeout_millis`, default 5s) before +the sender is dropped. + +## Protocol Versions + +QWP/WebSocket negotiates its protocol version during the WebSocket upgrade; +the client requires no `protocol_version` configuration. Arrays +(`column_arr`) and decimals (`column_dec`) work natively over QWP. + +The ILP transports use a separate, ILP-specific protocol-version mechanism +that gates array and decimal ingestion. Over ILP/HTTP, `protocol_version` +defaults to `auto` (server-negotiated). Over ILP/TCP, the default is `1` +and you must set `protocol_version=2` (arrays) or `=3` (decimals) +explicitly in the configuration string. + +| Version | Description | Server Compatibility | +| ------- | ---------------------------------------------------------- | -------------------- | +| **1** | Compatible InfluxDB Line Protocol over ILP transports | All QuestDB versions | +| **2** | 64-bit floats as binary, n-dimensional arrays | 9.0.0+ | +| **3** | Adds DECIMAL64/DECIMAL128/DECIMAL256 | 9.2.0+ | + ## Docs -Most of the client documentation is on the -[`ingress`](https://docs.rs/questdb-rs/6.1.0/questdb/ingress/) module page. +The main client documentation is on the +[`ingress`](https://docs.rs/questdb-rs/6.1.0/questdb/ingress/) module +page: configuration keys, the QWP error model, FSN-based completion, +progress modes, multi-host failover, store-and-forward, authentication, +TLS, the `Buffer` API, and the legacy ILP transports. ## Examples -A selection of usage examples is available in the [examples directory](https://github.com/questdb/c-questdb-client/tree/6.1.0/questdb-rs/examples): +A selection of usage examples is available in the +[examples directory](https://github.com/questdb/c-questdb-client/tree/6.1.0/questdb-rs/examples): | Example | Description | |---------|-------------| -| [`basic.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/basic.rs) | Minimal TCP ingestion example; shows basic row and array ingestion. | -| [`auth.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/auth.rs) | Adds authentication (user/password, token) to basic ingestion. | -| [`auth_tls.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/auth_tls.rs) | Like `auth.rs`, but uses TLS for encrypted TCP connections. | -| [`from_conf.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/from_conf.rs) | Configures client via connection string instead of builder pattern. | -| [`from_env.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/from_env.rs) | Reads config from `QDB_CLIENT_CONF` environment variable. | -| [`http.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/http.rs) | Uses HTTP transport and demonstrates array ingestion with `ndarray`. | -| [`protocol_version.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/protocol_version.rs) | Shows protocol version selection and feature differences (e.g. arrays). | +| [`basic.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/basic.rs) | Minimal ILP/TCP ingestion (legacy transport). | +| [`auth.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/auth.rs) | ILP/TCP with ECDSA authentication. | +| [`auth_tls.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/auth_tls.rs) | ILP/TCP with TLS plus ECDSA auth. | +| [`http.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/http.rs) | ILP/HTTP transport with array ingestion. | +| [`from_conf.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/from_conf.rs) | Configures a sender from a connection string. | +| [`from_env.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/from_env.rs) | Reads configuration from `QDB_CLIENT_CONF`. | +| [`protocol_version.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/protocol_version.rs) | ILP protocol-version selection. | +| [`qwp_ws_unified_sfa_bench.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_unified_sfa_bench.rs) | QWP/WebSocket throughput benchmark with store-and-forward. | ## Crate features -The crate provides several optional features to enable additional functionality. You can enable features using Cargo's `--features` flag or in your `Cargo.toml`. +The crate provides optional features. Enable them via Cargo's +`--features` flag or in your `Cargo.toml`. ### Default features -- **sync-sender**: Enables both `sync-sender-tcp` and `sync-sender-http`. -- **sync-sender-tcp**: Enables ILP/TCP (legacy). Depends on the `socket2` crate. -- **sync-sender-http**: Enables ILP/HTTP support. Depends on the `ureq` crate. -- **tls-webpki-certs**: Uses a snapshot of the [Common CA Database](https://www.ccadb.org/) as root TLS certificates. Depends on the `webpki-roots` crate. -- **ring-crypto**: Uses the `ring` crate as the cryptography backend for TLS (default crypto backend). -### Optional features - -- **chrono_timestamp**: Allows specifying timestamps as `chrono::DateTime` objects. Depends on the `chrono` crate. -- **tls-native-certs**: Uses OS-provided root TLS certificates for secure connections. Depends on the `rustls-native-certs` crate. -- **insecure-skip-verify**: Allows skipping verification of insecure certificates (not recommended for production). -- **ndarray**: Enables integration with the `ndarray` crate for working with n-dimensional arrays. Without this feature, you can still send slices or implement custom array types via the `NdArrayView` trait. -- **aws-lc-crypto**: Uses `aws-lc-rs` as the cryptography backend for TLS. Mutually exclusive with the `ring-crypto` feature. +* **sync-sender** — umbrella for the synchronous sender transports. +* **sync-sender-qwp-ws** — QWP over WebSocket. Recommended transport. +* **sync-sender-http** — ILP over HTTP (legacy). +* **sync-sender-tcp** — ILP over TCP (legacy). +* **tls-webpki-certs** — bundled TLS roots from the + [Common CA Database](https://www.ccadb.org/) via + [`webpki-roots`](https://crates.io/crates/webpki-roots). +* **ring-crypto** — default TLS crypto backend, via the `ring` crate. -- **almost-all-features**: Convenience feature for development and testing. Enables most features except mutually exclusive crypto backends. +### Optional features -> See the `Cargo.toml` for the full list and details on feature interactions. +* **chrono_timestamp** — accept timestamps as `chrono::DateTime`. +* **tls-native-certs** — OS-provided root TLS certificates via + `rustls-native-certs`. +* **insecure-skip-verify** — disable TLS verification (test-only, + not for production). +* **ndarray** — integrate with the `ndarray` crate for N-dimensional + arrays. Without this feature, slices and custom types via the + `NdArrayView` trait still work. +* **aws-lc-crypto** — alternative TLS backend via `aws-lc-rs`. Mutually + exclusive with `ring-crypto`. +* **almost-all-features** — dev/test convenience: enables most features + except mutually exclusive crypto backends. + +> See `Cargo.toml` for the full list and feature interactions. ## C, C++ and Python APIs -This crate is also exposed as a C and C++ API and in turn exposed to Python. +This crate is also exposed as a C and C++ API and in turn exposed to +Python. -* This project's [GitHub page](https://github.com/questdb/c-questdb-client) - for the C and C++ API. -* [Python bindings](https://github.com/questdb/py-questdb-client). +* [c-questdb-client](https://github.com/questdb/c-questdb-client) — the C + and C++ API. +* [py-questdb-client](https://github.com/questdb/py-questdb-client) — + Python bindings. ## Community -If you need help, have additional questions or want to provide feedback, you -may find us on [Slack](https://slack.questdb.io/). - -You can also sign up to our [mailing list](https://questdb.io/community/) to -get notified of new releases. +If you need help, have questions, or want to provide feedback, +[join us on Slack](https://slack.questdb.io/). You can also sign up to +the [mailing list](https://questdb.io/community/) to get notified of new +releases. diff --git a/questdb-rs/src/ingress/buffer.rs b/questdb-rs/src/ingress/buffer.rs index b81e3ce1..1173d315 100644 --- a/questdb-rs/src/ingress/buffer.rs +++ b/questdb-rs/src/ingress/buffer.rs @@ -374,8 +374,11 @@ enum BufferInner { /// A reusable row buffer. /// -/// For ILP senders this exposes the existing byte-oriented buffer implementation. -/// For QWP/UDP senders it dispatches to the QWP-specific row buffer. +/// The same `Buffer` type is shared across all transports; the underlying +/// representation is chosen by the sender at construction time. For +/// QWP senders (WebSocket or UDP) it dispatches to the QWP-specific +/// columnar row buffer; for ILP senders (HTTP or TCP) it exposes the +/// byte-oriented ILP buffer. #[derive(Clone, Debug)] pub struct Buffer { inner: BufferInner, diff --git a/questdb-rs/src/ingress/mod.md b/questdb-rs/src/ingress/mod.md index a8a79899..0579574f 100644 --- a/questdb-rs/src/ingress/mod.md +++ b/questdb-rs/src/ingress/mod.md @@ -1,340 +1,350 @@ # Fast Ingestion of Data into QuestDB -The `ingress` module implements QuestDB's variant of the -[InfluxDB Line Protocol](https://questdb.io/docs/reference/api/ilp/overview/) -(ILP). +The `ingress` module sends rows of time-series data to a QuestDB server. The +recommended transport is the +**QuestDB Wire Protocol over WebSocket (QWP/WebSocket)**: a columnar binary +protocol with asynchronous server acknowledgements, multi-host failover, and +optional on-disk durability. + +Legacy InfluxDB Line Protocol (ILP) transports — over HTTP and TCP — remain +supported but are not recommended for new code; see +[Legacy ILP Transports](#legacy-ilp-transports) below. To get started: -* Use [`Sender::from_conf()`] to get the [`Sender`] object -* Populate a [`Buffer`] with one or more rows of data -* Send the buffer using [`sender.flush()`](Sender::flush) +* Use [`Sender::from_conf()`] to build a [`Sender`]. +* Populate a [`Buffer`] with one or more rows. +* Call [`sender.flush()`](Sender::flush) to publish. +* Call [`sender.close_drain()`](Sender::close_drain) before dropping the + sender so already-published frames complete on the wire. ```rust no_run use questdb::{ Result, - ingress::{ - Sender, - Buffer, - TimestampNanos}}; + ingress::{Sender, TimestampNanos}}; fn main() -> Result<()> { - let mut sender = Sender::from_conf("http::addr=localhost:9000;")?; - let mut buffer = sender.new_buffer(); - buffer - .table("trades")? - .symbol("symbol", "ETH-USD")? - .symbol("side", "sell")? - .column_f64("price", 2615.54)? - .column_f64("amount", 0.00044)? - .at(TimestampNanos::now())?; - sender.flush(&mut buffer)?; - Ok(()) + let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; + let mut buffer = sender.new_buffer(); + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .symbol("side", "sell")? + .column_f64("price", 2615.54)? + .column_f64("amount", 0.00044)? + .at(TimestampNanos::now())?; + sender.flush(&mut buffer)?; + sender.close_drain()?; + Ok(()) } ``` # Configuration String -The easiest way to configure all the available parameters on a line sender is -the configuration string. The general structure is: +A sender is configured with a single string: ```plain -::addr=host:port;param1=val1;param2=val2;... -``` - -`transport` can be `http`, `https`, `tcp`, or `tcps`. See the full details on -supported parameters in a dedicated section below. - -# Don't Forget to Flush - -The sender and buffer objects are entirely decoupled. This means that the sender -won't get access to the data in the buffer until you explicitly call -[`sender.flush(&mut buffer)`](Sender::flush) or a variant. This may lead to a -pitfall where you drop a buffer that still has some data in it, resulting in -permanent data loss. - -A common technique is to flush periodically on a timer and/or once the buffer -exceeds a certain size. You can check the buffer's size by the calling -[`buffer.len()`](Buffer::len). - -The default `flush()` method clears the buffer after sending its data. If you -want to preserve its contents (for example, to send the same data to multiple -QuestDB instances), call -[`sender.flush_and_keep(&mut buffer)`](Sender::flush_and_keep) instead. - -# Error Handling - -The supported transport modes handle errors very differently. In a nutshell, -HTTP is much better at error handling. - -## TCP - -TCP doesn't report errors at all to the sender; instead, the server quietly -disconnects and you'll have to inspect the server logs to get more information -on the reason. When this has happened, the sender transitions into an error -state, and it is permanently unusable. You must drop it and create a new sender. -You can inspect the sender's error state by calling -[`sender.must_close()`](Sender::must_close). - -## QWP/UDP - -QWP/UDP is a best-effort datagram transport. A `flush()` call sends one or more -UDP datagrams and can report only local socket errors. A successful return does -not guarantee delivery, server-side processing, or flush-level atomicity. - -When one logical flush spans multiple datagrams, some datagrams may already -have been emitted before a later send fails. In that case, retrying the same -buffer may duplicate rows that were already sent. - -## QWP/WebSocket - -QWP/WebSocket is a reliable, in-order transport. Each published frame is -assigned a frame sequence number (FSN) and tracked through a publication -lifecycle until the server acknowledges or rejects it. Transient socket -errors and reconnects are absorbed by the driver and do not surface to the -caller; only server rejections and terminal protocol violations are -reported. - -Server rejections fall into two categories: - -- *Drop-and-continue* rejections (e.g. schema mismatch, write error) affect - only the rejected frame; the sender continues processing subsequent frames. - Retrieve structured diagnostics with - [`sender.poll_qwp_ws_error()`](Sender::poll_qwp_ws_error), or install a - callback via [`SenderBuilder::qwp_ws_error_handler`](SenderBuilder::qwp_ws_error_handler). - -- *Halt* rejections (e.g. parse error, internal error, security error) and - terminal WebSocket protocol violations latch the sender into a - permanently-unusable state. The next `flush()` call returns - `ErrorCode::ServerRejection` carrying the structured diagnostic, and - [`sender.must_close()`](Sender::must_close) returns `true`. The sender - must be dropped and a new one constructed. The buffer passed to that - final `flush()` is left unmodified, so its contents can be recovered - before dropping the sender. - -`flush()` returns once the frame has been appended to the local publication -log; the FSN is also available via -[`flush_and_get_fsn()`](Sender::flush_and_get_fsn). To wait for the server -to durably acknowledge a specific FSN, call -[`sender.await_acked_fsn()`](Sender::await_acked_fsn). -[`sender.published_fsn()`](Sender::published_fsn) and -[`sender.acked_fsn()`](Sender::acked_fsn) provide non-blocking polls. - -In `manual` progress mode no background thread observes the transport. -Server-side state — including halts — only becomes visible when the user -calls [`sender.drive_once()`](Sender::drive_once) or any sender method -that drives the transport (such as `flush` or `await_acked_fsn`). As a -consequence, `must_close()` on an otherwise-idle manual sender does not -reflect a halt until the next drive. - -## HTTP - -HTTP distinguishes between recoverable and non-recoverable errors. For -recoverable ones, it enters a retry loop with exponential backoff, and reports -the error to the caller only after it has exhausted the retry time budget -(configuration parameter: `retry_timeout`). - -`sender.flush()` and variant methods communicate the error in the `Result` -return value. The category of the error is signalled through the -[`ErrorCode`](crate::error::ErrorCode) enum, and it's accompanied with an error -message. - -After the sender has signalled an error, it remains usable. You can handle the -error as appropriate and continue using it. - -# Health Check - -The QuestDB server has a "ping" endpoint you can access to see if it's alive, -and confirm the version of the InfluxDB that it is compatible with at a protocol -level. - -```shell -curl -I http://localhost:9000/ping +::addr=host:port;key1=val1;key2=val2;... ``` -Example of the expected response: +For QWP/WebSocket the transport is `ws` (plain) or `wss` (TLS). The +long-form aliases `qwpws` / `qwpwss` are also accepted. The default port +is `9000` for both `ws` and `wss`. ```plain -HTTP/1.1 204 OK -Server: questDB/1.0 -Date: Fri, 2 Feb 2024 17:09:38 GMT -Transfer-Encoding: chunked -Content-Type: text/plain; charset=utf-8 -X-Influxdb-Version: v2.7.4 +ws::addr=localhost:9000; +wss::addr=db.example.com:9000;username=admin;password=secret; +ws::addr=node-a:9000,node-b:9000;sf_dir=/var/lib/myapp/qdb-sf;sender_id=ingest-1; ``` -# Configuration Parameters +For the full key reference (TLS, auth, failover, store-and-forward, +durable ACK, progress modes), see the +[connect-string reference](https://questdb.io/docs/connect/clients/connect-string/) +on the QuestDB documentation site. [`SenderBuilder`] exposes the same +options programmatically. -In the examples below, we'll use configuration strings. We also provide the -[`SenderBuilder`] to programmatically configure the sender. The methods on -[`SenderBuilder`] match one-for-one with the keys in the configuration string. - -## Authentication +# Don't Forget to Flush -To establish an -[authenticated](https://questdb.io/docs/reference/api/ilp/overview/#authentication) -and TLS-encrypted connection, use the `https` or `tcps` protocol, and use the -configuration options appropriate for the authentication method. +The sender and the buffer are decoupled: a buffer accumulates rows +locally and the sender does not see them until you call +[`sender.flush(&mut buffer)`](Sender::flush) (or +[`sender.flush_and_keep(&mut buffer)`](Sender::flush_and_keep) to retain +the buffer contents). -Here are quick examples of configuration strings for each authentication method -we support: +**This client does not auto-flush, regardless of transport.** The +configuration keys `auto_flush_rows`, `auto_flush_bytes`, and +`auto_flush_interval` are rejected; `auto_flush=off` is accepted as a +no-op for compatibility with older connect strings. A common pattern is +to flush periodically on a timer and/or once the buffer size +([`buffer.len()`](Buffer::len)) exceeds a threshold. -### HTTP Token Bearer Authentication +On QWP/WebSocket, `flush` returns once the frame has been appended to the +local publication log. A successful flush clears the buffer; a failed +flush retains the rows so you can retry. A typical ingest loop reuses one +sender and one buffer: -```no_run -# use questdb::{Result, ingress::Sender}; +```rust no_run +# use questdb::{Result, ingress::{Sender, TimestampNanos}}; # fn main() -> Result<()> { -let mut sender = Sender::from_conf( - "https::addr=localhost:9000;token=Yfym3fgMv0B9;" -)?; -# Ok(()) -# } +# let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; +# let mut buffer = sender.new_buffer(); +# let running = true; +while running { + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .column_f64("price", 2615.54)? + .at(TimestampNanos::now())?; + sender.flush(&mut buffer)?; + if buffer.len() > 64 * 1024 { + // also flush on size if you batch multiple rows per iteration + } + // sleep until the next tick +# break; +} +# Ok(()) } ``` -* `token`: the authentication token +# Error Handling on QWP/WebSocket + +QWP/WebSocket is a reliable, in-order transport. Transient socket errors +and reconnects are absorbed by the driver and do not surface to the +caller. Two error surfaces are visible to user code: -### HTTP Basic Authentication +1. **Synchronous errors from `flush` and related calls** — local + failures or a terminal sender state. Returned as `Result::Err`. +2. **Asynchronous server errors** — protocol or schema errors reported + by the server *after* `flush` has returned. Drain them with + [`sender.poll_qwp_ws_error()`](Sender::poll_qwp_ws_error) or install a + callback via + [`SenderBuilder::qwp_ws_error_handler`](SenderBuilder::qwp_ws_error_handler). -```no_run +```rust no_run # use questdb::{Result, ingress::Sender}; # fn main() -> Result<()> { -let mut sender = Sender::from_conf( - "https::addr=localhost:9000;username=testUser1;password=Yfym3fgMv0B9;" -)?; -# Ok(()) -# } +# let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; +while let Some(err) = sender.poll_qwp_ws_error()? { + eprintln!("qwp error: {:?}", err); +} +# Ok(()) } ``` -* `username`: the username -* `password`: the password - -### TCP Elliptic Curve Digital Signature Algorithm (ECDSA) +Server errors are classified by +[`QwpWsErrorPolicy`]: `DropAndContinue` rejects only the affected +frame, `Halt` latches the sender into a permanently-unusable state +([`must_close()`](Sender::must_close) returns `true`). See +[`QwpWsSenderError`] for the diagnostic fields and +[`QwpWsErrorCategory`] for the categorisation. The +[QuestDB documentation site](https://questdb.io/docs/connect/clients/c-and-cpp/#asynchronous-error-handling) +has the full protocol-level error model. + +# Completion Tracking, Progress Modes, Failover, Durability + +These are QWP/WebSocket features whose protocol-level details live in the +QuestDB documentation; the Rust API entry points are summarised here. + +* **FSN-based completion** — every published frame is assigned a frame + sequence number. Use [`flush_and_get_fsn`](Sender::flush_and_get_fsn) + to capture it, then + [`await_acked_fsn`](Sender::await_acked_fsn) (or + [`published_fsn`](Sender::published_fsn) / + [`acked_fsn`](Sender::acked_fsn) non-blocking) to wait for server + acknowledgement. +* **Progress modes** — `background` (default) runs a sender-owned thread + that drives the transport; `manual` requires the caller to drive + progress with [`drive_once`](Sender::drive_once) or any sender method + that progresses the loop. Select via the configuration string + (`qwp_ws_progress=manual`) or + [`SenderBuilder::qwp_ws_progress`](SenderBuilder::qwp_ws_progress). +* **Multi-host failover** — pass a comma-separated address list + (`addr=a:9000,b:9000`) and tune `reconnect_max_duration_millis`, + `reconnect_initial_backoff_millis`, `reconnect_max_backoff_millis`. + Auth failures are terminal across all endpoints; transport errors are + retried. +* **Store-and-forward** — set `sf_dir` to a writable directory and + `sender_id` to a stable identifier per sender process. Unacknowledged + frames are persisted and replayed across reconnects and process + restarts. Replay is at-least-once, so target tables should declare + `DEDUP UPSERT KEYS(...)`. +* **Durable acknowledgement** — set `request_durable_ack=on` (QuestDB + Enterprise with primary replication) so `acked_fsn` advances only + after durable upload to object storage. + +See [QuestDB high-availability docs](https://questdb.io/docs/high-availability/overview/) +for the full failover/SF/durable-ACK story and +[delivery semantics](https://questdb.io/docs/concepts/delivery-semantics/) +for the at-least-once/exactly-once model. + +# Authentication + +QWP/WebSocket authenticates at the HTTP layer during the WebSocket +upgrade. Credentials are sent once per connection and reused across +reconnects. -```no_run +```rust no_run # use questdb::{Result, ingress::Sender}; # fn main() -> Result<()> { -let mut sender = Sender::from_conf( - "tcps::addr=localhost:9009;username=testUser1;token=5UjEA0;token_x=fLKYa9;token_y=bS1dEfy;" -)?; -# Ok(()) -# } +// Bearer token (recommended for Enterprise) +let _s = Sender::from_conf( + "wss::addr=db.example.com:9000;token=Yfym3fgMv0B9;")?; + +// HTTP basic auth +let _s = Sender::from_conf( + "wss::addr=db.example.com:9000;username=admin;password=Yfym3fgMv0B9;")?; +# Ok(()) } ``` -The four ECDSA components are: - -* `username`, aka. _kid_ -* `token`, aka. _d_ -* `token_x`, aka. _x_ -* `token_y`, aka. _y_ +`auth_timeout` (milliseconds, default `15000`) bounds the handshake. +`auth_timeout_ms` is accepted as a Java-compatible spelling. -### Authentication Timeout +**Not supported by this client:** OIDC token acquisition / in-band +refresh, mutual TLS (client certificates), token rotation mid-session. +QuestDB itself supports OIDC server-side — see +[OpenID Connect](https://questdb.io/docs/security/oidc/); acquire a +token out-of-band from your IdP, pass it via `token=...`, and rebuild +the sender when it nears expiry. mTLS is not negotiated by the +QuestDB server regardless of client. -You can specify how long the client should wait for the authentication request -to resolve. The configuration parameter is: +# TLS -* `auth_timeout` (milliseconds, default 15 seconds) +Use the `wss` schema (or the alias `qwpwss`). Configuration parameters: -For QWP/WebSocket configuration strings, the Java-compatible spelling is also -accepted: +* `tls_ca=webpki_roots` — bundled webpki roots (default). +* `tls_ca=os_roots` — OS certificate store (requires the + `tls-native-certs` feature). +* `tls_ca=webpki_and_os_roots` — both. +* `tls_roots=/path/to/root-ca.pem` — load roots from a PEM file. + Implies `tls_ca=pem_file`. +* `tls_verify=unsafe_off` — disable verification entirely. Requires + the `insecure-skip-verify` feature. **Never use in production.** -* `auth_timeout_ms` (milliseconds, default 15 seconds) +See the notes on +[how to generate a self-signed certificate](https://github.com/questdb/c-questdb-client/tree/6.1.0/tls_certs). -## Encryption on the Wire: TLS +# Closing the Sender -To enable TLS on the QuestDB Enterprise server, refer to the [QuestDB Enterprise -TLS documentation](https://questdb.io/docs/operations/tls/). +For delivery-sensitive shutdown, call +[`close_drain`](Sender::close_drain) before dropping the sender: -*Note*: QuestDB Open Source does not support TLS natively. To use TLS with -QuestDB Open Source, use a TLS proxy such as -[HAProxy](http://www.haproxy.org/). - -We support several certification authorities (sources of PKI root certificates). -To select one, use the `tls_ca` config option. These are the supported variants: - -* `tls_ca=webpki_roots;` use the roots provided in the standard Rust crate - [webpki-roots](https://crates.io/crates/webpki-roots) +```rust no_run +# use questdb::{Result, ingress::Sender}; +# fn main() -> Result<()> { +# let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; +sender.close_drain()?; +# Ok(()) } +``` -* `tls_ca=os_roots;` use the OS-provided certificate store +`close_drain` stops accepting new publications and waits up to +`close_flush_timeout_millis` (default `5000`) for already-published +frames to be acknowledged. With `sf_dir`, anything still un-acked is +persisted to disk so a later sender can replay it. Dropping a sender +without `close_drain` is best-effort: in-flight frames may not reach the +server, and any delivery failure is silent. -* `tls_ca=webpki_and_os_roots;` combine both of the above +# Health Check -* `tls_roots=/path/to/root-ca.pem;` get the root certificates from the specified - file. Main purpose is for testing with self-signed certificates. _Note:_ this - automatically sets `tls_ca=pem_file`. +The QuestDB server exposes a `/ping` endpoint on the same port as +QWP/WebSocket (the HTTP listener; default `9000`): -See our notes on [how to generate a self-signed -certificate](https://github.com/questdb/c-questdb-client/tree/main/tls_certs). +```shell +curl -I http://localhost:9000/ping +``` -* `tls_verify=unsafe_off;` tells the QuestDB client to ignore all CA roots and - accept any server certificate without checking. You can use it as a last - resort, when you weren't able to apply the above approach with a self-signed - certificate. You should **never use it in production** as it defeats security - and allows a man-in-the middle attack. +# Sequential Coupling in the Buffer API -## HTTP Timeouts +The fluent API of [`Buffer`] has sequential coupling: there's a certain +order in which you are expected to call the methods. You must write the +symbols before the columns, and you must terminate each row by calling +either [`at`](Buffer::at) or [`at_now`](Buffer::at_now). Refer to the +[`Buffer`] doc for the full rules and a flowchart. -Instead of a fixed timeout value, we use a flexible timeout that depends on the -size of the HTTP request payload (how much data is in the buffer that you're -flushing). You can configure it using two options: +# Optimization: Avoid Revalidating Names -* `request_min_throughput` (bytes per second, default 100 KiB/s): divide the - payload size by this number to determine for how long to keep sending the - payload before timing out. -* `request_timeout` (milliseconds, default 10 seconds): additional time - allowance to account for the fixed latency of the request-response roundtrip. +The client validates every name you provide. To avoid re-validating the +same names on every row, create pre-validated [`ColumnName`] and +[`TableName`] values once: -Finally, the client will keep retrying the request if it experiences errors. You -can configure the total time budget for retrying: +```rust no_run +# use questdb::Result; +use questdb::ingress::{TableName, ColumnName, Sender, TimestampNanos}; +# fn main() -> Result<()> { +let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; +let mut buffer = sender.new_buffer(); +let table_name = TableName::new("trades")?; +let price_name = ColumnName::new("price")?; +buffer.table(table_name)?.column_f64(price_name, 2615.54)?.at(TimestampNanos::now())?; +buffer.table(table_name)?.column_f64(price_name, 39269.98)?.at(TimestampNanos::now())?; +# Ok(()) } +``` -* `retry_timeout` (milliseconds, default 10 seconds) +# Handling Optional Data (NULLs) -# Usage Considerations +In QuestDB, `NULL` values are represented by simply omitting the column +for that specific row. -## Transactional Flush +To make working with Rust's `Option` ergonomic and keep the fluent +builder chain unbroken, the [`Buffer`] API provides `_opt` variants for +all column methods (e.g. +[`column_str_opt`](Buffer::column_str_opt), +[`column_f64_opt`](Buffer::column_f64_opt)). -When using HTTP, you can arrange that each `flush()` call happens within its own -transaction. For this to work, your buffer must contain data that targets only -one table. This is because QuestDB doesn't support multi-table transactions. +If the provided value is `Some(v)`, the column is written normally. If +the value is `None`, the method acts as a no-op and skips the column. -In order to ensure in advance that a flush will be transactional, call -[`sender.flush_and_keep_with_flags(&mut buffer, true)`](Sender::flush_and_keep_with_flags). -This call will refuse to flush a buffer if the flush wouldn't be transactional. +**Note on ownership:** for types that implement `Copy` (like `i64`, +`f64`, `bool`), you can pass the `Option` directly. For heap-allocated +types like `String` or `Vec`, use `.as_ref()` or `.as_deref()` to pass a +reference without consuming the original value. -## When to Choose the TCP Transport? +```rust no_run +# use questdb::Result; +use questdb::ingress::{Sender, TimestampNanos}; +# fn main() -> Result<()> { +let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; +let mut buffer = sender.new_buffer(); +let humidity: Option = None; +buffer + .table("sensors")? + .symbol("location", "factory-1")? + .column_f64("temperature", 22.5)? + // Silently skips the humidity column (stored as NULL). + .column_f64_opt("humidity", humidity)? + .at(TimestampNanos::now())?; +# Ok(()) } +``` -As discussed above, the TCP transport mode is raw and simplistic: it doesn't -report any errors to the caller (the server just disconnects), has no automatic -retries, requires manual handling of connection failures, and doesn't support -transactional flushing. +# Array Datatype -However, TCP has a lower overhead than HTTP and it's worthwhile to try out as an -alternative in a scenario where you have a constantly high data rate and/or deal -with a high-latency network connection. +[`Buffer::column_arr`](Buffer::column_arr) supports efficient ingestion +of N-dimensional arrays using: -## Array Datatype +* native Rust arrays and slices (up to 3-dimensional) +* native Rust vectors (up to 3-dimensional) +* arrays from the [`ndarray`](https://docs.rs/ndarray) crate -The [`Buffer::column_arr`](Buffer::column_arr) method supports efficient ingestion of N-dimensional -arrays using several convenient types: +QWP/WebSocket carries array types natively in the wire format and does +not require a `protocol_version` setting. Requires QuestDB server +9.0.0 or later. -- native Rust arrays and slices (up to 3-dimensional) -- native Rust vectors (up to 3-dimensional) -- arrays from the [`ndarray`](https://docs.rs/ndarray) crate +# Decimal Datatype -You must use protocol version 2 to ingest arrays. The HTTP transport will -automatically enable it as long as you're connecting to an up-to-date QuestDB -server (version 9.0.0 or later), but with TCP you must explicitly specify it in -the configuration string: `protocol_version=2;`. +[`Buffer::column_dec`](Buffer::column_dec) accepts: -**Note**: QuestDB server version 9.0.0 or later is required for array support. +* native Rust string slices +* decimals from the [`rust_decimal`](https://docs.rs/rust_decimal) crate +* decimals from the [`bigdecimal`](https://docs.rs/bigdecimal) crate -## Timestamp Column Name +QWP/WebSocket carries `DECIMAL64`, `DECIMAL128`, and `DECIMAL256` +natively and does not require a `protocol_version` setting. Requires +QuestDB server 9.2.0 or later. Pre-create decimal columns with +`DECIMAL(precision, scale)` so the server enforces the expected +precision. -The InfluxDB Line Protocol (ILP) does not give a name to the designated timestamp, -so if you let this client auto-create the table, it will have the default `timestamp` name. -To use a custom name, say `my_ts`, pre-create the table with the desired -timestamp column name: +# Timestamp Column Name -To address this, issue a `CREATE TABLE` statement to create the table in advance. -Note the `timestamp(my_ts)` clause at the end specifies the designated timestamp. +The QuestDB ingest protocols do not give a name to the designated +timestamp, so if you let this client auto-create the table, it will have +the default `timestamp` name. To use a custom name, issue a +`CREATE TABLE` in advance: ```sql CREATE TABLE IF NOT EXISTS 'trades' ( @@ -346,125 +356,94 @@ CREATE TABLE IF NOT EXISTS 'trades' ( ) timestamp (my_ts) PARTITION BY DAY WAL; ``` -You can use the `CREATE TABLE IF NOT EXISTS` construct to make sure the table is -created, but without raising an error if the table already exists. +# Transactional Flush -## Sequential Coupling in the Buffer API +[`flush_and_keep_with_flags`](Sender::flush_and_keep_with_flags) with +`transactional = true` refuses to flush a buffer that would span +multiple tables, ensuring QuestDB treats the flush as a single +transaction. The function is gated on the `sync-sender-http` Cargo +feature; building with QWP/WebSocket only does not expose it. -The fluent API of [`Buffer`] has sequential coupling: there's a certain order in -which you are expected to call the methods. For example, you must write the -symbols before the columns, and you must terminate each row by calling either -[`at`](Buffer::at) or [`at_now`](Buffer::at_now). Refer to the [`Buffer`] doc -for the full rules and a flowchart. +# Check out the CONSIDERATIONS Document -## Optimization: Avoid Revalidating Names +The +[library considerations](https://github.com/questdb/c-questdb-client/blob/6.1.0/doc/CONSIDERATIONS.md) +covers threading, data quality, server errors, flushing, and +disconnections. -The client validates every name you provide. To avoid the redundant CPU work of -re-validating the same names on every row, create pre-validated [`ColumnName`] -and [`TableName`] values: +# Troubleshooting -```no_run -# use questdb::Result; -use questdb::ingress::{ - TableName, - ColumnName, - Buffer, - SenderBuilder, - TimestampNanos}; -# fn main() -> Result<()> { -let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?; -let mut buffer = sender.new_buffer(); -let table_name = TableName::new("trades")?; -let price_name = ColumnName::new("price")?; -buffer.table(table_name)?.column_f64(price_name, 2615.54)?.at(TimestampNanos::now())?; -buffer.table(table_name)?.column_f64(price_name, 39269.98)?.at(TimestampNanos::now())?; -# Ok(()) -# } -``` +If data doesn't appear in the database in a timely manner, you may not +be calling [`flush`](Sender::flush) often enough — this client has no +auto-flush on any transport. -## Handling Optional Data (NULLs) +For QWP/WebSocket, drain +[`poll_qwp_ws_error`](Sender::poll_qwp_ws_error) (or install a callback +via +[`qwp_ws_error_handler`](SenderBuilder::qwp_ws_error_handler)) to see +structured server diagnostics. The +[server log](https://questdb.io/docs/troubleshooting/log/) carries +additional context. -In QuestDB, `NULL` values are represented by simply omitting the column for that specific row. +To inspect the bytes of an ILP buffer before sending, call +[`buffer.as_bytes()`](Buffer::as_bytes). QWP buffers are encoded into +frames during [`flush`](Sender::flush) and `as_bytes()` is not useful +there. -To make working with Rust's `Option` ergonomic and keep the fluent builder chain unbroken, the [`Buffer`] API provides `_opt` variants for all column methods (e.g., [`column_str_opt`](Buffer::column_str_opt), [`column_f64_opt`](Buffer::column_f64_opt)). +# Legacy ILP Transports -If the provided value is `Some(v)`, the column is written normally. If the value is `None`, the method acts as a no-op and skips the column. +> **Legacy.** The ILP transports (`http`, `https`, `tcp`, `tcps`) remain +> supported for backwards compatibility but are not recommended for new +> code. Use QWP/WebSocket +> ([top of this page](#fast-ingestion-of-data-into-questdb)) instead. -**Note on ownership:** For types that implement `Copy` (like `i64`, `f64`, `bool`), you can pass the `Option` directly. For heap-allocated types like `String` or `Vec`, use `.as_ref()` or `.as_deref()` to pass a reference without consuming the original value. +The same [`Sender`] / [`Buffer`] API works across all transports — the +difference is the configuration string and the error model. -```rust no_run -# use questdb::Result; -use questdb::ingress::{Buffer, SenderBuilder, TimestampNanos}; +## ILP/HTTP +Configuration: `http::` or `https::`. Request-response. Recoverable +errors are retried with exponential backoff and surface only after the +retry budget is exhausted; the sender remains usable on signalled +errors. + +```rust no_run +# use questdb::{Result, ingress::{Sender, TimestampNanos}}; # fn main() -> Result<()> { - let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?; - let mut buffer = sender.new_buffer(); - struct SensorData { - sensor_id: Option, - temperature: Option, - humidity: Option, - } - - let data = SensorData { sensor_id: Some("sensor-1".to_string()), temperature: Some(22.5), humidity: None }; - - buffer - .table("sensors")? - .symbol("location", "factory-1")? - // Writes the sensor_id column if it's Some, otherwise skips it (stored as NULL in QuestDB) - .symbol_opt("sensor_id", data.sensor_id.as_deref())? - // Writes the temperature column - .column_f64_opt("temperature", data.temperature)? - // Silently skips the humidity column (stored as NULL in QuestDB) - .column_f64_opt("humidity", data.humidity)? - .at(TimestampNanos::now())?; -# Ok(()) -# } +let mut sender = Sender::from_conf("http::addr=localhost:9000;")?; +# let mut buffer = sender.new_buffer(); +# buffer.table("trades")?.column_f64("price", 1.0)?.at(TimestampNanos::now())?; +# sender.flush(&mut buffer)?; +# Ok(()) } ``` -## Decimal Datatype - -The [`Buffer::column_dec`](Buffer::column_dec) method supports efficient ingestion of decimal values using several convenient types: - -- native Rust String slices -- decimals from the [`rust_decimal`](https://docs.rs/rust_decimal) crate -- decimals from the [`bigdecimal`](https://docs.rs/bigdecimal) crate - -You must use protocol version 3 to ingest decimals. The HTTP transport will -automatically enable it as long as you're connecting to an up-to-date QuestDB -server (version 9.2.0 or later), but with TCP you must explicitly specify it in -the configuration string: `protocol_version=3;`. - -**Note**: QuestDB server version 9.2.0 or later is required for decimal support. +HTTP-specific tuning: `request_min_throughput` (bytes/sec, default +100 KiB/s), `request_timeout` (default 10 s), `retry_timeout` (default +10 s). HTTP also supports +[transactional flushes](#transactional-flush). -## Check out the CONSIDERATIONS Document +For arrays and decimals on HTTP, `protocol_version=auto` (the default) +negotiates the right version with the server. -The [Library -considerations](https://github.com/questdb/c-questdb-client/blob/main/doc/CONSIDERATIONS.md) -document covers these topics: +## ILP/TCP -* Threading -* Differences between the InfluxDB Line Protocol and QuestDB Data Types -* Data Quality -* Client-side checks and server errors -* Flushing -* Disconnections, data errors and troubleshooting +Configuration: `tcp::` or `tcps::`. Streaming. Does not report errors to +the sender — the server silently disconnects on error and the failure +appears only in the server log. The sender transitions into a terminal +state which you can detect via +[`must_close`](Sender::must_close). TCP also has lower overhead than +HTTP, which suits very high steady-state throughput on a high-latency +network, at the cost of all observability. -# Troubleshooting Common Issues +TCP supports ECDSA authentication: -## Infrequent Flushing - -If the data doesn't appear in the database in a timely manner, you may not be -calling [`flush()`](Sender::flush) often enough. - -## Debug disconnects and inspect errors - -If you're using ILP-over-TCP, it doesn't report any errors to the client. -Instead, on error, the server terminates the connection, and logs any error -messages in [server logs](https://questdb.io/docs/troubleshooting/log/). - -To inspect or log a buffer's contents before you send it, call -[`buffer.as_bytes()`](Buffer::as_bytes). +```rust no_run +# use questdb::{Result, ingress::Sender}; +# fn main() -> Result<()> { +let _s = Sender::from_conf( + "tcps::addr=localhost:9009;username=testUser1;token=5UjEA0;token_x=fLKYa9;token_y=bS1dEfy;")?; +# Ok(()) } +``` -This byte-level inspection is only meaningful for ILP buffers. QWP buffers are -encoded into UDP datagrams during [`flush()`](Sender::flush), so -[`buffer.as_bytes()`](Buffer::as_bytes) is not useful there. +TCP defaults to `protocol_version=1`. To send arrays or decimals over +TCP you must specify `protocol_version=2` (or `=3`) explicitly. diff --git a/questdb-rs/src/ingress/sender.rs b/questdb-rs/src/ingress/sender.rs index 83b644d2..bcec96e1 100644 --- a/questdb-rs/src/ingress/sender.rs +++ b/questdb-rs/src/ingress/sender.rs @@ -159,27 +159,33 @@ impl Sender { /// Create a new `Sender` instance from the given configuration string. /// - /// The format of the string is: `"http::addr=host:port;key=value;...;"`. + /// The format of the string is: `"ws::addr=host:port;key=value;...;"`. /// - /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, `"tcps"`, - /// and `"qwpudp"`. + /// Supported transport schemes: /// - /// We recommend HTTP for most cases because it provides more features, like - /// reporting errors to the client and supporting transaction control. TCP can - /// sometimes be faster in higher-latency networks, but misses a number of - /// features. + /// * `ws` / `wss` (long-form aliases: `qwpws` / `qwpwss`) — **recommended**. + /// QuestDB Wire Protocol over WebSocket. Asynchronous server ACKs, + /// multi-host failover, optional store-and-forward durability, + /// structured error model. + /// * `http` / `https` — InfluxDB Line Protocol over HTTP (legacy). + /// Request-response with retries. + /// * `tcp` / `tcps` — InfluxDB Line Protocol over TCP (legacy). No + /// error reporting to the client; failures appear only in the + /// server log. /// - /// Keys in the config string correspond to same-named methods on `SenderBuilder`. - /// - /// For the full list of keys and values, see the docs on [`SenderBuilder`]. + /// Keys in the config string correspond to same-named methods on + /// [`SenderBuilder`]. For the full list of keys and values, see the + /// [`ingress`](crate::ingress) module docs and the [`SenderBuilder`] + /// reference. /// /// You can also load the configuration from an environment variable. /// See [`Sender::from_env`]. /// - /// In the case of TCP, this synchronously establishes the TCP connection, and - /// returns once the connection is fully established. If the connection - /// requires authentication or TLS, these will also be completed before - /// returning. + /// For TCP this synchronously establishes the TCP connection and + /// returns once the connection (including any authentication and TLS + /// handshake) is fully established. For QWP/WebSocket the WebSocket + /// upgrade (likewise including any authentication and TLS handshake) + /// is performed synchronously here. #[cfg(feature = "_sync-sender")] pub fn from_conf>(conf: T) -> Result { SenderBuilder::from_conf(conf)?.build() @@ -189,10 +195,9 @@ impl Sender { /// environment variable. The format is the same as that accepted by /// [`Sender::from_conf`]. /// - /// In the case of TCP, this synchronously establishes the TCP connection, and - /// returns once the connection is fully established. If the connection - /// requires authentication or TLS, these will also be completed before - /// returning. + /// As with [`Sender::from_conf`], TCP and QWP/WebSocket transports + /// establish the connection (including any authentication and TLS + /// handshake) synchronously before this function returns. #[cfg(feature = "_sync-sender")] pub fn from_env() -> Result { SenderBuilder::from_env()?.build() @@ -498,11 +503,15 @@ impl Sender { /// transport failures observed later are reported by subsequent sender /// calls. /// - /// HTTP should be the first choice, but use TCP if you need to continuously send - /// data to the server at a high rate. + /// QWP/WebSocket is the recommended transport for new code: structured + /// error reporting, multi-host failover, and optional durable + /// store-and-forward. ILP-over-HTTP remains a reasonable second + /// choice; ILP-over-TCP trades observability for lower per-message + /// overhead. /// - /// To improve the HTTP performance, send larger buffers (with more rows), and - /// consider parallelizing writes using multiple senders from multiple threads. + /// To improve throughput, send larger buffers (with more rows), and + /// consider parallelizing writes using multiple senders from multiple + /// threads. pub fn flush(&mut self, buf: &mut Buffer) -> crate::Result<()> { self.flush_impl(buf, false)?; buf.clear(); @@ -728,16 +737,17 @@ impl Sender { /// Tell whether the sender is no longer usable and must be dropped. /// - /// Returns `true` after an unrecoverable failure. For ILP-over-TCP this - /// is any socket error. For QWP/WebSocket this also covers a server - /// rejection or protocol violation that latches the publication - /// lifecycle to its terminal state. ILP-over-HTTP and QWP/UDP never - /// transition into a permanently-unusable state and always return - /// `false`. - /// - /// In QWP/WebSocket manual progress mode the answer only refreshes when - /// the user drives the sender (`drive_once` / `flush`), since no - /// background thread is observing the transport. + /// Returns `true` after an unrecoverable failure. For QWP/WebSocket + /// this covers a server rejection (parse error, internal error, + /// security error) or a terminal WebSocket protocol violation that + /// latches the publication lifecycle. For ILP-over-TCP it is any + /// socket error. ILP-over-HTTP and QWP/UDP never transition into a + /// permanently-unusable state and always return `false`. + /// + /// In QWP/WebSocket manual progress mode the answer only refreshes + /// when the user drives the sender (`drive_once` / `flush` / + /// `await_acked_fsn`), since no background thread is observing the + /// transport. #[must_use] pub fn must_close(&self) -> bool { if !self.connected { From dbb982716ec27775a2c458a48f5e6bed8592366c Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 09:40:17 +0200 Subject: [PATCH 02/22] docs(ingress): canonical questdb.com URLs and add Rust-guide CTA Outbound documentation links were pointing at questdb.io/docs/..., which 301-redirects to questdb.com/docs/.... One reference also targeted the C/C++ client documentation page instead of the Rust one. This commit: - Migrates every questdb.io/docs/ URL in mod.md and README.md to questdb.com/docs/ so readers do not pay a redirect hop and the canonical domain is what shows in the address bar. Slack and community/marketing URLs stay on .io (different services). - Retargets the async-error-handling link in mod.md from /docs/connect/clients/c-and-cpp/#asynchronous-error-handling to /docs/connect/clients/rust/#asynchronous-error-handling. The Rust client doc page exists with the same anchor and is the right deep-link from a Rust rustdoc. - Adds a one-line CTA in mod.md immediately after the Hello World, pointing readers at the full Rust client documentation page on questdb.com for the topics that the trimmed rustdoc no longer covers in depth (failover, store-and-forward, durable ACK, troubleshooting). - Adds a parallel CTA paragraph to the README's Docs section so the crates.io landing surface also points at the Rust client guide, in addition to the docs.rs API reference. No code changes; all 50 doctests still pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/README.md | 9 +++++++-- questdb-rs/src/ingress/mod.md | 15 +++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/questdb-rs/README.md b/questdb-rs/README.md index d87b786a..3a678dc9 100644 --- a/questdb-rs/README.md +++ b/questdb-rs/README.md @@ -12,7 +12,7 @@ failover, optional on-disk durability, and a structured error model. Legacy InfluxDB Line Protocol (ILP) transports — over HTTP or TCP — remain supported for backwards compatibility but are not recommended for new code. -* [QuestDB Database docs](https://questdb.io/docs/) +* [QuestDB Database docs](https://questdb.com/docs/) * [`ingress` module docs](https://docs.rs/questdb-rs/6.1.0/questdb/ingress/) — protocol details, configuration parameters, and patterns. @@ -95,12 +95,17 @@ explicitly in the configuration string. ## Docs -The main client documentation is on the +This crate's API reference is on the [`ingress`](https://docs.rs/questdb-rs/6.1.0/questdb/ingress/) module page: configuration keys, the QWP error model, FSN-based completion, progress modes, multi-host failover, store-and-forward, authentication, TLS, the `Buffer` API, and the legacy ILP transports. +For the full Rust client guide — failover, store-and-forward operations, +migration from ILP, worked examples — see the +[Rust client documentation](https://questdb.com/docs/connect/clients/rust/) +on the QuestDB documentation site. + ## Examples A selection of usage examples is available in the diff --git a/questdb-rs/src/ingress/mod.md b/questdb-rs/src/ingress/mod.md index 0579574f..65bce0f7 100644 --- a/questdb-rs/src/ingress/mod.md +++ b/questdb-rs/src/ingress/mod.md @@ -38,6 +38,9 @@ fn main() -> Result<()> { } ``` +For more depth — failover, store-and-forward, durable ACKs, troubleshooting — +see the [Rust client documentation](https://questdb.com/docs/connect/clients/rust/). + # Configuration String A sender is configured with a single string: @@ -58,7 +61,7 @@ ws::addr=node-a:9000,node-b:9000;sf_dir=/var/lib/myapp/qdb-sf;sender_id=ingest-1 For the full key reference (TLS, auth, failover, store-and-forward, durable ACK, progress modes), see the -[connect-string reference](https://questdb.io/docs/connect/clients/connect-string/) +[connect-string reference](https://questdb.com/docs/connect/clients/connect-string/) on the QuestDB documentation site. [`SenderBuilder`] exposes the same options programmatically. @@ -134,7 +137,7 @@ frame, `Halt` latches the sender into a permanently-unusable state ([`must_close()`](Sender::must_close) returns `true`). See [`QwpWsSenderError`] for the diagnostic fields and [`QwpWsErrorCategory`] for the categorisation. The -[QuestDB documentation site](https://questdb.io/docs/connect/clients/c-and-cpp/#asynchronous-error-handling) +[QuestDB documentation site](https://questdb.com/docs/connect/clients/rust/#asynchronous-error-handling) has the full protocol-level error model. # Completion Tracking, Progress Modes, Failover, Durability @@ -169,9 +172,9 @@ QuestDB documentation; the Rust API entry points are summarised here. Enterprise with primary replication) so `acked_fsn` advances only after durable upload to object storage. -See [QuestDB high-availability docs](https://questdb.io/docs/high-availability/overview/) +See [QuestDB high-availability docs](https://questdb.com/docs/high-availability/overview/) for the full failover/SF/durable-ACK story and -[delivery semantics](https://questdb.io/docs/concepts/delivery-semantics/) +[delivery semantics](https://questdb.com/docs/concepts/delivery-semantics/) for the at-least-once/exactly-once model. # Authentication @@ -199,7 +202,7 @@ let _s = Sender::from_conf( **Not supported by this client:** OIDC token acquisition / in-band refresh, mutual TLS (client certificates), token rotation mid-session. QuestDB itself supports OIDC server-side — see -[OpenID Connect](https://questdb.io/docs/security/oidc/); acquire a +[OpenID Connect](https://questdb.com/docs/security/oidc/); acquire a token out-of-band from your IdP, pass it via `token=...`, and rebuild the sender when it nears expiry. mTLS is not negotiated by the QuestDB server regardless of client. @@ -382,7 +385,7 @@ For QWP/WebSocket, drain via [`qwp_ws_error_handler`](SenderBuilder::qwp_ws_error_handler)) to see structured server diagnostics. The -[server log](https://questdb.io/docs/troubleshooting/log/) carries +[server log](https://questdb.com/docs/troubleshooting/log/) carries additional context. To inspect the bytes of an ILP buffer before sending, call From 1d6216bf2dfd0dac8b438b8851f884c938340207 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 11:10:20 +0200 Subject: [PATCH 03/22] docs(readme): drop "Protocol Versions" section MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The section led with "QWP auto-negotiates the protocol version" — i.e. "nothing to configure" — and then spent six lines plus a table walking a new reader through legacy ILP version handshakes that they do not need to think about to ingest their first row. Net effect was to slow down the new-reader path with legacy trivia. The legacy ILP version nuances are still documented in ingress/mod.md under the ## ILP/HTTP and ## ILP/TCP subsections, which is the right context: a reader who is already choosing a legacy transport finds the protocol_version detail where they need it. Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/README.md | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/questdb-rs/README.md b/questdb-rs/README.md index 3a678dc9..d8b733b7 100644 --- a/questdb-rs/README.md +++ b/questdb-rs/README.md @@ -75,24 +75,6 @@ log. `close_drain` waits for already-published frames to be acknowledged by the server (bounded by `close_flush_timeout_millis`, default 5s) before the sender is dropped. -## Protocol Versions - -QWP/WebSocket negotiates its protocol version during the WebSocket upgrade; -the client requires no `protocol_version` configuration. Arrays -(`column_arr`) and decimals (`column_dec`) work natively over QWP. - -The ILP transports use a separate, ILP-specific protocol-version mechanism -that gates array and decimal ingestion. Over ILP/HTTP, `protocol_version` -defaults to `auto` (server-negotiated). Over ILP/TCP, the default is `1` -and you must set `protocol_version=2` (arrays) or `=3` (decimals) -explicitly in the configuration string. - -| Version | Description | Server Compatibility | -| ------- | ---------------------------------------------------------- | -------------------- | -| **1** | Compatible InfluxDB Line Protocol over ILP transports | All QuestDB versions | -| **2** | 64-bit floats as binary, n-dimensional arrays | 9.0.0+ | -| **3** | Adds DECIMAL64/DECIMAL128/DECIMAL256 | 9.2.0+ | - ## Docs This crate's API reference is on the From a327397f1a373cb1dd8120ea60e119d9c582aa63 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 11:28:37 +0200 Subject: [PATCH 04/22] docs(readme): trim Examples section to QWP-only references MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Examples table listed eight files but only one of them (qwp_ws_unified_sfa_bench.rs) actually exercised QWP — the other seven were ILP-based and being advertised on a page that otherwise demotes ILP as legacy. Trimmed to the single QWP example so the section is consistent with the rest of the README. The legacy ILP examples are still in the questdb-rs/examples/ directory for readers who want to look; they are simply no longer advertised on the crates.io landing surface. Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/README.md | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/questdb-rs/README.md b/questdb-rs/README.md index d8b733b7..8d554281 100644 --- a/questdb-rs/README.md +++ b/questdb-rs/README.md @@ -90,19 +90,8 @@ on the QuestDB documentation site. ## Examples -A selection of usage examples is available in the -[examples directory](https://github.com/questdb/c-questdb-client/tree/6.1.0/questdb-rs/examples): - -| Example | Description | -|---------|-------------| -| [`basic.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/basic.rs) | Minimal ILP/TCP ingestion (legacy transport). | -| [`auth.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/auth.rs) | ILP/TCP with ECDSA authentication. | -| [`auth_tls.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/auth_tls.rs) | ILP/TCP with TLS plus ECDSA auth. | -| [`http.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/http.rs) | ILP/HTTP transport with array ingestion. | -| [`from_conf.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/from_conf.rs) | Configures a sender from a connection string. | -| [`from_env.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/from_env.rs) | Reads configuration from `QDB_CLIENT_CONF`. | -| [`protocol_version.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/protocol_version.rs) | ILP protocol-version selection. | -| [`qwp_ws_unified_sfa_bench.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_unified_sfa_bench.rs) | QWP/WebSocket throughput benchmark with store-and-forward. | +QWP/WebSocket throughput benchmark with store-and-forward: +[`qwp_ws_unified_sfa_bench.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_unified_sfa_bench.rs). ## Crate features From e3f5087ade2b6575cd4f7e5167b39aac88b3a5e3 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 11:31:50 +0200 Subject: [PATCH 05/22] examples(qwp-ws): add basic, failover, and error-handling examples The Examples table on the crates.io landing surface only had one QWP entry (a throughput benchmark), which is too specialised to serve as the first example a new reader copies. This commit adds three more QWP/WebSocket examples and lists them in the README. examples/qwp_ws_basic.rs -- minimal ingest: from_conf, one row, flush, close_drain. Mirrors examples/basic.rs. examples/qwp_ws_failover.rs -- multi-host addr= list with sf_dir store-and-forward and sender_id; per-iteration flush with must_close check. examples/qwp_ws_error_handling.rs -- both async-error styles: poll_qwp_ws_error loop and SenderBuilder::qwp_ws_error_handler callback. Includes a qwp_ws_errors_dropped check. Each example is added to Cargo.toml with `required-features = ["sync-sender-qwp-ws"]` matching the convention already used by `qwp_ws_unified_sfa_bench`. README's Examples section restructured as a four-row table that lists all four QWP examples and shows the cargo run incantation, replacing the single-link sentence shape from the previous commit. Validation: `cargo build --examples` builds all four QWP examples cleanly (plus the existing ILP-feature-gated ones). Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/Cargo.toml | 12 +++ questdb-rs/README.md | 17 +++- questdb-rs/examples/qwp_ws_basic.rs | 49 +++++++++++ questdb-rs/examples/qwp_ws_error_handling.rs | 85 ++++++++++++++++++++ questdb-rs/examples/qwp_ws_failover.rs | 72 +++++++++++++++++ 5 files changed, 233 insertions(+), 2 deletions(-) create mode 100644 questdb-rs/examples/qwp_ws_basic.rs create mode 100644 questdb-rs/examples/qwp_ws_error_handling.rs create mode 100644 questdb-rs/examples/qwp_ws_failover.rs diff --git a/questdb-rs/Cargo.toml b/questdb-rs/Cargo.toml index f0885689..c5b9bd42 100644 --- a/questdb-rs/Cargo.toml +++ b/questdb-rs/Cargo.toml @@ -178,3 +178,15 @@ required-features = ["sync-sender-http", "ndarray", "bigdecimal"] [[example]] name = "qwp_ws_unified_sfa_bench" required-features = ["sync-sender-qwp-ws"] + +[[example]] +name = "qwp_ws_basic" +required-features = ["sync-sender-qwp-ws"] + +[[example]] +name = "qwp_ws_failover" +required-features = ["sync-sender-qwp-ws"] + +[[example]] +name = "qwp_ws_error_handling" +required-features = ["sync-sender-qwp-ws"] diff --git a/questdb-rs/README.md b/questdb-rs/README.md index 8d554281..7585cad9 100644 --- a/questdb-rs/README.md +++ b/questdb-rs/README.md @@ -90,8 +90,21 @@ on the QuestDB documentation site. ## Examples -QWP/WebSocket throughput benchmark with store-and-forward: -[`qwp_ws_unified_sfa_bench.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_unified_sfa_bench.rs). +QWP/WebSocket examples in the +[examples directory](https://github.com/questdb/c-questdb-client/tree/6.1.0/questdb-rs/examples): + +| Example | Description | +|---------|-------------| +| [`qwp_ws_basic.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_basic.rs) | Minimal QWP/WebSocket ingestion: build a sender, flush a row, `close_drain`. | +| [`qwp_ws_failover.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_failover.rs) | Multi-host `addr=` list with on-disk store-and-forward and `sender_id`. | +| [`qwp_ws_error_handling.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_error_handling.rs) | Server-error handling via `poll_qwp_ws_error` and via the `qwp_ws_error_handler` callback. | +| [`qwp_ws_unified_sfa_bench.rs`](https://github.com/questdb/c-questdb-client/blob/6.1.0/questdb-rs/examples/qwp_ws_unified_sfa_bench.rs) | Throughput benchmark with store-and-forward. | + +Build and run any of these with, for example: + +```sh +cargo run --example qwp_ws_basic --features sync-sender-qwp-ws +``` ## Crate features diff --git a/questdb-rs/examples/qwp_ws_basic.rs b/questdb-rs/examples/qwp_ws_basic.rs new file mode 100644 index 00000000..e300573e --- /dev/null +++ b/questdb-rs/examples/qwp_ws_basic.rs @@ -0,0 +1,49 @@ +//! Minimal QWP/WebSocket ingestion. +//! +//! Publishes one trades row to a QuestDB server reachable on +//! `ws::addr=localhost:9000;` (override host and port via command-line +//! arguments) and waits for in-flight frames to be acknowledged before +//! the sender is dropped. +//! +//! Run with: +//! +//! ```sh +//! cargo run --example qwp_ws_basic --features sync-sender-qwp-ws +//! cargo run --example qwp_ws_basic --features sync-sender-qwp-ws -- db.example.com 9000 +//! ``` + +use questdb::{ + Result, + ingress::{Sender, TimestampNanos}, +}; + +fn main() -> Result<()> { + let host = std::env::args() + .nth(1) + .unwrap_or_else(|| "localhost".to_string()); + let port = std::env::args().nth(2).unwrap_or_else(|| "9000".to_string()); + let conf = format!("ws::addr={host}:{port};"); + + let mut sender = Sender::from_conf(conf.as_str())?; + let mut buffer = sender.new_buffer(); + + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .symbol("side", "sell")? + .column_f64("price", 2615.54)? + .column_f64("amount", 0.00044)? + .at(TimestampNanos::now())?; + + sender.flush(&mut buffer)?; + + // close_drain stops accepting new publications and waits up to + // close_flush_timeout_millis (default 5000) for already-published + // frames to be acknowledged by the server. Skip it (or rely on + // Sender::drop) only if delivery is not delivery-sensitive -- + // without close_drain, in-flight frames may not reach the server + // and any delivery failure is silent. + sender.close_drain()?; + + Ok(()) +} diff --git a/questdb-rs/examples/qwp_ws_error_handling.rs b/questdb-rs/examples/qwp_ws_error_handling.rs new file mode 100644 index 00000000..52216b9e --- /dev/null +++ b/questdb-rs/examples/qwp_ws_error_handling.rs @@ -0,0 +1,85 @@ +//! QWP/WebSocket asynchronous error handling. +//! +//! Server errors (schema mismatch, parse error, security error, etc.) +//! arrive asynchronously after `flush` has returned. There are two +//! ways to observe them: +//! +//! - Poll for them with `Sender::poll_qwp_ws_error`. The diagnostic +//! log is bounded; if the caller polls too slowly, older +//! diagnostics are dropped and the dropped count is available via +//! `Sender::qwp_ws_errors_dropped`. +//! - Install a callback at construction time via +//! `SenderBuilder::qwp_ws_error_handler`. The callback runs +//! synchronously from sender API calls such as `flush` and must +//! not call methods on the same sender. +//! +//! This example shows both styles back-to-back. +//! +//! Run with: +//! +//! ```sh +//! cargo run --example qwp_ws_error_handling --features sync-sender-qwp-ws +//! ``` + +use questdb::{ + Result, + ingress::{Sender, SenderBuilder, TimestampNanos}, +}; + +fn polling_style() -> Result<()> { + let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; + let mut buffer = sender.new_buffer(); + + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .column_f64("price", 2615.54)? + .at(TimestampNanos::now())?; + sender.flush(&mut buffer)?; + + // Drain server-side diagnostics queued since the previous poll. + while let Some(err) = sender.poll_qwp_ws_error()? { + eprintln!( + "qwp error (poll): category={:?} policy={:?} fsn=[{}..={}] msg={:?}", + err.category, err.applied_policy, err.from_fsn, err.to_fsn, err.message + ); + } + + let dropped = sender.qwp_ws_errors_dropped()?; + if dropped > 0 { + eprintln!("note: {dropped} diagnostic(s) were dropped from the log"); + } + + sender.close_drain()?; + Ok(()) +} + +fn callback_style() -> Result<()> { + let mut sender = SenderBuilder::from_conf("ws::addr=localhost:9000;")? + .qwp_ws_error_handler(|err| { + eprintln!( + "qwp error (callback): category={:?} policy={:?} fsn=[{}..={}] msg={:?}", + err.category, err.applied_policy, err.from_fsn, err.to_fsn, err.message + ); + })? + .build()?; + + let mut buffer = sender.new_buffer(); + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .column_f64("price", 2615.54)? + .at(TimestampNanos::now())?; + sender.flush(&mut buffer)?; + + sender.close_drain()?; + Ok(()) +} + +fn main() -> Result<()> { + println!("polling style:"); + polling_style()?; + println!("\ncallback style:"); + callback_style()?; + Ok(()) +} diff --git a/questdb-rs/examples/qwp_ws_failover.rs b/questdb-rs/examples/qwp_ws_failover.rs new file mode 100644 index 00000000..cd400ada --- /dev/null +++ b/questdb-rs/examples/qwp_ws_failover.rs @@ -0,0 +1,72 @@ +//! QWP/WebSocket multi-host failover with store-and-forward. +//! +//! Demonstrates a production-style connect string with: +//! +//! - Multiple `addr=` entries (the driver walks the list to find a +//! healthy peer when the current connection breaks). +//! - `sf_dir=...` so unacknowledged frames spill to disk and are +//! replayed after reconnects and process restarts. Replay is +//! at-least-once, so target tables should declare +//! `DEDUP UPSERT KEYS(...)`. +//! - `sender_id=...` identifying this sender's slot under `sf_dir`. +//! Use a distinct id per sender process when several share the +//! same directory. +//! - `reconnect_max_duration_millis=...` to bound the per-outage +//! reconnect budget. +//! +//! Replace the address list with hosts on your network and ensure the +//! `sf_dir` path is writable. +//! +//! Run with: +//! +//! ```sh +//! cargo run --example qwp_ws_failover --features sync-sender-qwp-ws +//! ``` + +use std::thread; +use std::time::Duration; + +use questdb::{ + Result, + ingress::{Sender, TimestampNanos}, +}; + +fn main() -> Result<()> { + let conf = "ws::addr=db-primary:9000,db-replica-1:9000,db-replica-2:9000;\ + sf_dir=/tmp/myapp-qdb-sf;\ + sender_id=ingest-1;\ + reconnect_max_duration_millis=300000;"; + + let mut sender = Sender::from_conf(conf)?; + let mut buffer = sender.new_buffer(); + + // Publish ten rows, one per second. In a real ingest service this + // loop would be driven by your data source instead. + for _ in 0..10 { + buffer + .table("trades")? + .symbol("symbol", "ETH-USD")? + .symbol("side", "sell")? + .column_f64("price", 2615.54)? + .column_f64("amount", 0.00044)? + .at(TimestampNanos::now())?; + + // On flush failure, the buffer is retained so the next + // iteration retries the same payload. must_close() reports + // whether the sender has latched into a terminal state and + // must be dropped and recreated (e.g. auth failure, reconnect + // budget exhausted). + if let Err(e) = sender.flush(&mut buffer) { + eprintln!("flush error: {e}"); + if sender.must_close() { + eprintln!("sender is terminal; aborting"); + return Err(e); + } + } + + thread::sleep(Duration::from_secs(1)); + } + + sender.close_drain()?; + Ok(()) +} From 22b946ae26a9d5a1d73a43d643c44c73be03e1a6 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 11:59:19 +0200 Subject: [PATCH 06/22] docs(readme): surface the Rust client guide on the index page The crate index page rendered from README.md (file:///.../target/doc/ questdb/index.html, docs.rs/questdb-rs//questdb/) led with a generic "QuestDB Database docs" bullet and the rustdoc itself. There was no prominent index-page link to the Rust-specific guide on questdb.com/docs/connect/clients/rust/ -- it was only reachable from the Docs section further down. Promote the Rust client documentation page to the first top-of-page bullet so a reader landing on docs.rs gets one click to the canonical Rust client guide. The rustdoc API reference moves to second, and the generic database docs to third. Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/questdb-rs/README.md b/questdb-rs/README.md index 7585cad9..25dfca97 100644 --- a/questdb-rs/README.md +++ b/questdb-rs/README.md @@ -12,9 +12,13 @@ failover, optional on-disk durability, and a structured error model. Legacy InfluxDB Line Protocol (ILP) transports — over HTTP or TCP — remain supported for backwards compatibility but are not recommended for new code. -* [QuestDB Database docs](https://questdb.com/docs/) +* [Rust client documentation](https://questdb.com/docs/connect/clients/rust/) — + the full guide on the QuestDB documentation site. * [`ingress` module docs](https://docs.rs/questdb-rs/6.1.0/questdb/ingress/) — - protocol details, configuration parameters, and patterns. + this crate's API reference: protocol details, configuration parameters, + and patterns. +* [QuestDB Database docs](https://questdb.com/docs/) — the wider + documentation site (SQL reference, deployment, operations). ## Transports From 2fddad7f4d3209670bd417e18fd204dc9a1ab2ac Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 12:08:19 +0200 Subject: [PATCH 07/22] docs(ingress): drop the intro ILP framing A QWP-first reader hitting either the rustdoc landing page or the ingress module page saw an "InfluxDB Line Protocol transports remain supported but are not recommended" sentence within the first 15 lines, before any QWP example had been shown. The Legacy ILP Transports section in mod.md (and the Transports section in README.md) already carries that information in its proper context. Dropping the intro mention keeps the first impression on QWP. The mid-page "to inspect ILP buffer bytes, call buffer.as_bytes()" paragraph that sat in the QWP Troubleshooting section is also removed; it interrupted the QWP narrative with an ILP-specific mechanic. ILP buffer inspection remains discoverable from the Buffer::as_bytes rustdoc. Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/README.md | 13 +++++-------- questdb-rs/src/ingress/mod.md | 18 ++++-------------- 2 files changed, 9 insertions(+), 22 deletions(-) diff --git a/questdb-rs/README.md b/questdb-rs/README.md index 25dfca97..d18ea39a 100644 --- a/questdb-rs/README.md +++ b/questdb-rs/README.md @@ -3,14 +3,11 @@ Official Rust client for [QuestDB](https://questdb.io/), an open-source SQL database designed to process time-series data, faster. -The client library is designed for fast ingestion of data into QuestDB. The -recommended transport is the -**QuestDB Wire Protocol over WebSocket (QWP/WebSocket)**: a columnar binary -protocol with explicit asynchronous server acknowledgements, multi-host -failover, optional on-disk durability, and a structured error model. - -Legacy InfluxDB Line Protocol (ILP) transports — over HTTP or TCP — remain -supported for backwards compatibility but are not recommended for new code. +The client library is designed for fast ingestion of data into QuestDB +over the **QuestDB Wire Protocol over WebSocket (QWP/WebSocket)**: a +columnar binary protocol with explicit asynchronous server +acknowledgements, multi-host failover, optional on-disk durability, and a +structured error model. * [Rust client documentation](https://questdb.com/docs/connect/clients/rust/) — the full guide on the QuestDB documentation site. diff --git a/questdb-rs/src/ingress/mod.md b/questdb-rs/src/ingress/mod.md index 65bce0f7..461d46c7 100644 --- a/questdb-rs/src/ingress/mod.md +++ b/questdb-rs/src/ingress/mod.md @@ -1,14 +1,9 @@ # Fast Ingestion of Data into QuestDB -The `ingress` module sends rows of time-series data to a QuestDB server. The -recommended transport is the -**QuestDB Wire Protocol over WebSocket (QWP/WebSocket)**: a columnar binary -protocol with asynchronous server acknowledgements, multi-host failover, and -optional on-disk durability. - -Legacy InfluxDB Line Protocol (ILP) transports — over HTTP and TCP — remain -supported but are not recommended for new code; see -[Legacy ILP Transports](#legacy-ilp-transports) below. +The `ingress` module sends rows of time-series data to a QuestDB server over +the **QuestDB Wire Protocol over WebSocket (QWP/WebSocket)**: a columnar +binary protocol with asynchronous server acknowledgements, multi-host +failover, and optional on-disk durability. To get started: @@ -388,11 +383,6 @@ structured server diagnostics. The [server log](https://questdb.com/docs/troubleshooting/log/) carries additional context. -To inspect the bytes of an ILP buffer before sending, call -[`buffer.as_bytes()`](Buffer::as_bytes). QWP buffers are encoded into -frames during [`flush`](Sender::flush) and `as_bytes()` is not useful -there. - # Legacy ILP Transports > **Legacy.** The ILP transports (`http`, `https`, `tcp`, `tcps`) remain From 3fbdb3f72d05bf20fafe0ce78d7b33adb6e9246a Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 12:12:54 +0200 Subject: [PATCH 08/22] docs(ingress): trim SenderBuilder /// docs; link out for depth The SenderBuilder struct doc and several of its per-method comments still framed ILP as the recommended path, listed only ILP examples, or omitted QWP/WebSocket from per-transport descriptions: - SenderBuilder struct doc: three doctests all using https::/Protocol::Http; no ws:: example. Trimmed to one ws:: doctest plus a link to questdb.com/docs/connect/clients/rust/ for the full configuration reference. - SenderBuilder::from_conf: the "We recommend HTTP" paragraph and the enumeration of QWP-only connect-string keys collapsed into a short paragraph linking to the connect-string reference on questdb.com. - SenderBuilder::new: dropped the "using ILP over the specified protocol" claim (wrong since Protocol::QwpWs/Wss/Udp exist) and the feature-gated ILP-only example. - SenderBuilder::username/password/token: rewrote to name QWP/WebSocket and ILP/HTTP for basic and bearer-token auth (both transports accept these per accepts_http_auth in ingress.rs:316), with ECDSA framing scoped to ILP/TCP. - SenderBuilder::protocol_version: clarified that this knob applies only to ILP transports; QWP transports negotiate during handshake and ignore the setting. - SenderBuilder::max_name_len, Sender::max_name_len: clarified that the server-side pickup is ILP/HTTP-with-V2-specific; other transports use the configured/default value. - SenderBuilder::build: noted that QWP/WebSocket likewise establishes its connection synchronously, matching the existing TCP note. - Sender::flush_and_keep: now cross-references Sender::flush for the per-transport semantics (QWP/WebSocket local-publish, HTTP retry, TCP fire-and-forget) instead of saying nothing. The general principle for this pass: keep the docstring short, name the transports involved if they materially differ, and link to the QuestDB documentation site for the protocol-level depth. cargo test --doc passes (47 doctests; the previous 50 included three SenderBuilder struct-doc examples that have been collapsed). Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/src/ingress.rs | 170 +++++++++++-------------------- questdb-rs/src/ingress/sender.rs | 17 ++-- 2 files changed, 66 insertions(+), 121 deletions(-) diff --git a/questdb-rs/src/ingress.rs b/questdb-rs/src/ingress.rs index 34f780fa..5f97ce63 100644 --- a/questdb-rs/src/ingress.rs +++ b/questdb-rs/src/ingress.rs @@ -523,51 +523,26 @@ fn parse_qwp_ws_endpoints( Ok(endpoints) } -/// Accumulates parameters for a new `Sender` instance. +/// Accumulates parameters for a new `Sender`. /// -/// You can also create the builder from a config string. +/// Most callers should use [`SenderBuilder::from_conf`] with a connect +/// string; the per-key methods on this builder are for callers that +/// need to configure programmatically. /// /// ```no_run /// # use questdb::Result; /// use questdb::ingress::SenderBuilder; -/// /// # fn main() -> Result<()> { -/// let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?; -/// # Ok(()) +/// # #[cfg(feature = "sync-sender-qwp-ws")] { +/// let _sender = SenderBuilder::from_conf("ws::addr=localhost:9000;")?.build()?; /// # } -/// ``` -/// -/// Or create it from the `QDB_CLIENT_CONF` environment variable. -/// -/// ```no_run -/// # use questdb::Result; -/// use questdb::ingress::SenderBuilder; -/// -/// # fn main() -> Result<()> { -/// // export QDB_CLIENT_CONF="https::addr=localhost:9000;" -/// let mut sender = SenderBuilder::from_env()?.build()?; /// # Ok(()) /// # } /// ``` /// -/// The `SenderBuilder` can also be built programmatically. -/// The minimum required parameters are the protocol, host, and port. -/// -/// ```no_run -/// # use questdb::Result; -/// use questdb::ingress::SenderBuilder; -/// use questdb::ingress::Protocol; -/// -/// # fn main() -> Result<()> { -/// # #[cfg(feature = "sync-sender-http")] { -/// let mut sender = SenderBuilder::new(Protocol::Http, "localhost", 9000).build()?; -/// # } -/// # #[cfg(all(not(feature = "sync-sender-http"), feature = "sync-sender-tcp"))] { -/// let mut sender = SenderBuilder::new(Protocol::Tcp, "localhost", 9009).build()?; -/// # } -/// # Ok(()) -/// # } -/// ``` +/// See the +/// [Rust client guide](https://questdb.com/docs/connect/clients/rust/) +/// for the full configuration reference. #[derive(Debug, Clone)] pub struct SenderBuilder { protocol: Protocol, @@ -609,44 +584,26 @@ pub struct SenderBuilder { } impl SenderBuilder { - /// Create a new `SenderBuilder` instance from the configuration string. - /// - /// The format of the string is: `"http::addr=host:port;key=value;...;"`. + /// Build a `SenderBuilder` from a connect string. /// - /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, `"tcps"`, - /// `"qwpudp"`, `"qwpws"`, `"qwpwss"`, and the QWP/WebSocket aliases - /// `"ws"` / `"wss"` when the corresponding sender features are enabled. + /// Connect strings have the form + /// `"::addr=host:port;key=value;...;"`. The recommended + /// transport is QWP/WebSocket (`ws::` / `wss::`); ILP/HTTP + /// (`http::` / `https::`), ILP/TCP (`tcp::` / `tcps::`), and QWP/UDP + /// (`qwpudp::`) are also accepted when their Cargo features are + /// enabled. /// - /// We recommend HTTP for most cases because it provides more features, like - /// reporting errors to the client and supporting transaction control. TCP can - /// sometimes be faster in higher-latency networks, but misses a number of - /// features. + /// See the + /// [connect-string reference](https://questdb.com/docs/connect/clients/connect-string/) + /// for the full key list. Some QWP-specific keys (e.g. `sf_dir`, + /// `sender_id`, `in_flight_window`) are accepted only through the + /// connect string and not through a builder method. /// - /// Many accepted keys match one-for-one with the methods on `SenderBuilder`. - /// For example, this is a valid configuration string: + /// You can also load the configuration from an environment variable. + /// See [`SenderBuilder::from_env`]. /// - /// "https::addr=host:port;username=alice;password=secret;" - /// - /// and there are matching methods [SenderBuilder::username] and - /// [SenderBuilder::password]. The value of `addr=` is supplied directly to the - /// `SenderBuilder` constructor, so there's no matching method for that. - /// - /// Some QWP/WebSocket configuration keys are accepted only through the - /// configuration string, primarily for compatibility with Java-style - /// configuration names and settings without a public Rust builder method. - /// These include `in_flight_window`, `sf_dir`, `sender_id`, `sf_max_bytes`, - /// `sf_max_total_bytes`, `sf_durability`, `sf_append_deadline_millis`, - /// `auth_timeout_ms`, `close_flush_timeout_millis`, `request_durable_ack`, - /// `durable_ack_keepalive_interval_millis`, `drain_orphans`, - /// `max_background_drainers`, `error_inbox_capacity`, and - /// `max_schemas_per_connection`. - /// - /// You can also load the configuration from an environment variable. See - /// [`SenderBuilder::from_env`]. - /// - /// Once you have a `SenderBuilder` instance, you can further customize it - /// before calling [`SenderBuilder::build`], but you can't change any settings - /// that are already set in the config string. + /// Settings already supplied via the connect string cannot be + /// overridden by subsequent builder calls. pub fn from_conf>(conf: T) -> Result { let conf = conf.as_ref(); #[cfg(feature = "_sender-qwp-ws")] @@ -952,25 +909,11 @@ impl SenderBuilder { Self::from_conf(conf) } - /// Create a new `SenderBuilder` instance with the provided QuestDB - /// server and port, using ILP over the specified protocol. + /// Create a `SenderBuilder` with the given transport, host, and + /// port. /// - /// ```no_run - /// # use questdb::Result; - /// use questdb::ingress::{Protocol, SenderBuilder}; - /// - /// # fn main() -> Result<()> { - /// # #[cfg(feature = "sync-sender-tcp")] { - /// let mut sender = SenderBuilder::new( - /// Protocol::Tcp, "localhost", 9009).build()?; - /// # } - /// # #[cfg(all(not(feature = "sync-sender-tcp"), feature = "sync-sender-http"))] { - /// let mut sender = SenderBuilder::new( - /// Protocol::Http, "localhost", 9000).build()?; - /// # } - /// # Ok(()) - /// # } - /// ``` + /// Most callers should use [`SenderBuilder::from_conf`] or + /// [`Sender::from_conf`] instead. pub fn new, P: Into>(protocol: Protocol, host: H, port: P) -> Self { let host = host.into(); let port: Port = port.into(); @@ -1079,12 +1022,11 @@ impl SenderBuilder { /// Set the username for authentication. /// - /// For TCP, this is the `kid` part of the ECDSA key set. - /// The other fields are [`token`](SenderBuilder::token), [`token_x`](SenderBuilder::token_x), - /// and [`token_y`](SenderBuilder::token_y). - /// - /// For HTTP, this is a part of basic authentication. - /// See also: [`password`](SenderBuilder::password). + /// For QWP/WebSocket and ILP/HTTP this is the basic-auth username; + /// pair with [`password`](SenderBuilder::password). For ILP/TCP this + /// is the `kid` part of the ECDSA key set; pair with + /// [`token`](SenderBuilder::token) / [`token_x`](SenderBuilder::token_x) + /// / [`token_y`](SenderBuilder::token_y). pub fn username(mut self, username: &str) -> Result { #[cfg(feature = "_sender-qwp-udp")] self.reject_if_qwp_udp("username")?; @@ -1093,8 +1035,8 @@ impl SenderBuilder { Ok(self) } - /// Set the password for basic HTTP authentication. - /// See also: [`username`](SenderBuilder::username). + /// Set the password for basic authentication (QWP/WebSocket or + /// ILP/HTTP). See also: [`username`](SenderBuilder::username). pub fn password(mut self, password: &str) -> Result { #[cfg(feature = "_sender-qwp-udp")] self.reject_if_qwp_udp("password")?; @@ -1103,8 +1045,11 @@ impl SenderBuilder { Ok(self) } - /// Set the Token (Bearer) Authentication parameter for HTTP, - /// or the ECDSA private key for TCP authentication. + /// Set the bearer-token authentication value for QWP/WebSocket or + /// ILP/HTTP. On ILP/TCP this is the ECDSA private key (the `d` + /// component), used with [`username`](SenderBuilder::username) / + /// [`token_x`](SenderBuilder::token_x) / + /// [`token_y`](SenderBuilder::token_y). pub fn token(mut self, token: &str) -> Result { #[cfg(feature = "_sender-qwp-udp")] self.reject_if_qwp_udp("token")?; @@ -1159,15 +1104,14 @@ impl SenderBuilder { } } - /// Sets the protocol version for ILP transports. - /// - HTTP transport automatically negotiates the protocol version by default(unset, **Strong Recommended**). - /// You can explicitly configure the protocol version to avoid the slight latency cost at connection time. - /// - TCP transport does not negotiate the protocol version and uses [`ProtocolVersion::V1`] by - /// default. You must explicitly set [`ProtocolVersion::V2`] in order to ingest - /// arrays. - /// - QWP/UDP does not support explicit `protocol_version` configuration. + /// Set the ILP protocol version. /// - /// **Note**: QuestDB server version 9.0.0 or later is required for [`ProtocolVersion::V2`] support. + /// Applies only to the legacy ILP transports. ILP/HTTP auto-negotiates + /// by default (leave unset). ILP/TCP defaults to + /// [`ProtocolVersion::V1`]; set [`ProtocolVersion::V2`] for arrays + /// (server 9.0.0+) or [`ProtocolVersion::V3`] for decimals + /// (server 9.2.0+). QWP transports negotiate during handshake and do + /// not honour this setting. pub fn protocol_version(mut self, protocol_version: ProtocolVersion) -> Result { #[cfg(feature = "_sender-qwp-udp")] self.reject_if_qwp_udp("protocol_version")?; @@ -1779,11 +1723,12 @@ impl SenderBuilder { Ok(self) } - /// The maximum length of a table or column name in bytes. - /// Matches the `cairo.max.file.name.length` setting in the server. - /// The default is 127 bytes. - /// If running over HTTP and protocol version 2 is auto-negotiated, this - /// value is picked up from the server. + /// Maximum length of a table or column name in bytes. + /// + /// Matches the server's `cairo.max.file.name.length` setting; the + /// default is 127 bytes. ILP/HTTP with auto-negotiated protocol + /// version 2 picks this up from the server; other transports use the + /// configured (or default) value. pub fn max_name_len(mut self, value: usize) -> Result { if value < 16 { return Err(error::fmt!( @@ -1979,10 +1924,9 @@ impl SenderBuilder { #[cfg(feature = "_sync-sender")] /// Build the sender. /// - /// In the case of TCP, this synchronously establishes the TCP connection, and - /// returns once the connection is fully established. If the connection - /// requires authentication or TLS, these will also be completed before - /// returning. + /// For ILP/TCP and QWP/WebSocket this synchronously establishes the + /// connection (including any authentication and TLS handshake) and + /// returns once it is fully established. pub fn build(&self) -> Result { let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port); diff --git a/questdb-rs/src/ingress/sender.rs b/questdb-rs/src/ingress/sender.rs index bcec96e1..c1d29dbd 100644 --- a/questdb-rs/src/ingress/sender.rs +++ b/questdb-rs/src/ingress/sender.rs @@ -464,9 +464,12 @@ impl Sender { /// Send the given buffer of rows to the QuestDB server. /// - /// All the data stays in the buffer. Clear the buffer before starting a new batch. + /// All the data stays in the buffer. Clear the buffer before + /// starting a new batch. /// - /// To send and clear in one step, call [Sender::flush] instead. + /// Same transport-specific semantics as [`Sender::flush`]; see its + /// docs for the QWP/WebSocket, ILP/HTTP, and ILP/TCP behaviour. To + /// send and clear in one step, call [`Sender::flush`] instead. pub fn flush_and_keep(&mut self, buf: &Buffer) -> Result<()> { self.flush_impl(buf, false) } @@ -778,12 +781,10 @@ impl Sender { self.protocol_version } - /// Return the sender's maxinum name length of any column or table name. - /// This is either set explicitly when constructing the sender, - /// or the default value of 127. - /// When unset and using protocol version 2 over HTTP, the value is read - /// from the server from the `cairo.max.file.name.length` setting in - /// `server.conf` which defaults to 127. + /// Return the sender's maximum name length for any column or table + /// name. Set explicitly via [`SenderBuilder::max_name_len`], or the + /// default 127 bytes (or whatever the server advertises on ILP/HTTP + /// with auto-negotiated protocol version 2). pub fn max_name_len(&self) -> usize { self.max_name_len } From 54a63dbd553d60b60d3896f26d61ea436b58c6fe Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 12:15:05 +0200 Subject: [PATCH 09/22] docs(ingress): rewrite Buffer dispatch comments to cover QWP/WebSocket Six per-method comments on Buffer either omitted the QWP/WebSocket branch entirely or led with ILP. Two were factually wrong by omission: - Buffer::transactional claimed only "QWP/UDP" returns false; the QWP/WS branch also returns false unconditionally. - Buffer::as_bytes said "QWP/UDP returns an empty slice"; QWP/WS does the same. The other four (Buffer::protocol_version, Buffer::reserve, Buffer::len, Buffer::capacity) used "For ILP ... For QWP/UDP ..." wording with QWP/WebSocket either missing or trailing. Standardised on "QWP buffers (UDP and WebSocket)" where the two QWP branches share semantics; this is the actual code shape (both Qwp(_) and QwpWs(_) match arms return the same values for these methods). Where the WS branch genuinely diverges, the divergence is now stated explicitly. Also fixed the misleading "QWP datagram version" phrasing on Buffer::protocol_version (WebSocket frames are not datagrams) to "QWP wire-format version". cargo test --doc still passes (47 / 47). Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/src/ingress/buffer.rs | 51 ++++++++++++++++---------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/questdb-rs/src/ingress/buffer.rs b/questdb-rs/src/ingress/buffer.rs index 1173d315..ffeebbd8 100644 --- a/questdb-rs/src/ingress/buffer.rs +++ b/questdb-rs/src/ingress/buffer.rs @@ -452,10 +452,10 @@ impl Buffer { /// Returns the protocol version associated with this buffer. /// - /// For ILP buffers this is the ILP protocol version. For QWP/UDP buffers - /// this is the QWP datagram version, currently represented as - /// [`ProtocolVersion::V1`]. Interpret the value together with the buffer - /// transport; do not use it by itself for ILP feature gating. + /// For ILP buffers this is the ILP protocol version. For QWP buffers + /// (UDP and WebSocket) this is the QWP wire-format version, currently + /// [`ProtocolVersion::V1`]. Interpret the value together with the + /// buffer transport; do not use it by itself for ILP feature gating. pub fn protocol_version(&self) -> ProtocolVersion { match &self.inner { BufferInner::Ilp(inner) => inner.protocol_version(), @@ -466,13 +466,12 @@ impl Buffer { } } - /// Reserves capacity associated with `additional` more bytes of buffered data. + /// Reserve capacity for `additional` more bytes of buffered data. /// /// For ILP buffers this reserves exact serialized-byte capacity. For - /// QWP/UDP buffers this is a heuristic prewarm of the internal arenas and - /// planner scratch used during datagram planning and encoding; it is not an - /// exact guarantee that [`Buffer::len`] can grow by `additional` bytes - /// without further allocation. + /// QWP buffers (UDP and WebSocket) this is a heuristic prewarm of the + /// internal arenas, not an exact guarantee that [`Buffer::len`] can + /// grow by `additional` bytes without further allocation. pub fn reserve(&mut self, additional: usize) { match &mut self.inner { BufferInner::Ilp(inner) => inner.reserve(additional), @@ -483,12 +482,12 @@ impl Buffer { } } - /// Returns the current buffered size. + /// Return the current buffered size. /// - /// For ILP buffers this is the exact serialized byte count. For QWP/UDP - /// buffers this is the size hint used for flush planning. For QWP/WebSocket - /// buffers this is only a local size hint; the sender enforces - /// `max_buf_size` against the encoded replay message. + /// On ILP buffers this is the exact serialized byte count. On QWP + /// buffers (UDP and WebSocket) it is a local size hint; the actual + /// flushed payload may differ once the buffer is encoded into + /// datagrams or frames. pub fn len(&self) -> usize { match &self.inner { BufferInner::Ilp(inner) => inner.len(), @@ -513,11 +512,12 @@ impl Buffer { } } - /// Returns whether the buffered batch is transactional. + /// Return whether the buffered batch is transactional. /// - /// For ILP buffers this is `true` only while the buffer contains rows for - /// at most one table. QWP/UDP does not support transactional flushes, so - /// QWP buffers always return `false`. + /// For ILP buffers this is `true` only while the buffer contains + /// rows for at most one table. QWP buffers (UDP and WebSocket) do + /// not expose flush-level atomicity through this getter and always + /// return `false`. pub fn transactional(&self) -> bool { match &self.inner { BufferInner::Ilp(inner) => inner.transactional(), @@ -539,11 +539,11 @@ impl Buffer { } } - /// Returns the current retained-capacity hint for the buffer. + /// Return the current retained-capacity hint for the buffer. /// - /// For ILP buffers, this is byte capacity. For QWP/UDP buffers, this is an - /// implementation-defined retained-capacity hint and should not be - /// interpreted as exact byte capacity. + /// For ILP buffers this is byte capacity. For QWP buffers (UDP and + /// WebSocket) this is an implementation-defined retained-capacity + /// hint and should not be interpreted as exact byte capacity. pub fn capacity(&self) -> usize { match &self.inner { BufferInner::Ilp(inner) => inner.capacity(), @@ -554,10 +554,11 @@ impl Buffer { } } - /// Returns the raw serialized ILP bytes currently stored in the buffer. + /// Return the raw serialized ILP bytes currently stored in the + /// buffer. /// - /// QWP/UDP buffers build datagrams during flush, so this returns an empty - /// slice for QWP/UDP. + /// QWP buffers (UDP and WebSocket) build their wire payloads during + /// flush, so this returns an empty slice on those transports. pub fn as_bytes(&self) -> &[u8] { match &self.inner { BufferInner::Ilp(inner) => inner.as_bytes(), From dcb3430cf4ff3e782c22e2bb0e3b8413f61c1d7f Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 12:24:12 +0200 Subject: [PATCH 10/22] fix(ingress): reject auto_flush_interval in connect string The published rustdoc on `ingress/mod.md` advertises that this client has no auto-flush on any transport, and that the `auto_flush_rows`, `auto_flush_bytes`, and `auto_flush_interval` keys in the connect string are rejected. `validate_auto_flush_params` only iterates over the first two; unknown keys (including `auto_flush_interval`) fall through to the silent-accept branch in the main parser, so a user who sets `auto_flush_interval=1000;` gets no error and no flushing. Add `auto_flush_interval` to the rejection list so the validator matches the documented behaviour, with a parallel `auto_flush_interval_unsupported` test next to the existing rows/bytes cases. Also folds in two unrelated minor tidies that were sitting in the tree: - ingress.rs: reorder the `SenderBuilder::max_buf_size` doc-comment bullets so QWP/WebSocket is listed first, matching the QWP-first orientation of recent docs commits. - examples/qwp_ws_basic.rs: rustfmt wrap of an `unwrap_or_else` call. Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/examples/qwp_ws_basic.rs | 4 +++- questdb-rs/src/ingress.rs | 6 +++--- questdb-rs/src/ingress/tests.rs | 9 +++++++++ 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/questdb-rs/examples/qwp_ws_basic.rs b/questdb-rs/examples/qwp_ws_basic.rs index e300573e..d1c76c2a 100644 --- a/questdb-rs/examples/qwp_ws_basic.rs +++ b/questdb-rs/examples/qwp_ws_basic.rs @@ -21,7 +21,9 @@ fn main() -> Result<()> { let host = std::env::args() .nth(1) .unwrap_or_else(|| "localhost".to_string()); - let port = std::env::args().nth(2).unwrap_or_else(|| "9000".to_string()); + let port = std::env::args() + .nth(2) + .unwrap_or_else(|| "9000".to_string()); let conf = format!("ws::addr={host}:{port};"); let mut sender = Sender::from_conf(conf.as_str())?; diff --git a/questdb-rs/src/ingress.rs b/questdb-rs/src/ingress.rs index 5f97ce63..984bcca2 100644 --- a/questdb-rs/src/ingress.rs +++ b/questdb-rs/src/ingress.rs @@ -190,7 +190,7 @@ fn validate_auto_flush_params(params: &HashMap) -> Result<()> { )); } - for ¶m in ["auto_flush_rows", "auto_flush_bytes"].iter() { + for ¶m in ["auto_flush_rows", "auto_flush_bytes", "auto_flush_interval"].iter() { if params.contains_key(param) { return Err(error::fmt!( ConfigError, @@ -1708,9 +1708,9 @@ impl SenderBuilder { /// The maximum buffered size that the client will flush to the server. /// The default is 100 MiB. /// - /// For ILP this applies to the exact pending byte length. - /// For QWP/UDP this applies to the buffer size hint exposed by [`Buffer::len`]. /// For QWP/WebSocket this applies to the encoded replay message size. + /// For QWP/UDP this applies to the buffer size hint exposed by [`Buffer::len`]. + /// For ILP this applies to the exact pending byte length. pub fn max_buf_size(mut self, value: usize) -> Result { let min = 1024; if value < min { diff --git a/questdb-rs/src/ingress/tests.rs b/questdb-rs/src/ingress/tests.rs index ba3335b9..00747938 100644 --- a/questdb-rs/src/ingress/tests.rs +++ b/questdb-rs/src/ingress/tests.rs @@ -1123,6 +1123,15 @@ fn auto_flush_bytes_unsupported() { ); } +#[cfg(feature = "sync-sender-tcp")] +#[test] +fn auto_flush_interval_unsupported() { + assert_conf_err( + SenderBuilder::from_conf("tcps::addr=localhost;auto_flush_interval=1000;"), + "Invalid configuration parameter \"auto_flush_interval\". This client does not support auto-flush", + ); +} + fn assert_specified_eq>( actual: &ConfigSetting, expected: IntoV, From 0272c76169d6a02416a4c5283e31e3c665f6bc13 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 12:24:50 +0200 Subject: [PATCH 11/22] docs(ingress): migrate private-module questdb.io URLs to questdb.com questdb-rs/src/ingress/buffer/ilp.rs had eight `questdb.io/docs/...` links in /// comments inside the private `ilp` module. The module is not pub-exposed (rendered HTML is reachable only via the rustdoc "source" link), so this is a code-hygiene migration rather than a user-facing fix. The questdb.io domain 301-redirects to questdb.com; this commit removes the redirect hop and aligns the source with the rest of the crate's outbound link convention. Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/src/ingress/buffer/ilp.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/questdb-rs/src/ingress/buffer/ilp.rs b/questdb-rs/src/ingress/buffer/ilp.rs index 9135475a..bb722a12 100644 --- a/questdb-rs/src/ingress/buffer/ilp.rs +++ b/questdb-rs/src/ingress/buffer/ilp.rs @@ -215,18 +215,18 @@ struct IlpBookmarkState { /// /// | Buffer Method | Serialized as ILP type (Click on link to see possible casts) | /// |---------------|--------------------------------------------------------------| -/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.io/docs/concept/symbol/) | -/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.io/docs/reference/api/ilp/columnset-types#boolean) | -/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.io/docs/reference/api/ilp/columnset-types#integer) | -/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.io/docs/reference/api/ilp/columnset-types#float) | -/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.io/docs/reference/api/ilp/columnset-types#string) | -/// | [`column_arr`](Buffer::column_arr) | [`ARRAY`](https://questdb.io/docs/reference/api/ilp/columnset-types#array) | -/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp) | +/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.com/docs/concept/symbol/) | +/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.com/docs/reference/api/ilp/columnset-types#boolean) | +/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.com/docs/reference/api/ilp/columnset-types#integer) | +/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.com/docs/reference/api/ilp/columnset-types#float) | +/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.com/docs/reference/api/ilp/columnset-types#string) | +/// | [`column_arr`](Buffer::column_arr) | [`ARRAY`](https://questdb.com/docs/reference/api/ilp/columnset-types#array) | +/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.com/docs/reference/api/ilp/columnset-types#timestamp) | /// /// QuestDB supports both `STRING` and `SYMBOL` column types. /// /// To understand the difference, refer to the -/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/). In a nutshell, +/// [QuestDB documentation](https://questdb.com/docs/concept/symbol/). In a nutshell, /// symbols are interned strings, most suitable for identifiers that are repeated many /// times throughout the column. They offer an advantage in storage space and query /// performance. From ee9d2b837901d1e16e5e76d86ac680720b348fd3 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 12:26:21 +0200 Subject: [PATCH 12/22] build(release): track versioned URLs in questdb-rs/src/ingress/mod.md Two GitHub tree/blob URLs in the embedded ingress module docs were hard-coded at 6.1.0 but not listed in .bumpversion.toml, so the next release would leave them stale. Co-Authored-By: Claude Opus 4.7 (1M context) --- .bumpversion.toml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.bumpversion.toml b/.bumpversion.toml index c25d9d42..739ccf36 100644 --- a/.bumpversion.toml +++ b/.bumpversion.toml @@ -33,6 +33,16 @@ filename = "questdb-rs/README.md" search = "https://github.com/questdb/c-questdb-client/blob/{current_version}/questdb-rs/" replace = "https://github.com/questdb/c-questdb-client/blob/{new_version}/questdb-rs/" +[[tool.bumpversion.files]] +filename = "questdb-rs/src/ingress/mod.md" +search = "https://github.com/questdb/c-questdb-client/tree/{current_version}/" +replace = "https://github.com/questdb/c-questdb-client/tree/{new_version}/" + +[[tool.bumpversion.files]] +filename = "questdb-rs/src/ingress/mod.md" +search = "https://github.com/questdb/c-questdb-client/blob/{current_version}/" +replace = "https://github.com/questdb/c-questdb-client/blob/{new_version}/" + [[tool.bumpversion.files]] filename = "questdb-rs-ffi/Cargo.toml" search = "version = \"{current_version}\"" From 0a175eb302d988b6144d1580b3429b3bea522077 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 13:15:46 +0200 Subject: [PATCH 13/22] Fix QWP WebSocket failover flush example --- questdb-rs/examples/qwp_ws_failover.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/questdb-rs/examples/qwp_ws_failover.rs b/questdb-rs/examples/qwp_ws_failover.rs index cd400ada..b7463a8c 100644 --- a/questdb-rs/examples/qwp_ws_failover.rs +++ b/questdb-rs/examples/qwp_ws_failover.rs @@ -51,17 +51,18 @@ fn main() -> Result<()> { .column_f64("amount", 0.00044)? .at(TimestampNanos::now())?; - // On flush failure, the buffer is retained so the next - // iteration retries the same payload. must_close() reports - // whether the sender has latched into a terminal state and - // must be dropped and recreated (e.g. auth failure, reconnect - // budget exhausted). + // For QWP/WebSocket, a successful flush publishes the batch + // to the local replay queue or Store-and-Forward storage. + // Delivery, ACKs, reconnect, and replay happen asynchronously. + // If flush returns an error, the buffer is still intact; do + // not append more rows until you have retried, cleared, or + // dropped it. if let Err(e) = sender.flush(&mut buffer) { eprintln!("flush error: {e}"); if sender.must_close() { eprintln!("sender is terminal; aborting"); - return Err(e); } + return Err(e); } thread::sleep(Duration::from_secs(1)); From 490b274bf5752c2f0365caf7f4897a0066de93ba Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 13:45:35 +0200 Subject: [PATCH 14/22] ci(tests): skip doctests on single-transport feature matrix legs The narrow `sync-sender-tcp` and `sync-sender-http` legs verify that the library builds when a downstream consumer enables only one ILP transport. They don't need to compile the crate-level QWP/WebSocket doctests, which assume `sync-sender-qwp-ws` is enabled (the default, also used by docs.rs). Doctest coverage stays in the four wider legs. Co-Authored-By: Claude Opus 4.7 (1M context) --- ci/run_all_tests.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ci/run_all_tests.py b/ci/run_all_tests.py index e8c6f250..a20f38f0 100644 --- a/ci/run_all_tests.py +++ b/ci/run_all_tests.py @@ -40,11 +40,19 @@ def main(): run_cmd('cargo', 'test', '--no-default-features', '--features=ring-crypto,tls-native-certs,sync-sender', '--', '--nocapture', cwd='questdb-rs') + # Narrow single-transport matrix legs: verify the library compiles + # and tests pass when a downstream consumer enables only ILP/TCP or + # only ILP/HTTP. Skip doctests via `--lib --tests --examples` -- the + # crate-level docs describe QWP/WebSocket (the default transport) so + # they assume `sync-sender-qwp-ws` is enabled, which docs.rs builds + # with anyway (see Cargo.toml `package.metadata.docs.rs`). run_cmd('cargo', 'test', '--no-default-features', '--features=ring-crypto,tls-webpki-certs,sync-sender-tcp', + '--lib', '--tests', '--examples', '--', '--nocapture', cwd='questdb-rs') run_cmd('cargo', 'test', '--no-default-features', '--features=ring-crypto,tls-webpki-certs,sync-sender-http', + '--lib', '--tests', '--examples', '--', '--nocapture', cwd='questdb-rs') run_cmd('cargo', 'test', '--features=almost-all-features', '--', '--nocapture', cwd='questdb-rs') From 103607d670adae718d3a1c6f5f36aad259cb4f98 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 14:44:10 +0200 Subject: [PATCH 15/22] ci(fuzz): set +x before `##vso[task.setvariable]` JAVA_HOME echoes Three hetzner-incus bash steps used `set -eux` and then emitted `##vso[task.setvariable variable=JAVA_HOME...]` lines. Under xtrace, bash also writes the command to stderr with single-quoted arguments (`+ echo '##vso[...]/usr/lib/jvm/java-25-openjdk-amd64'`). The Azure agent parses `##vso` from both stdout and stderr and "last line wins", so the stderr copy intermittently leaks a trailing `'` into JAVA_HOME, making the next Maven@3 step fail with Not found jdkUserInputPath: /usr/lib/jvm/java-25-openjdk-amd64' Disable xtrace immediately before the echoes. The blog post linked in the comment describes the same race and the same fix. Co-Authored-By: Claude Opus 4.7 (1M context) --- ci/run_fuzz_pipeline.yaml | 8 ++++++++ ci/run_tests_pipeline.yaml | 16 ++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/ci/run_fuzz_pipeline.yaml b/ci/run_fuzz_pipeline.yaml index 61820c04..92543413 100644 --- a/ci/run_fuzz_pipeline.yaml +++ b/ci/run_fuzz_pipeline.yaml @@ -147,6 +147,14 @@ stages: exit 1 fi done + # Disable xtrace before emitting `##vso[task.setvariable]`: + # under `set -x`, bash also prints the command to stderr with + # single-quoted arguments. The Azure agent parses `##vso` from + # either stream and "last line wins" -- the stderr copy can + # win the race, leaking a trailing `'` into JAVA_HOME and + # making Maven@3 fail with `Not found jdkUserInputPath`. + # See https://questdb.com/blog/azure-pipelines-stdout-stderr-race-condition/ + set +x echo "##vso[task.setvariable variable=JAVA_HOME_17_X64]$JAVA_PATH_17" echo "##vso[task.setvariable variable=JAVA_HOME]$JAVA_PATH_25" displayName: "Install missing deps + resolve JDKs" diff --git a/ci/run_tests_pipeline.yaml b/ci/run_tests_pipeline.yaml index 2a2da047..d40abee6 100644 --- a/ci/run_tests_pipeline.yaml +++ b/ci/run_tests_pipeline.yaml @@ -160,6 +160,14 @@ stages: exit 1 fi done + # Disable xtrace before emitting `##vso[task.setvariable]`: + # under `set -x`, bash also prints the command to stderr with + # single-quoted arguments. The Azure agent parses `##vso` from + # either stream and "last line wins" -- the stderr copy can + # win the race, leaking a trailing `'` into JAVA_HOME and + # making Maven@3 fail with `Not found jdkUserInputPath`. + # See https://questdb.com/blog/azure-pipelines-stdout-stderr-race-condition/ + set +x echo "##vso[task.setvariable variable=JAVA_HOME_17_X64]$JAVA_PATH_17" echo "##vso[task.setvariable variable=JAVA_HOME]$JAVA_PATH_25" displayName: "Install missing deps + resolve JDKs" @@ -327,6 +335,14 @@ stages: exit 1 fi done + # Disable xtrace before emitting `##vso[task.setvariable]`: + # under `set -x`, bash also prints the command to stderr with + # single-quoted arguments. The Azure agent parses `##vso` from + # either stream and "last line wins" -- the stderr copy can + # win the race, leaking a trailing `'` into JAVA_HOME and + # making Maven@3 fail with `Not found jdkUserInputPath`. + # See https://questdb.com/blog/azure-pipelines-stdout-stderr-race-condition/ + set +x echo "##vso[task.setvariable variable=JAVA_HOME_17_X64]$JAVA_PATH_17" echo "##vso[task.setvariable variable=JAVA_HOME]$JAVA_PATH_25" displayName: "Install missing deps + resolve JDKs" From f0b3b62524ae9779386c5a6994bf8758d84e532b Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 15:05:02 +0200 Subject: [PATCH 16/22] docs(readme): canonical questdb.com URL for the mailing list link Mirrors the questdb.io -> questdb.com migration applied to ilp.rs in 0272c76. The slack.questdb.io invite shortener and the homepage link on line 3 are intentionally left alone -- the former has no .com equivalent, the latter is out of scope here. Co-Authored-By: Claude Opus 4.7 (1M context) --- questdb-rs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/questdb-rs/README.md b/questdb-rs/README.md index d18ea39a..1efd9e23 100644 --- a/questdb-rs/README.md +++ b/questdb-rs/README.md @@ -154,5 +154,5 @@ Python. If you need help, have questions, or want to provide feedback, [join us on Slack](https://slack.questdb.io/). You can also sign up to -the [mailing list](https://questdb.io/community/) to get notified of new +the [mailing list](https://questdb.com/community/) to get notified of new releases. From a2172cf5ba29f76f252f0be144730f15afabcf8f Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 15:29:58 +0200 Subject: [PATCH 17/22] docs(ingress): fix ingest loop flush example --- questdb-rs/src/ingress/mod.md | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/questdb-rs/src/ingress/mod.md b/questdb-rs/src/ingress/mod.md index 461d46c7..bd37678c 100644 --- a/questdb-rs/src/ingress/mod.md +++ b/questdb-rs/src/ingress/mod.md @@ -82,23 +82,34 @@ sender and one buffer: ```rust no_run # use questdb::{Result, ingress::{Sender, TimestampNanos}}; +# use std::time::{Duration, Instant}; # fn main() -> Result<()> { # let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; # let mut buffer = sender.new_buffer(); # let running = true; +let flush_interval = Duration::from_secs(1); +let max_buffer_bytes = 64 * 1024; +let mut next_flush = Instant::now() + flush_interval; + while running { buffer .table("trades")? .symbol("symbol", "ETH-USD")? .column_f64("price", 2615.54)? .at(TimestampNanos::now())?; - sender.flush(&mut buffer)?; - if buffer.len() > 64 * 1024 { - // also flush on size if you batch multiple rows per iteration + + let now = Instant::now(); + if buffer.len() >= max_buffer_bytes || now >= next_flush { + sender.flush(&mut buffer)?; + next_flush = now + flush_interval; } - // sleep until the next tick + // wait for or process the next row # break; } + +if buffer.len() > 0 { + sender.flush(&mut buffer)?; +} # Ok(()) } ``` From 59173c6d5956897a32059b312a7d658f42d497b9 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 15:45:21 +0200 Subject: [PATCH 18/22] docs(qwp-ws): wait before polling async errors --- questdb-rs/examples/qwp_ws_error_handling.rs | 22 ++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/questdb-rs/examples/qwp_ws_error_handling.rs b/questdb-rs/examples/qwp_ws_error_handling.rs index 52216b9e..14f78257 100644 --- a/questdb-rs/examples/qwp_ws_error_handling.rs +++ b/questdb-rs/examples/qwp_ws_error_handling.rs @@ -25,6 +25,7 @@ use questdb::{ Result, ingress::{Sender, SenderBuilder, TimestampNanos}, }; +use std::time::Duration; fn polling_style() -> Result<()> { let mut sender = Sender::from_conf("ws::addr=localhost:9000;")?; @@ -35,9 +36,22 @@ fn polling_style() -> Result<()> { .symbol("symbol", "ETH-USD")? .column_f64("price", 2615.54)? .at(TimestampNanos::now())?; - sender.flush(&mut buffer)?; + let published_fsn = sender.flush_and_get_fsn(&mut buffer)?; + + let wait_error = if let Some(fsn) = published_fsn { + match sender.await_acked_fsn(fsn, Duration::from_secs(5)) { + Ok(true) => None, + Ok(false) => { + eprintln!("timed out waiting for QWP/WebSocket frame {fsn} to complete"); + None + } + Err(err) => Some(err), + } + } else { + None + }; - // Drain server-side diagnostics queued since the previous poll. + // Drain server-side diagnostics observed for completed frames. while let Some(err) = sender.poll_qwp_ws_error()? { eprintln!( "qwp error (poll): category={:?} policy={:?} fsn=[{}..={}] msg={:?}", @@ -50,6 +64,10 @@ fn polling_style() -> Result<()> { eprintln!("note: {dropped} diagnostic(s) were dropped from the log"); } + if let Some(err) = wait_error { + return Err(err); + } + sender.close_drain()?; Ok(()) } From f145094606551cce582935b5230d86fb9d95b45b Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 15:53:33 +0200 Subject: [PATCH 19/22] docs(qwp-ws): use temp dir in failover example --- questdb-rs/examples/qwp_ws_failover.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/questdb-rs/examples/qwp_ws_failover.rs b/questdb-rs/examples/qwp_ws_failover.rs index b7463a8c..78fd79e6 100644 --- a/questdb-rs/examples/qwp_ws_failover.rs +++ b/questdb-rs/examples/qwp_ws_failover.rs @@ -32,10 +32,14 @@ use questdb::{ }; fn main() -> Result<()> { - let conf = "ws::addr=db-primary:9000,db-replica-1:9000,db-replica-2:9000;\ - sf_dir=/tmp/myapp-qdb-sf;\ - sender_id=ingest-1;\ - reconnect_max_duration_millis=300000;"; + let sf_dir = std::env::temp_dir().join("myapp-qdb-sf"); + let conf = format!( + "ws::addr=db-primary:9000,db-replica-1:9000,db-replica-2:9000;\ + sf_dir={};\ + sender_id=ingest-1;\ + reconnect_max_duration_millis=300000;", + sf_dir.display() + ); let mut sender = Sender::from_conf(conf)?; let mut buffer = sender.new_buffer(); From 72cd697cad00222d6b6074e3209fdd0d29b1d8e1 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 16:22:27 +0200 Subject: [PATCH 20/22] docs(qwp-ws): simplify basic example config --- questdb-rs/examples/qwp_ws_basic.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/questdb-rs/examples/qwp_ws_basic.rs b/questdb-rs/examples/qwp_ws_basic.rs index d1c76c2a..64c77a07 100644 --- a/questdb-rs/examples/qwp_ws_basic.rs +++ b/questdb-rs/examples/qwp_ws_basic.rs @@ -26,7 +26,7 @@ fn main() -> Result<()> { .unwrap_or_else(|| "9000".to_string()); let conf = format!("ws::addr={host}:{port};"); - let mut sender = Sender::from_conf(conf.as_str())?; + let mut sender = Sender::from_conf(&conf)?; let mut buffer = sender.new_buffer(); buffer From 4f0ca73be71a5462ff37ca652b4ede50cf58c4be Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 20 May 2026 16:51:09 +0200 Subject: [PATCH 21/22] docs(qwp-ws): close after polling wait errors --- questdb-rs/examples/qwp_ws_error_handling.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/questdb-rs/examples/qwp_ws_error_handling.rs b/questdb-rs/examples/qwp_ws_error_handling.rs index 14f78257..49b90e52 100644 --- a/questdb-rs/examples/qwp_ws_error_handling.rs +++ b/questdb-rs/examples/qwp_ws_error_handling.rs @@ -38,17 +38,17 @@ fn polling_style() -> Result<()> { .at(TimestampNanos::now())?; let published_fsn = sender.flush_and_get_fsn(&mut buffer)?; - let wait_error = if let Some(fsn) = published_fsn { + let wait_result = if let Some(fsn) = published_fsn { match sender.await_acked_fsn(fsn, Duration::from_secs(5)) { - Ok(true) => None, + Ok(true) => Ok(()), Ok(false) => { eprintln!("timed out waiting for QWP/WebSocket frame {fsn} to complete"); - None + Ok(()) } - Err(err) => Some(err), + Err(err) => Err(err), } } else { - None + Ok(()) }; // Drain server-side diagnostics observed for completed frames. @@ -64,12 +64,8 @@ fn polling_style() -> Result<()> { eprintln!("note: {dropped} diagnostic(s) were dropped from the log"); } - if let Some(err) = wait_error { - return Err(err); - } - sender.close_drain()?; - Ok(()) + wait_result } fn callback_style() -> Result<()> { From eee3c3f177b709360f0bbe9c44a47eeff1069d58 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 21 May 2026 09:38:32 +0200 Subject: [PATCH 22/22] align reconnect startup retry semantics Promote the effective initial-connect retry mode to sync when a QWP/WebSocket reconnect policy knob is explicitly configured, unless the user explicitly set initial_connect_retry. Document the implicit promotion rule and add parser/runtime coverage for the promotion and explicit opt-out behavior. --- doc/QWP_WEBSOCKET_ARCHITECTURE.md | 11 ++++--- questdb-rs/src/ingress.rs | 34 ++++++++++++++++--- questdb-rs/src/ingress/conf.rs | 3 +- questdb-rs/src/ingress/mod.md | 2 ++ questdb-rs/src/ingress/tests.rs | 55 +++++++++++++++++++++++++++++++ questdb-rs/src/tests/qwp_ws.rs | 51 ++++++++++++++++++++++++++++ 6 files changed, 147 insertions(+), 9 deletions(-) diff --git a/doc/QWP_WEBSOCKET_ARCHITECTURE.md b/doc/QWP_WEBSOCKET_ARCHITECTURE.md index 219f8c5e..8e7cbf5c 100644 --- a/doc/QWP_WEBSOCKET_ARCHITECTURE.md +++ b/doc/QWP_WEBSOCKET_ARCHITECTURE.md @@ -32,7 +32,8 @@ All code is under `questdb-rs/src/ingress/sender/`: Public configuration lives in `ingress/conf.rs`: `QwpWsConfig`, `SfDurability` (`Memory`; `Flush` and `Append` are parsed but currently rejected), -`QwpWsInitialConnectMode` (`Off | Sync | Async`, default `Off`). +`QwpWsInitialConnectMode` (`Off | Sync | Async`; effective default `Sync` +when any `reconnect_*` knob is explicitly configured, otherwise `Off`). --- @@ -450,9 +451,11 @@ counts after disruptions must use QuestDB table-level dedup (`DEDUP UPSERT KEYS(...)`); this matches the Java client's documented behavior. -`initial_connect_retry` is `QwpWsInitialConnectMode` (`Off | Sync | Async`, -default `Off`). `Async` lets the runner start before the first connection -succeeds; `Sync` retries inline; `Off` fails fast on the first attempt. +`initial_connect_retry` is `QwpWsInitialConnectMode` (`Off | Sync | Async`). +When unset, the effective default is `Sync` if any `reconnect_*` knob is +explicitly configured, otherwise `Off`. `Async` lets the runner start before +the first connection succeeds; `Sync` retries inline; `Off` fails fast on the +first attempt. ### 6.3 Error visibility diff --git a/questdb-rs/src/ingress.rs b/questdb-rs/src/ingress.rs index 984bcca2..e21a03f8 100644 --- a/questdb-rs/src/ingress.rs +++ b/questdb-rs/src/ingress.rs @@ -523,6 +523,30 @@ fn parse_qwp_ws_endpoints( Ok(endpoints) } +#[cfg(feature = "_sender-qwp-ws")] +fn qwp_ws_reconnect_policy_was_specified(qwp_ws: &conf::QwpWsConfig) -> bool { + matches!(&qwp_ws.reconnect_max_duration, ConfigSetting::Specified(_)) + || matches!( + &qwp_ws.reconnect_initial_backoff, + ConfigSetting::Specified(_) + ) + || matches!(&qwp_ws.reconnect_max_backoff, ConfigSetting::Specified(_)) +} + +#[cfg(feature = "_sender-qwp-ws")] +fn qwp_ws_effective_config(qwp_ws: &conf::QwpWsConfig) -> conf::QwpWsConfig { + let mut effective = qwp_ws.clone(); + if matches!( + &effective.initial_connect_retry, + ConfigSetting::Defaulted(_) + ) && qwp_ws_reconnect_policy_was_specified(qwp_ws) + { + effective.initial_connect_retry = + ConfigSetting::Defaulted(conf::QwpWsInitialConnectMode::Sync); + } + effective +} + /// Accumulates parameters for a new `Sender`. /// /// Most callers should use [`SenderBuilder::from_conf`] with a connect @@ -1449,7 +1473,8 @@ impl SenderBuilder { } #[cfg(feature = "_sender-qwp-ws")] - /// Retry the initial connection using the reconnect policy. Default false. + /// Retry the initial connection using the reconnect policy. Defaults to + /// false unless a `reconnect_*` setting was specified. pub fn initial_connect_retry(mut self, value: bool) -> Result { let Some(qwp_ws) = &mut self.qwp_ws else { return Err(error::fmt!( @@ -2052,7 +2077,8 @@ impl SenderBuilder { "QWP/WebSocket configuration is missing." )); }; - reject_unsupported_qwp_ws_sf_config(qwp_ws)?; + let qwp_ws = qwp_ws_effective_config(qwp_ws); + reject_unsupported_qwp_ws_sf_config(&qwp_ws)?; let basic_auth = qwp_ws_auth_header(&auth)?; if *qwp_ws.progress == QwpWsProgress::Manual { if *qwp_ws.initial_connect_retry == conf::QwpWsInitialConnectMode::Async { @@ -2066,7 +2092,7 @@ impl SenderBuilder { self.port.as_str(), matches!(self.protocol, Protocol::QwpWss), tls_settings, - qwp_ws, + &qwp_ws, basic_auth, )?)) } else { @@ -2075,7 +2101,7 @@ impl SenderBuilder { self.port.as_str(), matches!(self.protocol, Protocol::QwpWss), tls_settings, - qwp_ws, + &qwp_ws, basic_auth, )? } diff --git a/questdb-rs/src/ingress/conf.rs b/questdb-rs/src/ingress/conf.rs index 48831f7a..94a1b218 100644 --- a/questdb-rs/src/ingress/conf.rs +++ b/questdb-rs/src/ingress/conf.rs @@ -199,7 +199,8 @@ pub(crate) struct QwpWsConfig { /// Cap on the per-attempt reconnect delay. pub(crate) reconnect_max_backoff: ConfigSetting, /// Initial-connect retry mode. Default is fail-fast after one endpoint - /// round, matching Java's startup behavior. + /// round unless a reconnect policy knob was specified, in which case + /// build-time resolution promotes the effective mode to sync retry. pub(crate) initial_connect_retry: ConfigSetting, /// Bounded wait used by Sender::close_drain(). pub(crate) close_flush_timeout: ConfigSetting, diff --git a/questdb-rs/src/ingress/mod.md b/questdb-rs/src/ingress/mod.md index bd37678c..1c647e03 100644 --- a/questdb-rs/src/ingress/mod.md +++ b/questdb-rs/src/ingress/mod.md @@ -167,6 +167,8 @@ QuestDB documentation; the Rust API entry points are summarised here. * **Multi-host failover** — pass a comma-separated address list (`addr=a:9000,b:9000`) and tune `reconnect_max_duration_millis`, `reconnect_initial_backoff_millis`, `reconnect_max_backoff_millis`. + Setting any `reconnect_*` key implicitly enables synchronous initial + connect retry unless `initial_connect_retry=off` is set explicitly. Auth failures are terminal across all endpoints; transport errors are retried. * **Store-and-forward** — set `sf_dir` to a writable directory and diff --git a/questdb-rs/src/ingress/tests.rs b/questdb-rs/src/ingress/tests.rs index 00747938..8b718b8e 100644 --- a/questdb-rs/src/ingress/tests.rs +++ b/questdb-rs/src/ingress/tests.rs @@ -191,6 +191,61 @@ fn qwpws_store_and_forward_defaults_match_java() { assert_defaulted_eq(&qwp_ws.progress, QwpWsProgress::Background); } +#[cfg(feature = "sync-sender-qwp-ws")] +#[test] +fn qwpws_reconnect_policy_promotes_default_initial_connect_retry() { + for reconnect_setting in [ + "reconnect_max_duration_millis=20000", + "reconnect_initial_backoff_millis=200", + "reconnect_max_backoff_millis=2000", + ] { + let builder = + SenderBuilder::from_conf(format!("qwpws::addr=localhost:9000;{reconnect_setting};")) + .unwrap(); + let qwp_ws = builder.qwp_ws.as_ref().unwrap(); + assert_defaulted_eq( + &qwp_ws.initial_connect_retry, + conf::QwpWsInitialConnectMode::Off, + ); + + let effective = qwp_ws_effective_config(qwp_ws); + assert_defaulted_eq( + &effective.initial_connect_retry, + conf::QwpWsInitialConnectMode::Sync, + ); + } +} + +#[cfg(feature = "sync-sender-qwp-ws")] +#[test] +fn qwpws_explicit_initial_connect_retry_suppresses_reconnect_policy_promotion() { + let off = SenderBuilder::from_conf( + "qwpws::addr=localhost:9000;\ + reconnect_max_duration_millis=20000;\ + initial_connect_retry=off;", + ) + .unwrap(); + let qwp_ws = off.qwp_ws.as_ref().unwrap(); + let effective = qwp_ws_effective_config(qwp_ws); + assert_specified_eq( + &effective.initial_connect_retry, + conf::QwpWsInitialConnectMode::Off, + ); + + let async_mode = SenderBuilder::from_conf( + "qwpws::addr=localhost:9000;\ + reconnect_max_duration_millis=20000;\ + initial_connect_retry=async;", + ) + .unwrap(); + let qwp_ws = async_mode.qwp_ws.as_ref().unwrap(); + let effective = qwp_ws_effective_config(qwp_ws); + assert_specified_eq( + &effective.initial_connect_retry, + conf::QwpWsInitialConnectMode::Async, + ); +} + #[cfg(feature = "sync-sender-qwp-ws")] #[test] fn qwpws_progress_config_parses_manual_and_background() { diff --git a/questdb-rs/src/tests/qwp_ws.rs b/questdb-rs/src/tests/qwp_ws.rs index cbd824fa..aae77d39 100644 --- a/questdb-rs/src/tests/qwp_ws.rs +++ b/questdb-rs/src/tests/qwp_ws.rs @@ -3934,6 +3934,57 @@ fn qwp_ws_sync_initial_retry_budget_exhaustion_reports_context() { ); } +#[test] +fn qwp_ws_reconnect_knob_implicitly_promotes_initial_connect_to_sync() { + let probe = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = probe.local_addr().unwrap().port(); + drop(probe); + + let conf = format!( + "qwpws::addr=127.0.0.1:{port};\ + reconnect_max_duration_millis=1;" + ); + let err = SenderBuilder::from_conf(&conf) + .unwrap() + .build() + .unwrap_err(); + + assert_eq!(err.code(), ErrorCode::SocketError); + assert!( + err.msg() + .contains("QWP/WebSocket initial connect retry budget exhausted"), + "got: {}", + err.msg() + ); + assert!(err.msg().contains("attempts="), "got: {}", err.msg()); +} + +#[test] +fn qwp_ws_explicit_initial_connect_retry_off_suppresses_reconnect_promotion() { + let probe = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = probe.local_addr().unwrap().port(); + drop(probe); + + let conf = format!( + "qwpws::addr=127.0.0.1:{port};\ + reconnect_max_duration_millis=5000;\ + initial_connect_retry=off;" + ); + let err = SenderBuilder::from_conf(&conf) + .unwrap() + .build() + .unwrap_err(); + + assert_eq!(err.code(), ErrorCode::SocketError); + assert!( + !err.msg() + .contains("QWP/WebSocket initial connect retry budget exhausted"), + "got: {}", + err.msg() + ); + assert!(!err.msg().contains("attempts="), "got: {}", err.msg()); +} + #[test] fn qwp_ws_sync_initial_retry_resets_non_healthy_between_rounds() { let first_listener = TcpListener::bind("127.0.0.1:0").unwrap();