Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
177a81d
docs(ingress): add column-major sender plan and FFI ABI spec
bluestreak01 May 24, 2026
15f4c02
docs(ingress): sync flush with ack_level, refuse sf_dir in v1
bluestreak01 May 24, 2026
c7407b0
feat(ingress): column-major sender for QWP/WebSocket (WS-0..WS-6)
bluestreak01 May 24, 2026
7248546
feat(ingress): zero-copy pipelined column sender
bluestreak01 May 24, 2026
a9faea2
perf(ingress): set SO_SNDBUF and SO_RCVBUF to 4 MiB on QWP/WS sockets
bluestreak01 May 24, 2026
1725f8c
chore: register qwp_ws_l1_quotes example in Cargo.toml
bluestreak01 May 24, 2026
820ac39
feat(ingress): FLAG_DEFER_COMMIT for batched WAL commits
bluestreak01 May 25, 2026
2090138
perf(ingress): send first frame without FLAG_DEFER_COMMIT
bluestreak01 May 25, 2026
bc2cb85
support arrow/polars on sender and reader
kafka1991 May 26, 2026
06ee1a2
skip column that all null
kafka1991 May 26, 2026
e27bc30
Fix column sender sync ABI and ACK handling
jerrinot May 26, 2026
7740b7a
Step 1: re-anchor FFI on qwpws_conn
jerrinot May 27, 2026
632c647
Step 2a: add column_sender_chunk_append_arrow_column FFI
jerrinot May 27, 2026
6c53ea7
Step 2b: support LargeUtf8 (Arrow 'U') in the Arrow appender
jerrinot May 27, 2026
0650c40
Step 2c: append_arrow_column gains (row_offset, row_count)
jerrinot May 27, 2026
f35123d
Self-review fixes: Arrow mirror types, naming, docs
jerrinot May 27, 2026
6496bf8
Review fixes: late-slice LargeUtf8 + null buffer pointer rejection
jerrinot May 27, 2026
ba0cf92
Step 3: NumPy widening + bool packing via column_numpy
jerrinot May 27, 2026
45ce070
Add questdb_db_drop_conn for mid-call error recovery
jerrinot May 27, 2026
110b6d0
Polish bundle from the multi-agent review
jerrinot May 27, 2026
84836d2
better api
kafka1991 May 28, 2026
6713466
add more python tests
kafka1991 May 28, 2026
3aab56a
Merge remote-tracking branch 'origin/arrow_polars' into jh_conn_pool_…
jerrinot May 28, 2026
3f66bc5
Merge remote-tracking branch 'origin/arrow_polars' into jh_conn_pool_…
jerrinot May 28, 2026
f232e2b
Egress reader pool inside `questdb_db`
jerrinot May 28, 2026
766bb60
tuning tests
kafka1991 May 29, 2026
2d2fda4
add polars test
kafka1991 May 29, 2026
257c0c1
optimise arrow implementation
kafka1991 May 29, 2026
361420c
add test suit
kafka1991 May 29, 2026
832878e
optimise code
kafka1991 May 29, 2026
4fd1c67
code review and fmt
kafka1991 May 29, 2026
53c77b2
fix ci
kafka1991 Jun 1, 2026
7b8110f
fix ci
kafka1991 Jun 1, 2026
2009263
better arrow rust api
kafka1991 Jun 1, 2026
8e5798d
code format
kafka1991 Jun 1, 2026
d2a178b
add timeout in CI
kafka1991 Jun 1, 2026
a454590
Merge remote-tracking branch 'origin/pr-150' into jh_conn_pool_refactor
jerrinot Jun 1, 2026
d0f9fc1
Support LargeUtf8 symbol dictionaries in column sender
jerrinot Jun 1, 2026
f23a7e7
Add pooled QWP buffer flush
jerrinot Jun 1, 2026
5ffa114
code review and better api
kafka1991 Jun 1, 2026
c6078ed
Validate Arrow timestamps before QWP publish
jerrinot Jun 1, 2026
1757f51
code review round2
kafka1991 Jun 2, 2026
bb22275
code review round3
kafka1991 Jun 2, 2026
b84e0d0
code format
kafka1991 Jun 2, 2026
4a7d045
code review
kafka1991 Jun 2, 2026
1c69081
code review
kafka1991 Jun 2, 2026
47c3afb
Merge branch 'arrow_polars' into jh_conn_pool_refactor
kafka1991 Jun 2, 2026
fe36d8c
trigger ci
kafka1991 Jun 3, 2026
ea67465
Support Arrow string symbol metadata
jerrinot Jun 3, 2026
7cc3a12
Merge remote-tracking branch 'origin/jh_conn_pool_refactor' into jh_c…
jerrinot Jun 3, 2026
f0d557e
use java 25
jerrinot Jun 3, 2026
3972c08
change arrow abi
kafka1991 Jun 4, 2026
69c1f2a
fix asan/bsan test failed
kafka1991 Jun 4, 2026
a19c0d7
fix tests
kafka1991 Jun 4, 2026
99bd412
fix bugs
kafka1991 Jun 4, 2026
a39d0a9
optimise numpy datatype
kafka1991 Jun 4, 2026
78cea31
optimise numpy datatype
kafka1991 Jun 4, 2026
17e644b
fix compile issue
kafka1991 Jun 5, 2026
bfe054f
remove debug info
kafka1991 Jun 5, 2026
a28fd0a
disable cargo incremental
kafka1991 Jun 5, 2026
8231a3a
abi adjust
kafka1991 Jun 5, 2026
cadc2ba
abi adjust and code review
kafka1991 Jun 5, 2026
d728035
code review and abi adjust
kafka1991 Jun 5, 2026
a9267e2
Add reusable Arrow column imports
jerrinot Jun 5, 2026
ed18970
Align Arrow dataframe classification contracts
jerrinot Jun 5, 2026
6d084ab
fix test and format
kafka1991 Jun 8, 2026
6883623
code refactor
kafka1991 Jun 8, 2026
6eb8ff6
fix system tests python binding
kafka1991 Jun 8, 2026
ce91faa
code review
kafka1991 Jun 8, 2026
1f07e7d
fix docs ane docstring
kafka1991 Jun 8, 2026
8515a57
add support more numpy datatype
kafka1991 Jun 8, 2026
e78af42
code review
kafka1991 Jun 8, 2026
b57cd1b
review
kafka1991 Jun 8, 2026
fa20fe8
revert name validate
kafka1991 Jun 8, 2026
9cf6ff8
fix to newline character
kafka1991 Jun 8, 2026
0af2f1d
code review
kafka1991 Jun 8, 2026
fd60708
code review
kafka1991 Jun 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ option(
"Build the C/C++ tests with -fsanitize=address,undefined."
OFF)

option(
QUESTDB_ENABLE_ARROW
"Build with Apache Arrow C Data Interface exports. Opt-in: pulls arrow-rs."
OFF)

# Build static and dynamic lib written in Rust by invoking `cargo`.
# Imports `questdb_client` target.
add_subdirectory(corrosion)
Expand All @@ -81,6 +86,13 @@ endif()
if(QUESTDB_ENABLE_INSECURE_SKIP_VERIFY)
list(APPEND QUESTDB_CARGO_FEATURES insecure-skip-verify)
endif()
if(QUESTDB_TESTS_AND_EXAMPLES AND NOT QUESTDB_ENABLE_ARROW)
message(STATUS "QUESTDB_TESTS_AND_EXAMPLES=ON: enabling QUESTDB_ENABLE_ARROW")
set(QUESTDB_ENABLE_ARROW ON)
endif()
if(QUESTDB_ENABLE_ARROW)
list(APPEND QUESTDB_CARGO_FEATURES arrow)
endif()
if(QUESTDB_CARGO_FEATURES)
corrosion_import_crate(
MANIFEST_PATH questdb-rs-ffi/Cargo.toml
Expand All @@ -94,6 +106,9 @@ endif()
target_include_directories(
questdb_client INTERFACE
${CMAKE_CURRENT_SOURCE_DIR}/include)
if(QUESTDB_ENABLE_ARROW)
target_compile_definitions(questdb_client INTERFACE QUESTDB_CLIENT_ENABLE_ARROW)
endif()
if(WIN32)
set_target_properties(
questdb_client-shared
Expand Down Expand Up @@ -280,6 +295,26 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
compile_example(
line_reader_c_example_columns
examples/line_reader_c_example_columns.c)
compile_example(
line_reader_c_example_arrow
examples/line_reader_c_example_arrow.c)

find_package(Arrow QUIET)
if(Arrow_FOUND)
compile_example(
line_sender_cpp_example_arrow
examples/line_sender_cpp_example_arrow.cpp)
target_link_libraries(
line_sender_cpp_example_arrow Arrow::arrow_shared)
compile_example(
line_reader_cpp_example_arrow
examples/line_reader_cpp_example_arrow.cpp)
target_link_libraries(
line_reader_cpp_example_arrow Arrow::arrow_shared)
else()
message(STATUS
"arrow-cpp not found; skipping line_{sender,reader}_cpp_example_arrow.")
endif()

# Include Rust tests as part of the tests run
add_test(
Expand Down Expand Up @@ -358,6 +393,28 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
cpp_test/qwp_mock_server.cpp
cpp_test/test_line_reader_mock.cpp)

# Apache Arrow C Data Interface tests. The fatal_error gate above
# forces QUESTDB_ENABLE_ARROW=ON when tests are enabled, so these
# always build alongside the rest of the suite.
compile_test(
test_arrow_c
cpp_test/qwp_mock_server.cpp
cpp_test/qwp_mock_c.cpp
cpp_test/test_arrow_c.c)
compile_test(
test_arrow_egress
cpp_test/qwp_mock_server.cpp
cpp_test/test_arrow_egress.cpp)
compile_test(
test_arrow_ingress
cpp_test/qwp_mock_server.cpp
cpp_test/test_arrow_ingress.cpp)

compile_test(
test_column_sender
cpp_test/qwp_mock_server.cpp
cpp_test/test_column_sender.cpp)

# System testing Python3 script.
# This will download the latest QuestDB instance from Github,
# thus will also require a Java 11 installation to run the tests.
Expand Down
20 changes: 15 additions & 5 deletions ci/compile.yaml
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
steps:
- bash: |
df -h /
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc \
/opt/hostedtoolcache/CodeQL /usr/local/share/boost || true
sudo docker image prune --all --force >/dev/null 2>&1 || true
df -h /
condition: eq(variables['imageName'], 'ubuntu-latest')
displayName: "Free disk space (Microsoft-hosted ubuntu)"
- bash: |
echo "##vso[task.setvariable variable=CARGO_INCREMENTAL]0"
condition: eq(variables['imageName'], 'ubuntu-latest')
displayName: "Disable cargo incremental on Linux (saves ~30-50% target/ size)"
- script: |
rustup update $(toolchain)
rustup default $(toolchain)
condition: ne(variables['toolchain'], '')
displayName: "Update and set Rust toolchain"
- script: |
brew install numpy
python3 -m pip install --break-system-packages pyarrow polars
condition: eq(variables['imageName'], 'macos-latest')
displayName: "Install numpy via brew on macOS"
displayName: "Install numpy + pyarrow + polars on macOS"
- script: |
python -m pip install --upgrade pip
pip install numpy
# hetzner-incus provisions numpy via apt (python3-numpy) before this
# template runs because Ubuntu 24.04+ enforces PEP 668 and rejects
# pip into the system interpreter.
pip install numpy pyarrow polars tzdata
condition: |
and(
ne(variables['imageName'], 'macos-latest'),
Expand Down
13 changes: 12 additions & 1 deletion ci/run_all_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def main():
'test_line_reader_mock',
'line_reader_c_smoke',
'test_line_reader', # live-broker; skips per-test when no broker reachable
'test_arrow_c',
'test_arrow_egress',
'test_arrow_ingress',
'test_column_sender',
]
test_paths = [
(d, find_binary(d, name, exe_suffix))
Expand All @@ -45,7 +49,7 @@ def main():
]

system_test_path = pathlib.Path('system_test') / 'test.py'
qdb_v = '9.2.0' # The version of QuestDB we'll test against.
qdb_v = '9.4.1' # The version of QuestDB we'll test against.

run_cmd('cargo', 'test',
'--', '--nocapture', cwd='questdb-rs')
Expand All @@ -64,7 +68,14 @@ def main():
'--', '--nocapture', cwd='questdb-rs')
run_cmd('cargo', 'test', '--features=almost-all-features',
'--', '--nocapture', cwd='questdb-rs')
run_cmd('cargo', 'test',
'--features=almost-all-features,arrow,polars',
'--', '--nocapture', cwd='questdb-rs')
run_cmd('cargo', 'test', '--no-default-features',
'--features=ring-crypto,tls-webpki-certs,sync-sender-qwp-ws,sync-reader-ws,arrow',
'--', '--nocapture', cwd='questdb-rs')
run_cmd('cargo', 'test', cwd='questdb-rs-ffi')
run_cmd('cargo', 'test', '--features=arrow', cwd='questdb-rs-ffi')
for _, path in test_paths:
run_cmd(str(path))
run_cmd('python3', str(system_test_path), 'run', '--versions', qdb_v, '-v')
Expand Down
8 changes: 7 additions & 1 deletion ci/run_fuzz_pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ stages:
- bash: |
set -eux
sudo apt-get update
sudo apt-get install -y --no-install-recommends cmake python3-numpy
sudo apt-get install -y --no-install-recommends cmake python3-numpy python3-pip
sudo python3 -m pip install --break-system-packages pyarrow polars
# Image-provided JDK paths (see provision.sh's
# `apt-get install -y openjdk-17-jdk openjdk-25-jdk maven`).
JAVA_PATH_17="/usr/lib/jvm/java-17-openjdk-amd64"
Expand Down Expand Up @@ -200,6 +201,9 @@ stages:
- script: |
python3 system_test/test.py run --repo ./questdb TestQwpWsFuzz -v
displayName: "TestQwpWsFuzz"
- script: |
python3 system_test/test.py run --repo ./questdb TestArrowEgressFuzz TestArrowEgressPerKind TestArrowEgressEmpty TestArrowIngressFuzz TestArrowIngressPerKind TestArrowIngressDesignatedTs TestArrowIngressErrors TestArrowIngressMultiBatch TestArrowIngressExtraTypes TestArrowIngressUnsupportedTypes TestArrowRoundTripFuzz TestArrowRoundTripPerKind TestArrowAlignment TestArrowPolarsFuzz TestArrowPolarsRoundTripPerKind TestArrowPolarsPerDtype -v
displayName: "TestArrowFuzz"
- task: ArchiveFiles@2
displayName: "Compress QuestDB server log on failure"
condition: failed()
Expand Down Expand Up @@ -277,6 +281,8 @@ stages:
pool:
vmImage: "ubuntu-latest"
timeoutInMinutes: 30
variables:
imageName: ubuntu-latest
steps:
- checkout: self
fetchDepth: 1
Expand Down
17 changes: 13 additions & 4 deletions ci/run_tests_pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ stages:
pool:
name: $(poolName)
vmImage: $(imageName)
timeoutInMinutes: 60
timeoutInMinutes: 90
steps:
- checkout: self
fetchDepth: 1
Expand All @@ -67,7 +67,7 @@ stages:
############################# temp for test end #####################
- script: python3 ci/run_all_tests.py
env:
JAVA_HOME: $(JAVA_HOME_17_X64)
JAVA_HOME: $(JAVA_HOME_25_X64)
displayName: "Tests"
# - task: PublishBuildArtifacts@1
# inputs:
Expand Down Expand Up @@ -181,7 +181,7 @@ stages:
# debian-installed packages because the wheel RECORD file is
# missing). --break-system-packages overrides PEP 668.
sudo apt-get install -y --no-install-recommends cmake python3-pip
sudo python3 -m pip install --break-system-packages 'numpy>=2'
sudo python3 -m pip install --break-system-packages 'numpy>=2' pyarrow polars
JAVA_PATH_17="/usr/lib/jvm/java-17-openjdk-amd64"
JAVA_PATH_25="/usr/lib/jvm/java-25-openjdk-amd64"
for p in "$JAVA_PATH_17" "$JAVA_PATH_25"; do
Expand Down Expand Up @@ -313,6 +313,9 @@ stages:
- script: |
python3 system_test/test.py run --repo ./questdb TestQwpWsFuzz -v
displayName: "TestQwpWsFuzz"
- script: |
python3 system_test/test.py run --repo ./questdb TestArrowEgressFuzz TestArrowEgressPerKind TestArrowEgressEmpty TestArrowIngressFuzz TestArrowIngressPerKind TestArrowIngressDesignatedTs TestArrowIngressErrors TestArrowIngressMultiBatch TestArrowIngressExtraTypes TestArrowIngressUnsupportedTypes TestArrowRoundTripFuzz TestArrowRoundTripPerKind TestArrowAlignment TestArrowPolarsFuzz TestArrowPolarsRoundTripPerKind TestArrowPolarsPerDtype -v
displayName: "TestArrowWsFuzz"
# Mirrors ci/run_fuzz_pipeline.yaml: on failure, archive and
# publish the QuestDB server log so PR reviewers don't have to
# repro locally. Path comes from system_test/fixture.py:_log_path.
Expand Down Expand Up @@ -360,7 +363,8 @@ stages:
- bash: |
set -eux
sudo apt-get update
sudo apt-get install -y --no-install-recommends cmake python3-numpy
sudo apt-get install -y --no-install-recommends cmake python3-numpy python3-pip
sudo python3 -m pip install --break-system-packages pyarrow polars
JAVA_PATH_17="/usr/lib/jvm/java-17-openjdk-amd64"
JAVA_PATH_25="/usr/lib/jvm/java-25-openjdk-amd64"
for p in "$JAVA_PATH_17" "$JAVA_PATH_25"; do
Expand Down Expand Up @@ -414,6 +418,9 @@ stages:
- script: |
python3 system_test/test.py run --repo ./questdb TestQwpWsFuzz -v
displayName: "TestQwpWsFuzz"
- script: |
python3 system_test/test.py run --repo ./questdb TestArrowEgressFuzz TestArrowEgressPerKind TestArrowEgressEmpty TestArrowIngressFuzz TestArrowIngressPerKind TestArrowIngressDesignatedTs TestArrowIngressErrors TestArrowIngressMultiBatch TestArrowIngressExtraTypes TestArrowIngressUnsupportedTypes TestArrowRoundTripFuzz TestArrowRoundTripPerKind TestArrowAlignment TestArrowPolarsFuzz TestArrowPolarsRoundTripPerKind TestArrowPolarsPerDtype -v
displayName: "TestArrowWsFuzz"
- task: ArchiveFiles@2
displayName: "Compress QuestDB server log on failure"
condition: failed()
Expand Down Expand Up @@ -444,6 +451,8 @@ stages:
pool:
vmImage: "ubuntu-latest"
timeoutInMinutes: 30
variables:
imageName: ubuntu-latest
steps:
- checkout: self
fetchDepth: 1
Expand Down
56 changes: 56 additions & 0 deletions cpp_test/qwp_mock_c.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include "qwp_mock_c.h"
#include "qwp_mock_server.hpp"

#include <memory>
#include <string>
#include <vector>

namespace qm = qwp_mock;

struct qwp_mock_c
{
std::unique_ptr<qm::MockServer> server;
std::string addr_cached;
};

extern "C" qwp_mock_c* qwp_mock_c_start(int slot_count)
{
if (slot_count < 1)
slot_count = 1;
// Per-connection script: wait for one client binary frame whose
// first byte is 'Q' (the QWP1 magic byte that every column-sender
// publish frame starts with). This blocks the worker from
// `graceful_close`ing before the client has finished writing.
qm::Script accept_one_frame = {
qm::ActionAwaitClientFrame{0x51},
};
std::vector<qm::Script> scripts;
scripts.reserve(static_cast<size_t>(slot_count));
for (int i = 0; i < slot_count; ++i)
scripts.push_back(accept_one_frame);

auto holder = new qwp_mock_c{};
try
{
holder->server = std::make_unique<qm::MockServer>(std::move(scripts));
holder->addr_cached = holder->server->addr();
}
catch (...)
{
delete holder;
return nullptr;
}
return holder;
}

extern "C" const char* qwp_mock_c_addr(qwp_mock_c* mock)
{
if (mock == nullptr)
return nullptr;
return mock->addr_cached.c_str();
}

extern "C" void qwp_mock_c_stop(qwp_mock_c* mock)
{
delete mock;
}
47 changes: 47 additions & 0 deletions cpp_test/qwp_mock_c.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/* C-friendly shim around `qwp_mock::MockServer` for the pure-C
* test_arrow_c.c suite.
*
* Spins up an in-process mock that accepts one WS-Upgrade per slot and
* silently swallows the first inbound QWP binary frame on each
* connection — enough to drive `column_sender_flush_arrow_batch`
* end-to-end without hitting a live QuestDB instance.
*
* CMake note: when wiring this into the build, add
* `cpp_test/qwp_mock_c.cpp` to the `c-questdb-client-test`
* executable's source list (alongside `qwp_mock_server.cpp`). The
* shim itself has no extra link deps beyond what
* `qwp_mock_server.cpp` already pulls in.
*/

#ifndef QWP_MOCK_C_H
#define QWP_MOCK_C_H

#ifdef __cplusplus
extern "C"
{
#endif

typedef struct qwp_mock_c qwp_mock_c;

/* Start a mock server bound to 127.0.0.1:0. The mock accepts up to
* `slot_count` WS upgrades and, on each, waits for one inbound QWP
* binary frame (first payload byte == 'Q', i.e. the QWP1 magic) before
* cleanly closing the connection. `slot_count` must be >= 1 — pass 1
* when using the default `pool_size=1` connect string.
*
* Returns NULL on failure (e.g. OS-level bind failure). */
qwp_mock_c* qwp_mock_c_start(int slot_count);

/* Return the mock's listening address as "127.0.0.1:NNNN", suitable for
* splicing into a `qwpws::addr=...` connect string. Pointer is valid
* until `qwp_mock_c_stop`. */
const char* qwp_mock_c_addr(qwp_mock_c* mock);

/* Shut down the mock and free its resources. Safe to pass NULL. */
void qwp_mock_c_stop(qwp_mock_c* mock);

#ifdef __cplusplus
}
#endif

#endif /* QWP_MOCK_C_H */
Loading