Skip to content

Add ClickHouse destination#658

Draft
jmqd wants to merge 68 commits intomainfrom
jm/clickhouse
Draft

Add ClickHouse destination#658
jmqd wants to merge 68 commits intomainfrom
jm/clickhouse

Conversation

@jmqd
Copy link
Copy Markdown
Contributor

@jmqd jmqd commented Apr 7, 2026

Adds a ClickHouse destination that replicates Postgres tables into append-only MergeTree tables, with cdc_operation and cdc_lsn (UInt64) columns appended to every row. Both initial copy and streaming are supported. The encoder writes RowBinary bytes directly via Client::insert_formatted_with, bypassing the typed Inserter/serde path (which panics on empty COLUMN_NAMES and overflows the compiler recursion limit on nullable array elements). Per-INSERT byte budgeting keeps peak memory bounded for large copies.

Schema evolution (add, drop, rename column) is handled by diffing against the previous snapshot and replaying it through idempotent DDL, bracketed with Applying -> Applied metadata writes so an interrupted run is recoverable on restart. RowBinary nullable flags are derived from the actual ClickHouse schema rather than the source, since ClickHouse forces newly-added columns to Nullable(T) regardless of the upstream NOT NULL constraint. Identifiers are quoted at every SQL boundary, and a validate_connectivity probe lets the etl-api validators treat ClickHouse uniformly with Iceberg.

Coverage: 20 unit tests on the encoder/schema/client helpers plus 15 integration tests covering initial copy of all supported types, insert/update/delete streaming (with key-only DELETE tombstone expansion), restart, intermediate flushing, multi-table concurrency, truncate, and schema-change scenarios. DDL and insert durations are exposed as Prometheus histograms.

@jmqd jmqd changed the title Jm/clickhouse ClickHouse destination Apr 7, 2026
bnjjj and others added 10 commits April 7, 2026 17:08
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
jmqd added 3 commits April 7, 2026 17:52
Main introduced AsyncResult parameters on the Destination trait
(truncate_table, write_table_rows, write_events). Update the
ClickHouse implementation to conform, add missing K8sClient trait
stubs, and fix imports that drifted during rebase.
These were carried over during the ClickHouse rebase but the startup
path on main no longer calls them, triggering dead_code warnings.
@coveralls
Copy link
Copy Markdown

coveralls commented Apr 7, 2026

Coverage Status

coverage: 78.795% (+0.4%) from 78.38% — jm/clickhouse into main

Silently skipping tests when env vars are missing gives false
confidence. The tests should fail explicitly so missing configuration
is always noticed.
jmqd added 4 commits April 9, 2026 15:49
Exercises encoding edge cases the all_types test does not cover:
- NULL scalars (nullable columns with SQL NULL)
- NULL elements inside arrays (ARRAY[1, NULL, 3])
- Empty strings distinct from NULLs
- Single-element arrays (varint length = 0x01)
- Multi-byte UTF-8 (emoji, CJK) in scalars and arrays
- Zero integer distinct from NULL
The docker-compose stack already starts a ClickHouse container but
the test env vars were not set, causing the integration tests to
panic. Wire up the same defaults used by docker-compose.
delete_dynamic_replicator_secrets deleted BigQuery, Iceberg, and
Ducklake secrets but not ClickHouse, leaving orphaned K8s secrets
when ClickHouse pipelines were deleted or changed destination type.
Previously this silently wrote a zero-length string, corrupting data
without any signal. Now returns a ConversionError so the problem is
surfaced to the operator.
The previous ErrorKind::Unknown discarded the JoinError, losing the
panic message. Now preserves the error as a detail string and uses a
more specific error kind matching the DuckLake convention.
@jmqd jmqd changed the title ClickHouse destination Add ClickHouse destination Apr 10, 2026
jmqd added 4 commits April 13, 2026 15:07
Uses REPLICA IDENTITY FULL so Postgres sends all column values in the
DELETE event. Inserts two rows, deletes one, and verifies the correct
row appears with cdc_operation = "DELETE", old data preserved, and a
positive LSN — while the other row remains untouched.
jmqd added 3 commits April 21, 2026 22:31
# Conflicts:
#	etl-api/src/configs/destination.rs
Two new tests:
- schema_change_add_column: ADD COLUMN propagates to ClickHouse, Alice's
  pre-change row has NULL for the new column, Bob's post-change row has
  the real value.
- schema_change_add_drop_rename: combined ADD + DROP + RENAME, verifies
  column names, data preservation, and metadata snapshot_id advancement.

Two bugs found and fixed by these tests:
- Column ordering: ALTER TABLE ADD COLUMN appended after the CDC columns
  (cdc_operation, cdc_lsn), misaligning RowBinary encoding. Fixed by
  using AFTER clause to place new columns before CDC columns.
- Metadata overwrite: ensure_table_exists unconditionally stored Applying
  metadata on cache miss, overwriting the Applied metadata set by
  handle_relation_event. Fixed by checking for existing metadata before
  storing.

Also adds column_names() and db_client() helpers to ClickHouseTestDatabase.
Dead code after inlining the sysinfo call in the replicator and
example. Fixes CI -Dwarnings failure.
jmqd added 3 commits April 22, 2026 09:45
Explicit match arms in type mapping functions document intentional
Postgres-to-ClickHouse type decisions. Suppressing the lint preserves
this clarity and prevents silent breakage if the wildcard default
changes.

