Skip to content

feat(core): pipelined egress reader#146

Open
glasstiger wants to merge 271 commits into
mainfrom
ia_pipelined_reader
Open

feat(core): pipelined egress reader#146
glasstiger wants to merge 271 commits into
mainfrom
ia_pipelined_reader

Conversation

@glasstiger

@glasstiger glasstiger commented May 20, 2026

Copy link
Copy Markdown

Pipelined (background-thread) QWP egress reader

Adds a pipelined egress reader to the questdb-rs core: socket read + frame
decode run on a dedicated OS worker thread (questdb-egress-io), so while the
user thread processes batch N the worker is already reading + decoding batch
N+1. This naturally pipelines decode against per-row consumption.

"Pipelined" means decoupled via an OS thread — not Rust async/.await.
There is no executor and no polling; the public API is plain blocking method
calls. It's a direct port of the Java client's QwpEgressIoThread +
QwpQueryClient pair.

API at a glance

use questdb::egress::pipelined_reader::{PipelinedReader, Event};

let mut r = PipelinedReader::from_conf("ws::addr=localhost:9000;")?;
let mut cur = r.prepare("SELECT 42").execute()?;
loop {
    match cur.take_event()? {
        Event::Batch(b) => { /* project columns / consume rows */ }
        Event::FailoverReset(_ev) => { /* replay restarts at batch_seq=0 */ }
        Event::End { .. } | Event::ExecDone { .. } => break,
        _ => continue, // Event is #[non_exhaustive]
    }
}
  • PipelinedReader — owns the worker thread (from_conf / from_env).
  • PipelinedQueryprepare() + typed binds + execute().
  • PipelinedCursor — pulls owned events (take_event, blocking / try / timed),
    plus cancel() and request_id().
  • Event — tagged enum: Batch / End / ExecDone / FailoverReset
    (#[non_exhaustive]).
  • OwnedBatch — refcounted Bytes slices, so a batch outlives the channel turn
    that delivered it.
  • Bounded event channel (default capacity 4) → the worker reads ahead until the
    channel fills, then backpressures via TCP recv buffer / server flow control
    (when initial_credit > 0).

Supporting changes in the sync reader (egress/reader.rs)

  • Cancellable failover (reconnect_with_failover_cancellable with
    abort_tick + abort_check). The worker polls shutdown / cancel_slot
    during the failover backoff so PipelinedReader::close /
    PipelinedCursor::cancel / Drop abort in bounded time (one READ_POLL_TICK)
    instead of blocking for the whole failover_max_attempts × failover_backoff_max_ms budget — or forever with failover_max_duration_ms=0.
    The sync path passes Duration::MAX + a no-op check, so it is unaffected.
  • FailoverEvent::failed_request_id — the request_id the cursor held on the
    connection that just failed, so callers can correlate pre-/post-failover frames
    by (failed, new) rid pair.
  • pipelined_internalspub(crate) shim exposing only the cancellable
    reconnect + terminate paths to the pipelined module.

Scope

  • Rust core only. Gated behind the sync-reader-ws feature. No FFI / C /
    C++ surface in this PR
    — the public C API for the pipelined reader is
    deliberately out of scope here.
  • New files: egress/pipelined_reader.rs, tests/egress_pipelined.rs,
    examples/qwp_egress_read_pipelined.rs; plus egress/wire/varint.rs and small
    additions to schema.rs / symbol_dict.rs / transport.rs / lib.rs.
  • CI: ci/run_tests_pipeline.yaml runs --test egress_pipelined under
    live-server-tests.

Verification

  • questdb-rs: builds + clippy clean + tests pass under almost-all-features
    (1586 lib + 65 failover + 49 doctests, 0 failed).
  • questdb-rs-ffi: builds clean under --all-features.
  • The egress_pipelined integration cases are live-broker gated (require a
    running QuestDB) and were not exercised locally.

Base branch is vi_egress; the latest vi_egress has been merged into this
branch, so the diff is just the pipelined-reader additions on top.

bluestreak01 and others added 30 commits April 26, 2026 16:26
Drives non-SELECT statements through the egress channel and verifies
the cursor terminates with Terminal::ExecDone (0x16) rather than
RESULT_END (0x12). Previously the EXEC_DONE path had unit coverage
only -- this confirms the server actually emits 0x16 frames for
DDL and DML, that the rows_affected count is correct, and that
the cursor lifecycle resets so a follow-up SELECT on the same
connection still works.

Sequence in one test:
- CREATE TABLE: op_type 0x09, rows_affected = 0
- INSERT INTO ... VALUES (...), (...), (...): op_type 0x02,
  rows_affected = 3
- SELECT (sanity check that the connection is reusable): batches
  + RESULT_END terminal
- DROP TABLE: another EXEC_DONE

55/55 live tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Cursor auto-emits CREDIT (msg_kind 0x15) after every RESULT_BATCH
when the request was started with initial_credit > 0. The
additional_bytes value matches the wire size we just consumed
(12-byte header + payload_length), the same accounting the server
uses to decrement the per-request budget. Mirrors the Java
reference's QwpEgressIoThread.sendCredit pattern (line 640-642):
"the user is done with the batch, so the recv-buffer bytes are
free; tell the server it can stream payloadLen more bytes."

reader.rs:
- Cursor.credit_enabled set by ReaderQuery::execute from the
  built QueryRequest's initial_credit
- send_credit_frame writes [0x15][i64 LE rid][varint bytes] as
  a bare client to server message
- Wire size captured from FrameHeader BEFORE decode_frame
  consumes the header
- New public Cursor::add_credit(u64) for users who want to grant
  credit out of band of auto-replenishment

Live test: credit_flow_control_keeps_server_streaming
- initial_credit = 4 KiB, max_batch_rows = 500, 5000 rows
- The 4 KiB budget is much smaller than a single batch's wire
  size; without CREDIT replenishment the server would only emit
  the row-floor batch and then pause indefinitely
- Test asserts all 5000 rows arrive across 10 batches in under
  a few seconds. JVM log confirms no stalls.

732 lib + 56 live tests, all green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Multi-address parsing, walk-on-connect, eager SERVER_INFO consumption,
and target=any/primary/replica role filtering. Mid-query failover
(server crash mid-stream -> reconnect + replay) needs a real cluster
and is left as a TODO; the connect-time walk handles the most-common
case (one endpoint down) and is fully testable against single-node
OSS.

config.rs:
- ReaderConfig.host/port -> ReaderConfig.addrs: Vec<(String, u16)>.
- `addr=h1:9000,h2:9001,h3,h4:9999` parses to 4 endpoints; missing
  port falls back to default. Empty entries rejected.
- url() now picks the first endpoint; url_for(idx) builds per-endpoint
  URL.

transport.rs:
- WsTransport::connect_to(cfg, idx) takes an explicit endpoint
  index. WsTransport::connect stays as the single-addr convenience.

reader.rs:
- Reader::from_config walks cfg.addrs in order. For each:
  - WsTransport::connect_to(cfg, idx); transport failures skip to
    the next endpoint, last one surfaces if all fail.
  - On v2+, eagerly read the unsolicited SERVER_INFO (0x18) frame
    and store it. The connection negotiated v1 path keeps Server
    Info as None.
  - target=any: accept any endpoint. target=primary: accept
    Primary / PrimaryCatchup / *Standalone* (matches Java's
    matchesTarget so single-node OSS deployments work without
    surprise). target=replica: accept only Replica.
- New Reader::server_info() accessor.
- New ErrorCode::RoleMismatch surfaces the "all endpoints connected
  but none matched target" case. Distinct from SocketError ("none
  reachable") so users can tell the two apart.

Tests, all live against single-node OSS:
- server_info_exposes_role: confirms STANDALONE comes through.
- target_primary_accepts_standalone: STANDALONE counts as primary
  (matches Java; OSS users don't have to remember the workaround).
- target_replica_rejects_standalone: with target=replica, no
  endpoint matches; assert RoleMismatch with target name in msg.
- multi_addr_walks_past_unreachable_endpoint: first addr is a
  non-listening loopback port; second is the live server; the
  walk falls through to the live one and queries succeed.

What's NOT in this commit (needs a real cluster):
- Mid-query failover (server crash mid-stream -> auto-reconnect).
- Real PRIMARY vs REPLICA role distinction (single-node always
  emits STANDALONE).

Total: 734 lib + 60 live tests, all green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three live tests against the real server confirm the array decoder
handles every case it'll see in practice. Population mirrors the
QuestDB QwpEgressBootstrapTest pattern: SQL INSERT with ARRAY[...]
literals against a WAL table, then read back via egress.

- double_array_1d_varying_lengths: rows have different-sized 1-D
  arrays in the same column (3-, 2-, 1-element). Verifies shape()
  per row, element_count, and element() values across the
  per-row-shape decoder path.
- double_array_2d_row_major: ARRAY[[1.0, 2.0], [3.0, 4.0]] for
  DOUBLE[][]; verifies shape == [2, 2] / [2, 3] and that flat
  element ordering is row-major.
- double_array_with_null_array_row: insert NULL alongside non-null
  arrays; verifies the column null bitmap suppresses shape() /
  element() at the right rows.

LONG_ARRAY: not exercised end-to-end because the server doesn't
emit it. The decoder + Layer 0 LongArrayColumn stay in place for
forward-compat (cheap to maintain, mirrors DoubleArray).

Total: 734 lib + 63 live tests, all green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Twelve new live tests covering boundary values, special values, empty
strings, all-null columns, extreme widths, and 3-D arrays. Each
caught a real server-behaviour quirk worth pinning:

- integer_boundaries: byte/short/int/long at MIN+1, 0, MAX. i32::MIN
  is QuestDB's INT NULL sentinel and i64::MIN is the LONG sentinel
  -- inserting either rewrites the row to NULL, so the test uses
  MIN+1 for the most-negative non-null value at each width.

- double_special_values: NaN, +Infinity, -Infinity, -0.0. QuestDB
  treats non-finite doubles as NULL on insert (consistent with the
  NaN-as-NULL sentinel) regardless of which SQL literal is used,
  so the test accepts null OR the expected bit pattern for those
  three rows. -0.0 is finite; just verify it numerically equals 0.

- varchar_empty_string_distinct_from_null: '' decodes to Some(""),
  NULL decodes to None. The dense per-row offsets densification
  must keep them disjoint -- they do.

- varchar_unicode_and_long_string: emoji + CJK + Hebrew + Hangul
  in one literal, plus an 8 KiB ASCII string. Validates the UTF-8
  validation pass at decode time and the offsets/data buffer
  sizing for non-trivially-large rows.

- all_null_long_column / all_null_varchar_column: every row null.
  Varchar exercises the offsets-array densification when every
  per-row entry is zero-length.

- timestamp_epoch_and_far_future: 0 (epoch), 1us-after-epoch, year
  2099. Negative pre-epoch timestamps would violate WAL designated-
  timestamp monotonicity and stay covered by unit tests.

- uuid_all_zeros_and_all_ones: confirms the UUID null sentinel
  ("both halves Long.MIN_VALUE") doesn't collide with the all-zero
  byte pattern, and that the 16-byte FixedBytesColumn read path
  preserves every byte.

- long256_distinct_high_low_bytes: 0x0123...cdef pattern across all
  32 bytes -- catches any byte-order regression in the 32-byte read
  path.

- geohash_multiple_widths: 1c (5 bits/1 byte), 3c (15/2), 7c (35/5),
  12c (60/8). Spans every byte_width the geohash decoder reaches.

- double_array_3d: shape [2, 2, 3], 12 row-major elements indexed
  flat -- verifies element() ordering on a 3-D array.

- decimal64_zero_and_negative_scale_boundary: scale=0 (decimal(18,0))
  and a scale=2 column with 0.00 and -99.99. Confirms the sign and
  zero round-trips cleanly through both scale paths.

Total: 734 lib + 75 live tests, all green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Apply rustfmt and resolve clippy `-D warnings` errors flagged by CI:

- div_ceil for ceiling division (bit_reader, binds, decoder)
- range contains for inclusive range checks
- byte string literals over byte slices
- replace needless borrows / conversions / range loops
- type aliases for VarlenBuffers / SymbolBuffers to satisfy type_complexity
- expect_fun_call replaced with unwrap_or_else
…r conflict

The dev-dep ureq with `features = ["rustls"]` pulls in `rustls/ring`,
which collides with the `aws-lc-rs` provider when tests run under
`--features=aws-lc-crypto`. The fixture only hits `http://localhost`
for ping/exec, so plain HTTP (no rustls) is sufficient.
QuestDB master requires JDK 25 (the pom enforcer rejects older
versions: `<jdk>(24,)</jdk>`). The Azure pipeline was hardcoded to
JDK 17 via Maven@3's `jdkVersionOption: "1.17"`, causing the
"Vs QuestDB master" job to fail with "JDK version can't be empty".

Download JDK 25 from Adoptium Temurin before the Maven step and point
JAVA_HOME at it; switch the Maven task to `jdkVersionOption: "default"`
so it picks up the freshly installed JDK.
Switches RESULT_BATCH decoding from owned `Vec<u8>` buffers to
ref-counted `bytes::Bytes` slices that borrow into the WebSocket payload
buffer (or, under FLAG_ZSTD, into the decompressed body). Eliminates
two full-payload memcpys per batch:

- Transport: `WsTransport::read_frame` no longer `.to_vec()`s the
  payload tail. tungstenite 0.27's `Message::Binary(Bytes)` is already
  ref-counted, so `bytes.slice(HEADER_LEN..)` is just an Arc bump.
- Decoder: `densify_fixed`'s no-null path and `decode_varlen`'s data
  buffer now hold `Bytes` views into the parent payload instead of
  allocating `Vec<u8>`. Validity bitmaps follow the same path. Paths
  that have to materialize fresh bytes (BOOLEAN bit-unpack, GORILLA
  expansion, null-bearing fixed-width densification) wrap the `Vec<u8>`
  with `Bytes::from(vec)` and pay one allocation rather than two.

Measured on a 10M-row × 15-column workload (5 high-cardinality SYMBOLs
+ 1 VARCHAR + 7 fixed-width + TIMESTAMP) over loopback, M1 macOS,
release build:

  baseline          : 11.6M rows/s,  898 MiB/s,  861 ms
  + transport copy  : 12.6M rows/s,  978 MiB/s,  791 ms  (+8.8%)
  + zero-copy decode: 13.3M rows/s, 1026 MiB/s,  754 ms (+14.2%)

Also adds `Reader::bytes_received()` for benchmarking and two example
binaries: `qwp_egress_latency` (matches the Java JMH single-row
latency bench) and `qwp_egress_read_wide` (matches the Java wide-table
throughput bench).

Adds `bytes = "1.7"` as a direct dependency so the decoder compiles
without the `sync-reader-ws` transport feature; tungstenite already
re-exports the same type.

All 734 lib tests pass under `--features almost-all-features` and
across the four `--no-default-features` combos used in CI.
Three more decoder optimizations, on top of the zero-copy slicing in
the previous commit. Net effect on the same 10M-row × 15-column
workload (loopback, M1, release): 754 ms → ~625 ms total, decoder
CPU time 506 ms → ~155 ms.

1. SYMBOL code densification: split out a no-null fast path that
   inlines the 1-, 2-, and 3-byte LEB128 cases (covers any code up to
   2^21, more than enough for the 100k-cardinality test data) into a
   straight-line loop. The dict-bounds check moves to a single
   post-pass that auto-vectorizes. Slow path falls back to the generic
   decoder for over-3-byte values. This was by far the biggest win:
   dropped per-batch decode CPU from ~500 ms to ~175 ms.

2. VARCHAR offsets: in the no-null path, the densified offset table is
   bit-for-bit identical to the compact one the wire already provides
   (`row_count + 1` entries, no holes). Hand the compact `Vec<u32>`
   straight back as the dense view instead of allocating a second
   `row_count + 1`-sized buffer and copying.

3. Bench: have `qwp_egress_read_wide`'s SYMBOL inner loop call
   `dict.get(codes[r])` directly when iterating a column we know has
   no nulls, skipping the per-row validity check inside `resolve()`.
   This is a fair user-side optimization, not a bench-only hack — it
   matches the Java client's `getStrA` flyweight pattern.

Adds `Reader::read_ns()` / `decode_ns()` / `reset_timing()` accumulators
so the wide-table bench can split wall time into wire-arrival vs
decoder-CPU vs user-iteration. With the optimizations applied, the
profile shifts to ~315 ms read + ~155 ms decode + ~118 ms iter, with
iter overlapping the next batch's wire arrival via the kernel TCP
buffer. Total throughput is now within ~7% of the Java reference
(1.22 GiB/s vs 1.3 GiB/s) on the same hardware.

Lib tests stay at 734/734 across `--features almost-all-features`
plus the four `--no-default-features` combos used in CI.
Three correctness fixes to Reader/Cursor lifecycle, each with a live-
server regression test that exercises the bug pre-fix and bounds the
behaviour post-fix.

1. Drop closes the WebSocket on a mid-stream cursor.

   Pre-fix, Cursor::drop only flipped reader.cursor_active=false. The
   transport stayed open with the abandoned query's RESULT_BATCH /
   RESULT_END frames still in flight, so the next execute() on the
   Reader silently multiplexed onto the dirty stream and the next
   next_batch() tripped the request_id check with ProtocolError.

   Post-fix, Drop calls a new WsTransport::close_in_place(&mut self)
   when cursor_active is still set. That sends the WS Close handshake
   so the server stops streaming and releases request-scoped state;
   any subsequent operation on the Reader fails cleanly at the
   transport layer with SocketError. Also fixed map_ws_error to
   classify Protocol(SendAfterClosing / ReceivedAfterClosing) as
   SocketError instead of ProtocolError, since these are
   transport-state errors not wire-format errors. Module doc rewritten
   to match the new contract.

   Tests: dropping_live_cursor_closes_connection (asserts SocketError
   on reuse) and cancel_then_drop_allows_reuse (counterpart: explicit
   cancel keeps the Reader usable end-to-end).

2. Cancel stops replenishing the server's credit window.

   Pre-fix, every batch read inside cancel()'s drain loop fired a
   send_credit_frame of the batch's wire size, refilling the server's
   per-request budget while we were throwing the bytes away. That
   defeated the very backpressure cancel was meant to use to hasten
   the post-cancel terminal.

   Post-fix, Cursor gains a `cancelling: bool` flag. cancel() flips it
   between writing the CANCEL frame and entering the drain loop, and
   next_batch() skips the per-batch CREDIT for the rest of the
   cursor's life. cancel() also emits a single 1-byte CREDIT wake
   nudge alongside CANCEL: QuestDB's egress server only re-enters
   streamResults from handleCredit on the credit-suspended path, and
   handleCancel just sets a flag. Without the nudge, a cancel against
   a credit-suspended server would deadlock; with it, streamResults
   re-enters, observes the cancel flag at the top of the loop (before
   the credit check), and aborts with STATUS_CANCELLED.

   Reader.credit_granted_total() and Cursor.credit_granted_total()
   accessors expose the connection-level CREDIT-bytes counter so
   tests can directly observe the bug. The regression test
   (cancel_does_not_replenish_credit_window) reads 3 batches, then
   cancels, and asserts that bytes granted during cancel <= 4
   (i.e. just the wake nudge). With the suppression check removed,
   the same test reports 8063 bytes granted - exactly two batches
   of replenishment - and fails with a clear diagnostic.

3. QUERY_ERROR marks the cursor finished.

   Pre-fix, the ServerEvent::Error arm of next_batch returned Err and
   cleared cursor_active but never assigned self.terminal. Combined
   with cancel() converting Err(Cancelled) into Ok(()), the cursor
   ended up "finished from cancel's POV but unfinished from
   next_batch's POV". A follow-up next_batch() then fell through the
   short-circuit and blocked indefinitely on transport.read_frame()
   waiting for bytes the server would never send. The bug applied to
   every QUERY_ERROR (ParseError, InternalError, ...) not just
   Cancelled.

   Post-fix, Cursor gains a private `done: bool` flag set in all
   three terminal arms (End, ExecDone, Error). next_batch and
   cancel's early-return / drain-loop conditions all switch to using
   it. Public Terminal enum unchanged - error terminals are still
   surfaced via the Err return; done just keeps the state machine
   internally consistent.

   Test (cursor_short_circuits_after_query_error) covers both paths:
   bad SQL (next_batch returns Err, then must Ok(None)), and cancel
   (returns Ok, then next_batch must Ok(None)). assert_returns_within
   wraps each potentially-blocking call in a scoped thread + 3s
   poll-for-finished, so a regression fails with a clear diagnostic
   instead of hanging CI.

PR #140 review comments (12 of 14 addressed in this commit):

- Stale module / type docs across binds.rs, column.rs, decoder.rs,
  mod.rs, transport.rs - now reflect the fact that VARCHAR / BINARY /
  GEOHASH / DECIMAL128/256 / arrays are decoded, FLAG_ZSTD and Gorilla
  temporals are supported, and TLS / Reader / Cursor / BatchView are
  shipped. binds.rs Geohash precision_bits range corrected from
  (1..60) to (1..=60).
- config.rs: error for compression={zstd,auto} without
  compression-zstd feature now reports the user-facing token
  (compression=auto) instead of the wire token (zstd,raw).
- reader.rs: next_request_id wrap-skips 0 and negatives. Astronomically
  unlikely on a single connection but keeps request_id strictly
  positive.
- decoder.rs: col_count is capped at 4096 at the varint read site,
  guarding every downstream Vec::with_capacity(col_count) against an
  OOM from a hostile or corrupted varint.
- server_event.rs: decode_frame now rejects header.table_count != 1
  for RESULT_BATCH and != 0 for every other kind, catching frame /
  msg-kind drift up front.
- egress_live_server.rs: decimal comment at the bind_decimal128 test
  fixed (mantissa 1234567 at scale 4 is 123.4567, not 12.34567).
  zstd_compressed_multi_batch downgrades the compressed_batches > 0
  hard-assert to an eprintln warning - the FLAG_ZSTD decode path is
  exercised independently by encoder unit tests, and a hard fail
  here would couple the test to QuestDB's per-batch compression
  heuristic.

Two comments deliberately not actioned: .gitmodules branch=master is
intentional given the JDK 25 CI tracking work, and Reference-mode
schema validation is already done by the caller at decoder.rs:402-410
so an in-function check would be duplicate work. Both threads
resolved on the PR with that reasoning.

Verified locally with JDK 25 on the questdb submodule:
  - 369/369 lib unit tests pass (sync-reader-ws feature)
  - 79/79 live-server tests pass (live-server-tests feature)
  - clippy clean with --features almost-all-features --tests
    -- -D warnings

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
kafka1991 and others added 26 commits May 19, 2026 16:31
…apper

Cursor::Drop emits a CANCEL frame and then a Close frame on the same
socket (egress/reader.rs:1855-1862 -> transport.rs:362,416-423). On
Linux, if the peer has gone away between the two writes, the first
write_all consumes sk_err and returns ECONNRESET; the second write_all
hits the clean-sk_err / sk_shutdown path in tcp_sendmsg and raises
SIGPIPE. On macOS the first send already SIGPIPEs. The failover replay
path (re-issued QUERY_REQUEST followed by CREDIT frames on a freshly-
opened-but-then-dead socket) has the same multi-write shape.

Pure-Rust binaries are shielded by std's startup SIG_IGN, but the FFI
cdylib loaded into a C/Python/etc. host inherits the host's SIGPIPE
disposition - typically SIG_DFL, which kills the process. The C++ mock
server hit this on macOS and was fixed in 7239e5d (QWP_MSG_NOSIGNAL +
set_no_sigpipe); this commit mirrors that on the Rust client side.

NoSigpipeTcp (egress/ws/nosigpipe.rs) wraps the connected TcpStream:
- Linux/Android: every write routes through send(2) with MSG_NOSIGNAL.
- macOS/iOS/*BSD: setsockopt(SO_NOSIGPIPE, 1) once at construction.
- Windows/other: pass-through (no equivalent signal exists).

The wrapper is plumbed in transport.rs::connect_to so both Stream::Plain
and the rustls::StreamOwned<_, NoSigpipeTcp> arm route writes through
it. On platforms with SO_NOSIGPIPE the option lives on the kernel
socket, so TcpStream::try_clone inherits it without a second
setsockopt.

Verification:
- cargo fmt --manifest-path questdb-rs/Cargo.toml
- cargo clippy --manifest-path questdb-rs/Cargo.toml --tests
- cargo clippy --manifest-path questdb-rs/Cargo.toml --tests
  --features almost-all-features: clean
- cargo clippy --manifest-path questdb-rs-ffi/Cargo.toml --tests
  --features confstr-ffi: clean
- cargo test --features almost-all-features --lib: 1559 passed
- cargo test --features almost-all-features --test egress_failover:
  59 passed

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two coverage gaps in the line_reader FFI surface, bundled because both
came out of the same review pass and exercise the same mock harness.

(1) Of 27 line_reader_query_bind_* exports, only bind_varchar was
    exercised from cpp_test. The 19 ffi_bind_method! instantiations
    and the explicit binds had Rust-side wire-format tests but no
    C-side ABI test - a wrong parameter order or sign-extension in a
    single bind would silently corrupt the captured QUERY_REQUEST
    bytes with no signal in CI.

    New TEST_CASE("mock: every supported bind variant marshals through
    the FFI ABI") in cpp_test/test_line_reader_mock.cpp drives 24 of
    27 bind variants in one query (every Phase-1-bindable type),
    captures the QUERY_REQUEST off the mock, and asserts each byte
    of the bind payload against a hand-computed expected vector.
    Sentinel values per bind so a wrong argument order, sign extension,
    or off-by-one width on any single bind produces a localised byte
    diff (verified by deliberately flipping one byte - "bind payload
    mismatch at byte 64"). The three Phase-1-rejected paths
    (bind_binary, bind_ipv4, bind_null_binary, bind_null with ipv4
    kind) cannot produce wire bytes - their ABI shape is checked at
    compile time via the wrapper inlines, and the rejection is
    exercised by existing Rust unit tests in egress/binds.rs.

(2) Reader's stat getters take &self and are documented as safe to
    call concurrently from a monitor thread while another drives a
    cursor (reader.rs:140-150). Commit 7239e5d's mention of
    "assert_send::<Reader>() / assert_sync::<Reader>()" was an
    offhand review check, not committed code - a future Rc/RefCell
    field would silently flip Reader to !Send/!Sync with no signal.
    No test actually migrated Reader to a worker thread while
    polling stats from main; the FFI's UnsafeCell<Reader> +
    Arc<ReaderStats> design existed entirely in commentary.

    - egress/reader.rs: const _ block pinning Reader / ReaderStats /
      HostHealthTracker to Send + Sync. A regression breaks
      compilation. Verified by adding PhantomData<Rc<()>> temporarily
      - assertion fires with "the trait Send is not implemented for
      Rc<()>" pointing at the bound.
    - tests/egress_failover.rs:
      reader_migrates_to_worker_thread_with_concurrent_stats_polling
      runs three sequential queries on a worker thread while main
      polls the cloned Arc<ReaderStats>. Asserts counter monotonicity
      per poll and post-join final_bytes >= max_observed_bytes so a
      broken store-Release on the done flag surfaces.
    - cpp_test/test_line_reader_mock.cpp: mirror test driving 32
      RESULT_BATCH frames via the FFI. Worker calls reader.execute()
      and drains; main hammers bytes_received() / read_ns() /
      decode_ns() / credit_granted_total() on the same wrapper.
      Sound because the C-side line_reader struct keeps the
      UnsafeCell<Reader> and the Arc<ReaderStats> in disjoint fields,
      and the FFI getters use ptr::addr_of! to avoid an intermediate
      &line_reader reborrow that would otherwise cover the cell.
      Picks up TSan if/when wired into QUESTDB_SANITIZE.

Verification:
- cargo fmt + clippy --tests --features almost-all-features: clean
- cargo test --features almost-all-features --test egress_failover:
  60 passed (was 59)
- build/test_line_reader_mock: 69 cases / 5167 assertions, was 67
- 3x rerun of the new C++ thread test: 2712 / 5412 / 7297 polls,
  no flake
- Deliberate byte flip in the bind ABI test produced a localised
  diff: "bind payload mismatch at byte 64"

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a public Sender::qwp_ws_totals() snapshot mirroring the Java
QwpWebSocketSender's getTotal*() counters (frames_sent, acks,
reconnect_attempts, reconnects_succeeded, server_errors). Counters live
on QwpWsPublicationStore so both background and manual progress modes
see them; bumps happen in record_sent_event, apply_response (Ack /
DurableOk / DurableAck / Reject), finish_reconnect_success, and a new
store.record_reconnect_attempt() called from reconnect_with_policy +
continue_reconnect.

Also adds system_test/failover_clients/src/bin/qwp_sidecar.rs -- a
line-protocol Rust port of com.questdb.e2e.QwpSidecarMain so the
questdb-enterprise e2e harness can drive the Rust sender via the same
stdin/stdout protocol it already uses for the Java sender. The binary's
STATS verb reads qwp_ws_totals().

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the client-side half of a cross-repo CI flow that runs the Rust
qwp_sidecar against a real QuestDB Enterprise primary. Two pieces:

  - system_test/enterprise_e2e/: a self-contained pytest package
    (sidecar adapter, fixtures, the failover test, pytest markers
    c_client / c_client_rust). It registers questdb-enterprise's
    lib.shared_fixtures plugin via PYTHONPATH so the Enterprise
    harness primitives (server_factory, sidecar, scenario_dir,
    obj_store) are reused without filesystem-coupling. Future C / C++
    bindings drop in as sibling sub-binding markers (c_client_c,
    c_client_cpp) without touching anything else.

  - ci/run_tests_pipeline.yaml: a new TriggerEnterpriseCClientE2E
    job that, on every non-fork PR build, POSTs to the Azure DevOps
    REST API to queue the `build-and-test-e2e-c-client` pipeline in
    the questdb-enterprise project with templateParameters
    cClientCommit=$(Build.SourceVersion),
    cClientPrNumber=$(System.PullRequest.PullRequestNumber), and
    clientBranch=<source branch>. Fire-and-forget; the Enterprise
    pipeline posts a GitHub status (`enterprise-e2e-c-client`) back
    to the PR. Naming scheme keys every artefact on a per-repo
    prefix (c_client, future py_client, etc.) so sibling client
    repos don't collide.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Microsoft-hosted queue runs consistently deeper than incus (33 vs
10 at last check). The dispatch job is a ~10-second curl+jq POST with
no compute needs, so it wastes nothing on the self-hosted pool but
gets picked up much faster. curl + jq are already on the hetzner
image (the Enterprise ReportToOssPr stage uses both).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…itiving

CI build for PR #140 fails on every linux job (linux/linux-stable/
linux-nightly, both `build/` and `build_CXX20/`) with
`-Werror=maybe-uninitialized` raised against `std::variant`'s union
storage during the move-construction triggered by
`std::vector::push_back(qm::ActionXxx{})`. GCC 13's flow analysis can't
prove which variant alternative is active inside the move-ctor and
flags padding bytes of inactive alternatives (`ActionSendExecDone`,
`ActionSendCacheReset`, …) as possibly uninitialized — a known false
positive on `std::variant` move-ctor analysis. Other TEST_CASEs in this
file use `Script s = { … }` (initializer-list ctor; alternatives
constructed inline) and don't trip it; the new concurrent-stats test
needed a loop and so used push_back.

Switching to `emplace_back(qm::ActionXxx{})` forwards the rvalue
straight to `qm::Action`'s converting variant constructor, constructing
in-place in the vector slot — no variant move-ctor is invoked, so the
union-storage analysis path is never reached. Also pre-`reserve` the
vector so any future change can't reintroduce a relocation walk through
the same move path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Enterprise pipeline registration lives in a separate Azure DevOps
project ('questdb-enterprise') from this one ('questdb'). Replace the
$(System.TeamProject) default in enterpriseProject with the literal
project name so the REST API call goes to the right place. Comment
also updated to spell out the cross-project Queue-builds permission
the Enterprise side needs to grant.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The c-questdb-client and questdb-enterprise pipelines live in
different ADO projects (`questdb` vs `questdb-enterprise`), and
System.AccessToken can't see cross-project pipelines APIs without
project-scoped-auth being disabled on the source + cross-project
Build Service permissions granted on the target. PAT auth side-steps
both: a single secret pipeline variable (ENT_DISPATCH_PAT, created
in the Enterprise ADO project with Build read-and-execute scope)
authorises the lookup + queue calls directly.

Fail fast with a clear message if the variable isn't set, so a
missing secret produces an actionable error rather than a 401 from
the REST call. Fork-PR skip logic is unchanged but the comment now
says secret variables (not just System.AccessToken) are what's
unavailable on forks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Surface Disconnected / Retrying / Reset / GaveUp phases of mid-query
failover via a single callback. Closes the visibility gap where
on_failover_reset only fired after a successful reconnect — SLO
dashboards now see the outage when it happens, per-retry telemetry
is available, and "gave up" is observable without polling. Either
on_failover_reset or on_failover_progress being installed opts the
cursor in to replay-after-data-delivered.

Wired through Rust core, FFI, C/C++ headers, and the system_test
failover_client so the real-server CI lane exercises every phase.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Integrates the vi_egress egress/ingress refactor with the pipelined
reader. Conflicts resolved in three files:

- egress/reader.rs: merged the two divergent failover-reconnect
  changes. vi_egress added an `on_attempt: &mut dyn FnMut(u32)`
  per-attempt progress hook (feeding `on_failover_progress`); the
  pipelined branch split the function into a cancellable variant
  (`abort_tick` + `abort_check`, for worker shutdown/cancel). The
  merged `reconnect_with_failover_cancellable` carries both; the
  `reconnect_with_failover` wrapper and the sync `Cursor` caller pass
  `on_attempt`, while `pipelined_internals` passes a no-op. Kept the
  `FailoverEvent::failed_request_id` field/snapshot alongside the new
  `FailoverProgressEvent` phases, and adopted vi_egress's
  `has_replay_aware_callback` naming for `would_silently_duplicate`
  (now gated on on_failover_reset OR on_failover_progress).
- egress/mod.rs: kept both the pipelined_reader exports and the
  expanded reader exports (FailoverPhase, FailoverProgressEvent).
- ci/run_tests_pipeline.yaml: kept the "+ pipelined" fuzz-step
  displayName and vi_egress's new enterprise-e2e dispatch job.

Verified: questdb-rs builds + clippy clean + tests pass (1586 lib,
65 failover, 49 doctests, 0 failed) under almost-all-features;
questdb-rs-ffi builds clean under --all-features.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented May 20, 2026

Copy link
Copy Markdown

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 166fe8a9-c536-4cb7-b1b6-48b03484a198

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch ia_pipelined_reader

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@glasstiger glasstiger changed the title feat(core): Pipelined reader feat(core): pipelined egress reader May 20, 2026
Base automatically changed from vi_egress to main May 23, 2026 20:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants