feat(ingress): column-major sender for QWP/WebSocket (WS-0..WS-6)#148
feat(ingress): column-major sender for QWP/WebSocket (WS-0..WS-6)#148bluestreak01 wants to merge 9 commits into
Conversation
Plan and FFI ABI for the new column-major writer that will ingest Pandas/Polars DataFrames over QWP/WebSocket. Locks the QuestDb pool shape, BulkChunk encoder strategy, validity bitmap semantics, and the C ABI the separate Python wrapper repo will consume. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Locks the column-sender API around synchronous flush: sender.flush(&mut chunk, ack_level) blocks until the requested ACK level (Ok = WAL commit, Durable = object-store via Enterprise opt-in). Drops the FSN/submit/await split from the FFI; at most one frame in flight per sender, parallelism via the pool. Refuses sf_dir and other sf_* keys at QuestDb::connect with ConfigError — store-and-forward is single-writer-per-slot and interacts awkwardly with pool auto-grow; row-major Sender remains the SF path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Lands the Rust core, C ABI, and benchmarks for a column-major sender
targeting Pandas/Polars → QuestDB throughput over QWP/WebSocket. See
`doc/COLUMN_SENDER_PLAN.md` for the design and `doc/COLUMN_SENDER_FFI_ABI.md`
for the C ABI spec; both shipped in earlier commits on this branch.
# What's in the box
* **WS-0 — `QuestDb` pool** (`ingress/column_sender/db.rs`,
`ingress/column_sender/conf.rs`).
Thread-safe pool with eager-open, fail-fast at `pool_max`,
`BorrowedSender<'a>` that returns on `Drop`, and a background reaper
(`pool_reap=auto`, tick = max(5 s, idle_timeout / 12)) that closes
excess-over-`pool_size` connections. New conf keys: `pool_size`,
`pool_max`, `pool_idle_timeout_ms`, `pool_reap`. `sf_*` / `sender_id`
/ `qwp_ws_progress=manual` refused at `connect`-time.
* **WS-1 — synchronous `flush` plumbing** (`ingress/column_sender/sender.rs`,
`ingress/column_sender/encoder.rs`).
`ColumnSender::flush(chunk, AckLevel)` encodes the chunk, publishes via
the existing QWP/WS replay queue (`Sender::qwp_ws_publish_raw` —
pub(crate) escape hatch in the row-API sender), and blocks until the
ACK watermark crosses the published FSN. Polls in 50 ms slices so a
`must_close` mid-wait surfaces promptly. `AckLevel::Durable` requires
`request_durable_ack=on` at connect or returns `InvalidApiCall`.
* **WS-2 — `Chunk` + numeric / fixed-width columns**
(`ingress/column_sender/chunk.rs`, `validity.rs`, `wire.rs`).
Per-column wire-shape `Vec<u8>` storage so encode is a header +
`extend_from_slice` per column. Two code paths per type per the plan
§2.2:
- Bool, i8, i16, i32, i64, f32, f64: `null_flag = 0` always; nullable
rows sentinel-encoded (0 / i32::MIN / i64::MIN / NaN), matching the
row-API convention.
- Sparse-null types (uuid, long256, ipv4, ts_nanos, ts_micros,
date_millis): no-null = `extend_from_slice`; nullable = QWP-shape
bitmap + dense gather.
- Designated timestamp (micros or nanos) — exactly one per chunk.
Connection-scoped `SchemaRegistry`: first emit → FULL; repeat → REFERENCE.
* **WS-3 — VARCHAR** (`Chunk::column_varchar`). Arrow Utf8 in
(`offsets: &[i32]` length `row_count + 1`, `bytes: &[u8]`); wire out
is dense `non_null_count + 1` LE-u32 offsets + concatenated bytes.
No-null path memcpys offsets when `offsets[0] == 0`; nullable path
walks validity and skips slicing for null rows. Offset validation
(negative / non-monotonic / past `bytes_len`) caught client-side.
* **WS-4 — symbol bulk-intern**
(`Chunk::symbol_dict_{i8,i16,i32}`, `encoder::resolve_symbols`).
Three append-time passes: referenced-bitset + range check; compact
referenced dict bytes; translate codes to internal indices and build
the QWP-shape bitmap. Connection-scoped `SymbolGlobalDict` shared
with the row API's type (`buffer/qwp.rs:next_id/intern/entry`
promoted to `pub(crate)`). At flush time, only entries the chunk
actually references reach the wire — protects the 1M-per-connection
cap on huge Pandas `Categorical` dicts. Roll-back on encode error
keeps client + server dict views coherent.
* **WS-5 — C ABI** (`questdb-rs-ffi/src/column_sender.rs`,
`include/questdb/ingress/column_sender.h`).
Full implementation of `doc/COLUMN_SENDER_FFI_ABI.md`:
- Opaque handles `questdb_db`, `column_sender`, `column_sender_chunk`.
- `column_sender_validity` repr-C struct; `column_sender_ack_level`
repr-C enum.
- `questdb_db_connect/close/borrow_sender/return_sender/reap_idle`.
- Every chunk column-append, the VARCHAR + symbol_dict family, the
two designated-timestamp variants, and `column_sender_flush`.
- Errors reuse `line_sender_error*`.
Rust side gains `OwnedSender` — Arc-backed borrow handle the FFI hands
out as `column_sender*` so the C caller can free `questdb_db*` before
all borrows return without dangling.
Hand-runnable smoke test at `cpp_test/smoke_column_sender.c`
(compiles with `-Wall -Wextra -Werror`; not wired into CMake yet —
matches the `smoke_line_reader` pattern).
* **WS-6 — bench** (`questdb-rs/benches/column_sender.rs`,
`doc/COLUMN_SENDER_PERF.md`).
Three families: per-column append vs raw memcpy baseline; symbol
bulk-intern vs naïve per-row HashMap; encode_chunk end-to-end (no
network). First-baseline numbers (Apple Silicon laptop, 100k rows):
- `column_f64/column_sender_no_null` ≈ 55 GiB/s — matches memcpy.
- `column_i64/column_sender_no_null` ≈ 54 GiB/s — matches memcpy.
- `column_varchar/column_sender_no_null` within ~5 % of memcpy.
- Symbol bulk-intern ~16× faster than naïve per-row HashMap.
- `encode_chunk/populate_plus_encode` ≈ 139 M rows/s end-to-end.
# Verification
- 57 column-sender tests (Rust core); 8 FFI tests; full 834-test lib
suite passes.
- `cargo fmt` + `cargo clippy --tests --benches` clean on both crates.
- `cargo doc` introduces no new warnings.
- `cc -std=c11 -Wall -Wextra -Werror -I include` compiles the C header
and the smoke program.
# What's not in here
- WS-7 (Python wrapper) lives in `py-questdb-client`. With the C ABI
in `include/questdb/ingress/column_sender.h` and the FFI symbols in
`libquestdb_client`, that repo can now start consuming.
- A live Pandas→QuestDB end-to-end bench and 1-hour soak — both
belong in the Python repo / nightly CI rather than the in-tree
Criterion suite.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (9)
📝 WalkthroughWalkthroughImplements a column-major sender: new Rust column_sender module (chunk, encoder, conn, pool, sender, validity, wire), a C FFI header and Rust FFI impl with tests, benchmarks and example, mock-server integration tests, and a small C smoke test demonstrating the ABI. ChangesColumn-Sender QWP/WebSocket Ingestion
Estimated code review effort 🎯 5 (Critical) | ⏱️ ~120 minutes Suggested reviewers
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 9
🧹 Nitpick comments (4)
questdb-rs/benches/column_sender.rs (1)
398-418: ⚡ Quick winReuse
BenchEncoderStateacross iterations for steady-state encode metrics.Both
encode_onlyandpopulate_plus_encoderecreate encoder state each iteration, which benchmarks cold-path schema/symbol setup repeatedly. If you want hot-path numbers, keep one mutableBenchEncoderStateper benchmark closure.Suggested adjustment
- group.bench_function("encode_only", |b| { - b.iter_batched( - BenchEncoderState::new, - |mut state| { - let frame = bench_encode_chunk(&prebuilt, &mut state).unwrap(); - black_box(frame); - }, - BatchSize::SmallInput, - ); - }); + group.bench_function("encode_only", |b| { + let mut state = BenchEncoderState::new(); + b.iter(|| { + let frame = bench_encode_chunk(&prebuilt, &mut state).unwrap(); + black_box(frame); + }); + }); - group.bench_function("populate_plus_encode", |b| { - b.iter_batched( - BenchEncoderState::new, - |mut state| { - let chunk = build_chunk(); - let frame = bench_encode_chunk(&chunk, &mut state).unwrap(); - black_box(frame); - }, - BatchSize::SmallInput, - ); - }); + group.bench_function("populate_plus_encode", |b| { + let mut state = BenchEncoderState::new(); + b.iter(|| { + let chunk = build_chunk(); + let frame = bench_encode_chunk(&chunk, &mut state).unwrap(); + black_box(frame); + }); + });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@questdb-rs/benches/column_sender.rs` around lines 398 - 418, Both benchmarks currently call BenchEncoderState::new inside each iter_batched closure causing cold-path allocation; instead create a single mutable BenchEncoderState before invoking group.bench_function (for both "encode_only" and "populate_plus_encode") and reuse that same &mut state across iterations when calling bench_encode_chunk (referencing BenchEncoderState, bench_encode_chunk, encode_only, populate_plus_encode, build_chunk, prebuilt). If the encoder requires a clean slate between iterations, call its reset/clear method (or reinitialize only its internal buffers) rather than recreating the whole BenchEncoderState each iteration so you measure steady-state encode performance.questdb-rs/src/ingress/column_sender/chunk.rs (1)
852-861: 💤 Low valueFast-path memcpy assumes little-endian architecture.
The non-nullable branch performs a raw byte copy of the input slice, which only produces correct LE wire bytes on little-endian machines. On big-endian machines, this would emit BE bytes while the nullable branch correctly uses
to_le().Consider adding a compile-time guard or documenting the assumption.
Example compile-time check
+#[cfg(not(target_endian = "little"))] +compile_error!("column_sender fast paths assume little-endian architecture"); + #[inline] fn encode_le_numeric<'a, T, const N: usize, F>(🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@questdb-rs/src/ingress/column_sender/chunk.rs` around lines 852 - 861, The fast-path in the match on validity (the None branch that creates bytes via std::slice::from_raw_parts and calls payload.extend_from_slice) assumes little-endian and will emit wrong wire bytes on big-endian targets; either add a compile-time guard (e.g. cfg(target_endian = "little") / compile_error! for unsupported endianness) or change the fast path to normalize values to little-endian (convert each element with to_le()/to_le_bytes before appending) so it matches the nullable branch behavior; update the branch that builds bytes (the unsafe from_raw_parts + payload.extend_from_slice) accordingly and keep the safety comment if retaining the unsafe block.questdb-rs/src/ingress/column_sender/wire.rs (1)
69-78: 💤 Low valueNote:
I8_NULLandI16_NULLuse 0 as the sentinel.Using
0as the NULL sentinel forBYTEandSHORTmeans actual zero values cannot be distinguished from NULL at the wire level. This appears intentional to match the row-API encoder, but callers should be aware that these column types have a restricted value domain when using sentinel-based null encoding.If this matches the QuestDB wire spec, no change needed.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@questdb-rs/src/ingress/column_sender/wire.rs` around lines 69 - 78, The I8_NULL and I16_NULL sentinels are currently set to 0 (I8_NULL, I16_NULL) which makes genuine zero values indistinguishable from NULL on the wire; verify this matches the QuestDB row-API wire spec and, if it does, add a clear code comment and/or public doc note (near the constants I8_NULL, I16_NULL and the module-level comment) stating that BYTE and SHORT use 0 as the NULL sentinel and thus cannot represent real zero values when using sentinel-based null encoding; if it does not match the spec, change the sentinels to the correct sentinel values from the spec and update any encoder/decoder logic that references I8_NULL or I16_NULL (e.g., in the row-API encoder and column-sender paths) so both sides remain byte-compatible.questdb-rs/src/ingress/column_sender/db.rs (1)
426-442: 💤 Low valueMisleading comment about mutex hold timing.
The comment on lines 438–440 states the sender is dropped "with the mutex still held," but
drop(state)on line 441 releases the mutex before the function returns andsendergoes out of scope. The code behavior is actually correct (and preferable—dropping outside the lock avoids blocking concurrent borrows during potentially slow connection teardown), but the comment contradicts the actual execution order.📝 Suggested comment fix
- // Dropped `sender` (when `must_close`) falls out of scope here, after - // the count was decremented but with the mutex still held — safe - // since `Sender::drop` does not re-enter the pool. + // `sender` (when `must_close`) is dropped after this function returns, + // which is after the mutex is released. This is intentional: dropping + // outside the lock avoids blocking concurrent borrows during teardown. + // Safe since `Sender::drop` does not re-enter the pool. drop(state);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@questdb-rs/src/ingress/column_sender/db.rs` around lines 426 - 442, The comment in return_to_pool incorrectly states the sender is dropped "with the mutex still held"; update the comment to state that we release the mutex (drop(state)) before the sender is dropped so connection teardown happens outside the lock. Reference the return_to_pool function and the local variables state and sender (and types Arc<DbInner>, PoolEntry) and make the comment explicitly note dropping the sender occurs after drop(state) to avoid blocking concurrent borrows.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@doc/COLUMN_SENDER_FFI_ABI.md`:
- Line 42: The docs currently reference column_sender_submit but the public ABI
function is column_sender_flush; update all occurrences (e.g., the table entry
mentioning "column_sender_submit", and the other instances around the indicated
sections) to consistently say column_sender_flush and describe its semantics
(i.e., that column_sender_flush returns an error when the encoded frame exceeds
the negotiated cap and behaves as documented elsewhere). Search for
"column_sender_submit" and replace with "column_sender_flush", ensuring the
description and error semantics match the existing column_sender_flush
documentation.
- Around line 230-233: The documentation signature for questdb_db_connect is
missing the conf_len parameter which causes drift from the header; update the
spec and all examples (including the other occurrences around lines referenced)
to match the header by adding the conf_len parameter to the function prototype
for questdb_db_connect (ensure the signature is: questdb_db*
questdb_db_connect(const char* conf, size_t conf_len, line_sender_error**
err_out);) and update any example calls and parameter descriptions to include
conf_len so consumers won't get compile errors.
- Around line 588-591: Update the spec for symbol_dict_<IDX> to remove
"behaviour is undefined" and require deterministic validation: always check each
codes[i] against dict_len and treat any out-of-range code as an API error
(return line_sender_error_invalid_api_call), even when the validity pointer is
NULL; mention that when a row's validity bit is 0 its code is ignored for
payload semantics but still must be within range to avoid invalid API calls, and
reference codes[i], dict_len, and the line_sender_error_invalid_api_call symbol
so implementers validate codes before using them.
In `@doc/COLUMN_SENDER_PLAN.md`:
- Around line 445-447: WS-5 still mentions the old API names
`column_sender_submit` and `_await_acked_fsn`; update the document to use the
finalized public API name `column_sender_flush` everywhere (including any
descriptions like "chunk fill and submit" -> "chunk fill and flush") and ensure
the surrounding text accurately reflects the flush behavior for FFI/wrapper
implementers, replacing any remaining `_await_acked_fsn` references with the new
API naming.
In `@include/questdb/ingress/column_sender.h`:
- Around line 388-389: Clarify the symbol-code bounds in the comment near
column_sender.h by replacing the ambiguous range "0 .. dict_len" with an
explicit predicate such as "0 <= codes[i] && codes[i] < dict_len" (for non-null
rows) and state that null-row codes are not inspected; update the sentence
referencing `codes[i]` and `dict_len` so readers clearly understand the
inclusive lower bound and exclusive upper bound to avoid off-by-one errors.
- Around line 128-132: The header's contract for post-questdb_db_close behavior
conflicts with the ABI spec: decide to follow the spec by making checked-out
column_sender handles invalid after questdb_db_close; update the comment block
around questdb_db_close/column_sender to state that closing the DB invalidates
all outstanding column_sender handles and they must not be used or returned, and
then adjust related implementations (e.g., questdb_db_close,
questdb_db_return_sender, and any reference-counting teardown logic) to
mark/invalidate senders on close and to make questdb_db_return_sender a no-op or
error when given an invalidated sender; reference symbols: questdb_db_close,
column_sender, questdb_db_return_sender, and the pool's internal refcounting to
locate and change behavior and docs.
In `@questdb-rs-ffi/src/lib.rs`:
- Around line 79-80: The export of column_sender is unconditional but the
upstream questdb::ingress::column_sender is only compiled under the feature
"sync-sender-qwp-ws", so gate questdb-rs-ffi’s module and re-export behind the
same feature: wrap the declarations (the pub mod column_sender; and pub use
column_sender::*;) in a #[cfg(feature = "sync-sender-qwp-ws")] and ensure the
feature is forwarded to the questdb-rs dependency in Cargo.toml (or
alternatively add #[cfg(feature = "sync-sender-qwp-ws")] to the import in
column_sender.rs that references questdb::ingress::column_sender::*), so the
module is only compiled when the upstream feature is enabled.
In `@questdb-rs/src/ingress/column_sender/sender.rs`:
- Around line 116-128: The flush method currently always calls await_ack which
in turn always waits on await_acked_fsn (durable coverage), causing AckLevel::Ok
to block on durable ACK when durable-ack is enabled; change the logic so
AckLevel is honored: update await_ack (or change calls in flush/send path) to
accept an AckLevel and, for AckLevel::Ok, wait on the WAL-commit waiter (e.g.,
await_wal_acked_fsn or equivalent non-durable waiter) while for
AckLevel::Durable use await_acked_fsn; adjust both flush and the other similar
call site (around the 135-152 region) to pass the AckLevel through and invoke
the correct waiter, keeping existing durable_ack_opt_in checks for
InvalidApiCall when Durable is requested but not enabled.
In `@questdb-rs/src/tests/column_sender_pool.rs`:
- Line 238: The test currently discards the boolean result of wait_until,
allowing timeouts to false-pass; change the bare calls to assertions so failures
fail the test — for example replace wait_until(Duration::from_secs(2), ||
server.accepted() == 3); with assert!(wait_until(Duration::from_secs(2), ||
server.accepted() == 3), "timed out waiting for server.accepted() == 3"); and do
the same for the other occurrence at the second call (line with wait_until(...)
returning a boolean) so both wait_until invocations are asserted (use assert! or
assert_eq!(..., true) with a descriptive message).
---
Nitpick comments:
In `@questdb-rs/benches/column_sender.rs`:
- Around line 398-418: Both benchmarks currently call BenchEncoderState::new
inside each iter_batched closure causing cold-path allocation; instead create a
single mutable BenchEncoderState before invoking group.bench_function (for both
"encode_only" and "populate_plus_encode") and reuse that same &mut state across
iterations when calling bench_encode_chunk (referencing BenchEncoderState,
bench_encode_chunk, encode_only, populate_plus_encode, build_chunk, prebuilt).
If the encoder requires a clean slate between iterations, call its reset/clear
method (or reinitialize only its internal buffers) rather than recreating the
whole BenchEncoderState each iteration so you measure steady-state encode
performance.
In `@questdb-rs/src/ingress/column_sender/chunk.rs`:
- Around line 852-861: The fast-path in the match on validity (the None branch
that creates bytes via std::slice::from_raw_parts and calls
payload.extend_from_slice) assumes little-endian and will emit wrong wire bytes
on big-endian targets; either add a compile-time guard (e.g. cfg(target_endian =
"little") / compile_error! for unsupported endianness) or change the fast path
to normalize values to little-endian (convert each element with
to_le()/to_le_bytes before appending) so it matches the nullable branch
behavior; update the branch that builds bytes (the unsafe from_raw_parts +
payload.extend_from_slice) accordingly and keep the safety comment if retaining
the unsafe block.
In `@questdb-rs/src/ingress/column_sender/db.rs`:
- Around line 426-442: The comment in return_to_pool incorrectly states the
sender is dropped "with the mutex still held"; update the comment to state that
we release the mutex (drop(state)) before the sender is dropped so connection
teardown happens outside the lock. Reference the return_to_pool function and the
local variables state and sender (and types Arc<DbInner>, PoolEntry) and make
the comment explicitly note dropping the sender occurs after drop(state) to
avoid blocking concurrent borrows.
In `@questdb-rs/src/ingress/column_sender/wire.rs`:
- Around line 69-78: The I8_NULL and I16_NULL sentinels are currently set to 0
(I8_NULL, I16_NULL) which makes genuine zero values indistinguishable from NULL
on the wire; verify this matches the QuestDB row-API wire spec and, if it does,
add a clear code comment and/or public doc note (near the constants I8_NULL,
I16_NULL and the module-level comment) stating that BYTE and SHORT use 0 as the
NULL sentinel and thus cannot represent real zero values when using
sentinel-based null encoding; if it does not match the spec, change the
sentinels to the correct sentinel values from the spec and update any
encoder/decoder logic that references I8_NULL or I16_NULL (e.g., in the row-API
encoder and column-sender paths) so both sides remain byte-compatible.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9118dc8a-47ed-40e1-8c28-452a44ac3e1e
📒 Files selected for processing (24)
cpp_test/smoke_column_sender.cdoc/COLUMN_SENDER_FFI_ABI.mddoc/COLUMN_SENDER_PERF.mddoc/COLUMN_SENDER_PLAN.mdinclude/questdb/ingress/column_sender.hquestdb-rs-ffi/src/column_sender.rsquestdb-rs-ffi/src/lib.rsquestdb-rs/Cargo.tomlquestdb-rs/benches/column_sender.rsquestdb-rs/src/ingress.rsquestdb-rs/src/ingress/buffer/qwp.rsquestdb-rs/src/ingress/column_sender/chunk.rsquestdb-rs/src/ingress/column_sender/conf.rsquestdb-rs/src/ingress/column_sender/db.rsquestdb-rs/src/ingress/column_sender/encoder.rsquestdb-rs/src/ingress/column_sender/mod.rsquestdb-rs/src/ingress/column_sender/sender.rsquestdb-rs/src/ingress/column_sender/validity.rsquestdb-rs/src/ingress/column_sender/wire.rsquestdb-rs/src/ingress/sender.rsquestdb-rs/src/ingress/sender/qwp_ws.rsquestdb-rs/src/tests.rsquestdb-rs/src/tests/column_sender_pool.rsquestdb-rs/src/tests/qwp_ws.rs
| questdb_db* questdb_db_connect( | ||
| const char* conf, | ||
| line_sender_error** err_out); | ||
|
|
There was a problem hiding this comment.
Fix questdb_db_connect signature drift in spec and example.
The spec/example omit conf_len, but the header requires it. Consumers copying this doc will hit compile errors.
Suggested doc fix
questdb_db* questdb_db_connect(
const char* conf,
+ size_t conf_len,
line_sender_error** err_out);- questdb_db* db = questdb_db_connect(
- "qwpws::addr=localhost:9000;pool_size=1;", &err);
+ const char* conf = "qwpws::addr=localhost:9000;pool_size=1;";
+ questdb_db* db = questdb_db_connect(
+ conf, strlen(conf), &err);Also applies to: 801-803
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@doc/COLUMN_SENDER_FFI_ABI.md` around lines 230 - 233, The documentation
signature for questdb_db_connect is missing the conf_len parameter which causes
drift from the header; update the spec and all examples (including the other
occurrences around lines referenced) to match the header by adding the conf_len
parameter to the function prototype for questdb_db_connect (ensure the signature
is: questdb_db* questdb_db_connect(const char* conf, size_t conf_len,
line_sender_error** err_out);) and update any example calls and parameter
descriptions to include conf_len so consumers won't get compile errors.
| For each `symbol_dict_<IDX>` variant, `codes[i]` is the index into the | ||
| dict for row `i`. Codes must be in range `0..dict_len` for valid rows; | ||
| behaviour is undefined for out-of-range codes when validity is NULL. | ||
| When a row's validity bit is 0, its code is ignored. |
There was a problem hiding this comment.
Avoid undefined behavior in symbol code validation contract.
The spec currently allows undefined behavior for out-of-range codes in one case. For a public C ABI, this should be deterministic (line_sender_error_invalid_api_call) regardless of validity pointer usage.
Suggested doc fix
-Codes must be in range `0..dict_len` for valid rows;
-behaviour is undefined for out-of-range codes when validity is NULL.
+For non-null rows, codes must satisfy `0 <= code < dict_len`.
+Out-of-range codes return `line_sender_error_invalid_api_call`.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| For each `symbol_dict_<IDX>` variant, `codes[i]` is the index into the | |
| dict for row `i`. Codes must be in range `0..dict_len` for valid rows; | |
| behaviour is undefined for out-of-range codes when validity is NULL. | |
| When a row's validity bit is 0, its code is ignored. | |
| For each `symbol_dict_<IDX>` variant, `codes[i]` is the index into the | |
| dict for row `i`. For non-null rows, codes must satisfy `0 <= code < dict_len`. | |
| Out-of-range codes return `line_sender_error_invalid_api_call`. | |
| When a row's validity bit is 0, its code is ignored. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@doc/COLUMN_SENDER_FFI_ABI.md` around lines 588 - 591, Update the spec for
symbol_dict_<IDX> to remove "behaviour is undefined" and require deterministic
validation: always check each codes[i] against dict_len and treat any
out-of-range code as an API error (return line_sender_error_invalid_api_call),
even when the validity pointer is NULL; mention that when a row's validity bit
is 0 its code is ignored for payload semantics but still must be within range to
avoid invalid API calls, and reference codes[i], dict_len, and the
line_sender_error_invalid_api_call symbol so implementers validate codes before
using them.
| - `column_sender_chunk_*` + `column_sender_submit` / | ||
| `_await_acked_fsn` — chunk fill and submit. Each column function | ||
| ships the moment its Rust counterpart lands. |
There was a problem hiding this comment.
Use the finalized flush API name in WS-5.
WS-5 still references column_sender_submit / _await_acked_fsn, but the documented/declared public API is column_sender_flush. This drift will mislead FFI and wrapper implementation work.
Suggested doc fix
- - `column_sender_chunk_*` + `column_sender_submit` /
- `_await_acked_fsn` — chunk fill and submit. Each column function
+ - `column_sender_chunk_*` + `column_sender_flush` — chunk fill and
+ synchronous submit+ACK wait. Each column function
ships the moment its Rust counterpart lands.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| - `column_sender_chunk_*` + `column_sender_submit` / | |
| `_await_acked_fsn` — chunk fill and submit. Each column function | |
| ships the moment its Rust counterpart lands. | |
| - `column_sender_chunk_*` + `column_sender_flush` — chunk fill and | |
| synchronous submit+ACK wait. Each column function | |
| ships the moment its Rust counterpart lands. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@doc/COLUMN_SENDER_PLAN.md` around lines 445 - 447, WS-5 still mentions the
old API names `column_sender_submit` and `_await_acked_fsn`; update the document
to use the finalized public API name `column_sender_flush` everywhere (including
any descriptions like "chunk fill and submit" -> "chunk fill and flush") and
ensure the surrounding text accurately reflects the flush behavior for
FFI/wrapper implementers, replacing any remaining `_await_acked_fsn` references
with the new API naming.
| * Close the pool and all its connections. Accepts NULL and no-ops. | ||
| * Outstanding `column_sender` handles remain valid and return their | ||
| * connections on `questdb_db_return_sender` — the pool's state is | ||
| * reference-counted internally. | ||
| */ |
There was a problem hiding this comment.
Unify post-questdb_db_close sender semantics with the ABI spec.
This header says checked-out senders remain valid after close, but the ABI spec says they are invalidated. The public contract needs one behavior.
Suggested header wording (if invalidation is intended)
- * Outstanding `column_sender` handles remain valid and return their
- * connections on `questdb_db_return_sender` — the pool's state is
- * reference-counted internally.
+ * `column_sender` handles still checked out become invalid after close;
+ * subsequent calls on them fail with `line_sender_error_invalid_api_call`.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| * Close the pool and all its connections. Accepts NULL and no-ops. | |
| * Outstanding `column_sender` handles remain valid and return their | |
| * connections on `questdb_db_return_sender` — the pool's state is | |
| * reference-counted internally. | |
| */ | |
| * Close the pool and all its connections. Accepts NULL and no-ops. | |
| * `column_sender` handles still checked out become invalid after close; | |
| * subsequent calls on them fail with `line_sender_error_invalid_api_call`. | |
| */ |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@include/questdb/ingress/column_sender.h` around lines 128 - 132, The header's
contract for post-questdb_db_close behavior conflicts with the ABI spec: decide
to follow the spec by making checked-out column_sender handles invalid after
questdb_db_close; update the comment block around questdb_db_close/column_sender
to state that closing the DB invalidates all outstanding column_sender handles
and they must not be used or returned, and then adjust related implementations
(e.g., questdb_db_close, questdb_db_return_sender, and any reference-counting
teardown logic) to mark/invalidate senders on close and to make
questdb_db_return_sender a no-op or error when given an invalidated sender;
reference symbols: questdb_db_close, column_sender, questdb_db_return_sender,
and the pool's internal refcounting to locate and change behavior and docs.
| * `codes[i]` must be in `0 .. dict_len` for non-null rows; null-row | ||
| * codes are not inspected. |
There was a problem hiding this comment.
Make symbol code bounds explicit and unambiguous.
0 .. dict_len is ambiguous in C-facing docs. Use an explicit predicate to prevent off-by-one misuse.
Suggested wording
- * `codes[i]` must be in `0 .. dict_len` for non-null rows; null-row
+ * For non-null rows, `codes[i]` must satisfy
+ * `0 <= codes[i] && (size_t)codes[i] < dict_len`; null-row
* codes are not inspected.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| * `codes[i]` must be in `0 .. dict_len` for non-null rows; null-row | |
| * codes are not inspected. | |
| * For non-null rows, `codes[i]` must satisfy | |
| * `0 <= codes[i] && (size_t)codes[i] < dict_len`; null-row | |
| * codes are not inspected. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@include/questdb/ingress/column_sender.h` around lines 388 - 389, Clarify the
symbol-code bounds in the comment near column_sender.h by replacing the
ambiguous range "0 .. dict_len" with an explicit predicate such as "0 <=
codes[i] && codes[i] < dict_len" (for non-null rows) and state that null-row
codes are not inspected; update the sentence referencing `codes[i]` and
`dict_len` so readers clearly understand the inclusive lower bound and exclusive
upper bound to avoid off-by-one errors.
| pub mod column_sender; | ||
| pub use column_sender::*; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Check FFI column_sender module for cfg gating =="
fd -a "column_sender.rs" questdb-rs-ffi/src | xargs -I{} sh -c 'echo "--- {}"; rg -n "cfg\\(|mod column_sender|use questdb::ingress::column_sender|pub extern" "{}"'
echo
echo "== Check feature declarations and links in FFI manifest =="
fd -a "Cargo.toml" questdb-rs-ffi | xargs -I{} sh -c 'echo "--- {}"; rg -n "sync-sender-qwp-ws|features|questdb" "{}" -n -C2'
echo
echo "== Check current file for unconditional exports =="
rg -n "pub mod column_sender|pub use column_sender::\\*" questdb-rs-ffi/src/lib.rs -n -C2Repository: questdb/c-questdb-client
Length of output: 2600
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Locate ingress.rs in questdb-rs =="
ls -la ../questdb-rs 2>/dev/null || true
fd -a "ingress.rs" ../questdb-rs -g'*.rs'
echo
echo "== Search questdb-rs ingress.rs for sync-sender-qwp-ws and column_sender gating =="
fd -a "ingress.rs" ../questdb-rs -g'*.rs' | head -n 5 | while read -r f; do
echo "--- $f"
rg -n "sync-sender-qwp-ws|column_sender" "$f" -n -C 3
done
echo
echo "== Search questdb-rs Cargo.toml for sync-sender-qwp-ws definition =="
fd -a "Cargo.toml" ../questdb-rs | head -n 5 | xargs -I{} sh -c 'echo "--- {}"; rg -n "sync-sender-qwp-ws" "{}" -n -C 3'
echo
echo "== Find any module export gating for column_sender in questdb-rs =="
rg -n "mod column_sender|pub mod column_sender|column_sender" ../questdb-rs/src/ingress.rs -n -C 3Repository: questdb/c-questdb-client
Length of output: 355
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== ls ../ =="
ls -ლა ../ || true
echo
echo "== Locate ingress.rs in ../questdb-rs =="
fd -a "ingress.rs" ../questdb-rs || true
echo
echo "== Search sync-sender-qwp-ws and column_sender in ingress.rs =="
INGRESS_FILES=$(fd -a "ingress.rs" ../questdb-rs || true)
if [ -z "$INGRESS_FILES" ]; then
echo "No ingress.rs found under ../questdb-rs"
else
echo "$INGRESS_FILES" | while read -r f; do
[ -f "$f" ] || continue
echo "--- $f"
rg -n "sync-sender-qwp-ws|mod column_sender|pub mod column_sender|column_sender::" "$f" -n -C 3 || true
done
fi
echo
echo "== Search questdb-rs Cargo.toml for sync-sender-qwp-ws definition =="
fd -a "Cargo.toml" ../questdb-rs | head -n 20 | while read -r toml; do
echo "--- $toml"
rg -n "sync-sender-qwp-ws" "$toml" -n -C 3 || true
done
echo
echo "== Also search questdb-rs src for sync-sender-qwp-ws references =="
rg -n "sync-sender-qwp-ws" ../questdb-rs/src -n -C 3 || trueRepository: questdb/c-questdb-client
Length of output: 683
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Find ingress.rs anywhere in repo =="
fd -a "ingress.rs" . || true
echo
echo "== Find questdb-rs/Cargo.toml (or any Cargo.toml mentioning questdb-rs) =="
fd -a "Cargo.toml" . | head -n 200 | while read -r p; do
if rg -n '"name"\s*=\s*"questdb-rs"' "$p" >/dev/null 2>&1; then
echo "--- $p"
fi
done
echo
echo "== Search for sync-sender-qwp-ws in repo =="
rg -n "sync-sender-qwp-ws" . -S || true
echo
echo "== Search for column_sender module export in ingress.rs (if found) =="
INGRESS=$(fd -a "ingress.rs" . || true)
if [ -z "${INGRESS}" ]; then
echo "No ingress.rs found."
else
echo "${INGRESS}" | while read -r f; do
echo "--- $f"
rg -n "mod column_sender|pub mod column_sender|column_sender" "$f" -n -C 3 || true
done
fiRepository: questdb/c-questdb-client
Length of output: 16766
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Show questdb-rs-ffi/Cargo.toml around [dependencies.questdb-rs] =="
rg -n "\[dependencies\.questdb-rs\]" -n questdb-rs-ffi/Cargo.toml
sed -n '1,120p' questdb-rs-ffi/Cargo.toml
echo
echo "== Search for sync-sender-qwp-ws in questdb-rs-ffi/Cargo.toml =="
rg -n "sync-sender-qwp-ws" questdb-rs-ffi/Cargo.toml || trueRepository: questdb/c-questdb-client
Length of output: 2605
Gate questdb-rs-ffi column_sender exports behind sync-sender-qwp-ws.
questdb-rs/src/ingress.rs only defines pub mod column_sender under #[cfg(feature = "sync-sender-qwp-ws")], but questdb-rs-ffi/src/column_sender.rs unconditionally imports questdb::ingress::column_sender::*, and questdb-rs-ffi/src/lib.rs unconditionally re-exports it (pub mod column_sender; pub use column_sender::*;). Since questdb-rs-ffi/Cargo.toml enables questdb-rs with default-features = false and features = ["sync-sender", ...] (no sync-sender-qwp-ws), non-WS builds should fail.
Fix by cfg-gating questdb-rs-ffi’s column_sender module + re-exports behind a sync-sender-qwp-ws feature (and forwarding it to questdb-rs/sync-sender-qwp-ws), or by ensuring the dependency enables sync-sender-qwp-ws whenever this module is compiled.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@questdb-rs-ffi/src/lib.rs` around lines 79 - 80, The export of column_sender
is unconditional but the upstream questdb::ingress::column_sender is only
compiled under the feature "sync-sender-qwp-ws", so gate questdb-rs-ffi’s module
and re-export behind the same feature: wrap the declarations (the pub mod
column_sender; and pub use column_sender::*;) in a #[cfg(feature =
"sync-sender-qwp-ws")] and ensure the feature is forwarded to the questdb-rs
dependency in Cargo.toml (or alternatively add #[cfg(feature =
"sync-sender-qwp-ws")] to the import in column_sender.rs that references
questdb::ingress::column_sender::*), so the module is only compiled when the
upstream feature is enabled.
| // Give the server thread time to register the accepts (the upgrades | ||
| // complete before `connect` returns, but the AtomicUsize is incremented | ||
| // before `perform_server_upgrade`). | ||
| wait_until(Duration::from_secs(2), || server.accepted() == 3); |
There was a problem hiding this comment.
Assert wait_until outcomes so these tests can’t false-pass.
Line 238 and Line 269 currently discard wait_until(...)’s boolean, so a timeout won’t fail the test.
💡 Suggested fix
- wait_until(Duration::from_secs(2), || server.accepted() == 3);
+ assert!(
+ wait_until(Duration::from_secs(2), || server.accepted() == 3),
+ "expected 3 accepted connections during eager-open"
+ );
@@
- wait_until(Duration::from_secs(2), || server.accepted() == 3);
+ assert!(
+ wait_until(Duration::from_secs(2), || server.accepted() == 3),
+ "expected 3 accepted connections during auto-grow"
+ );Also applies to: 269-269
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@questdb-rs/src/tests/column_sender_pool.rs` at line 238, The test currently
discards the boolean result of wait_until, allowing timeouts to false-pass;
change the bare calls to assertions so failures fail the test — for example
replace wait_until(Duration::from_secs(2), || server.accepted() == 3); with
assert!(wait_until(Duration::from_secs(2), || server.accepted() == 3), "timed
out waiting for server.accepted() == 3"); and do the same for the other
occurrence at the second call (line with wait_until(...) returning a boolean) so
both wait_until invocations are asserted (use assert! or assert_eq!(..., true)
with a descriptive message).
Rewrite the column-major sender to eliminate intermediate buffers and pipeline writes for maximum single-connection throughput. Architecture changes: - ColumnSender now owns a dedicated ColumnConn (conn.rs) that drives socket I/O directly — no replay queue, no background thread, no row-API publisher involvement. - Chunk<'a> holds borrowed descriptors (raw pointers + lengths) into the caller's buffers; no per-column Vec<u8> staging. The encoder writes wire bytes straight from caller memory into the connection's reusable write_buf at flush time. - flush() pipelines: encode + WS-mask + write_all, then drain acks non-blocking. Blocks only when in-flight hits the 128-frame protocol cap. New sync(AckLevel) blocks until all acks settle. - Server cumulative OKs handled correctly (sequence=N acks all frames up to N). API changes: - flush(&mut chunk, AckLevel) → flush(&mut chunk) (fire-and-forget) - New sync(AckLevel) drains all in-flight acks - FFI: column_sender_flush drops ack_level arg; new column_sender_sync - FFI lifetime contract: caller buffers must outlive flush (no copy) Performance (5M-row L1 quotes, 9 columns, localhost): - Encode path: 6 GB/s (2.3% of wall time) - End-to-end: 350 MB/s pipelined (was 264 MB/s stop-and-wait) - Per-chunk p50: 0.72 ms (was 2.64 ms) - Criterion populate+encode: 575 µs (was 718 µs, 20% faster) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The default macOS TCP send buffer (~128 KB) is smaller than a typical QWP chunk (1.5 MB at 25k rows). write_all blocks mid-frame while the kernel drains the small buffer. A 4 MiB send buffer lets the kernel accept a full chunk in one shot, reducing write_all stalls when the pipeline has multiple frames in flight. Also sets SO_RCVBUF to 4 MiB to absorb ack bursts from the server without backpressuring the server's send path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
questdb-rs-ffi/src/column_sender.rs (2)
324-325:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winFix constructor doc: table-name length is not validated at chunk creation.
The comment says
column_sender_chunk_newenforces≤ 127 bytes, but the implementation only parses UTF-8 and defers name-length validation to flush-time encoding. Please align docs with actual behavior.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@questdb-rs-ffi/src/column_sender.rs` around lines 324 - 325, The doc comment for column_sender_chunk_new incorrectly claims it enforces a ≤ 127 byte table-name length; update the constructor doc to reflect actual behavior: state that column_sender_chunk_new only validates that table_name is valid UTF-8 and creates an empty chunk, and that any table-name length checks are deferred to flush-time encoding (e.g., when preparing/encoding the chunk during flush). Mention the function name column_sender_chunk_new and the deferment to the flush/encoding path so readers know where length validation occurs.
114-119:⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy liftAvoid passing
#[repr(C)]enum values by value acrossextern "C"boundaries.
column_sender_synctakesack_level: column_sender_ack_levelby value (questdb-rs-ffi/src/column_sender.rs:807-810); if C supplies anything other than discriminants0/1, Rust can receive an invalid enum value at the ABI boundary, which can lead to UB. Accept an integer ABI type (e.g.,u32/c_int) and explicitly validate/map toAckLevelbefore use.column_sender_chunk_newdocuments “validated UTF-8, ≤ 127 bytes” (questdb-rs-ffi/src/column_sender.rs:324-336), but the constructor only validates UTF-8 and the test shows a 128-byte name succeeds (validation happens later). Update the docs to match behavior or enforce the length at construction.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@questdb-rs-ffi/src/column_sender.rs` around lines 114 - 119, The enum column_sender_ack_level must not be taken by-value across the C ABI; change the extern API for column_sender_sync to accept a fixed integer ABI type (e.g., u32 or libc::c_int), then validate/map that integer to the internal AckLevel enum inside column_sender_sync (return an error/NULL or map to a default for out-of-range values) to avoid UB. Also reconcile the column_sender_chunk_new behavior and docs: either enforce the "≤ 127 bytes" length check at construction (return an error/NULL if name.len() > 127) or update the documented constraint to reflect that only UTF-8 is validated and length is checked later; adjust the constructor for column_sender_chunk_new accordingly.questdb-rs/benches/column_sender.rs (1)
48-51:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUpdate stale benchmark helper reference in module docs.
The doc comment still links to
bench_encode_chunk, but the code now usesbench_encode_chunk_into. Please update the symbol/link to avoid broken docs and confusion.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@questdb-rs/benches/column_sender.rs` around lines 48 - 51, Update the module doc comment to reference the current benchmark helper symbol: replace the stale link/name `bench_encode_chunk` with `bench_encode_chunk_into` in the top-level docs so the documentation points to the correct helper used by the code; ensure the formatting of the doc link (brackets/backticks) matches the surrounding style to keep the intra-doc link working for `bench_encode_chunk_into`.
🧹 Nitpick comments (1)
questdb-rs/src/ingress/column_sender/conn.rs (1)
401-412: 💤 Low valueTimeout setting is a no-op.
The
set_timeoutsmethod does nothing — it ignores its parameters and always returnsOk(()). The comment explains timeouts are set during connect, but if this method is called (line 206) expecting timeout changes, that won't happen. Consider removing the call site or implementing the method.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@questdb-rs/src/ingress/column_sender/conn.rs` around lines 401 - 412, The set_timeouts method is currently a no-op and should either apply the requested timeouts or be removed at callers; implement it to set read/write timeouts on the underlying TCP socket (use the tcp_stream accessor that set_timeouts' comment references) by calling the platform APIs analogous to WsStream::set_timeouts (e.g., mapping the read: Option<Duration> and write: Option<Duration> to set_read_timeout/set_write_timeout and clearing with None), propagate/convert any IO errors into the Result return, and/or add a proper error when the tcp_stream is unavailable; alternatively, if callers (the code that invokes set_timeouts) never need runtime timeout changes, remove those call sites or expose a public setter on WsStream (as hinted) and move the actual timeout logic into that setter so establish_connection behavior in qwp_ws.rs remains unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@questdb-rs-ffi/src/column_sender.rs`:
- Around line 324-325: The doc comment for column_sender_chunk_new incorrectly
claims it enforces a ≤ 127 byte table-name length; update the constructor doc to
reflect actual behavior: state that column_sender_chunk_new only validates that
table_name is valid UTF-8 and creates an empty chunk, and that any table-name
length checks are deferred to flush-time encoding (e.g., when preparing/encoding
the chunk during flush). Mention the function name column_sender_chunk_new and
the deferment to the flush/encoding path so readers know where length validation
occurs.
- Around line 114-119: The enum column_sender_ack_level must not be taken
by-value across the C ABI; change the extern API for column_sender_sync to
accept a fixed integer ABI type (e.g., u32 or libc::c_int), then validate/map
that integer to the internal AckLevel enum inside column_sender_sync (return an
error/NULL or map to a default for out-of-range values) to avoid UB. Also
reconcile the column_sender_chunk_new behavior and docs: either enforce the "≤
127 bytes" length check at construction (return an error/NULL if name.len() >
127) or update the documented constraint to reflect that only UTF-8 is validated
and length is checked later; adjust the constructor for column_sender_chunk_new
accordingly.
In `@questdb-rs/benches/column_sender.rs`:
- Around line 48-51: Update the module doc comment to reference the current
benchmark helper symbol: replace the stale link/name `bench_encode_chunk` with
`bench_encode_chunk_into` in the top-level docs so the documentation points to
the correct helper used by the code; ensure the formatting of the doc link
(brackets/backticks) matches the surrounding style to keep the intra-doc link
working for `bench_encode_chunk_into`.
---
Nitpick comments:
In `@questdb-rs/src/ingress/column_sender/conn.rs`:
- Around line 401-412: The set_timeouts method is currently a no-op and should
either apply the requested timeouts or be removed at callers; implement it to
set read/write timeouts on the underlying TCP socket (use the tcp_stream
accessor that set_timeouts' comment references) by calling the platform APIs
analogous to WsStream::set_timeouts (e.g., mapping the read: Option<Duration>
and write: Option<Duration> to set_read_timeout/set_write_timeout and clearing
with None), propagate/convert any IO errors into the Result return, and/or add a
proper error when the tcp_stream is unavailable; alternatively, if callers (the
code that invokes set_timeouts) never need runtime timeout changes, remove those
call sites or expose a public setter on WsStream (as hinted) and move the actual
timeout logic into that setter so establish_connection behavior in qwp_ws.rs
remains unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e1dc9a57-6a92-48c3-9f9d-bd5714c117da
📒 Files selected for processing (19)
doc/COLUMN_SENDER_FFI_ABI.mddoc/COLUMN_SENDER_PERF.mddoc/COLUMN_SENDER_PLAN.mdquestdb-rs-ffi/src/column_sender.rsquestdb-rs/Cargo.tomlquestdb-rs/benches/column_sender.rsquestdb-rs/examples/qwp_ws_l1_quotes.rsquestdb-rs/src/ingress.rsquestdb-rs/src/ingress/column_sender/chunk.rsquestdb-rs/src/ingress/column_sender/conf.rsquestdb-rs/src/ingress/column_sender/conn.rsquestdb-rs/src/ingress/column_sender/db.rsquestdb-rs/src/ingress/column_sender/encoder.rsquestdb-rs/src/ingress/column_sender/mod.rsquestdb-rs/src/ingress/column_sender/sender.rsquestdb-rs/src/ingress/column_sender/validity.rsquestdb-rs/src/ingress/sender.rsquestdb-rs/src/ingress/sender/qwp_ws.rsquestdb-rs/src/tests/column_sender_pool.rs
💤 Files with no reviewable changes (1)
- questdb-rs/src/ingress/column_sender/validity.rs
flush() now sets FLAG_DEFER_COMMIT (0x01) on every QWP frame. The server appends rows to WAL writers without committing. sync() sends a commit-triggering empty frame (without the flag) that commits all accumulated rows in one WAL transaction, then drains acks. This eliminates per-chunk WAL fsync overhead: 200 chunks × 25k rows now produce 1 WAL commit instead of 200. The p95 per-chunk latency drops from ~23 ms to ~7 ms. Old servers that don't recognize the flag ignore it (reserved bit position) and commit per-message — graceful degradation per the spec. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
questdb-rs/src/ingress/column_sender/encoder.rs (1)
742-879: ⚡ Quick winAdd coverage for the new defer-commit bit.
All updated tests still call
encode_chunk_into(..., false), so the new path never asserts that deferred flushes setQWP_FLAG_DEFER_COMMITor that the empty sync frame leaves it clear. A tiny header-byte test here would lock down the new wire contract.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@questdb-rs/src/ingress/column_sender/encoder.rs` around lines 742 - 879, Add tests that exercise the new defer-commit path by calling encode_chunk_into(&mut out, &chunk, &mut reg, &mut dict, true) and asserting the header flag includes QWP_FLAG_DEFER_COMMIT, and also a corresponding test with the false argument to assert the flag is clear; locate the header byte checked in existing tests (e.g. out[5] used in empty_chunk_encodes_to_14_bytes and symbol-related tests) and validate the bit is set/cleared using QWP_FLAG_DEFER_COMMIT, reusing the Chunk construction and encode_chunk_into invocation patterns already present in tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@questdb-rs/src/ingress/column_sender/sender.rs`:
- Around line 123-127: In sync(), validate the requested AckLevel before sending
the commit-triggering frame: check AckLevel::Durable up-front (e.g., if
ack_level == AckLevel::Durable) against the connection's capabilities (call on
self.conn such as supports_durable_acks() or an equivalent method) and return an
Err when the connection doesn't support durable ACKs; only after this check
succeeds call self.flush_inner(...) and then self.conn.sync_all_acks(ack_level).
Ensure the error returned clearly indicates unsupported AckLevel for this
connection.
- Around line 123-127: sync() currently calls flush_inner() to send a
non-deferred commit frame but the existing capacity check (used by deferred
flushes) can fill the last slot and deadlock both flush() and sync() on
drain_one_ack_blocking(); fix by reserving one slot for the commit frame when
checking capacity: either add a boolean parameter (e.g., reserve_commit_slot) to
flush_inner() and have sync() call flush_inner(&mut commit_chunk, /*
defer_commit = */ false, /* reserve_commit_slot = */ true), or modify the cap
check logic so that when defer_commit is true it enforces available_slots > 1
(or when sending non-deferred frames it bypasses the full check), and surface a
clear error (e.g., “capacity exhausted: call sync() now”) if a deferred flush
would consume the reserved slot; update flush() and flush_inner() accordingly to
use the new check so deferred flushes cannot occupy the last slot needed for the
commit frame (affecting sync(), flush(), flush_inner(), and
drain_one_ack_blocking()).
---
Nitpick comments:
In `@questdb-rs/src/ingress/column_sender/encoder.rs`:
- Around line 742-879: Add tests that exercise the new defer-commit path by
calling encode_chunk_into(&mut out, &chunk, &mut reg, &mut dict, true) and
asserting the header flag includes QWP_FLAG_DEFER_COMMIT, and also a
corresponding test with the false argument to assert the flag is clear; locate
the header byte checked in existing tests (e.g. out[5] used in
empty_chunk_encodes_to_14_bytes and symbol-related tests) and validate the bit
is set/cleared using QWP_FLAG_DEFER_COMMIT, reusing the Chunk construction and
encode_chunk_into invocation patterns already present in tests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8dd5a381-9e75-483e-a6d5-75dc47a6c040
📒 Files selected for processing (4)
questdb-rs/src/ingress/column_sender/encoder.rsquestdb-rs/src/ingress/column_sender/mod.rsquestdb-rs/src/ingress/column_sender/sender.rsquestdb-rs/src/ingress/column_sender/wire.rs
The server's ClientSymbolCache only caches symbols with symbolKey < initialSymbolCount. On a fresh table, initialSymbolCount stays at 0 until a WAL segment rolls and the watermark updates. By sending the first frame without FLAG_DEFER_COMMIT, the server commits it immediately, which allows the next segment to pick up the new symbol count and enable caching for all subsequent deferred frames. This is a client-side workaround for a server-side cache limitation. The proper fix is for the server to cache locally-assigned symbol IDs within the same segment (see WalColumnarRowAppender.putSymbolColumn). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@questdb-rs/src/ingress/column_sender/sender.rs`:
- Around line 115-118: Update the flush() documentation to reflect the current
behavior: indicate that the first successful frame is sent without
FLAG_DEFER_COMMIT and subsequent frames are sent deferred; reference the flush()
method and the first_frame_sent field (and FLAG_DEFER_COMMIT / flush_inner) so
readers can locate the implementation and understand the contract change. Ensure
the doc comment clearly states the first-frame is committed immediately (no
defer flag) and later frames use defer behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bc81f692-f8fc-4dce-bf49-1a2f3216da8d
📒 Files selected for processing (1)
questdb-rs/src/ingress/column_sender/sender.rs
Align the C ABI, docs, and smoke test with column_sender_flush(sender, chunk, err) plus column_sender_sync(sender, ack_level, err). Reserve an in-flight slot for the sync commit, validate durable ACK opt-in before publishing, and add pool/sync coverage.
Summary
Lands the Rust core, C ABI, and benchmarks for a column-major sender that
ingests Pandas/Polars DataFrames into QuestDB at the maximum throughput
the QWP/WebSocket wire allows. The design and FFI spec landed in earlier
commits on this branch (
doc/COLUMN_SENDER_PLAN.md,doc/COLUMN_SENDER_FFI_ABI.md); this PR implements WS-0…WS-6 of theplan in a single commit.
The user model is
DataFrame → Table:The same surface is exposed verbatim through the C ABI in
include/questdb/ingress/column_sender.h.What's in the box
WS-0 —
QuestDbpool (ingress/column_sender/{db,conf}.rs).Thread-safe; eager-open, fail-fast at
pool_max,BorrowedSender<'a>that returns on
Drop, background reaper underpool_reap=auto.New conf keys:
pool_size,pool_max,pool_idle_timeout_ms,pool_reap. Store-and-forward (sf_*/sender_id) andqwp_ws_progress=manualrefused atconnect-time.WS-1 — synchronous
flush(ingress/column_sender/sender.rs,encoder.rs).ColumnSender::flush(chunk, AckLevel)encodes, publishesvia the existing QWP/WS replay queue, and blocks until the ACK
watermark crosses the published FSN. Polls in 50 ms slices so
must_closemid-wait surfaces promptly.AckLevel::Durablerequiresrequest_durable_ack=onat connect (elseInvalidApiCall).WS-2 —
Chunk+ numeric / fixed-width columns(
chunk.rs,validity.rs,wire.rs).Per-column wire-shape
Vec<u8>storage; encode is a header +extend_from_sliceper column. Two code paths per type:bool/i{8,16,32,64}/f{32,64}—null_flag = 0always;nullable rows sentinel-encoded (matches the row-API convention).
uuid/long256/ipv4/ts_{nanos,micros}/date_millis—no-null =
extend_from_slice; nullable = QWP-shape bitmap +dense gather.
Connection-scoped
SchemaRegistry: first emit → FULL; repeat → REFERENCE.WS-3 — VARCHAR (
Chunk::column_varchar). Arrow Utf8 in;dense LE-u32 offsets + concatenated bytes out. No-null path memcpys
offsets when
offsets[0] == 0; nullable path skips slicing for nullrows. Offset validation (negative / non-monotonic / past end) caught
client-side.
WS-4 — symbol bulk-intern (
Chunk::symbol_dict_{i8,i16,i32},encoder::resolve_symbols). Three append-time passes(referenced-bitset + range check; compact referenced dict bytes;
translate to internal indices), with global-id assignment deferred to
flush time. Connection-scoped
SymbolGlobalDict(the same type therow API uses;
next_id/intern/entrypromoted topub(crate)).Only entries the chunk references reach the wire — protects the
1M-per-connection cap on huge Pandas
Categoricaldicts.WS-5 — C ABI
(
questdb-rs-ffi/src/column_sender.rs,include/questdb/ingress/column_sender.h).Full implementation of
doc/COLUMN_SENDER_FFI_ABI.md:questdb_db,column_sender,column_sender_chunk.column_sender_validityrepr-C struct;column_sender_ack_levelrepr-C enum.
questdb_db_connect/close/borrow_sender/return_sender/reap_idle.symbol_dict_*family,both designated-timestamp variants, and
column_sender_flush.line_sender_error*.Rust side gains
OwnedSender— Arc-backed borrow handle so the Ccaller can free
questdb_db*before all borrows return withoutdangling.
Hand-runnable smoke test at
cpp_test/smoke_column_sender.c(round-tripsa 3-row chunk with
i64+f64+ nullablevarchar+ designatedtimestamp against a real server). Compiles with
-Wall -Wextra -Werror;not wired into CMake yet (matches the
smoke_line_readerpattern).WS-6 — bench + perf doc
(
questdb-rs/benches/column_sender.rs,doc/COLUMN_SENDER_PERF.md).Criterion bench in three families: per-column append vs raw memcpy
baseline; symbol bulk-intern vs naïve per-row HashMap;
encode_chunkend-to-end (no network). First-baseline numbers from an Apple Silicon
laptop (100k rows, 1k-card symbol dict):
column_f64/column_sender_no_nullcolumn_i64/column_sender_no_nullcolumn_varchar/column_sender_no_nullsymbol_dict/column_sendersymbol_dict/naive_per_row_hashmapencode_chunk/populate_plus_encodeArchitecture notes
The plan calls out code reuse as a non-goal —
doc/COLUMN_SENDER_PLAN.md§2.1. What this PR shares with the row API is what must stay coherent at
connection scope: the global
SymbolGlobalDicttype (same wire-formatconventions, dedicated per-connection instance for column-sender slots),
the QWP/WS publisher / driver / WS framing (reached through a
pub(crate)
Sender::qwp_ws_publish_rawescape hatch). The encoder,schema registry, validity helpers, and varint/type-byte tables are
deliberately duplicated in
column_sender/so the hot path stays freeof cross-module hops.
The pool's
BorrowedSender<'a>carries a&'a QuestDblifetime forRust callers (compile-time use-after-close protection). The FFI gets an
OwnedSenderinstead — same code path but lifetime-free, with anArc<DbInner>keeping the pool's state alive past the user'squestdb_db*pointer.Test plan
cargo test— full lib suite passes (834 tests; +57 new column-sender tests)cargo test --lib column_sender— 57 tests passcargo test --manifest-path questdb-rs-ffi/Cargo.toml --lib— 34 FFI tests pass (8 new)cargo fmt --checkclean on both cratescargo clippy --tests --benches --features sync-sender-qwp-wscleancargo doc --no-depsintroduces no new warningscargo bench --features sync-sender-qwp-ws --bench column_sender -- --quick --noplotcompletes in <1 min; all groups report sane throughputcc -std=c11 -Wall -Wextra -Werror -I include -c cpp_test/smoke_column_sender.ccompilesWhat's not in here
py-questdb-client. With the CABI shipped here, that repo can now start consuming the column-sender
FFI symbols.
smoke_line_readerpattern is the template.Python repo / nightly CI rather than the in-tree Criterion suite.
🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Documentation
Benchmarks & Tests