Add retry with backoff to create_database in test setup to absorb
transient connection failures under parallel test load.
The lint was removed upstream in 57bff19.
Recovery: if ensure_table_exists finds metadata in Applying state
(interrupted schema change), it re-applies the diff idempotently using
previous_snapshot_id and marks Applied.

Idempotent ALTER statements: ADD COLUMN IF NOT EXISTS, DROP COLUMN IF
EXISTS, and catch-and-skip for already-applied RENAME COLUMN.

Test flakiness: add nextest thread group (max 4) for ClickHouse tests
and --test-threads=4 to the local test script. Also retry
create_database with backoff on transient connection failures.
jmqd added 20 commits April 22, 2026 22:49
# Conflicts:
#	Cargo.lock
#	etl-destinations/Cargo.toml
Key-only DELETE rows (default replica identity) now produce proper
tombstone rows by expanding PK-only data to full column width:
- Non-nullable scalars: type-appropriate zero values (0, false, "")
- Date/Timestamp/UUID: typed epoch/nil defaults
- Nullable scalars: NULL
- Array columns: empty arrays (not NULL, since ClickHouse arrays use
  Array(Nullable(T)) without an outer Nullable wrapper)

The test now exercises all supported column types in the tombstone:
smallint, integer, bigint, real, double, numeric, boolean, text,
varchar, date, timestamp, timestamptz, time, jsonb, bytea, uuid,
plus nullable scalars and arrays.

Also adds timeout panics to five inline poll loops that previously
fell through silently with stale data.
NaiveDateTime::UNIX_EPOCH is not available in the pinned chrono
version; derive it from DateTime::UNIX_EPOCH instead to match the
TIMESTAMPTZ arm just below.
Collapses the read-then-write pair into one DDL statement that does
the existence check and rename atomically on the server, matching
ADD COLUMN IF NOT EXISTS / DROP COLUMN IF EXISTS used by add_column
and drop_column in the same file.
The DELETE tombstone test already verifies every other non-PK column
type gets the expected zero value. date_col, timestamp_col, and
timestamptz_col were decoded from the SELECT but never asserted on,
which tripped the dead_code lint. Add the missing zero-value
assertions and clean up two assert_eq! calls on bool literals.
Passwordless ClickHouse configs created no password secret, but the replicator StatefulSet still referenced the ClickHouse password secret unconditionally. That could leave pods unable to start with a missing secretKeyRef, or keep using stale credentials after a password was removed.

Carry whether the password secret is required into env generation, omit the password env var when it is not needed, and delete any stale ClickHouse password secret during reconciliation.
ClickHouse DDL and RowBinary insert statements interpolated table and column names inside double quotes without escaping embedded quotes. Legal Postgres identifiers containing quotes could therefore generate invalid ClickHouse SQL, and tenant-controlled identifiers could turn that into an injection risk.

Centralize identifier quoting, use it for CREATE, ALTER, TRUNCATE, and INSERT SQL generation, and add targeted tests for quoted table and column names.
The RowBinary encoder zipped values with nullable flags, which silently dropped extra values or flags when an internal schema/cache mismatch occurred. That could mask the real bug and emit malformed positional RowBinary.

Check row width before encoding and return a ConversionError that includes both lengths. Add tests for too few and too many values.
Postgres LSNs are u64 values, but ClickHouse cdc_lsn was stored as Int64. Values outside the signed range were logged and collapsed to i64::MAX, corrupting CDC metadata and losing ordering information.

Change generated ClickHouse DDL to use UInt64, encode CDC LSNs as unsigned RowBinary values, remove the overflow fallback, and update tests to read cdc_lsn as u64.
ALTER TABLE ADD COLUMN forces new ClickHouse scalar columns to Nullable(T), even when the upstream Postgres column is NOT NULL. Recomputing RowBinary nullable flags from the Postgres schema could then omit the nullable marker byte and corrupt positional RowBinary encoding.

Query system.columns on cache miss, validate the actual destination column order against the replicated schema, and derive nullable flags from the ClickHouse type strings. Strengthen the schema-change integration test with a NOT NULL DEFAULT column that fails before this fix.
The script was added when tests/clickhouse_pipeline.rs was auto-discovered as its own Cargo integration test target. Later, integration tests were consolidated under tests/main.rs with autotests disabled, making the Cargo target name main and leaving clickhouse_pipeline as a module/test filter.

Default TEST_TARGET to main and default TEST_NAME_FILTER to clickhouse_pipeline so the helper runs the ClickHouse tests against the current test layout.
Aligns the connectivity-probe method name with IcebergClient::validate_connectivity
so the etl-api validators treat both destinations uniformly.
Restructures the row-encoding loop into an outer 'while rows remain' / inner
'while under byte budget' shape so each INSERT is closed in exactly one place.
The row that crosses the budget still goes into the current INSERT, matching
the prior behavior. An empty input now skips opening an INSERT instead of
issuing a no-op flush.
core.rs: extract recover_applying_metadata from ensure_table_exists to flatten
the crash-recovery branch out of the cache/metadata top-level. Merge the two
parallel maps in write_events_inner into a single
HashMap<TableId, (ReplicatedTableSchema, Vec<PendingRow>)>, eliminating the
defensive remove().ok_or_else() that dressed up an impossible mismatch. Extract
the JoinSet-based row flush into flush_pending_rows.

encoding.rs: introduce a map_array helper and date_to_days extractor so each
ArrayCell variant becomes a one-liner that mirrors cell_to_clickhouse_value.
Rewrite bytes_to_hex to encode via a 16-byte ASCII lookup table and take
&[u8] instead of consuming a Vec<u8>.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants