Skip to content

feat: columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress#153

Open
kafka1991 wants to merge 77 commits into
mainfrom
jh_conn_pool_refactor
Open

feat: columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress#153
kafka1991 wants to merge 77 commits into
mainfrom
jh_conn_pool_refactor

Conversation

@kafka1991

@kafka1991 kafka1991 commented Jun 4, 2026

Copy link
Copy Markdown
Collaborator

Summary

This PR is the combined landing of #148 and #150 — two independently-developed columnar I/O tracks that both target QWP/WebSocket and share the same connection pool / SymbolGlobalDict, shipped together so downstream consumers (py-questdb-client, in-tree C/C++ tests) see one self-consistent surface.

#148 — Column-major sender (column_sender)

A DataFrame → Table ingest API. QuestDb connection pool + BorrowedSender + Chunk (per-column Vec<u8> that stacks wire bytes directly) + synchronous flush(AckLevel). Covers bool / signed integers / floats / UUID / Long256 / IPv4 / timestamps / VARCHAR / symbol_dict_{i8,i16,i32} bulk-intern. The connection-scoped SchemaRegistry (FULL / REFERENCE emit modes) and SymbolGlobalDict are shared with the row API, preserving the 1M-per-connection symbol cap on huge Pandas Categorical dicts. Full C ABI (include/questdb/ingress/column_sender.h) and a Criterion bench suite (column path ≈ memcpy ceiling; bulk-intern ~16× faster than per-row HashMap).

#150 — Apache Arrow + Polars integration

Both directions over QWP/WebSocket:

  • Ingress: Buffer::append_arrow / append_arrow_at_column consumes a whole RecordBatch in one call, column-major dense bulk path (one memcpy per column; QWP null bitmap built by byte-stride OR-with-NOT of the Arrow validity buffer when boundaries align, per-row fallback only when bit-offsets are unaligned).
  • Egress: Cursor::as_record_batch_reader() streaming RecordBatch iterator; Polars sub-feature provides the DataFrame bridge.
  • C ABI via the Arrow C Data Interface: line_sender_buffer_append_arrow* and line_reader_cursor_next_arrow_batch, with schema depth capped at 64, row_count capped at 16M and per-column value_data capped at 1 GiB to keep the FFI crate's panic = "abort" profile from aborting on allocator OOM.

Why merged

The two tracks were developed on the same jh_conn_pool_refactor branch and converged on shared infrastructure (connection pool, SymbolGlobalDict, QWP/WS transport). Splitting them at review time would force one to ship behind a compatibility shim for the other; merging them together avoids that churn and gives C/C++ callers — and the upcoming Pandas / Polars wrapper in py-questdb-client — a column-major, zero-redundant-copy path into QuestDB in one cut.

Public surface

See the original PRs' "Public surface" / "What's in the box" sections:

Feature gating

  • questdb-rs: arrow + polars are opt-in features, excluded from almost-all-features; column_sender lives behind sync-sender-qwp-ws.
  • questdb-rs-ffi: arrow feature mirrors.
  • CMakeLists.txt: QUESTDB_ENABLE_ARROW=OFF by default; auto-flipped to ON when QUESTDB_TESTS_AND_EXAMPLES=ON so tests / examples exercise the Arrow path without explicit opt-in.

Test plan

  • Rust unit tests: 57 column_sender + 80+ Arrow
  • FFI unit tests: 8 new column_sender + Arrow path coverage
  • C/C++ tests: test_arrow_c.c / test_arrow_egress.cpp / test_arrow_ingress.cpp wired into CMake and exercised in CI
  • System tests against a live QuestDB: arrow_egress_fuzz / arrow_ingress_fuzz / arrow_round_trip_fuzz / arrow_alignment_fuzz
  • cargo bench --features sync-sender-qwp-ws --bench column_sender
  • End-to-end Pandas / Polars throughput (py-questdb-client, WS-7)

Closes #148, #150.

Summary by CodeRabbit

  • New Features

    • Opt-in Apache Arrow support for Arrow egress and ingest; new column-major sender for high-throughput DataFrame ingestion; Polars integration.
  • Examples

    • Added C, C++ and Rust examples demonstrating Arrow egress/ingest and column-sender/Polars workflows.
  • Documentation

    • Added column-sender ABI spec, implementation plan, and performance guidance.
  • Tests

    • Expanded Arrow/Polars unit, smoke and fuzz coverage; new integration tests.
  • Chores (CI)

    • CI updated to install pyarrow/polars, expanded test/fuzz jobs and longer timeouts.
  • Benchmarks

    • New Criterion benchmarks measuring column-sender performance.

bluestreak01 and others added 30 commits May 24, 2026 01:42
Plan and FFI ABI for the new column-major writer that will ingest
Pandas/Polars DataFrames over QWP/WebSocket. Locks the QuestDb pool
shape, BulkChunk encoder strategy, validity bitmap semantics, and
the C ABI the separate Python wrapper repo will consume.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Locks the column-sender API around synchronous flush:
sender.flush(&mut chunk, ack_level) blocks until the requested ACK
level (Ok = WAL commit, Durable = object-store via Enterprise
opt-in). Drops the FSN/submit/await split from the FFI; at most one
frame in flight per sender, parallelism via the pool.

Refuses sf_dir and other sf_* keys at QuestDb::connect with
ConfigError — store-and-forward is single-writer-per-slot and
interacts awkwardly with pool auto-grow; row-major Sender remains
the SF path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Lands the Rust core, C ABI, and benchmarks for a column-major sender
targeting Pandas/Polars → QuestDB throughput over QWP/WebSocket. See
`doc/COLUMN_SENDER_PLAN.md` for the design and `doc/COLUMN_SENDER_FFI_ABI.md`
for the C ABI spec; both shipped in earlier commits on this branch.

# What's in the box

* **WS-0 — `QuestDb` pool** (`ingress/column_sender/db.rs`,
  `ingress/column_sender/conf.rs`).
  Thread-safe pool with eager-open, fail-fast at `pool_max`,
  `BorrowedSender<'a>` that returns on `Drop`, and a background reaper
  (`pool_reap=auto`, tick = max(5 s, idle_timeout / 12)) that closes
  excess-over-`pool_size` connections. New conf keys: `pool_size`,
  `pool_max`, `pool_idle_timeout_ms`, `pool_reap`. `sf_*` / `sender_id`
  / `qwp_ws_progress=manual` refused at `connect`-time.

* **WS-1 — synchronous `flush` plumbing** (`ingress/column_sender/sender.rs`,
  `ingress/column_sender/encoder.rs`).
  `ColumnSender::flush(chunk, AckLevel)` encodes the chunk, publishes via
  the existing QWP/WS replay queue (`Sender::qwp_ws_publish_raw` —
  pub(crate) escape hatch in the row-API sender), and blocks until the
  ACK watermark crosses the published FSN. Polls in 50 ms slices so a
  `must_close` mid-wait surfaces promptly. `AckLevel::Durable` requires
  `request_durable_ack=on` at connect or returns `InvalidApiCall`.

* **WS-2 — `Chunk` + numeric / fixed-width columns**
  (`ingress/column_sender/chunk.rs`, `validity.rs`, `wire.rs`).
  Per-column wire-shape `Vec<u8>` storage so encode is a header +
  `extend_from_slice` per column. Two code paths per type per the plan
  §2.2:
  - Bool, i8, i16, i32, i64, f32, f64: `null_flag = 0` always; nullable
    rows sentinel-encoded (0 / i32::MIN / i64::MIN / NaN), matching the
    row-API convention.
  - Sparse-null types (uuid, long256, ipv4, ts_nanos, ts_micros,
    date_millis): no-null = `extend_from_slice`; nullable = QWP-shape
    bitmap + dense gather.
  - Designated timestamp (micros or nanos) — exactly one per chunk.
  Connection-scoped `SchemaRegistry`: first emit → FULL; repeat → REFERENCE.

* **WS-3 — VARCHAR** (`Chunk::column_varchar`). Arrow Utf8 in
  (`offsets: &[i32]` length `row_count + 1`, `bytes: &[u8]`); wire out
  is dense `non_null_count + 1` LE-u32 offsets + concatenated bytes.
  No-null path memcpys offsets when `offsets[0] == 0`; nullable path
  walks validity and skips slicing for null rows. Offset validation
  (negative / non-monotonic / past `bytes_len`) caught client-side.

* **WS-4 — symbol bulk-intern**
  (`Chunk::symbol_dict_{i8,i16,i32}`, `encoder::resolve_symbols`).
  Three append-time passes: referenced-bitset + range check; compact
  referenced dict bytes; translate codes to internal indices and build
  the QWP-shape bitmap. Connection-scoped `SymbolGlobalDict` shared
  with the row API's type (`buffer/qwp.rs:next_id/intern/entry`
  promoted to `pub(crate)`). At flush time, only entries the chunk
  actually references reach the wire — protects the 1M-per-connection
  cap on huge Pandas `Categorical` dicts. Roll-back on encode error
  keeps client + server dict views coherent.

* **WS-5 — C ABI** (`questdb-rs-ffi/src/column_sender.rs`,
  `include/questdb/ingress/column_sender.h`).
  Full implementation of `doc/COLUMN_SENDER_FFI_ABI.md`:
  - Opaque handles `questdb_db`, `column_sender`, `column_sender_chunk`.
  - `column_sender_validity` repr-C struct; `column_sender_ack_level`
    repr-C enum.
  - `questdb_db_connect/close/borrow_sender/return_sender/reap_idle`.
  - Every chunk column-append, the VARCHAR + symbol_dict family, the
    two designated-timestamp variants, and `column_sender_flush`.
  - Errors reuse `line_sender_error*`.
  Rust side gains `OwnedSender` — Arc-backed borrow handle the FFI hands
  out as `column_sender*` so the C caller can free `questdb_db*` before
  all borrows return without dangling.

  Hand-runnable smoke test at `cpp_test/smoke_column_sender.c`
  (compiles with `-Wall -Wextra -Werror`; not wired into CMake yet —
  matches the `smoke_line_reader` pattern).

* **WS-6 — bench** (`questdb-rs/benches/column_sender.rs`,
  `doc/COLUMN_SENDER_PERF.md`).
  Three families: per-column append vs raw memcpy baseline; symbol
  bulk-intern vs naïve per-row HashMap; encode_chunk end-to-end (no
  network). First-baseline numbers (Apple Silicon laptop, 100k rows):
    - `column_f64/column_sender_no_null` ≈ 55 GiB/s — matches memcpy.
    - `column_i64/column_sender_no_null` ≈ 54 GiB/s — matches memcpy.
    - `column_varchar/column_sender_no_null` within ~5 % of memcpy.
    - Symbol bulk-intern ~16× faster than naïve per-row HashMap.
    - `encode_chunk/populate_plus_encode` ≈ 139 M rows/s end-to-end.

# Verification

- 57 column-sender tests (Rust core); 8 FFI tests; full 834-test lib
  suite passes.
- `cargo fmt` + `cargo clippy --tests --benches` clean on both crates.
- `cargo doc` introduces no new warnings.
- `cc -std=c11 -Wall -Wextra -Werror -I include` compiles the C header
  and the smoke program.

# What's not in here

- WS-7 (Python wrapper) lives in `py-questdb-client`. With the C ABI
  in `include/questdb/ingress/column_sender.h` and the FFI symbols in
  `libquestdb_client`, that repo can now start consuming.
- A live Pandas→QuestDB end-to-end bench and 1-hour soak — both
  belong in the Python repo / nightly CI rather than the in-tree
  Criterion suite.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rewrite the column-major sender to eliminate intermediate buffers and
pipeline writes for maximum single-connection throughput.

Architecture changes:
- ColumnSender now owns a dedicated ColumnConn (conn.rs) that drives
  socket I/O directly — no replay queue, no background thread, no
  row-API publisher involvement.
- Chunk<'a> holds borrowed descriptors (raw pointers + lengths) into
  the caller's buffers; no per-column Vec<u8> staging. The encoder
  writes wire bytes straight from caller memory into the connection's
  reusable write_buf at flush time.
- flush() pipelines: encode + WS-mask + write_all, then drain acks
  non-blocking. Blocks only when in-flight hits the 128-frame protocol
  cap. New sync(AckLevel) blocks until all acks settle.
- Server cumulative OKs handled correctly (sequence=N acks all frames
  up to N).

API changes:
- flush(&mut chunk, AckLevel) → flush(&mut chunk) (fire-and-forget)
- New sync(AckLevel) drains all in-flight acks
- FFI: column_sender_flush drops ack_level arg; new column_sender_sync
- FFI lifetime contract: caller buffers must outlive flush (no copy)

Performance (5M-row L1 quotes, 9 columns, localhost):
- Encode path: 6 GB/s (2.3% of wall time)
- End-to-end: 350 MB/s pipelined (was 264 MB/s stop-and-wait)
- Per-chunk p50: 0.72 ms (was 2.64 ms)
- Criterion populate+encode: 575 µs (was 718 µs, 20% faster)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The default macOS TCP send buffer (~128 KB) is smaller than a typical
QWP chunk (1.5 MB at 25k rows). write_all blocks mid-frame while the
kernel drains the small buffer. A 4 MiB send buffer lets the kernel
accept a full chunk in one shot, reducing write_all stalls when the
pipeline has multiple frames in flight.

Also sets SO_RCVBUF to 4 MiB to absorb ack bursts from the server
without backpressuring the server's send path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
flush() now sets FLAG_DEFER_COMMIT (0x01) on every QWP frame. The
server appends rows to WAL writers without committing. sync() sends a
commit-triggering empty frame (without the flag) that commits all
accumulated rows in one WAL transaction, then drains acks.

This eliminates per-chunk WAL fsync overhead: 200 chunks × 25k rows
now produce 1 WAL commit instead of 200. The p95 per-chunk latency
drops from ~23 ms to ~7 ms. Old servers that don't recognize the flag
ignore it (reserved bit position) and commit per-message — graceful
degradation per the spec.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The server's ClientSymbolCache only caches symbols with
symbolKey < initialSymbolCount. On a fresh table, initialSymbolCount
stays at 0 until a WAL segment rolls and the watermark updates. By
sending the first frame without FLAG_DEFER_COMMIT, the server commits
it immediately, which allows the next segment to pick up the new
symbol count and enable caching for all subsequent deferred frames.

This is a client-side workaround for a server-side cache limitation.
The proper fix is for the server to cache locally-assigned symbol IDs
within the same segment (see WalColumnarRowAppender.putSymbolColumn).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Align the C ABI, docs, and smoke test with column_sender_flush(sender, chunk, err) plus column_sender_sync(sender, ack_level, err). Reserve an in-flight slot for the sync commit, validate durable ACK opt-in before publishing, and add pool/sync coverage.
Rename the borrowed handle returned from the connection pool from
`column_sender` to `qwpws_conn` so it can host peer writer modes
(per-type today, generic Arrow / NumPy in Steps 2-3, future egress
readers). No behaviour change — the underlying Rust types
(ColumnSender / OwnedSender) keep their names since they're
doc-hidden; only the public C ABI changes.

FFI surface changes:
- struct column_sender             -> qwpws_conn
- questdb_db_borrow_sender         -> questdb_db_borrow_conn
- questdb_db_return_sender         -> questdb_db_return_conn
- column_sender_must_close         -> qwpws_conn_must_close
- column_sender_flush(sender, ...) -> column_sender_flush(conn, ...)
- column_sender_sync(sender, ...)  -> column_sender_sync(conn, ...)

column_sender_chunk and the column_sender_chunk_column_* / _symbol_dict_*
appenders keep their names — the chunk IS the column-sender writer's
accumulator, and flush/sync are operations on it; only the
borrowed-handle parameter type changes.

See plan-conn-pool-and-writers.md in py-questdb-client (Step 1) and
the Slack thread from 2026-05-27 with Victor for the rationale: pool
QWP/WS connections, not writers, so egress readers and Arrow / NumPy
appenders can share the same pool as the existing column_sender chunk
path.

Open Q1 from the plan is answered (chunk.rs:208, encoder.rs:82-95,
encoder.rs:460-466): `column_sender_chunk_column_*` already
direct-writes to the wire buffer — for native-LE contiguous data it
is one `extend_from_slice` per column. So Step 3's NumPy appender is
no longer about "saving an extra memcpy"; it's about avoiding
Python-side widening for narrower dtypes / strided / non-native-endian.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
New entry point that consumes an Apache Arrow C Data Interface
ArrowArray + ArrowSchema pair and dispatches to the existing per-type
chunk methods based on the schema's format string. Caller passes the
borrowed pointers it gets from PyArrow's `_export_to_c` (or any other
Arrow C Data producer); the FFI never constructs or releases the
arrays.

Supported schema formats in this patch:
  - c, s, i, l       int8 / int16 / int32 / int64
  - f, g             float32 / float64
  - b                bool (LSB-first bitmap)
  - u                UTF-8 string (int32 offsets)
  - tsn:..., tsu:... timestamp nanos / micros (timezone suffix ignored)
  - dictionary schemas with c/s/i indices and a UTF-8 value type —
    routed to symbol_dict_i8 / _i16 / _i32

Other formats — including LargeUtf8 (U), decimal, struct, list, and
non-UTF-8 dictionary values — currently return
line_sender_error_invalid_api_call. LargeUtf8 lands in Step 2b.

Constraints:
  - ArrowArray.offset must be 0; sliced arrays are rejected.
  - The chunk's row-count lock applies to the new appender the same
    way as the per-type calls.

The Arrow types are mirrored as #[repr(C)] structs in the Rust FFI
shim so we read them without taking a dependency on the arrow / arrow-
array crate. No new Rust dependencies.

See plan-conn-pool-and-writers.md (Step 2). The Cython-side wiring
(routing pandas Arrow-backed columns through this entry point) lands
in a separate patch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add ColumnKind::VarcharLarge (i64 offsets) + Chunk::column_varchar_large
+ encode_varchar_large. The new encoder reads i64 offsets and writes
u32 LE to the wire frame in one pass — no caller- or Rust-side
intermediate Vec<i32> for the narrowing.

Validation rejects negative offsets, decreasing offsets, offsets
exceeding the bytes buffer, AND any last offset exceeding u32::MAX
(the QWP wire offset table is uint32 LE). The overflow check at
chunk-build time surfaces a meaningful error rather than a per-row
overflow at encode time.

The Arrow appender's `U` format match now routes here. This unblocks
the Python side: pandas large_string columns can be sent without the
Python-side cast to UTF-8 (which previously allocated a fresh Arrow
array via pyarrow.cast).

estimate_frame_size grew a VarcharLarge case identical to Varchar.

questdb-rs 836 lib-tests pass. clippy clean on both crates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extend column_sender_chunk_append_arrow_column with row_offset and
row_count parameters so chunked-emission callers can slice an
ArrowArray without consolidating it first. Required for the Python
Client.dataframe path, which loops over row chunks and currently
slices buffers manually for the per-type appenders.

Per-format slicing:
  - fixed-width primitives + timestamps: data pointer is shifted by
    row_offset elements (`ptr.add(row_offset)`).
  - bool bitmap: shifted by row_offset / 8 bytes; row_offset % 8 == 0
    required (matches the validity bitmap byte-alignment).
  - utf8 / large_utf8: offsets pointer shifted by row_offset
    elements (Arrow offsets are monotonic, so the slice's offsets
    are still well-formed). bytes_len is read from the original
    array's last offset; the encoder rebases on the wire.
  - dictionary symbols: codes pointer shifted; the dictionary is
    shared across chunks unchanged.

Validity bitmap requires row_offset % 8 == 0; with row_offset=0 and
row_count=array.length we get exactly the previous behaviour.

Caller bounds-check: row_offset + row_count must not exceed
array.length.

The C header docs the new parameters; clippy & fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Match the Arrow C Data Interface spec more precisely: `children`
  is `*const *mut ArrowArray` (`struct ArrowArray**` in the spec)
  and `dictionary` is `*mut ArrowArray`. We never mutate, so this
  is layout-equivalent to the previous `*const`/`*const`, but the
  declarations now line up with the spec for readers cross-checking.
- Rename `array_len` -> `array_total_len` in the appender so the
  meaning is unambiguous next to the per-call `row_count` parameter.
- Cross-reference doc comments: the per-type varchar / symbol_dict
  C-ABI entries now mention `column_sender_chunk_append_arrow_column`
  as the recommended path for callers holding an Arrow array, and
  flag the per-type entries as the lower-level building block.

No behaviour change. fmt + clippy clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two correctness findings from the multi-agent review:

1. **encode_varchar_large rejected valid late slices.**
   validate_varchar_offsets_i64 checked the absolute `last` offset
   against u32::MAX, but the encoder narrows `(off - first)` per row.
   A slice taken from the tail of a multi-GiB LargeUtf8 array (e.g.
   base=3 GiB, last=4 GiB) was rejected even though every wire offset
   would be ≤ 1 GiB. Now we validate the *span* `last - first` against
   u32::MAX, with a clearer error message.

2. **Null-pointer deref on malformed Arrow arrays.**
   arrow_buffer<T> returned the raw buffer pointer without checking it
   for null. Callers then unconditionally `slice::from_raw_parts(...)`
   or `*offsets_ptr.add(...)`. A producer presenting length > 0 with a
   null data buffer (spec-violating but plausible from buggy clients)
   would UB before any validation ran.

   Added an `allow_null: bool` parameter. The bytes buffer of an empty
   varchar/symbol-dict array can legitimately be NULL (we already
   guard that downstream), so those three call sites pass `true`. All
   other call sites — offsets, primitives, codes, bool bitmap — pass
   `false` and surface a clean `InvalidApiCall` error instead.

Reviewers: convergent finding from concurrency-code-reviewer (Rust)
and general-purpose (cross-layer) agents.

clippy + fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add Chunk::column_numpy + NumpyDtype enum to questdb-rs, plus the C
FFI wrapper column_sender_chunk_append_numpy_column.

Behaviour, per Step 3 design decisions:
- i8/i16/i32 -> i64 sign-extend (wire = LONG).
- u8/u16/u32 -> i64 zero-extend (wire = LONG).
- i64 -> pass-through (wire = LONG).
- u64 -> i64 bit-reinterpret. Values > i64::MAX wrap to negative on
  the wire, matching the row path's C-cast behaviour.
- f32 -> f64 widen (wire = DOUBLE).
- f64 -> pass-through (wire = DOUBLE).
- bool (NumPy byte-per-row) -> Arrow LSB-first packed bitmap
  (wire = BOOLEAN).

Strided arrays and non-native-endian arrays are not supported in v1;
the caller (Python client) consolidates upstream.

Widening lives in Rust at append time, materialising into a chunk-
owned scratch arena (`Chunk::scratch: Vec<NumpyScratch>`). The
ColumnDescriptor's `*const T` points into the scratch; the encoder
hot path is unchanged. Scratch is cleared on Chunk::clear / drop.

The scratch enum uses typed variants (Box<[i64]>, Box<[f64]>,
Box<[u8]>) so the storage alignment matches the encoder's read
alignment.

questdb-rs 836 lib-tests pass. clippy + fmt clean on both crates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Round-3 dirty-sender fix (option c from
plan-conn-pool-and-writers.md): expose a new FFI that callers use in
error-recovery paths to force-close a conn instead of recycling it.

The problem: a mid-call flush failure left a conn with in-flight
uncommitted frames in the pool. The next borrower's first flush is
QWP's "immediate commit", which would commit the stale frames
alongside their own.

The fix exposes a single new entry point:

  void questdb_db_drop_conn(questdb_db* db, qwpws_conn* conn);

semantically equivalent to "mark must_close, then return" but in one
atomic step. The conn enters the terminal state and the pool drops
it on return rather than recycling it.

Implementation:
- ColumnConn gains `mark_must_close(&mut self)` (pub(crate)).
- ColumnSender gains `mark_must_close(&mut self)` (pub) that
  forwards to ColumnConn.
- The FFI wraps these: questdb_db_drop_conn marks then drops.

The existing `qwpws_conn_must_close()` getter is unchanged; this
adds the corresponding setter at each layer.

clippy + fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three small hardening tweaks:

1. **Tighten format dispatch.** The Arrow C Data Interface only uses
   a `:`-prefixed parameter on timestamp / date / time formats;
   everything else is a single character. Previously
   `column_sender_chunk_append_arrow_column` did
   `format.split(':').next()` and dispatched on the prefix, which
   would spuriously match e.g. a malformed `"u:foo"` to the varchar
   arm. Exact-match the non-ts arms and use `starts_with("tsn:")` /
   `starts_with("tsu:")` for the ts arms.

2. **Accept `null_count == -1` with NULL bitmap as "no nulls".**
   pyarrow / polars emit this shape when the column has no nulls
   (the spec's "unknown" interpretation). We treat it as no-nulls;
   the encoder reads the data buffer densely. Only `null_count > 0`
   with a NULL bitmap is malformed.

3. **Guard `dict_array.length < 0`.** The main array's negative
   length is already rejected in
   `column_sender_chunk_append_arrow_column`; mirror the same check
   inside `arrow_dictionary_utf8` for symmetry.

clippy + fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…refactor

# Conflicts:
#	questdb-rs/src/ingress.rs
Extends the column-sender pool to also serve egress readers from one
shared `questdb_db` configured by a single conf-string. Lazy-init for
readers, eager for writers, same `pool_size` / `pool_max` /
`pool_idle_timeout_ms` / `pool_reap` budget.

- questdb-rs/db.rs: parallel reader free-list, `borrow_reader_owned`,
  `ReaderPoolHandle`, `OwnedReader::mark_must_close`, integrated into
  the reaper. All reader-side state and methods feature-gated under
  `_egress` so the default build (no egress) stays lean.
- questdb-rs/egress/config: reader conf-string parser accepts the
  `qwpws::` / `qwpwss::` schemes and ignores `pool_*` keys, so a
  single conf-string drives both the sender and reader pools without
  translation.
- questdb-rs-ffi/egress: `line_reader` becomes a named struct with a
  `ReaderOwnership` enum (Standalone vs Pooled{handle, must_close});
  pool borrow/return + `line_reader_mark_must_close` exposed in C.
- column_sender.rs: `questdb_db(pub(crate) QuestDb)` so the egress
  FFI can reach the inner pool to wire reader borrows.
- Headers: reader-pool entry points live in `egress/line_reader.h`
  next to the type they wrap; `ingress/column_sender.h` points
  there.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 4, 2026

Copy link
Copy Markdown

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds an opt-in Arrow build flag and CI deps; implements Arrow C Data Interface ingress/egress FFI and feature-gated C++/Rust adapters; introduces a column-major QWP/WebSocket sender (pool, conn, encoder) with Rust and C FFI surfaces; adds examples, tests, benchmarks, and docs.

Changes

Arrow support and column-major sender end-to-end

Layer / File(s) Summary
Build and CI feature wiring
CMakeLists.txt, ci/*
Adds QUESTDB_ENABLE_ARROW, wires Cargo arrow feature, appends Arrow/Polars CI deps, updates CI test/fuzz matrices and timeouts.
C++ mock & egress tests
cpp_test/*
Adds qwp_mock_c wrapper, handshake negotiation, Arrow egress doctests covering formats, schema-drift, and release semantics.
Docs: plan & perf
doc/COLUMN_SENDER_PLAN.md, doc/COLUMN_SENDER_PERF.md
Adds design plan and benchmark/perf notes for column-major sender.
Arrow examples (C/C++)
examples/*_arrow.*
Adds C/C++ line-reader and line-sender Arrow examples demonstrating export/import and flush flows.
Public C/C++ headers
include/questdb/*
Adds/extends column_sender.h, augments line_reader.h/hpp with Arrow guards, new error codes, and reader-pool APIs.
Rust FFI bootstrap & Arrow validation
questdb-rs-ffi/*
Adds column_sender FFI module, Arrow pre-validation/import helpers, ABI error mappings, and FFI tests.
Rust benches & examples
questdb-rs/benches, questdb-rs/examples/*
Adds Criterion benches and Polars/quote-streaming examples.
Rust egress Arrow & Polars runtime
questdb-rs/src/egress/arrow/*
Implements DecodedBatch→RecordBatch conversion, schema utilities, Cursor Arrow/Polars adapters, and tests.
Rust ingress column-sender runtime
questdb-rs/src/ingress/column_sender/*
Implements Chunk model, encoder, WS ColumnConn, QuestDb pool, pooling/reaper, connection ACK sync, Numpy/Arrow deferred handling, and unit tests.
C ABI spec & FFI implementation
doc/COLUMN_SENDER_FFI_ABI.md, include/questdb/ingress/column_sender.h, questdb-rs-ffi/src/column_sender.rs
Adds full ABI spec, C header, and Rust FFI entrypoints (connect/borrow/chunk/append/flush/sync) with validation and tests.

Estimated code review effort
🎯 5 (Critical) | ⏱️ ~120 minutes

Suggested reviewers

  • bluestreak01
  • amunra

"A rabbit taps frames in a whispering queue,
Bits don Arrow wings, and swiftly they flew.
Columns march neatly, through sockets they send,
ACKs hop back lightly—beginning to end.
Polars and Arrow, we dance in the night,
Questing for data, in streams trimmed and tight. 🐇✨"

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch jh_conn_pool_refactor

@kafka1991 kafka1991 changed the title feat: support polars and pandas ABI feat: columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress Jun 4, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (8)
CMakeLists.txt (1)

396-398: 💤 Low value

Misleading comment: no fatal_error gate for Arrow.

The comment references a "fatal_error gate" that forces QUESTDB_ENABLE_ARROW=ON, but no such gate exists. Lines 51-55 have a FATAL_ERROR for QUESTDB_ENABLE_READER, not Arrow. Arrow is silently auto-enabled at lines 89-91 via message(STATUS) + set().

Consider updating the comment to accurately describe the auto-enable mechanism.

📝 Suggested comment fix
-    # Apache Arrow C Data Interface tests. The fatal_error gate above
-    # forces QUESTDB_ENABLE_ARROW=ON when tests are enabled, so these
-    # always build alongside the rest of the suite.
+    # Apache Arrow C Data Interface tests. QUESTDB_ENABLE_ARROW is
+    # auto-enabled (lines 89-91) when tests are enabled, so these
+    # always build alongside the rest of the suite.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@CMakeLists.txt` around lines 396 - 398, The comment incorrectly claims a
"fatal_error gate" forces QUESTDB_ENABLE_ARROW=ON; instead update the comment
near the Apache Arrow C Data Interface tests to state that QUESTDB_ENABLE_ARROW
is auto-enabled via the CMake logic that emits message(STATUS) and calls
set(QUESTDB_ENABLE_ARROW ON) (the auto-enable mechanism), not via any
FATAL_ERROR gate like the one used for QUESTDB_ENABLE_READER; reference
QUESTDB_ENABLE_ARROW and the message(STATUS)/set() auto-enable behavior when
rewriting the comment.
questdb-rs-ffi/src/column_sender.rs (1)

1-1700: Remember to run cargo fmt and clippy before commit.

As per coding guidelines, before every commit run:

  • cargo fmt --manifest-path questdb-rs-ffi/Cargo.toml
  • cargo clippy --manifest-path questdb-rs-ffi/Cargo.toml --tests (without -D warnings)

The implementation looks solid: input validation is comprehensive, Arrow ownership handling is correct, and the FFI surface properly handles all error cases.

As per coding guidelines for **/*.rs and **/Cargo.toml files.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questdb-rs-ffi/src/column_sender.rs` around lines 1 - 1700, You need to
format and lint the Rust crate before committing: run `cargo fmt --manifest-path
questdb-rs-ffi/Cargo.toml` and then `cargo clippy --manifest-path
questdb-rs-ffi/Cargo.toml --tests` and fix any clippy warnings (do not deny
warnings). Focus fixes around the FFI surface in column_sender.rs (e.g.
functions like column_sender_chunk_new, column_sender_chunk_append_numpy_column,
resolve_numpy_dtype and column_sender_flush) so formatting and lints are clean
before pushing.
cpp_test/test_arrow_c.c (1)

52-70: ⚡ Quick win

Consider surfacing initialization failures in test helpers.

Both make_table and make_col swallow line_sender_table_name_init / line_sender_column_name_init errors and return the structure regardless. If initialization fails, the returned struct may be in an undefined state. While this is test code and may be intentional for negative test cases, consider one of:

  1. Add a comment documenting this is intentional for tests
  2. Add a boolean out-param to signal failure
  3. Return a sentinel value or use a different pattern for error cases
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cpp_test/test_arrow_c.c` around lines 52 - 70, The helpers make_table and
make_col currently swallow initialization errors from
line_sender_table_name_init and line_sender_column_name_init and return
potentially-uninitialized structs; change both functions to detect if err is
non-NULL, log or print the error, free the error, and fail-fast (e.g., fprintf
to stderr and exit or assert) so tests do not proceed with invalid values —
update make_table and make_col to free err and abort on error instead of
silently returning; include a brief comment above each helper explaining the
fail-fast behavior.
questdb-rs/src/egress/arrow/convert.rs (1)

289-326: 🏗️ Heavy lift

Large* mismatch isn’t produced by the egress schema, but the array builders still ignore width/container type
questdb-rs/src/egress/arrow/schema.rs (and polars.rs) constructs DataType::Utf8 / DataType::Binary and DataType::List (not LargeUtf8 / LargeBinary / LargeList), and the Arrow tests assert List(...)—so the specific RecordBatch::try_new rejection described for Large* types shouldn’t occur in the normal egress path.

However, varlen_string_array() / varlen_binary_array() and nest_lists() ignore the provided field/container width and always build Utf8/Binary and ListArray with i32 offsets (e.g., DataType::Utf8/DataType::Binary, offsets_i32, DataType::List in nest_lists). If any alternate caller ever passes Large* types into these helpers, the produced array types would not match the schema. Consider threading/handling the field/requested DataType (choose Large* + i64 offsets) or constraining/guarding these helpers accordingly.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questdb-rs/src/egress/arrow/convert.rs` around lines 289 - 326, The helpers
varlen_string_array, varlen_binary_array (and nest_lists) ignore the Field's
DataType and always build Utf8/Binary and i32-offset List arrays, which will
mismatch if a LargeUtf8/LargeBinary/LargeList is passed; update these helpers to
inspect the provided field.type() and branch: for
DataType::Utf8/DataType::Binary/DataType::List use i32 offsets and current
builders, and for DataType::LargeUtf8/DataType::LargeBinary/DataType::LargeList
use the Large variants with i64 offsets (use offsets_i64 or equivalent and
construct LargeUtf8/LargeBinary/LargeList ArrayData), or alternatively add an
explicit guard that returns an error if a Large* type is supplied. Ensure you
reference and switch on the Field's DataType in varlen_string_array,
varlen_binary_array and nest_lists so produced ArrayData matches the schema.
questdb-rs/src/egress/decoder.rs (1)

798-812: ⚡ Quick win

Add boundary tests for the new per-width decimal scale guard.

This change introduces a new protocol constraint, but there isn’t a targeted test pinning the DECIMAL64 boundary (e.g. scale=18 accepted, scale=19 rejected). A focused test here would protect this behavior from silent regressions.

Suggested test additions
+    #[test]
+    fn decode_decimal64_scale_boundary_enforced() {
+        // scale=18 should pass, scale=19 should fail for DECIMAL64.
+        let ok = vec![0x00u8, 18u8];
+        let bad = vec![0x00u8, 19u8];
+        // ...build 1-row DECIMAL64 payloads and assert:
+        // ok => decode_result_batch(...).is_ok()
+        // bad => ProtocolError containing "DECIMAL64 scale"
+    }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questdb-rs/src/egress/decoder.rs` around lines 798 - 812, Add unit tests that
explicitly exercise the new per-width decimal scale guard by constructing
DECIMAL values with widths that map to the per_width_max logic (e.g., width 8 =>
per_width_max 18 for DECIMAL64) and asserting acceptance at the boundary (scale
= 18) and rejection just above it (scale = 19). Target the code paths that
compute per_width_max and return the ProtocolError with the message "DECIMAL{}
scale {} exceeds per-width maximum {}", using the same input construction used
by the decoder (invoke the decoder entry that handles DECIMAL widths or send a
serialized DECIMAL packet into the decoding routine) so tests fail if
per_width_max enforcement changes. Ensure tests cover at least width=8
(DECIMAL64) and one other width branch (e.g., width=16 or 32) to lock in
behavior across branches.
doc/COLUMN_SENDER_PLAN.md (1)

172-175: ⚡ Quick win

Keep the “new module” list aligned with the implemented tree.

This section lists sender.rs, validity.rs, and error.rs, but this PR’s actual module set under questdb-rs/src/ingress/column_sender/ differs. Updating this list will prevent implementation drift in follow-up tasks.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@doc/COLUMN_SENDER_PLAN.md` around lines 172 - 175, Update the "New module"
list in doc/COLUMN_SENDER_PLAN.md so it exactly matches the implemented module
tree under questdb-rs/src/ingress/column_sender/ (use the actual filenames
present, e.g., include or remove sender.rs, validity.rs, error.rs as
appropriate) and also update the re-export line
(questdb::ingress::column_sender::{QuestDb, ColumnSender, Chunk, Validity}) to
reflect the real public items implemented in db.rs, sender.rs, chunk.rs,
validity.rs, encoder.rs, error.rs; ensure the doc’s module list and re-exports
are synchronized with the codebase to prevent future drift.
questdb-rs/src/egress/config.rs (1)

589-595: ⚡ Quick win

Add explicit tests for qwpws / qwpwss aliases.

This parser branch is new behavior and should be locked with direct tests asserting tls and URL scheme mapping for both aliases.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questdb-rs/src/egress/config.rs` around lines 589 - 595, Add explicit unit
tests that lock the new branch handling the "qwpws" and "qwpwss" aliases: create
two tests (e.g., test_qwpws_alias and test_qwpwss_alias) that call the same
parser/constructor used by the egress config (the function that interprets the
scheme and returns the tls flag and normalized URL scheme) and assert that
"qwpws" yields tls = false and normalizes to "ws", and that "qwpwss" yields tls
= true and normalizes to "wss"; place them in the existing config/egress tests
module so future changes to the branch will break the tests if behavior changes.
questdb-rs/src/ingress/column_sender/conn.rs (1)

422-433: 💤 Low value

set_timeouts is a no-op and may leave stale socket timeouts.

The method accepts timeout parameters but discards them. The comment mentions exposing a setter on WsStream for long flushes, but currently any timeout refresh is silently skipped. If the socket's initial timeouts become inappropriate for long frame writes, this could lead to unexpected WouldBlock or timeout errors.

Consider either implementing the timeout refresh or removing the dead parameters to avoid confusion.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questdb-rs/src/ingress/column_sender/conn.rs` around lines 422 - 433, The
set_timeouts function currently ignores its read/write parameters, which can
leave stale socket timeouts; update set_timeouts (in conn.rs) to apply the
provided read and write Durations to the underlying TCP socket obtained from the
WsStream accessor (use the tcp_stream accessor used elsewhere) by calling
set_read_timeout and set_write_timeout (converting None to Ok(None) semantics)
and return an error if either syscall fails, or alternatively remove the unused
parameters and document that timeouts are fixed — pick one: implement timeout
refresh by setting TcpStream::set_read_timeout/set_write_timeout and propagate
io::Error as Err, or remove the read: Option<Duration>, write: Option<Duration>
params from set_timeouts and callers, and update comments referencing
WsStream/establish_connection in qwp_ws.rs accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@cpp_test/smoke_column_sender.c`:
- Around line 76-80: The test uses the wrong type and API names from the header:
replace all occurrences of column_sender and
questdb_db_borrow_sender/questdb_db_return_sender with the actual names defined
in column_sender.h (use qwpws_conn* as the variable type and call
questdb_db_borrow_conn()/questdb_db_return_conn() instead); update the variable
name (e.g., sender -> conn) and all corresponding borrow/return call sites
mentioned (around the blocks at 76, 86, 98-99, 110, 133, 147, 155, 163, 169) so
the types and function names match the header signatures (qwpws_conn*,
questdb_db_borrow_conn, questdb_db_return_conn) and ensure all cleanup paths
call questdb_db_return_conn where sender cleanup was previously used.

In `@cpp_test/test_arrow_ingress.cpp`:
- Around line 35-38: The test uses Arrow-only symbols (qdb::column_sender_conn,
column_sender_flush_arrow_batch / column_sender_flush_arrow_batch_at_column) but
the TU is compiled without QUESTDB_CLIENT_ENABLE_ARROW; guard the Arrow-specific
test sections (the blocks using qdb::column_sender_conn and calls to
flush_arrow_batch/flush_arrow_batch_at_column) with `#ifdef`
QUESTDB_CLIENT_ENABLE_ARROW ... `#endif` (or define QUESTDB_CLIENT_ENABLE_ARROW
for this TU), and apply the same guards to the other indicated ranges (lines
around 48-51, 61-64, 213-236, 266-282, 639-642) so compilation only includes
these symbols when Arrow support is enabled.

In `@doc/COLUMN_SENDER_PLAN.md`:
- Line 131: The markdown fenced code block in COLUMN_SENDER_PLAN.md is untyped
and triggers MD040; update its opening fence from ``` to include a language tag
(e.g., ```text) so the block is typed. Locate the unnamed fenced block and
change the opening fence to include the language specifier (suggested: "text")
while leaving the closing fence as-is.

In `@examples/line_sender_cpp_example_arrow.cpp`:
- Around line 29-34: The AppendValues and Finish calls on the Arrow builders
(ts_b.AppendValues, price_b.AppendValues, ts_b.Finish, price_b.Finish) currently
call .ok() and ignore the returned Status; modify the code to check the returned
Status for each AppendValues and Finish call, handle errors (log and
return/exit) when Status.IsOk() is false, and avoid using ts_arr or price_arr if
Finish failed; ensure each failure path cleans up or aborts before proceeding to
use the arrays.

In `@include/questdb/ingress/line_sender.hpp`:
- Around line 1811-1821: The code currently throws for sender_protocol ==
protocol::qwpws/qwpwss in new_buffer(), breaking existing C++ callers and the
empty-buffer fallback used by flush_and_get_fsn / flush_and_keep_and_get_fsn /
flush_and_keep[_with_flags]; instead of throwing, restore a row-buffer path for
WebSocket senders: remove the throw and map qwpws/qwpwss to a compatible backend
(e.g., reuse line_sender_buffer::_backend_kind::ilp or add a new backend like
qwp_ws_row) so line_sender_buffer construction still succeeds, and ensure the
existing empty-buffer fallback logic in flush_and_get_fsn and
flush_and_keep_and_get_fsn continues to work for WebSocket protocols.

In `@questdb-rs/src/egress/arrow/schema.rs`:
- Around line 228-246: The loop over shape_offsets.windows(2) currently only
checks dims > shapes.len(), which misses cases where w[1] (end) is out of bounds
but end-start is small; update the check in the block handling shape_offsets to
validate absolute bounds first: verify both w[0] and w[1] are <= shapes.len()
(and keep the existing monotonicity check), and if either index is out of range
return the ProtocolError used elsewhere (same fmt! message but referencing the
absolute bound failure); then compute dims = (w[1] - w[0]) as usize and proceed
as before, removing the dims > shapes.len() condition.

In `@questdb-rs/src/ingress.rs`:
- Around line 2474-2487: build_qwp_ws_raw_stream currently omits the same config
validation that build() performs, allowing an incompatible combination (manual
progress + initial_connect_retry=async); to fix, invoke the same validation
sequence used in build(): call qwp_ws.apply_reconnect_implies_initial_retry()
and then reject_unsupported_qwp_ws_sf_config(&qwp_ws)? at the start of
build_qwp_ws_raw_stream (before establishing the connection) so the function
rejects the unsupported config the same way as build().

In `@questdb-rs/src/ingress/column_sender/conf.rs`:
- Around line 159-161: The wildcard match arm currently lets unknown keys pass
through silently; change it so keys starting with "pool_" are rejected instead
of passed to SenderBuilder. In the match fallback (the `_ => { ... }` arm)
detect if the key starts_with("pool_") and return an Err (e.g., a
ConfigError::UnknownKey or UnknownPoolKey with the offending key) so typoed
pool_* settings surface as errors; leave non-pool passthrough behavior for other
keys to continue being handled by SenderBuilder.

In `@questdb-rs/src/ingress/column_sender/conn.rs`:
- Around line 268-281: The function try_drain_acks currently increments drained
for every response from try_recv_qwp_response, but the docstring promises
"number of OK acks consumed"; update the counting logic so you only increment
drained for OK ack responses: when you get Some(response) from
try_recv_qwp_response, inspect the response variant (e.g., differentiate OK vs
DurableAck) and call process_response(response) as before but only increment
drained for the OK variant; if inspecting the variant requires changing
ownership, adjust to match by reference or change process_response to
accept/by-reference or return the response type to avoid double-consuming.
Alternatively, if you prefer minimal change, update the docstring of
try_drain_acks to state it returns the number of responses consumed (not just OK
acks).

---

Nitpick comments:
In `@CMakeLists.txt`:
- Around line 396-398: The comment incorrectly claims a "fatal_error gate"
forces QUESTDB_ENABLE_ARROW=ON; instead update the comment near the Apache Arrow
C Data Interface tests to state that QUESTDB_ENABLE_ARROW is auto-enabled via
the CMake logic that emits message(STATUS) and calls set(QUESTDB_ENABLE_ARROW
ON) (the auto-enable mechanism), not via any FATAL_ERROR gate like the one used
for QUESTDB_ENABLE_READER; reference QUESTDB_ENABLE_ARROW and the
message(STATUS)/set() auto-enable behavior when rewriting the comment.

In `@cpp_test/test_arrow_c.c`:
- Around line 52-70: The helpers make_table and make_col currently swallow
initialization errors from line_sender_table_name_init and
line_sender_column_name_init and return potentially-uninitialized structs;
change both functions to detect if err is non-NULL, log or print the error, free
the error, and fail-fast (e.g., fprintf to stderr and exit or assert) so tests
do not proceed with invalid values — update make_table and make_col to free err
and abort on error instead of silently returning; include a brief comment above
each helper explaining the fail-fast behavior.

In `@doc/COLUMN_SENDER_PLAN.md`:
- Around line 172-175: Update the "New module" list in doc/COLUMN_SENDER_PLAN.md
so it exactly matches the implemented module tree under
questdb-rs/src/ingress/column_sender/ (use the actual filenames present, e.g.,
include or remove sender.rs, validity.rs, error.rs as appropriate) and also
update the re-export line (questdb::ingress::column_sender::{QuestDb,
ColumnSender, Chunk, Validity}) to reflect the real public items implemented in
db.rs, sender.rs, chunk.rs, validity.rs, encoder.rs, error.rs; ensure the doc’s
module list and re-exports are synchronized with the codebase to prevent future
drift.

In `@questdb-rs-ffi/src/column_sender.rs`:
- Around line 1-1700: You need to format and lint the Rust crate before
committing: run `cargo fmt --manifest-path questdb-rs-ffi/Cargo.toml` and then
`cargo clippy --manifest-path questdb-rs-ffi/Cargo.toml --tests` and fix any
clippy warnings (do not deny warnings). Focus fixes around the FFI surface in
column_sender.rs (e.g. functions like column_sender_chunk_new,
column_sender_chunk_append_numpy_column, resolve_numpy_dtype and
column_sender_flush) so formatting and lints are clean before pushing.

In `@questdb-rs/src/egress/arrow/convert.rs`:
- Around line 289-326: The helpers varlen_string_array, varlen_binary_array (and
nest_lists) ignore the Field's DataType and always build Utf8/Binary and
i32-offset List arrays, which will mismatch if a LargeUtf8/LargeBinary/LargeList
is passed; update these helpers to inspect the provided field.type() and branch:
for DataType::Utf8/DataType::Binary/DataType::List use i32 offsets and current
builders, and for DataType::LargeUtf8/DataType::LargeBinary/DataType::LargeList
use the Large variants with i64 offsets (use offsets_i64 or equivalent and
construct LargeUtf8/LargeBinary/LargeList ArrayData), or alternatively add an
explicit guard that returns an error if a Large* type is supplied. Ensure you
reference and switch on the Field's DataType in varlen_string_array,
varlen_binary_array and nest_lists so produced ArrayData matches the schema.

In `@questdb-rs/src/egress/config.rs`:
- Around line 589-595: Add explicit unit tests that lock the new branch handling
the "qwpws" and "qwpwss" aliases: create two tests (e.g., test_qwpws_alias and
test_qwpwss_alias) that call the same parser/constructor used by the egress
config (the function that interprets the scheme and returns the tls flag and
normalized URL scheme) and assert that "qwpws" yields tls = false and normalizes
to "ws", and that "qwpwss" yields tls = true and normalizes to "wss"; place them
in the existing config/egress tests module so future changes to the branch will
break the tests if behavior changes.

In `@questdb-rs/src/egress/decoder.rs`:
- Around line 798-812: Add unit tests that explicitly exercise the new per-width
decimal scale guard by constructing DECIMAL values with widths that map to the
per_width_max logic (e.g., width 8 => per_width_max 18 for DECIMAL64) and
asserting acceptance at the boundary (scale = 18) and rejection just above it
(scale = 19). Target the code paths that compute per_width_max and return the
ProtocolError with the message "DECIMAL{} scale {} exceeds per-width maximum
{}", using the same input construction used by the decoder (invoke the decoder
entry that handles DECIMAL widths or send a serialized DECIMAL packet into the
decoding routine) so tests fail if per_width_max enforcement changes. Ensure
tests cover at least width=8 (DECIMAL64) and one other width branch (e.g.,
width=16 or 32) to lock in behavior across branches.

In `@questdb-rs/src/ingress/column_sender/conn.rs`:
- Around line 422-433: The set_timeouts function currently ignores its
read/write parameters, which can leave stale socket timeouts; update
set_timeouts (in conn.rs) to apply the provided read and write Durations to the
underlying TCP socket obtained from the WsStream accessor (use the tcp_stream
accessor used elsewhere) by calling set_read_timeout and set_write_timeout
(converting None to Ok(None) semantics) and return an error if either syscall
fails, or alternatively remove the unused parameters and document that timeouts
are fixed — pick one: implement timeout refresh by setting
TcpStream::set_read_timeout/set_write_timeout and propagate io::Error as Err, or
remove the read: Option<Duration>, write: Option<Duration> params from
set_timeouts and callers, and update comments referencing
WsStream/establish_connection in qwp_ws.rs accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 27742921-fcfb-4275-92b7-e403b14badb9

📥 Commits

Reviewing files that changed from the base of the PR and between 5db1d69 and 3972c08.

⛔ Files ignored due to path filters (1)
  • questdb-rs-ffi/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (77)
  • CMakeLists.txt
  • ci/compile.yaml
  • ci/run_all_tests.py
  • ci/run_fuzz_pipeline.yaml
  • ci/run_tests_pipeline.yaml
  • cpp_test/qwp_mock_c.cpp
  • cpp_test/qwp_mock_c.h
  • cpp_test/qwp_mock_server.cpp
  • cpp_test/smoke_column_sender.c
  • cpp_test/test_arrow_c.c
  • cpp_test/test_arrow_egress.cpp
  • cpp_test/test_arrow_ingress.cpp
  • doc/COLUMN_SENDER_FFI_ABI.md
  • doc/COLUMN_SENDER_PERF.md
  • doc/COLUMN_SENDER_PLAN.md
  • examples/line_reader_c_example_arrow.c
  • examples/line_reader_cpp_example_arrow.cpp
  • examples/line_sender_cpp_example_arrow.cpp
  • include/questdb/egress/line_reader.h
  • include/questdb/egress/line_reader.hpp
  • include/questdb/ingress/column_sender.h
  • include/questdb/ingress/column_sender.hpp
  • include/questdb/ingress/line_sender.h
  • include/questdb/ingress/line_sender.hpp
  • include/questdb/ingress/line_sender_core.hpp
  • questdb-rs-ffi/Cargo.toml
  • questdb-rs-ffi/src/column_sender.rs
  • questdb-rs-ffi/src/egress.rs
  • questdb-rs-ffi/src/lib.rs
  • questdb-rs/Cargo.toml
  • questdb-rs/benches/column_sender.rs
  • questdb-rs/examples/polars.rs
  • questdb-rs/examples/qwp_ws_l1_quotes.rs
  • questdb-rs/src/egress/arrow/convert.rs
  • questdb-rs/src/egress/arrow/mod.rs
  • questdb-rs/src/egress/arrow/polars.rs
  • questdb-rs/src/egress/arrow/reader.rs
  • questdb-rs/src/egress/arrow/schema.rs
  • questdb-rs/src/egress/arrow/tests.rs
  • questdb-rs/src/egress/config.rs
  • questdb-rs/src/egress/decoder.rs
  • questdb-rs/src/egress/error.rs
  • questdb-rs/src/egress/mod.rs
  • questdb-rs/src/egress/reader.rs
  • questdb-rs/src/error.rs
  • questdb-rs/src/ingress.rs
  • questdb-rs/src/ingress/buffer.rs
  • questdb-rs/src/ingress/buffer/qwp.rs
  • questdb-rs/src/ingress/column_sender/arrow_batch.rs
  • questdb-rs/src/ingress/column_sender/chunk.rs
  • questdb-rs/src/ingress/column_sender/conf.rs
  • questdb-rs/src/ingress/column_sender/conn.rs
  • questdb-rs/src/ingress/column_sender/db.rs
  • questdb-rs/src/ingress/column_sender/encoder.rs
  • questdb-rs/src/ingress/column_sender/mod.rs
  • questdb-rs/src/ingress/column_sender/numpy_wire.rs
  • questdb-rs/src/ingress/column_sender/sender.rs
  • questdb-rs/src/ingress/column_sender/validity.rs
  • questdb-rs/src/ingress/column_sender/wire.rs
  • questdb-rs/src/ingress/polars.rs
  • questdb-rs/src/ingress/sender.rs
  • questdb-rs/src/ingress/sender/qwp_ws.rs
  • questdb-rs/src/tests.rs
  • questdb-rs/src/tests/column_sender_pool.rs
  • questdb-rs/src/tests/qwp_ws.rs
  • questdb-rs/tests/qwp_egress_bounds_fuzz.rs
  • system_test/arrow_alignment_fuzz.py
  • system_test/arrow_egress_fuzz.py
  • system_test/arrow_ffi.py
  • system_test/arrow_fuzz_common.py
  • system_test/arrow_ingress_fuzz.py
  • system_test/arrow_polars_fuzz.py
  • system_test/arrow_polars_per_dtype.py
  • system_test/arrow_round_trip_fuzz.py
  • system_test/questdb_line_sender.py
  • system_test/test.py
  • system_test/test_arrow_fuzz_common_unit.py
👮 Files not reviewed due to content moderation or server errors (6)
  • cpp_test/test_arrow_egress.cpp
  • doc/COLUMN_SENDER_FFI_ABI.md
  • questdb-rs/src/egress/error.rs
  • questdb-rs/src/egress/mod.rs
  • questdb-rs/src/egress/reader.rs
  • questdb-rs/src/error.rs

Comment thread cpp_test/smoke_column_sender.c Outdated
Comment thread cpp_test/test_arrow_ingress.cpp
Comment thread doc/COLUMN_SENDER_PLAN.md

## 3. Architecture

```

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add a language tag to the fenced block.

The code fence at Line 131 is untyped (```), which triggers markdownlint MD040.

Suggested fix
-```
+```text
 Python repo (separate)                  c-questdb-client (this repo)
 ─────────────────────                   ─────────────────────────────
 ...
-```
+```
🧰 Tools
🪛 markdownlint-cli2 (0.22.1)

[warning] 131-131: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@doc/COLUMN_SENDER_PLAN.md` at line 131, The markdown fenced code block in
COLUMN_SENDER_PLAN.md is untyped and triggers MD040; update its opening fence
from ``` to include a language tag (e.g., ```text) so the block is typed. Locate
the unnamed fenced block and change the opening fence to include the language
specifier (suggested: "text") while leaving the closing fence as-is.

Comment on lines +29 to +34
ts_b.AppendValues({base, base + 1, base + 2}).ok();
price_b.AppendValues({2615.54, 2615.55, 2615.50}).ok();

std::shared_ptr<arrow::Array> ts_arr, price_arr;
ts_b.Finish(&ts_arr).ok();
price_b.Finish(&price_arr).ok();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Check Arrow builder operation results.

The calls to .ok() on lines 29-30 and 33-34 ignore the Status return values from AppendValues() and Finish(). If these operations fail (e.g., memory allocation failure), the code continues with potentially incomplete or invalid Arrow arrays, which could produce corrupt data or crash later.

🔍 Recommended fix
     constexpr int64_t base = 1700000000000000LL;
-    ts_b.AppendValues({base, base + 1, base + 2}).ok();
-    price_b.AppendValues({2615.54, 2615.55, 2615.50}).ok();
+    if (!ts_b.AppendValues({base, base + 1, base + 2}).ok())
+        return nullptr;
+    if (!price_b.AppendValues({2615.54, 2615.55, 2615.50}).ok())
+        return nullptr;

     std::shared_ptr<arrow::Array> ts_arr, price_arr;
-    ts_b.Finish(&ts_arr).ok();
-    price_b.Finish(&price_arr).ok();
+    if (!ts_b.Finish(&ts_arr).ok())
+        return nullptr;
+    if (!price_b.Finish(&price_arr).ok())
+        return nullptr;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ts_b.AppendValues({base, base + 1, base + 2}).ok();
price_b.AppendValues({2615.54, 2615.55, 2615.50}).ok();
std::shared_ptr<arrow::Array> ts_arr, price_arr;
ts_b.Finish(&ts_arr).ok();
price_b.Finish(&price_arr).ok();
if (!ts_b.AppendValues({base, base + 1, base + 2}).ok())
return nullptr;
if (!price_b.AppendValues({2615.54, 2615.55, 2615.50}).ok())
return nullptr;
std::shared_ptr<arrow::Array> ts_arr, price_arr;
if (!ts_b.Finish(&ts_arr).ok())
return nullptr;
if (!price_b.Finish(&price_arr).ok())
return nullptr;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/line_sender_cpp_example_arrow.cpp` around lines 29 - 34, The
AppendValues and Finish calls on the Arrow builders (ts_b.AppendValues,
price_b.AppendValues, ts_b.Finish, price_b.Finish) currently call .ok() and
ignore the returned Status; modify the code to check the returned Status for
each AppendValues and Finish call, handle errors (log and return/exit) when
Status.IsOk() is false, and avoid using ts_arr or price_arr if Finish failed;
ensure each failure path cleans up or aborts before proceeding to use the
arrays.

Comment on lines +1811 to +1821
if (sender_protocol == protocol::qwpws ||
sender_protocol == protocol::qwpwss)
{
throw line_sender_error{
line_sender_error_code::invalid_api_call,
"QWP/WebSocket senders do not produce row-by-row buffers; "
"use the column_sender chunk API instead."};
}
auto backend = line_sender_buffer::_backend_kind::ilp;
if (sender_protocol == protocol::qwpudp)
backend = line_sender_buffer::_backend_kind::qwp_udp;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Blocking new_buffer() for QWP/WebSocket breaks the existing C++ sender contract.

line_sender still exposes flush_and_get_fsn / flush_and_keep_and_get_fsn, and the C layer in this PR still documents sender-bound line_sender_buffer construction. Throwing here means C++ callers no longer have any way to create the buffer those methods require, and the empty-buffer fallback in flush_and_keep[_with_flags] / flush_and_keep_and_get_fsn now fails on qwpws / qwpwss too. Either keep a WebSocket row-buffer path here or retire the row-buffer API in the same change.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@include/questdb/ingress/line_sender.hpp` around lines 1811 - 1821, The code
currently throws for sender_protocol == protocol::qwpws/qwpwss in new_buffer(),
breaking existing C++ callers and the empty-buffer fallback used by
flush_and_get_fsn / flush_and_keep_and_get_fsn / flush_and_keep[_with_flags];
instead of throwing, restore a row-buffer path for WebSocket senders: remove the
throw and map qwpws/qwpwss to a compatible backend (e.g., reuse
line_sender_buffer::_backend_kind::ilp or add a new backend like qwp_ws_row) so
line_sender_buffer construction still succeeds, and ensure the existing
empty-buffer fallback logic in flush_and_get_fsn and flush_and_keep_and_get_fsn
continues to work for WebSocket protocols.

Comment on lines +228 to +246
for w in shape_offsets.windows(2) {
let dims = w[1].checked_sub(w[0]).ok_or_else(|| {
fmt!(
ProtocolError,
"shape_offsets not monotonic: {} < {}",
w[1],
w[0]
)
})? as usize;
if dims > 0 {
if dims > shapes.len() {
return Err(fmt!(
ProtocolError,
"shape_offsets points past shapes buffer (dim_count={}, shapes.len()={})",
dims,
shapes.len()
));
}
return Ok(Some(dims));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Validate absolute shape_offsets bounds before inferring ndim.

Line 238 currently checks dims > shapes.len(), which misses malformed windows where end is out of bounds but end - start is still small. That accepts invalid shape offsets and can infer an incorrect ndim instead of returning ProtocolError.

Suggested fix
 fn ndim_from_shapes(shapes: &[u32], shape_offsets: &[u32]) -> Result<Option<usize>> {
     if shape_offsets.len() < 2 {
         return Ok(None);
     }
     for w in shape_offsets.windows(2) {
-        let dims = w[1].checked_sub(w[0]).ok_or_else(|| {
+        let start = w[0] as usize;
+        let end = w[1] as usize;
+        let dims = end.checked_sub(start).ok_or_else(|| {
             fmt!(
                 ProtocolError,
                 "shape_offsets not monotonic: {} < {}",
                 w[1],
                 w[0]
             )
-        })? as usize;
+        })?;
         if dims > 0 {
-            if dims > shapes.len() {
+            if end > shapes.len() {
                 return Err(fmt!(
                     ProtocolError,
-                    "shape_offsets points past shapes buffer (dim_count={}, shapes.len()={})",
-                    dims,
+                    "shape_offsets points past shapes buffer (end={}, shapes.len()={})",
+                    end,
                     shapes.len()
                 ));
             }
             return Ok(Some(dims));
         }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for w in shape_offsets.windows(2) {
let dims = w[1].checked_sub(w[0]).ok_or_else(|| {
fmt!(
ProtocolError,
"shape_offsets not monotonic: {} < {}",
w[1],
w[0]
)
})? as usize;
if dims > 0 {
if dims > shapes.len() {
return Err(fmt!(
ProtocolError,
"shape_offsets points past shapes buffer (dim_count={}, shapes.len()={})",
dims,
shapes.len()
));
}
return Ok(Some(dims));
for w in shape_offsets.windows(2) {
let start = w[0] as usize;
let end = w[1] as usize;
let dims = end.checked_sub(start).ok_or_else(|| {
fmt!(
ProtocolError,
"shape_offsets not monotonic: {} < {}",
w[1],
w[0]
)
})?;
if dims > 0 {
if end > shapes.len() {
return Err(fmt!(
ProtocolError,
"shape_offsets points past shapes buffer (end={}, shapes.len()={})",
end,
shapes.len()
));
}
return Ok(Some(dims));
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questdb-rs/src/egress/arrow/schema.rs` around lines 228 - 246, The loop over
shape_offsets.windows(2) currently only checks dims > shapes.len(), which misses
cases where w[1] (end) is out of bounds but end-start is small; update the check
in the block handling shape_offsets to validate absolute bounds first: verify
both w[0] and w[1] are <= shapes.len() (and keep the existing monotonicity
check), and if either index is out of range return the ProtocolError used
elsewhere (same fmt! message but referencing the absolute bound failure); then
compute dims = (w[1] - w[0]) as usize and proceed as before, removing the dims >
shapes.len() condition.

Comment thread questdb-rs/src/ingress.rs
Comment on lines +2474 to +2487
let mut qwp_ws = qwp_ws.clone();
qwp_ws.apply_reconnect_implies_initial_retry();
reject_unsupported_qwp_ws_sf_config(&qwp_ws)?;

let use_tls = matches!(self.protocol, Protocol::QwpWss);
let (stream, _negotiated_version, leftover) = sender::qwp_ws::establish_connection(
self.host.as_str(),
self.port.as_str(),
use_tls,
tls_settings,
&qwp_ws,
basic_auth.as_deref(),
)?;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep raw-stream config validation consistent with build().

build_qwp_ws_raw_stream() skips the manual progress + initial_connect_retry=async incompatibility check that build() enforces, so the same config can be rejected on one path and accepted on the other.

Suggested fix
         let mut qwp_ws = qwp_ws.clone();
         qwp_ws.apply_reconnect_implies_initial_retry();
+        if *qwp_ws.progress == QwpWsProgress::Manual
+            && *qwp_ws.initial_connect_retry == conf::QwpWsInitialConnectMode::Async
+        {
+            return Err(error::fmt!(
+                ConfigError,
+                "initial_connect_retry=async requires QWP/WebSocket background progress; \
+                 use qwp_ws_progress=background or initial_connect_retry=sync"
+            ));
+        }
         reject_unsupported_qwp_ws_sf_config(&qwp_ws)?;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questdb-rs/src/ingress.rs` around lines 2474 - 2487, build_qwp_ws_raw_stream
currently omits the same config validation that build() performs, allowing an
incompatible combination (manual progress + initial_connect_retry=async); to
fix, invoke the same validation sequence used in build(): call
qwp_ws.apply_reconnect_implies_initial_retry() and then
reject_unsupported_qwp_ws_sf_config(&qwp_ws)? at the start of
build_qwp_ws_raw_stream (before establishing the connection) so the function
rejects the unsupported config the same way as build().

Comment on lines +159 to +161
_ => {
// Unknown / passthrough — leave the SenderBuilder to handle it.
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reject unknown pool_* keys to avoid silent misconfiguration.

Right now typoed pool settings are silently ignored as passthrough. That can leave pool behavior at defaults without any operator signal.

Targeted fix
         match key {
             "request_durable_ack" => {
                 let _ = parse_on_off("request_durable_ack", value)?;
             }
@@
             "pool_reap" => {
                 pool.pool_reap = match value {
                     "auto" => PoolReap::Auto,
                     "manual" => PoolReap::Manual,
                     other => {
@@
                     }
                 };
             }
+            _ if key.starts_with("pool_") => {
+                return Err(error::fmt!(
+                    ConfigError,
+                    "Unknown pool setting {:?}",
+                    key
+                ));
+            }
             _ => {
                 // Unknown / passthrough — leave the SenderBuilder to handle it.
             }
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
_ => {
// Unknown / passthrough — leave the SenderBuilder to handle it.
}
_ if key.starts_with("pool_") => {
return Err(error::fmt!(
ConfigError,
"Unknown pool setting {:?}",
key
));
}
_ => {
// Unknown / passthrough — leave the SenderBuilder to handle it.
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questdb-rs/src/ingress/column_sender/conf.rs` around lines 159 - 161, The
wildcard match arm currently lets unknown keys pass through silently; change it
so keys starting with "pool_" are rejected instead of passed to SenderBuilder.
In the match fallback (the `_ => { ... }` arm) detect if the key
starts_with("pool_") and return an Err (e.g., a ConfigError::UnknownKey or
UnknownPoolKey with the offending key) so typoed pool_* settings surface as
errors; leave non-pool passthrough behavior for other keys to continue being
handled by SenderBuilder.

Comment thread questdb-rs/src/ingress/column_sender/conn.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
doc/COLUMN_SENDER_FFI_ABI.md (1)

92-100: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Critical: Buffer lifetime contract is contradictory.

Section §2.3 (lines 92-100) explicitly states that buffers "must remain alive and unchanged until the next column_sender_flush call on the chunk returns" and that "The FFI stores raw pointers into the caller's buffers; it does not copy at append time."

However, Section §16 (lines 1206-1209) states: "Buffers are copied into the chunk during the call, so they can be dropped after the call returns."

These are directly contradictory. If Python wrapper developers follow §16 and drop buffers immediately after the append call, but the Rust FFI implementation follows §2.3 and stores raw pointers without copying, this will cause use-after-free bugs and memory corruption.

One of these statements must be wrong. Based on the PR objectives mentioning "zero-overhead-where-possible" and §2.3's explicit rationale ("required to hit memcpy-bandwidth throughput"), the zero-copy design (§2.3) appears intentional. If so, §16 lines 1207-1209 must be corrected to state that buffers must remain alive until flush returns, matching §2.3.

📝 Proposed fix for §16
 - **Object lifetimes** — keep the source `np.ndarray` / `pa.Array`
-  alive for the duration of the FFI call. Buffers are copied into the
-  chunk during the call, so they can be dropped after the call
-  returns.
+  alive until the next `flush()` call returns (or until the chunk is
+  freed/cleared). The FFI stores raw pointers without copying at append
+  time for zero-copy throughput. Do not drop or mutate the source arrays
+  between `append` and `flush`.

Also applies to: 1206-1209

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@doc/COLUMN_SENDER_FFI_ABI.md` around lines 92 - 100, The doc contains a
contradictory buffer-lifetime contract: §2.3 says buffers passed to any
column_sender_chunk_* function are not copied and "must remain alive and
unchanged until the next column_sender_flush call on the chunk returns" (or
until column_sender_chunk_free / column_sender_chunk_clear is called), but §16
claims buffers are copied during the call and can be dropped immediately;
reconcile by updating §16 to match the intended zero-copy design: change the
wording to explicitly state that buffers passed to column_sender_chunk_* are not
copied and must remain valid and unchanged until column_sender_flush returns (or
until column_sender_chunk_free / column_sender_chunk_clear is called), and add a
short note pointing to the performance rationale and to §2.3 for details.
🧹 Nitpick comments (2)
doc/COLUMN_SENDER_FFI_ABI.md (2)

537-538: 💤 Low value

Clarify VARCHAR as the replacement for removed STRING type.

The note mentions that "The older STRING wire type (0x08) has been removed from the spec," but doesn't explicitly state that VARCHAR is the replacement. Users migrating from older code might benefit from a brief clarification.

Suggested clarification
-QWP has exactly one variable-width text type: VARCHAR (wire code
-`0x0F`). The wire format is `uint32` offsets + concatenated bytes. The
-older STRING wire type (`0x08`) has been removed from the spec and is
-not exposed here.
+QWP has exactly one variable-width text type: VARCHAR (wire code
+`0x0F`), which replaces the older STRING wire type (`0x08`).
+The wire format is `uint32` offsets + concatenated bytes.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@doc/COLUMN_SENDER_FFI_ABI.md` around lines 537 - 538, Update the note that
currently states "The older STRING wire type (`0x08`) has been removed from the
spec" to explicitly state that VARCHAR is the intended replacement for the
removed STRING type; mention the removed symbol "STRING (`0x08`)" and the
replacement symbol "VARCHAR" so readers migrating older code know to map STRING
uses to VARCHAR and any relevant wire-type differences.

810-811: 💤 Low value

Consider emphasizing the zero-value data-loss trap for BYTE and SHORT.

Lines 810-811 document that source values of 0 in i8 and i16 columns are automatically converted to NULL, regardless of the validity bitmap. This means users cannot store the literal value 0 in BYTE or SHORT columns.

While this is a wire-protocol constraint (sentinel values) rather than an API design choice, it's a significant footgun that could surprise users. Consider adding a callout box or warning earlier in the spec (perhaps in §6 or §11) to make this limitation more prominent before users encounter it in the coverage matrix.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@doc/COLUMN_SENDER_FFI_ABI.md` around lines 810 - 811, Add a prominent warning
about the zero-value sentinel behavior for i8/BYTE and i16/SHORT: state that
source values of 0 are treated as NULL regardless of the validity bitmap so
literal 0 cannot be stored, and surface this as a callout in an earlier
high-visibility section (suggest §6 or §11) and reference the coverage matrix
entries `i8`/`BYTE` and `i16`/`SHORT` (sentinel = 0) so readers see the trap
before reaching the column type table; keep the callout short, explicit about
data loss, and link back to the table rows that document the sentinel behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@doc/COLUMN_SENDER_FFI_ABI.md`:
- Around line 92-100: The doc contains a contradictory buffer-lifetime contract:
§2.3 says buffers passed to any column_sender_chunk_* function are not copied
and "must remain alive and unchanged until the next column_sender_flush call on
the chunk returns" (or until column_sender_chunk_free /
column_sender_chunk_clear is called), but §16 claims buffers are copied during
the call and can be dropped immediately; reconcile by updating §16 to match the
intended zero-copy design: change the wording to explicitly state that buffers
passed to column_sender_chunk_* are not copied and must remain valid and
unchanged until column_sender_flush returns (or until column_sender_chunk_free /
column_sender_chunk_clear is called), and add a short note pointing to the
performance rationale and to §2.3 for details.

---

Nitpick comments:
In `@doc/COLUMN_SENDER_FFI_ABI.md`:
- Around line 537-538: Update the note that currently states "The older STRING
wire type (`0x08`) has been removed from the spec" to explicitly state that
VARCHAR is the intended replacement for the removed STRING type; mention the
removed symbol "STRING (`0x08`)" and the replacement symbol "VARCHAR" so readers
migrating older code know to map STRING uses to VARCHAR and any relevant
wire-type differences.
- Around line 810-811: Add a prominent warning about the zero-value sentinel
behavior for i8/BYTE and i16/SHORT: state that source values of 0 are treated as
NULL regardless of the validity bitmap so literal 0 cannot be stored, and
surface this as a callout in an earlier high-visibility section (suggest §6 or
§11) and reference the coverage matrix entries `i8`/`BYTE` and `i16`/`SHORT`
(sentinel = 0) so readers see the trap before reaching the column type table;
keep the callout short, explicit about data loss, and link back to the table
rows that document the sentinel behavior.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e3d24b42-a01b-473b-aa99-0866e399212b

📥 Commits

Reviewing files that changed from the base of the PR and between a39d0a9 and 78cea31.

📒 Files selected for processing (4)
  • doc/COLUMN_SENDER_FFI_ABI.md
  • include/questdb/ingress/column_sender.h
  • questdb-rs-ffi/src/column_sender.rs
  • questdb-rs/src/ingress/column_sender/numpy_wire.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • include/questdb/ingress/column_sender.h
  • questdb-rs-ffi/src/column_sender.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
questdb-rs-ffi/src/column_sender.rs (2)

393-404: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

The constructor contract and the tested behavior disagree.

Line 393 says column_sender_chunk_new validates the table name to <= 127 bytes, but the test below explicitly asserts that a 128-byte name succeeds and is only rejected later. Either enforce the limit here or relax the documented contract; otherwise FFI callers get a successful handle for an input the API says should fail.

Also applies to: 1471-1482

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questdb-rs-ffi/src/column_sender.rs` around lines 393 - 404, The
comment/documentation and runtime validation disagree: update the validation to
match the test expectations by allowing table names up to 128 bytes (instead of
enforcing <=127) so column_sender_chunk_new returns a valid handle for a
128-byte name; modify the underlying name_str length check (or the constant it
uses) to permit 128 bytes and update the doc comment on column_sender_chunk_new
accordingly, and apply the same consistent change to the other
constructor/validation at the 1471-1482 region so both places enforce the same
max-length policy.

122-136: ⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Fix FFI boundary: stop taking #[repr(C)] enums by value in extern "C" APIs

column_sender_sync takes ack_level: column_sender_ack_level by value, and column_sender_chunk_append_numpy_column takes dtype: column_sender_numpy_dtype by value. If a C caller supplies an out-of-range discriminant, Rust has an invalid enum value at the call boundary (undefined behavior) before your into()/conversion logic can reject it.

Change both parameters to a fixed-width integer type in the C/Rust boundary (e.g., u32), then TryFrom/validate and return InvalidApiCall for unknown values.

Also, column_sender_numpy_dtype documents it as “mirrored to the C ABI as a 32-bit enum”, but #[repr(C)] does not guarantee a fixed 32-bit width across C ABIs—use an explicit #[repr(u32)] (or, better, accept u32 at the boundary) so the ABI is actually 32-bit.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questdb-rs-ffi/src/column_sender.rs` around lines 122 - 136, The extern-C
boundary must not accept #[repr(C)] enums by value; change the parameters
ack_level in column_sender_sync and dtype in
column_sender_chunk_append_numpy_column from their enum types
(column_sender_ack_level, column_sender_numpy_dtype) to a fixed-width integer
(u32) at the FFI boundary, then validate by TryFrom/try_into into the Rust enum
(or convert via a match) and return InvalidApiCall for unknown discriminants; if
you keep enum definitions for internal use, give them an explicit #[repr(u32)]
to document size, but do validation in the functions named column_sender_sync
and column_sender_chunk_append_numpy_column before any enum conversion.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@questdb-rs-ffi/src/column_sender.rs`:
- Around line 393-404: The comment/documentation and runtime validation
disagree: update the validation to match the test expectations by allowing table
names up to 128 bytes (instead of enforcing <=127) so column_sender_chunk_new
returns a valid handle for a 128-byte name; modify the underlying name_str
length check (or the constant it uses) to permit 128 bytes and update the doc
comment on column_sender_chunk_new accordingly, and apply the same consistent
change to the other constructor/validation at the 1471-1482 region so both
places enforce the same max-length policy.
- Around line 122-136: The extern-C boundary must not accept #[repr(C)] enums by
value; change the parameters ack_level in column_sender_sync and dtype in
column_sender_chunk_append_numpy_column from their enum types
(column_sender_ack_level, column_sender_numpy_dtype) to a fixed-width integer
(u32) at the FFI boundary, then validate by TryFrom/try_into into the Rust enum
(or convert via a match) and return InvalidApiCall for unknown discriminants; if
you keep enum definitions for internal use, give them an explicit #[repr(u32)]
to document size, but do validation in the functions named column_sender_sync
and column_sender_chunk_append_numpy_column before any enum conversion.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 16ca2d8e-8939-4f3e-bbe2-67f272211260

📥 Commits

Reviewing files that changed from the base of the PR and between 78cea31 and 17e644b.

📒 Files selected for processing (1)
  • questdb-rs-ffi/src/column_sender.rs

kafka1991 and others added 19 commits June 5, 2026 09:44
Accept plain FixedSizeBinary(16) as UUID for Arrow ingestion instead of requiring extension metadata. This matches the Python Client.dataframe contract and the server e2e UUID round-trip tests, at the cost of no longer treating FSB16 as a generic opaque fixed-size binary shape in this path.

Reject null timestamp field columns before publishing, matching the existing designated-timestamp and Python planner validation policy. Nullable timestamp fields can be revisited later only with an explicit server/protocol contract.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants