From 0814b4cb40600dcff584ddfb41d22e91442edf92 Mon Sep 17 00:00:00 2001 From: Andrew Chen Date: Sun, 17 May 2026 01:05:34 -0500 Subject: [PATCH 1/3] fix: store real kafka partition+offset in Iceberg via KafkaRecordDeserializer Replace SimpleStringSchema with a custom KafkaRecordDeserializationSchema in both normalize.py and cdc_symbol_config.py so that kafka_partition and kafka_offset written to Iceberg silver/bronze layers reflect the actual Kafka record position instead of the placeholder -1. The -1 fallback path in iceberg_reader.py is retained for backward compatibility with rows written before this fix. Co-Authored-By: Claude Sonnet 4.6 --- docs/ROADMAP.md | 2 +- flink/jobs/cdc_symbol_config.py | 33 ++++++------ flink/jobs/lib/kafka_schema.py | 73 +++++++++++++++++++++++++++ flink/jobs/normalize.py | 25 +++++---- flink/tests/unit/test_kafka_schema.py | 61 ++++++++++++++++++++++ replay/src/replay/iceberg_reader.py | 13 ++--- replay/src/replay/producer.py | 2 +- 7 files changed, 172 insertions(+), 37 deletions(-) create mode 100644 flink/jobs/lib/kafka_schema.py create mode 100644 flink/tests/unit/test_kafka_schema.py diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index 5a956f5..6d6a89a 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -132,7 +132,7 @@ ingest/src/ingest/ ### Known limitations -- `kafka_partition` and `kafka_offset` hardcoded to `-1` in normalize output — `SimpleStringSchema` doesn't expose Kafka record metadata. Fix: implement `KafkaRecordDeserializationSchema`. +~~`kafka_partition` and `kafka_offset` hardcoded to `-1` in normalize output~~ — **Fixed (2026-05-17):** replaced `SimpleStringSchema` with `KafkaRecordDeserializer` (`lib/kafka_schema.py`) in both `normalize.py` and `cdc_symbol_config.py`. Real partition and offset are now stored in Iceberg; `iceberg_reader.py` precise-seek path is active for rows written after the fix. --- diff --git a/flink/jobs/cdc_symbol_config.py b/flink/jobs/cdc_symbol_config.py index cd5c586..1657fcd 100644 --- a/flink/jobs/cdc_symbol_config.py +++ b/flink/jobs/cdc_symbol_config.py @@ -57,10 +57,15 @@ from pathlib import Path from lib.config import Config +from lib.kafka_schema import ( + OFFSET_IDX, + PARTITION_IDX, + PAYLOAD_IDX, + KafkaRecordDeserializer, +) from lib.sql_runner import add_inserts_from_file, execute_sql_file from pyflink.common import Row, WatermarkStrategy from pyflink.common.restart_strategy import RestartStrategies -from pyflink.common.serialization import SimpleStringSchema from pyflink.common.typeinfo import Types from pyflink.datastream import ( CheckpointingMode, @@ -133,19 +138,17 @@ class CdcEnvelopeProcessor(FlatMapFunction): - Missing payload (both before and after null) → log warning, skip """ - def flat_map(self, value: str) -> None: # type: ignore[override] + def flat_map(self, value: Row) -> None: # type: ignore[override] now_ms = int(datetime.now(UTC).timestamp() * 1000) - - # Tombstone guard: null Kafka value (should not arrive; tombstones.on.delete=false) - if value is None: - log.warning("cdc_tombstone_received: null value skipped (connector misconfigured?)") - return + kafka_partition: int = value[PARTITION_IDX] + kafka_offset: int = value[OFFSET_IDX] + raw_json: str = value[PAYLOAD_IDX] # ── Parse Debezium envelope ──────────────────────────────────────────── try: - envelope = json.loads(value) + envelope = json.loads(raw_json) except (json.JSONDecodeError, ValueError) as exc: - log.warning("cdc_parse_error error=%s msg_preview=%.200s", exc, value) + log.warning("cdc_parse_error error=%s msg_preview=%.200s", exc, raw_json) return op: str = envelope.get("op", "") @@ -186,10 +189,6 @@ def flat_map(self, value: str) -> None: # type: ignore[override] ) # ── Emit bronze row ──────────────────────────────────────────────────── - # kafka_partition and kafka_offset are -1 because SimpleStringSchema - # doesn't expose Kafka record metadata (same limitation as normalize.py). - # The source_lsn from Debezium's source block is the canonical ordering key - # for replay purposes. yield Row( op, symbol, @@ -199,9 +198,9 @@ def flat_map(self, value: str) -> None: # type: ignore[override] source_ts_ms, tx_id, debezium_ts_ms, - Config.TOPIC_CDC, # kafka_topic - -1, # kafka_partition (not available via SimpleStringSchema) - -1, # kafka_offset (not available via SimpleStringSchema) + Config.TOPIC_CDC, + kafka_partition, + kafka_offset, now_ms, ) @@ -273,7 +272,7 @@ def main() -> None: .set_topics(cfg.TOPIC_CDC) .set_group_id(cfg.GROUP_CDC_BRONZE) .set_starting_offsets(KafkaOffsetsInitializer.earliest()) - .set_value_only_deserializer(SimpleStringSchema()) + .set_deserializer(KafkaRecordDeserializer()) .build(), WatermarkStrategy.no_watermarks(), "cdc-raw-source", diff --git a/flink/jobs/lib/kafka_schema.py b/flink/jobs/lib/kafka_schema.py new file mode 100644 index 0000000..ba8990e --- /dev/null +++ b/flink/jobs/lib/kafka_schema.py @@ -0,0 +1,73 @@ +""" +Kafka record deserialization helpers. + +The pure function parse_kafka_record is importable without PyFlink (used in +unit tests running on Python 3.12 in the root workspace). The +KafkaRecordDeserializer class and KAFKA_RECORD_TYPE are only defined when +PyFlink is present; they live under a try/except so the module remains +importable in the test environment. + +Why this exists +--------------- +SimpleStringSchema is a value-only deserializer: it discards all Kafka record +metadata (partition, offset, timestamp, key). Switching to +KafkaRecordDeserializationSchema lets downstream Flink jobs store the real +kafka_partition and kafka_offset in Iceberg instead of -1, enabling precise +offset-based seek during replay. +""" + +from __future__ import annotations + +# Positional indices into the Row emitted by KafkaRecordDeserializer. +# Use these constants instead of magic numbers in job code. +PAYLOAD_IDX = 0 +PARTITION_IDX = 1 +OFFSET_IDX = 2 + + +def parse_kafka_record(record: object) -> tuple[str, int, int] | None: + """Extract (payload_str, partition, offset) from a Kafka ConsumerRecord. + + Duck-typed: any object with .value(), .partition(), .offset() methods works, + which makes this function testable without PyFlink. + + Returns None for tombstone records (null value bytes). + """ + value_bytes = record.value() # type: ignore[union-attr] + if value_bytes is None: + return None + payload = bytes(value_bytes).decode("utf-8") + partition: int = record.partition() # type: ignore[union-attr] + offset: int = record.offset() # type: ignore[union-attr] + return (payload, partition, offset) + + +try: + from pyflink.common import Row + from pyflink.common.typeinfo import Types + from pyflink.datastream.connectors.kafka import KafkaRecordDeserializationSchema + + KAFKA_RECORD_TYPE = Types.ROW_NAMED( + ["payload", "kafka_partition", "kafka_offset"], + [Types.STRING(), Types.INT(), Types.LONG()], + ) + + class KafkaRecordDeserializer(KafkaRecordDeserializationSchema): + """Replaces SimpleStringSchema to capture real kafka_partition and kafka_offset. + + Emits Row(payload: str, kafka_partition: int, kafka_offset: long). + Tombstone records (null value) are suppressed here so downstream + processors never receive None payloads. + """ + + def deserialize(self, record: object, collector: object) -> None: + result = parse_kafka_record(record) + if result is not None: + payload, partition, offset = result + collector.collect(Row(payload, partition, offset)) # type: ignore[union-attr] + + def get_produced_type(self) -> object: + return KAFKA_RECORD_TYPE + +except ImportError: + pass diff --git a/flink/jobs/normalize.py b/flink/jobs/normalize.py index 496f5cc..22bf5a1 100644 --- a/flink/jobs/normalize.py +++ b/flink/jobs/normalize.py @@ -32,6 +32,12 @@ from typing import Any from lib.config import Config +from lib.kafka_schema import ( + OFFSET_IDX, + PARTITION_IDX, + PAYLOAD_IDX, + KafkaRecordDeserializer, +) from lib.logic import ( apply_depth_diff, best_ask, @@ -43,7 +49,6 @@ from lib.sql_runner import add_inserts_from_file, execute_sql_file from pyflink.common import Row, WatermarkStrategy from pyflink.common.restart_strategy import RestartStrategies -from pyflink.common.serialization import SimpleStringSchema from pyflink.common.typeinfo import Types from pyflink.datastream import ( CheckpointingMode, @@ -125,9 +130,9 @@ def _to_epoch_ms(ts: Any) -> int: return int(datetime.now(UTC).timestamp() * 1000) -def _kafka_key(msg: str) -> str: - """Extract routing key from raw JSON message (called by key_by).""" - data = json.loads(msg) +def _kafka_key(record: Row) -> str: + """Extract routing key from Kafka record Row(payload, partition, offset).""" + data = json.loads(record[PAYLOAD_IDX]) return f"{data['exchange']}#{data['symbol']}" @@ -155,9 +160,11 @@ def open(self, runtime_context: RuntimeContext) -> None: # type: ignore[overrid ) def process_element( # type: ignore[override] - self, value: str, ctx: KeyedProcessFunction.Context + self, value: Row, ctx: KeyedProcessFunction.Context ) -> None: - msg = json.loads(value) + kafka_partition: int = value[PARTITION_IDX] + kafka_offset: int = value[OFFSET_IDX] + msg = json.loads(value[PAYLOAD_IDX]) last_update_id: int = int(msg["last_update_id"]) # ── 1. Dedup ────────────────────────────────────────────────────────── @@ -222,8 +229,8 @@ def process_element( # type: ignore[override] compute_mid_price(bb_price, ba_price), compute_imbalance(book["bids"], book["asks"]), Config.TOPIC_RAW, - -1, # partition not available via SimpleStringSchema - -1, # offset not available via SimpleStringSchema + kafka_partition, + kafka_offset, ) @@ -292,7 +299,7 @@ def main() -> None: .set_topics(cfg.TOPIC_RAW) .set_group_id(cfg.GROUP_NORMALIZE) .set_starting_offsets(KafkaOffsetsInitializer.earliest()) - .set_value_only_deserializer(SimpleStringSchema()) + .set_deserializer(KafkaRecordDeserializer()) .build(), WatermarkStrategy.no_watermarks(), "raw-orderbook-source", diff --git a/flink/tests/unit/test_kafka_schema.py b/flink/tests/unit/test_kafka_schema.py new file mode 100644 index 0000000..444fa41 --- /dev/null +++ b/flink/tests/unit/test_kafka_schema.py @@ -0,0 +1,61 @@ +"""Unit tests for lib/kafka_schema.py — no PyFlink dependency.""" + +from unittest.mock import MagicMock + +from lib.kafka_schema import OFFSET_IDX, PARTITION_IDX, PAYLOAD_IDX, parse_kafka_record + + +def _make_record( + value_bytes: bytes | None, + partition: int = 0, + offset: int = 0, +) -> MagicMock: + record = MagicMock() + record.value.return_value = value_bytes + record.partition.return_value = partition + record.offset.return_value = offset + return record + + +class TestParseKafkaRecord: + def test_returns_payload_partition_offset(self) -> None: + record = _make_record(b'{"symbol": "BTCUSDT"}', partition=2, offset=12345) + result = parse_kafka_record(record) + assert result is not None + assert result[PAYLOAD_IDX] == '{"symbol": "BTCUSDT"}' + assert result[PARTITION_IDX] == 2 + assert result[OFFSET_IDX] == 12345 + + def test_tombstone_returns_none(self) -> None: + record = _make_record(None, partition=0, offset=99) + assert parse_kafka_record(record) is None + + def test_partition_zero_and_offset_zero_are_valid(self) -> None: + record = _make_record(b"{}", partition=0, offset=0) + result = parse_kafka_record(record) + assert result is not None + assert result[PARTITION_IDX] == 0 + assert result[OFFSET_IDX] == 0 + + def test_utf8_payload_decoded_correctly(self) -> None: + payload = '{"exchange": "binance", "symbol": "ETHUSDT"}' + record = _make_record(payload.encode("utf-8"), partition=1, offset=500) + result = parse_kafka_record(record) + assert result is not None + assert result[PAYLOAD_IDX] == payload + + def test_large_offset_handled(self) -> None: + large_offset = 2**40 + record = _make_record(b"{}", partition=0, offset=large_offset) + result = parse_kafka_record(record) + assert result is not None + assert result[OFFSET_IDX] == large_offset + + def test_high_partition_number(self) -> None: + record = _make_record(b"{}", partition=47, offset=1) + result = parse_kafka_record(record) + assert result is not None + assert result[PARTITION_IDX] == 47 + + def test_index_constants_are_distinct(self) -> None: + assert len({PAYLOAD_IDX, PARTITION_IDX, OFFSET_IDX}) == 3 diff --git a/replay/src/replay/iceberg_reader.py b/replay/src/replay/iceberg_reader.py index 85dca2a..a0dbb2b 100644 --- a/replay/src/replay/iceberg_reader.py +++ b/replay/src/replay/iceberg_reader.py @@ -10,15 +10,10 @@ time window. 4. Return min/max (partition, offset) per partition. -Known limitation: normalize.py currently sets kafka_partition=-1 and -kafka_offset=-1 because SimpleStringSchema doesn't expose Kafka record metadata. -When offsets are -1, this reader falls back to Kafka's offsets_for_times() API, -which finds the first offset whose timestamp is >= the requested time. The -Kafka path is always tried as a verification step even when Iceberg offsets are -available. - -See the Known Limitations section in docs/DEBUGGING_PHASE3.md for the fix -(implement KafkaRecordDeserializationSchema in normalize.py). +Rows written before the KafkaRecordDeserializer fix (see lib/kafka_schema.py) +will have kafka_partition=-1 and kafka_offset=-1. When all rows in the scan +window carry -1, this reader falls back to Kafka's offsets_for_times() API. +New rows written after the fix carry real offsets and take the precise path. """ from __future__ import annotations diff --git a/replay/src/replay/producer.py b/replay/src/replay/producer.py index 29aaf16..c1a2a46 100644 --- a/replay/src/replay/producer.py +++ b/replay/src/replay/producer.py @@ -6,7 +6,7 @@ 1. Determine which Kafka offsets correspond to the requested time window: a. Try Iceberg reader (reads kafka_offset from normalized.book_ticker). b. Fall back to Kafka's offsets_for_times() when Iceberg returns no ranges - (the current state due to kafka_offset=-1 in normalize.py). + (e.g. rows written before the KafkaRecordDeserializer fix still have -1). 2. For each partition in the offset range: a. Seek a temporary consumer to start_offset. b. Read records up to end_offset (or max_events limit). From fafd44646f8ea16a5ab3c20000aec42944eb27f1 Mon Sep 17 00:00:00 2001 From: Andrew Chen Date: Sun, 17 May 2026 01:05:34 -0500 Subject: [PATCH 2/3] fix: store real kafka partition+offset in Iceberg via KafkaRecordDeserializer Replace SimpleStringSchema with a custom KafkaRecordDeserializationSchema in both normalize.py and cdc_symbol_config.py so that kafka_partition and kafka_offset written to Iceberg silver/bronze layers reflect the actual Kafka record position instead of the placeholder -1. The -1 fallback path in iceberg_reader.py is retained for backward compatibility with rows written before this fix. Co-Authored-By: Claude Sonnet 4.6 --- docs/ROADMAP.md | 2 +- flink/jobs/cdc_symbol_config.py | 33 ++++++------ flink/jobs/lib/kafka_schema.py | 73 +++++++++++++++++++++++++++ flink/jobs/normalize.py | 25 +++++---- flink/tests/unit/test_kafka_schema.py | 61 ++++++++++++++++++++++ replay/src/replay/iceberg_reader.py | 13 ++--- replay/src/replay/producer.py | 2 +- 7 files changed, 172 insertions(+), 37 deletions(-) create mode 100644 flink/jobs/lib/kafka_schema.py create mode 100644 flink/tests/unit/test_kafka_schema.py diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index 5a956f5..6d6a89a 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -132,7 +132,7 @@ ingest/src/ingest/ ### Known limitations -- `kafka_partition` and `kafka_offset` hardcoded to `-1` in normalize output — `SimpleStringSchema` doesn't expose Kafka record metadata. Fix: implement `KafkaRecordDeserializationSchema`. +~~`kafka_partition` and `kafka_offset` hardcoded to `-1` in normalize output~~ — **Fixed (2026-05-17):** replaced `SimpleStringSchema` with `KafkaRecordDeserializer` (`lib/kafka_schema.py`) in both `normalize.py` and `cdc_symbol_config.py`. Real partition and offset are now stored in Iceberg; `iceberg_reader.py` precise-seek path is active for rows written after the fix. --- diff --git a/flink/jobs/cdc_symbol_config.py b/flink/jobs/cdc_symbol_config.py index cd5c586..1657fcd 100644 --- a/flink/jobs/cdc_symbol_config.py +++ b/flink/jobs/cdc_symbol_config.py @@ -57,10 +57,15 @@ from pathlib import Path from lib.config import Config +from lib.kafka_schema import ( + OFFSET_IDX, + PARTITION_IDX, + PAYLOAD_IDX, + KafkaRecordDeserializer, +) from lib.sql_runner import add_inserts_from_file, execute_sql_file from pyflink.common import Row, WatermarkStrategy from pyflink.common.restart_strategy import RestartStrategies -from pyflink.common.serialization import SimpleStringSchema from pyflink.common.typeinfo import Types from pyflink.datastream import ( CheckpointingMode, @@ -133,19 +138,17 @@ class CdcEnvelopeProcessor(FlatMapFunction): - Missing payload (both before and after null) → log warning, skip """ - def flat_map(self, value: str) -> None: # type: ignore[override] + def flat_map(self, value: Row) -> None: # type: ignore[override] now_ms = int(datetime.now(UTC).timestamp() * 1000) - - # Tombstone guard: null Kafka value (should not arrive; tombstones.on.delete=false) - if value is None: - log.warning("cdc_tombstone_received: null value skipped (connector misconfigured?)") - return + kafka_partition: int = value[PARTITION_IDX] + kafka_offset: int = value[OFFSET_IDX] + raw_json: str = value[PAYLOAD_IDX] # ── Parse Debezium envelope ──────────────────────────────────────────── try: - envelope = json.loads(value) + envelope = json.loads(raw_json) except (json.JSONDecodeError, ValueError) as exc: - log.warning("cdc_parse_error error=%s msg_preview=%.200s", exc, value) + log.warning("cdc_parse_error error=%s msg_preview=%.200s", exc, raw_json) return op: str = envelope.get("op", "") @@ -186,10 +189,6 @@ def flat_map(self, value: str) -> None: # type: ignore[override] ) # ── Emit bronze row ──────────────────────────────────────────────────── - # kafka_partition and kafka_offset are -1 because SimpleStringSchema - # doesn't expose Kafka record metadata (same limitation as normalize.py). - # The source_lsn from Debezium's source block is the canonical ordering key - # for replay purposes. yield Row( op, symbol, @@ -199,9 +198,9 @@ def flat_map(self, value: str) -> None: # type: ignore[override] source_ts_ms, tx_id, debezium_ts_ms, - Config.TOPIC_CDC, # kafka_topic - -1, # kafka_partition (not available via SimpleStringSchema) - -1, # kafka_offset (not available via SimpleStringSchema) + Config.TOPIC_CDC, + kafka_partition, + kafka_offset, now_ms, ) @@ -273,7 +272,7 @@ def main() -> None: .set_topics(cfg.TOPIC_CDC) .set_group_id(cfg.GROUP_CDC_BRONZE) .set_starting_offsets(KafkaOffsetsInitializer.earliest()) - .set_value_only_deserializer(SimpleStringSchema()) + .set_deserializer(KafkaRecordDeserializer()) .build(), WatermarkStrategy.no_watermarks(), "cdc-raw-source", diff --git a/flink/jobs/lib/kafka_schema.py b/flink/jobs/lib/kafka_schema.py new file mode 100644 index 0000000..ba8990e --- /dev/null +++ b/flink/jobs/lib/kafka_schema.py @@ -0,0 +1,73 @@ +""" +Kafka record deserialization helpers. + +The pure function parse_kafka_record is importable without PyFlink (used in +unit tests running on Python 3.12 in the root workspace). The +KafkaRecordDeserializer class and KAFKA_RECORD_TYPE are only defined when +PyFlink is present; they live under a try/except so the module remains +importable in the test environment. + +Why this exists +--------------- +SimpleStringSchema is a value-only deserializer: it discards all Kafka record +metadata (partition, offset, timestamp, key). Switching to +KafkaRecordDeserializationSchema lets downstream Flink jobs store the real +kafka_partition and kafka_offset in Iceberg instead of -1, enabling precise +offset-based seek during replay. +""" + +from __future__ import annotations + +# Positional indices into the Row emitted by KafkaRecordDeserializer. +# Use these constants instead of magic numbers in job code. +PAYLOAD_IDX = 0 +PARTITION_IDX = 1 +OFFSET_IDX = 2 + + +def parse_kafka_record(record: object) -> tuple[str, int, int] | None: + """Extract (payload_str, partition, offset) from a Kafka ConsumerRecord. + + Duck-typed: any object with .value(), .partition(), .offset() methods works, + which makes this function testable without PyFlink. + + Returns None for tombstone records (null value bytes). + """ + value_bytes = record.value() # type: ignore[union-attr] + if value_bytes is None: + return None + payload = bytes(value_bytes).decode("utf-8") + partition: int = record.partition() # type: ignore[union-attr] + offset: int = record.offset() # type: ignore[union-attr] + return (payload, partition, offset) + + +try: + from pyflink.common import Row + from pyflink.common.typeinfo import Types + from pyflink.datastream.connectors.kafka import KafkaRecordDeserializationSchema + + KAFKA_RECORD_TYPE = Types.ROW_NAMED( + ["payload", "kafka_partition", "kafka_offset"], + [Types.STRING(), Types.INT(), Types.LONG()], + ) + + class KafkaRecordDeserializer(KafkaRecordDeserializationSchema): + """Replaces SimpleStringSchema to capture real kafka_partition and kafka_offset. + + Emits Row(payload: str, kafka_partition: int, kafka_offset: long). + Tombstone records (null value) are suppressed here so downstream + processors never receive None payloads. + """ + + def deserialize(self, record: object, collector: object) -> None: + result = parse_kafka_record(record) + if result is not None: + payload, partition, offset = result + collector.collect(Row(payload, partition, offset)) # type: ignore[union-attr] + + def get_produced_type(self) -> object: + return KAFKA_RECORD_TYPE + +except ImportError: + pass diff --git a/flink/jobs/normalize.py b/flink/jobs/normalize.py index 496f5cc..22bf5a1 100644 --- a/flink/jobs/normalize.py +++ b/flink/jobs/normalize.py @@ -32,6 +32,12 @@ from typing import Any from lib.config import Config +from lib.kafka_schema import ( + OFFSET_IDX, + PARTITION_IDX, + PAYLOAD_IDX, + KafkaRecordDeserializer, +) from lib.logic import ( apply_depth_diff, best_ask, @@ -43,7 +49,6 @@ from lib.sql_runner import add_inserts_from_file, execute_sql_file from pyflink.common import Row, WatermarkStrategy from pyflink.common.restart_strategy import RestartStrategies -from pyflink.common.serialization import SimpleStringSchema from pyflink.common.typeinfo import Types from pyflink.datastream import ( CheckpointingMode, @@ -125,9 +130,9 @@ def _to_epoch_ms(ts: Any) -> int: return int(datetime.now(UTC).timestamp() * 1000) -def _kafka_key(msg: str) -> str: - """Extract routing key from raw JSON message (called by key_by).""" - data = json.loads(msg) +def _kafka_key(record: Row) -> str: + """Extract routing key from Kafka record Row(payload, partition, offset).""" + data = json.loads(record[PAYLOAD_IDX]) return f"{data['exchange']}#{data['symbol']}" @@ -155,9 +160,11 @@ def open(self, runtime_context: RuntimeContext) -> None: # type: ignore[overrid ) def process_element( # type: ignore[override] - self, value: str, ctx: KeyedProcessFunction.Context + self, value: Row, ctx: KeyedProcessFunction.Context ) -> None: - msg = json.loads(value) + kafka_partition: int = value[PARTITION_IDX] + kafka_offset: int = value[OFFSET_IDX] + msg = json.loads(value[PAYLOAD_IDX]) last_update_id: int = int(msg["last_update_id"]) # ── 1. Dedup ────────────────────────────────────────────────────────── @@ -222,8 +229,8 @@ def process_element( # type: ignore[override] compute_mid_price(bb_price, ba_price), compute_imbalance(book["bids"], book["asks"]), Config.TOPIC_RAW, - -1, # partition not available via SimpleStringSchema - -1, # offset not available via SimpleStringSchema + kafka_partition, + kafka_offset, ) @@ -292,7 +299,7 @@ def main() -> None: .set_topics(cfg.TOPIC_RAW) .set_group_id(cfg.GROUP_NORMALIZE) .set_starting_offsets(KafkaOffsetsInitializer.earliest()) - .set_value_only_deserializer(SimpleStringSchema()) + .set_deserializer(KafkaRecordDeserializer()) .build(), WatermarkStrategy.no_watermarks(), "raw-orderbook-source", diff --git a/flink/tests/unit/test_kafka_schema.py b/flink/tests/unit/test_kafka_schema.py new file mode 100644 index 0000000..444fa41 --- /dev/null +++ b/flink/tests/unit/test_kafka_schema.py @@ -0,0 +1,61 @@ +"""Unit tests for lib/kafka_schema.py — no PyFlink dependency.""" + +from unittest.mock import MagicMock + +from lib.kafka_schema import OFFSET_IDX, PARTITION_IDX, PAYLOAD_IDX, parse_kafka_record + + +def _make_record( + value_bytes: bytes | None, + partition: int = 0, + offset: int = 0, +) -> MagicMock: + record = MagicMock() + record.value.return_value = value_bytes + record.partition.return_value = partition + record.offset.return_value = offset + return record + + +class TestParseKafkaRecord: + def test_returns_payload_partition_offset(self) -> None: + record = _make_record(b'{"symbol": "BTCUSDT"}', partition=2, offset=12345) + result = parse_kafka_record(record) + assert result is not None + assert result[PAYLOAD_IDX] == '{"symbol": "BTCUSDT"}' + assert result[PARTITION_IDX] == 2 + assert result[OFFSET_IDX] == 12345 + + def test_tombstone_returns_none(self) -> None: + record = _make_record(None, partition=0, offset=99) + assert parse_kafka_record(record) is None + + def test_partition_zero_and_offset_zero_are_valid(self) -> None: + record = _make_record(b"{}", partition=0, offset=0) + result = parse_kafka_record(record) + assert result is not None + assert result[PARTITION_IDX] == 0 + assert result[OFFSET_IDX] == 0 + + def test_utf8_payload_decoded_correctly(self) -> None: + payload = '{"exchange": "binance", "symbol": "ETHUSDT"}' + record = _make_record(payload.encode("utf-8"), partition=1, offset=500) + result = parse_kafka_record(record) + assert result is not None + assert result[PAYLOAD_IDX] == payload + + def test_large_offset_handled(self) -> None: + large_offset = 2**40 + record = _make_record(b"{}", partition=0, offset=large_offset) + result = parse_kafka_record(record) + assert result is not None + assert result[OFFSET_IDX] == large_offset + + def test_high_partition_number(self) -> None: + record = _make_record(b"{}", partition=47, offset=1) + result = parse_kafka_record(record) + assert result is not None + assert result[PARTITION_IDX] == 47 + + def test_index_constants_are_distinct(self) -> None: + assert len({PAYLOAD_IDX, PARTITION_IDX, OFFSET_IDX}) == 3 diff --git a/replay/src/replay/iceberg_reader.py b/replay/src/replay/iceberg_reader.py index 85dca2a..a0dbb2b 100644 --- a/replay/src/replay/iceberg_reader.py +++ b/replay/src/replay/iceberg_reader.py @@ -10,15 +10,10 @@ time window. 4. Return min/max (partition, offset) per partition. -Known limitation: normalize.py currently sets kafka_partition=-1 and -kafka_offset=-1 because SimpleStringSchema doesn't expose Kafka record metadata. -When offsets are -1, this reader falls back to Kafka's offsets_for_times() API, -which finds the first offset whose timestamp is >= the requested time. The -Kafka path is always tried as a verification step even when Iceberg offsets are -available. - -See the Known Limitations section in docs/DEBUGGING_PHASE3.md for the fix -(implement KafkaRecordDeserializationSchema in normalize.py). +Rows written before the KafkaRecordDeserializer fix (see lib/kafka_schema.py) +will have kafka_partition=-1 and kafka_offset=-1. When all rows in the scan +window carry -1, this reader falls back to Kafka's offsets_for_times() API. +New rows written after the fix carry real offsets and take the precise path. """ from __future__ import annotations diff --git a/replay/src/replay/producer.py b/replay/src/replay/producer.py index 29aaf16..c1a2a46 100644 --- a/replay/src/replay/producer.py +++ b/replay/src/replay/producer.py @@ -6,7 +6,7 @@ 1. Determine which Kafka offsets correspond to the requested time window: a. Try Iceberg reader (reads kafka_offset from normalized.book_ticker). b. Fall back to Kafka's offsets_for_times() when Iceberg returns no ranges - (the current state due to kafka_offset=-1 in normalize.py). + (e.g. rows written before the KafkaRecordDeserializer fix still have -1). 2. For each partition in the offset range: a. Seek a temporary consumer to start_offset. b. Read records up to end_offset (or max_events limit). From d8e4a234233e0398a852735daa682cadcd9aa942 Mon Sep 17 00:00:00 2001 From: Andrew Chen Date: Sun, 17 May 2026 11:21:11 -0500 Subject: [PATCH 3/3] Phase 5: Airflow DAGs, Spark jobs, fix lib namespace conflict MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add airflow/dags: backfill_ohlcv_1m, compact_iceberg_tables, expire_iceberg_snapshots, freshness_sla_check, run_dbt_models, run_great_expectations - Add spark/jobs: backfill_ohlcv, compact_tables, spark_lib - Add airflow/tests and spark/tests; include both in pytest testpaths - Rename flink/jobs/lib → flink_lib, spark/jobs/lib → spark_lib to eliminate bare `lib` package collision across services - Fix all ruff lint errors in new airflow/spark files Co-Authored-By: Claude Sonnet 4.6 --- airflow/Dockerfile | 27 +++++ airflow/dags/backfill_ohlcv_1m.py | 90 +++++++++++++++ airflow/dags/compact_iceberg_tables.py | 58 ++++++++++ airflow/dags/expire_iceberg_snapshots.py | 57 ++++++++++ airflow/dags/freshness_sla_check.py | 58 ++++++++++ airflow/dags/run_dbt_models.py | 41 +++++++ airflow/dags/run_great_expectations.py | 107 ++++++++++++++++++ airflow/tests/conftest.py | 6 + airflow/tests/test_dag_integrity.py | 47 ++++++++ airflow/tests/test_freshness_check.py | 49 ++++++++ flink/jobs/cdc_symbol_config.py | 6 +- flink/jobs/{lib => flink_lib}/__init__.py | 0 flink/jobs/{lib => flink_lib}/config.py | 0 flink/jobs/{lib => flink_lib}/kafka_schema.py | 0 flink/jobs/{lib => flink_lib}/logic.py | 0 flink/jobs/{lib => flink_lib}/sql_runner.py | 0 flink/jobs/normalize.py | 8 +- flink/jobs/ohlcv_1m.py | 4 +- flink/tests/unit/test_config.py | 2 +- flink/tests/unit/test_kafka_schema.py | 2 +- flink/tests/unit/test_logic.py | 2 +- flink/tests/unit/test_sql_runner.py | 2 +- pyproject.toml | 2 +- spark/Dockerfile | 16 +++ spark/jobs/__init__.py | 0 spark/jobs/backfill_ohlcv.py | 67 +++++++++++ spark/jobs/compact_tables.py | 38 +++++++ spark/jobs/spark_lib/__init__.py | 0 spark/jobs/spark_lib/config.py | 45 ++++++++ spark/jobs/spark_lib/sql_runner.py | 13 +++ spark/jobs/sql/backfill_ohlcv.sql | 46 ++++++++ spark/jobs/sql/compact_table.sql | 9 ++ spark/tests/conftest.py | 6 + spark/tests/test_spark_jobs.py | 93 +++++++++++++++ 34 files changed, 887 insertions(+), 14 deletions(-) create mode 100644 airflow/Dockerfile create mode 100644 airflow/dags/backfill_ohlcv_1m.py create mode 100644 airflow/dags/compact_iceberg_tables.py create mode 100644 airflow/dags/expire_iceberg_snapshots.py create mode 100644 airflow/dags/freshness_sla_check.py create mode 100644 airflow/dags/run_dbt_models.py create mode 100644 airflow/dags/run_great_expectations.py create mode 100644 airflow/tests/conftest.py create mode 100644 airflow/tests/test_dag_integrity.py create mode 100644 airflow/tests/test_freshness_check.py rename flink/jobs/{lib => flink_lib}/__init__.py (100%) rename flink/jobs/{lib => flink_lib}/config.py (100%) rename flink/jobs/{lib => flink_lib}/kafka_schema.py (100%) rename flink/jobs/{lib => flink_lib}/logic.py (100%) rename flink/jobs/{lib => flink_lib}/sql_runner.py (100%) create mode 100644 spark/Dockerfile create mode 100644 spark/jobs/__init__.py create mode 100644 spark/jobs/backfill_ohlcv.py create mode 100644 spark/jobs/compact_tables.py create mode 100644 spark/jobs/spark_lib/__init__.py create mode 100644 spark/jobs/spark_lib/config.py create mode 100644 spark/jobs/spark_lib/sql_runner.py create mode 100644 spark/jobs/sql/backfill_ohlcv.sql create mode 100644 spark/jobs/sql/compact_table.sql create mode 100644 spark/tests/conftest.py create mode 100644 spark/tests/test_spark_jobs.py diff --git a/airflow/Dockerfile b/airflow/Dockerfile new file mode 100644 index 0000000..781a6e5 --- /dev/null +++ b/airflow/Dockerfile @@ -0,0 +1,27 @@ +FROM apache/airflow:2.9.3-python3.11 + +USER root + +# Java is required by PySpark (Spark runs on the JVM). +RUN apt-get update && \ + apt-get install -y --no-install-recommends default-jre-headless && \ + rm -rf /var/lib/apt/lists/* + +# Iceberg Spark runtime + AWS bundle — same Iceberg version (1.6.0) as Flink stack. +# Placed in /opt/airflow/jars/ and passed to spark-submit via --jars. +RUN mkdir -p /opt/airflow/jars && \ + curl -fL "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.0/iceberg-spark-runtime-3.5_2.12-1.6.0.jar" \ + -o /opt/airflow/jars/iceberg-spark-runtime.jar && \ + curl -fL "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/1.6.0/iceberg-aws-bundle-1.6.0.jar" \ + -o /opt/airflow/jars/iceberg-aws-bundle.jar + +USER airflow + +RUN pip install --no-cache-dir \ + pyspark==3.5.1 \ + apache-airflow-providers-apache-spark==4.10.0 \ + apache-airflow-providers-trino==5.7.0 \ + great-expectations==0.18.19 \ + trino==0.328.0 \ + dbt-trino==1.8.0 \ + pandas==2.2.2 diff --git a/airflow/dags/backfill_ohlcv_1m.py b/airflow/dags/backfill_ohlcv_1m.py new file mode 100644 index 0000000..86cc4ee --- /dev/null +++ b/airflow/dags/backfill_ohlcv_1m.py @@ -0,0 +1,90 @@ +"""backfill_ohlcv_1m — manually triggered Spark OHLCV backfill for a date range.""" + +from __future__ import annotations + +from datetime import datetime, timedelta + +from airflow.operators.python import PythonOperator +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator + +from airflow import DAG + +_JARS = "/opt/airflow/jars/iceberg-spark-runtime.jar,/opt/airflow/jars/iceberg-aws-bundle.jar" + +_SPARK_CONF = { + "spark.sql.extensions": ("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"), + "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.iceberg.type": "rest", + "spark.sql.catalog.iceberg.uri": "http://iceberg-rest:8181", + "spark.sql.catalog.iceberg.warehouse": "s3://ticksense/warehouse", + "spark.sql.catalog.iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", + "spark.sql.catalog.iceberg.s3.endpoint": "http://minio:9000", + "spark.sql.catalog.iceberg.s3.path-style-access": "true", + "spark.sql.catalog.iceberg.s3.access-key-id": "minioadmin", + "spark.sql.catalog.iceberg.s3.secret-access-key": "minioadmin", + "spark.sql.sources.partitionOverwriteMode": "dynamic", +} + +default_args = { + "retries": 3, + "retry_delay": timedelta(minutes=2), + "retry_exponential_backoff": True, + "max_retry_delay": timedelta(minutes=30), +} + + +def validate_params(**context: object) -> None: + """Fail fast if date params are missing or malformed before Spark starts.""" + params = context["params"] + start = params.get("start_date") + end = params.get("end_date") + if not start or not end: + from airflow.exceptions import AirflowFailException + + raise AirflowFailException("Required params: start_date and end_date (YYYY-MM-DD)") + from datetime import date + + s, e = date.fromisoformat(start), date.fromisoformat(end) + if e <= s: + from airflow.exceptions import AirflowFailException + + raise AirflowFailException(f"end_date ({e}) must be after start_date ({s})") + + +with DAG( + dag_id="backfill_ohlcv_1m", + start_date=datetime(2026, 5, 1), + schedule=None, + catchup=False, + default_args=default_args, + params={"start_date": "2026-05-01", "end_date": "2026-05-02"}, + tags=["backfill", "spark", "ohlcv"], + doc_md=( + "Manually triggered. Recomputes normalized.ohlcv_1m from book_ticker " + "for the given date range. Idempotent — reruns overwrite the same partitions." + ), +) as dag: + check = PythonOperator(task_id="validate_params", python_callable=validate_params) + + backfill = SparkSubmitOperator( + task_id="spark_backfill_ohlcv", + application="/opt/airflow/spark-jobs/backfill_ohlcv.py", + application_args=[ + "--start-date", + "{{ params.start_date }}", + "--end-date", + "{{ params.end_date }}", + ], + conn_id="spark_default", + jars=_JARS, + conf=_SPARK_CONF, + env_vars={ + "ICEBERG_REST_URI": "http://iceberg-rest:8181", + "ICEBERG_WAREHOUSE": "s3://ticksense/warehouse", + "S3_ENDPOINT": "http://minio:9000", + "AWS_ACCESS_KEY_ID": "minioadmin", + "AWS_SECRET_ACCESS_KEY": "minioadmin", + }, + ) + + check >> backfill diff --git a/airflow/dags/compact_iceberg_tables.py b/airflow/dags/compact_iceberg_tables.py new file mode 100644 index 0000000..1173c7d --- /dev/null +++ b/airflow/dags/compact_iceberg_tables.py @@ -0,0 +1,58 @@ +"""compact_iceberg_tables — nightly Spark rewrite_data_files for silver tables.""" + +from __future__ import annotations + +from datetime import datetime, timedelta + +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator + +from airflow import DAG + +_JARS = "/opt/airflow/jars/iceberg-spark-runtime.jar,/opt/airflow/jars/iceberg-aws-bundle.jar" + +_SPARK_CONF = { + "spark.sql.extensions": ("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"), + "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.iceberg.type": "rest", + "spark.sql.catalog.iceberg.uri": "http://iceberg-rest:8181", + "spark.sql.catalog.iceberg.warehouse": "s3://ticksense/warehouse", + "spark.sql.catalog.iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", + "spark.sql.catalog.iceberg.s3.endpoint": "http://minio:9000", + "spark.sql.catalog.iceberg.s3.path-style-access": "true", + "spark.sql.catalog.iceberg.s3.access-key-id": "minioadmin", + "spark.sql.catalog.iceberg.s3.secret-access-key": "minioadmin", +} + +default_args = { + "retries": 3, + "retry_delay": timedelta(minutes=2), + "retry_exponential_backoff": True, + "max_retry_delay": timedelta(minutes=30), +} + +with DAG( + dag_id="compact_iceberg_tables", + start_date=datetime(2026, 5, 1), + schedule="0 2 * * *", + catchup=False, + default_args=default_args, + tags=["maintenance", "iceberg", "spark"], + doc_md=( + "Nightly at 02:00 UTC. Merges small Parquet files written by Flink checkpoints " + "into larger files for efficient Trino batch scans." + ), +) as dag: + SparkSubmitOperator( + task_id="rewrite_data_files", + application="/opt/airflow/spark-jobs/compact_tables.py", + conn_id="spark_default", + jars=_JARS, + conf=_SPARK_CONF, + env_vars={ + "ICEBERG_REST_URI": "http://iceberg-rest:8181", + "ICEBERG_WAREHOUSE": "s3://ticksense/warehouse", + "S3_ENDPOINT": "http://minio:9000", + "AWS_ACCESS_KEY_ID": "minioadmin", + "AWS_SECRET_ACCESS_KEY": "minioadmin", + }, + ) diff --git a/airflow/dags/expire_iceberg_snapshots.py b/airflow/dags/expire_iceberg_snapshots.py new file mode 100644 index 0000000..3457a0a --- /dev/null +++ b/airflow/dags/expire_iceberg_snapshots.py @@ -0,0 +1,57 @@ +"""expire_iceberg_snapshots — retain last 7 days of Iceberg snapshots.""" + +from __future__ import annotations + +import os +from datetime import datetime, timedelta + +import trino +from airflow.operators.python import PythonOperator + +from airflow import DAG + +RETENTION_DAYS = 7 + +TABLES = [ + "normalized.book_ticker", + "normalized.ohlcv_1m", + "normalized.symbol_config", +] + +default_args = { + "retries": 3, + "retry_delay": timedelta(minutes=2), + "retry_exponential_backoff": True, + "max_retry_delay": timedelta(minutes=30), +} + + +def expire_snapshots(**_: object) -> None: + """Run Trino expire_snapshots procedure for each silver table.""" + with ( + trino.dbapi.connect( + host=os.getenv("TRINO_HOST", "trino"), + port=int(os.getenv("TRINO_PORT", "8080")), + user="airflow", + catalog="iceberg", + ) as conn, + conn.cursor() as cur, + ): + for table in TABLES: + cur.execute( + f"ALTER TABLE iceberg.{table} " + f"EXECUTE expire_snapshots(retention_threshold => '{RETENTION_DAYS}d')" + ) + cur.fetchall() + + +with DAG( + dag_id="expire_iceberg_snapshots", + start_date=datetime(2026, 5, 1), + schedule="0 3 * * *", + catchup=False, + default_args=default_args, + tags=["maintenance", "iceberg"], + doc_md=f"Nightly at 03:00 UTC. Removes Iceberg snapshots older than {RETENTION_DAYS} days.", +) as dag: + PythonOperator(task_id="expire_snapshots", python_callable=expire_snapshots) diff --git a/airflow/dags/freshness_sla_check.py b/airflow/dags/freshness_sla_check.py new file mode 100644 index 0000000..d9d5e08 --- /dev/null +++ b/airflow/dags/freshness_sla_check.py @@ -0,0 +1,58 @@ +"""freshness_sla_check — alert if any symbol is stale > 35 seconds.""" + +from __future__ import annotations + +import os +from datetime import datetime, timedelta + +import trino +from airflow.exceptions import AirflowFailException +from airflow.operators.python import PythonOperator + +from airflow import DAG + +STALENESS_THRESHOLD_S = 35 + +default_args = { + "retries": 1, + "retry_delay": timedelta(minutes=1), +} + + +def check_freshness(**_: object) -> None: + """Query Trino for any symbol whose latest tick is older than the SLA threshold.""" + with ( + trino.dbapi.connect( + host=os.getenv("TRINO_HOST", "trino"), + port=int(os.getenv("TRINO_PORT", "8080")), + user="airflow", + catalog="iceberg", + ) as conn, + conn.cursor() as cur, + ): + cur.execute( + f""" + SELECT symbol, + DATE_DIFF('second', MAX(exchange_event_ts), NOW()) AS staleness_s + FROM normalized.book_ticker + GROUP BY symbol + HAVING DATE_DIFF('second', MAX(exchange_event_ts), NOW()) > {STALENESS_THRESHOLD_S} + ORDER BY staleness_s DESC + """ + ) + stale = cur.fetchall() + if stale: + details = ", ".join(f"{row[0]}={row[1]}s" for row in stale) + raise AirflowFailException(f"SLA breach — stale symbols: {details}") + + +with DAG( + dag_id="freshness_sla_check", + start_date=datetime(2026, 5, 1), + schedule="*/5 * * * *", + catchup=False, + default_args=default_args, + tags=["sla", "monitoring"], + doc_md="Runs every 5 min. Fails (alerts) if any symbol has no data within 35 s.", +) as dag: + PythonOperator(task_id="check_freshness", python_callable=check_freshness) diff --git a/airflow/dags/run_dbt_models.py b/airflow/dags/run_dbt_models.py new file mode 100644 index 0000000..37e6f09 --- /dev/null +++ b/airflow/dags/run_dbt_models.py @@ -0,0 +1,41 @@ +"""run_dbt_models — hourly dbt run + test against Iceberg silver tables.""" + +from __future__ import annotations + +from datetime import datetime, timedelta + +from airflow.operators.bash import BashOperator + +from airflow import DAG + +_DBT = "dbt --no-send-anonymous-usage-stats" +_OPTS = "--profiles-dir /opt/airflow/dbt --project-dir /opt/airflow/dbt" + +default_args = { + "retries": 3, + "retry_delay": timedelta(minutes=2), + "retry_exponential_backoff": True, + "max_retry_delay": timedelta(minutes=30), +} + +with DAG( + dag_id="run_dbt_models", + start_date=datetime(2026, 5, 1), + schedule="0 * * * *", + catchup=False, + default_args=default_args, + tags=["dbt", "analytics"], + doc_md="Hourly dbt run + test. Rebuilds staging views and mart tables over Iceberg.", +) as dag: + dbt_run = BashOperator( + task_id="dbt_run", + bash_command=f"{_DBT} run {_OPTS}", + env={"TRINO_HOST": "trino", "TRINO_PORT": "8080"}, + ) + dbt_test = BashOperator( + task_id="dbt_test", + bash_command=f"{_DBT} test {_OPTS}", + env={"TRINO_HOST": "trino", "TRINO_PORT": "8080"}, + ) + + dbt_run >> dbt_test diff --git a/airflow/dags/run_great_expectations.py b/airflow/dags/run_great_expectations.py new file mode 100644 index 0000000..343f4fc --- /dev/null +++ b/airflow/dags/run_great_expectations.py @@ -0,0 +1,107 @@ +"""run_great_expectations — data quality checks on Iceberg silver tables.""" + +from __future__ import annotations + +import os +from datetime import datetime, timedelta + +import pandas as pd +import trino +from airflow.exceptions import AirflowFailException +from airflow.operators.python import PythonOperator + +from airflow import DAG + +default_args = { + "retries": 2, + "retry_delay": timedelta(minutes=5), + "retry_exponential_backoff": True, + "max_retry_delay": timedelta(minutes=30), +} + +# Expectations applied to every table (column → (min, max|None)) +_PRICE_COLS = {"best_bid_price": (0, None), "best_ask_price": (0, None)} +_REQUIRED_COLS = ["exchange", "symbol", "exchange_event_ts"] + + +def _trino_conn() -> trino.dbapi.Connection: + return trino.dbapi.connect( + host=os.getenv("TRINO_HOST", "trino"), + port=int(os.getenv("TRINO_PORT", "8080")), + user="airflow", + catalog="iceberg", + ) + + +def _fetch_sample(table: str, minutes: int = 10) -> pd.DataFrame: + """Fetch a recent sample from the given Iceberg table via Trino.""" + with _trino_conn() as conn, conn.cursor() as cur: + cur.execute( + f""" + SELECT * + FROM iceberg.{table} + WHERE exchange_event_ts > NOW() - INTERVAL '{minutes}' MINUTE + LIMIT 2000 + """ + ) + rows = cur.fetchall() + cols = [d[0] for d in cur.description] + return pd.DataFrame(rows, columns=cols) + + +def _run_suite(df: pd.DataFrame, table: str) -> list[str]: + """Run GX expectations on a pandas DataFrame. Returns list of failure messages.""" + import great_expectations as gx + + ctx = gx.get_context(mode="ephemeral") + ds = ctx.sources.add_pandas(name="trino_sample") + asset = ds.add_dataframe_asset(name=table) + batch_req = asset.build_batch_request(dataframe=df) + + ctx.add_or_update_expectation_suite(f"{table}_suite") + validator = ctx.get_validator(batch_request=batch_req, expectation_suite_name=f"{table}_suite") + + for col in _REQUIRED_COLS: + if col in df.columns: + validator.expect_column_values_to_not_be_null(col) + + for col, (lo, hi) in _PRICE_COLS.items(): + if col in df.columns: + validator.expect_column_values_to_be_between(col, min_value=lo, max_value=hi) + + results = validator.validate() + return [ + r["expectation_config"]["expectation_type"] for r in results.results if not r["success"] + ] + + +def run_gx_checks(**_: object) -> None: + """Validate recent data in book_ticker and ohlcv_1m; fail if any expectation fails.""" + failures: dict[str, list[str]] = {} + + for table in ("normalized.book_ticker", "normalized.ohlcv_1m"): + df = _fetch_sample(table) + if df.empty: + failures[table] = ["no_recent_data"] + continue + failed = _run_suite(df, table) + if failed: + failures[table] = failed + + if failures: + raise AirflowFailException(f"GX validation failed: {failures}") + + +with DAG( + dag_id="run_great_expectations", + start_date=datetime(2026, 5, 1), + schedule="30 * * * *", + catchup=False, + default_args=default_args, + tags=["quality", "great-expectations"], + doc_md=( + "Runs at :30 each hour. Fetches a recent sample from Iceberg via Trino " + "and validates null constraints and price bounds with Great Expectations." + ), +) as dag: + PythonOperator(task_id="gx_checks", python_callable=run_gx_checks) diff --git a/airflow/tests/conftest.py b/airflow/tests/conftest.py new file mode 100644 index 0000000..d08ff22 --- /dev/null +++ b/airflow/tests/conftest.py @@ -0,0 +1,6 @@ +"""Add airflow/dags to sys.path so DAG modules are importable by name.""" + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent / "dags")) diff --git a/airflow/tests/test_dag_integrity.py b/airflow/tests/test_dag_integrity.py new file mode 100644 index 0000000..49c7279 --- /dev/null +++ b/airflow/tests/test_dag_integrity.py @@ -0,0 +1,47 @@ +"""DAG integrity tests — verify all 6 DAGs load without import errors.""" + +from __future__ import annotations + +import pytest + +pytest.importorskip( + "airflow.operators.python", reason="apache-airflow not installed; skipping DAG tests" +) + +from airflow.models import DagBag # noqa: E402 + +EXPECTED_DAG_IDS = { + "backfill_ohlcv_1m", + "compact_iceberg_tables", + "expire_iceberg_snapshots", + "run_dbt_models", + "run_great_expectations", + "freshness_sla_check", +} + + +@pytest.fixture(scope="module") +def dag_bag() -> DagBag: + return DagBag(dag_folder="airflow/dags", include_examples=False) + + +def test_no_import_errors(dag_bag: DagBag) -> None: + assert dag_bag.import_errors == {}, f"Import errors: {dag_bag.import_errors}" + + +def test_all_dags_present(dag_bag: DagBag) -> None: + assert set(dag_bag.dag_ids) == EXPECTED_DAG_IDS + + +def test_all_dags_have_tags(dag_bag: DagBag) -> None: + for dag_id, dag in dag_bag.dags.items(): + assert dag.tags, f"DAG '{dag_id}' has no tags" + + +def test_all_dags_have_retries(dag_bag: DagBag) -> None: + # schedule=None (manual) DAGs skip this check + for dag_id, dag in dag_bag.dags.items(): + if dag.schedule_interval is None: + continue + for task in dag.tasks: + assert task.retries > 0, f"{dag_id}.{task.task_id} has retries=0" diff --git a/airflow/tests/test_freshness_check.py b/airflow/tests/test_freshness_check.py new file mode 100644 index 0000000..6858183 --- /dev/null +++ b/airflow/tests/test_freshness_check.py @@ -0,0 +1,49 @@ +"""Unit tests for check_freshness — Trino connection mocked out.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +pytest.importorskip("airflow.operators.python", reason="apache-airflow not installed; skipping") + +from airflow.exceptions import AirflowFailException # noqa: E402 +from freshness_sla_check import check_freshness # noqa: E402 + + +def _mock_conn(rows: list[tuple]) -> MagicMock: + cur = MagicMock() + cur.fetchall.return_value = rows + cur.__enter__ = lambda s: s + cur.__exit__ = MagicMock(return_value=False) + conn = MagicMock() + conn.cursor.return_value = cur + conn.__enter__ = lambda s: s + conn.__exit__ = MagicMock(return_value=False) + return conn + + +def test_passes_when_all_fresh() -> None: + with patch("freshness_sla_check.trino.dbapi.connect", return_value=_mock_conn([])): + check_freshness() # must not raise + + +def test_raises_when_symbol_stale() -> None: + stale_rows = [("BTCUSDT", 120), ("ETHUSDT", 45)] + with ( + patch("freshness_sla_check.trino.dbapi.connect", return_value=_mock_conn(stale_rows)), + pytest.raises(AirflowFailException, match="SLA breach"), + ): + check_freshness() + + +def test_failure_message_contains_symbol() -> None: + with ( + patch( + "freshness_sla_check.trino.dbapi.connect", + return_value=_mock_conn([("SOLUSDT", 99)]), + ), + pytest.raises(AirflowFailException, match="SOLUSDT"), + ): + check_freshness() diff --git a/flink/jobs/cdc_symbol_config.py b/flink/jobs/cdc_symbol_config.py index 1657fcd..4280f93 100644 --- a/flink/jobs/cdc_symbol_config.py +++ b/flink/jobs/cdc_symbol_config.py @@ -56,14 +56,14 @@ from datetime import datetime, timezone from pathlib import Path -from lib.config import Config -from lib.kafka_schema import ( +from flink_lib.config import Config +from flink_lib.kafka_schema import ( OFFSET_IDX, PARTITION_IDX, PAYLOAD_IDX, KafkaRecordDeserializer, ) -from lib.sql_runner import add_inserts_from_file, execute_sql_file +from flink_lib.sql_runner import add_inserts_from_file, execute_sql_file from pyflink.common import Row, WatermarkStrategy from pyflink.common.restart_strategy import RestartStrategies from pyflink.common.typeinfo import Types diff --git a/flink/jobs/lib/__init__.py b/flink/jobs/flink_lib/__init__.py similarity index 100% rename from flink/jobs/lib/__init__.py rename to flink/jobs/flink_lib/__init__.py diff --git a/flink/jobs/lib/config.py b/flink/jobs/flink_lib/config.py similarity index 100% rename from flink/jobs/lib/config.py rename to flink/jobs/flink_lib/config.py diff --git a/flink/jobs/lib/kafka_schema.py b/flink/jobs/flink_lib/kafka_schema.py similarity index 100% rename from flink/jobs/lib/kafka_schema.py rename to flink/jobs/flink_lib/kafka_schema.py diff --git a/flink/jobs/lib/logic.py b/flink/jobs/flink_lib/logic.py similarity index 100% rename from flink/jobs/lib/logic.py rename to flink/jobs/flink_lib/logic.py diff --git a/flink/jobs/lib/sql_runner.py b/flink/jobs/flink_lib/sql_runner.py similarity index 100% rename from flink/jobs/lib/sql_runner.py rename to flink/jobs/flink_lib/sql_runner.py diff --git a/flink/jobs/normalize.py b/flink/jobs/normalize.py index 22bf5a1..87b9e95 100644 --- a/flink/jobs/normalize.py +++ b/flink/jobs/normalize.py @@ -31,14 +31,14 @@ from pathlib import Path from typing import Any -from lib.config import Config -from lib.kafka_schema import ( +from flink_lib.config import Config +from flink_lib.kafka_schema import ( OFFSET_IDX, PARTITION_IDX, PAYLOAD_IDX, KafkaRecordDeserializer, ) -from lib.logic import ( +from flink_lib.logic import ( apply_depth_diff, best_ask, best_bid, @@ -46,7 +46,7 @@ compute_mid_price, compute_spread, ) -from lib.sql_runner import add_inserts_from_file, execute_sql_file +from flink_lib.sql_runner import add_inserts_from_file, execute_sql_file from pyflink.common import Row, WatermarkStrategy from pyflink.common.restart_strategy import RestartStrategies from pyflink.common.typeinfo import Types diff --git a/flink/jobs/ohlcv_1m.py b/flink/jobs/ohlcv_1m.py index 909271c..a0ea2c2 100644 --- a/flink/jobs/ohlcv_1m.py +++ b/flink/jobs/ohlcv_1m.py @@ -21,8 +21,8 @@ import logging from pathlib import Path -from lib.config import Config -from lib.sql_runner import add_inserts_from_file, execute_sql_file +from flink_lib.config import Config +from flink_lib.sql_runner import add_inserts_from_file, execute_sql_file from pyflink.common.restart_strategy import RestartStrategies from pyflink.datastream import ( CheckpointingMode, diff --git a/flink/tests/unit/test_config.py b/flink/tests/unit/test_config.py index b148c93..c18c873 100644 --- a/flink/tests/unit/test_config.py +++ b/flink/tests/unit/test_config.py @@ -2,7 +2,7 @@ from __future__ import annotations -from lib.config import Config +from flink_lib.config import Config class TestConfig: diff --git a/flink/tests/unit/test_kafka_schema.py b/flink/tests/unit/test_kafka_schema.py index 444fa41..c591a77 100644 --- a/flink/tests/unit/test_kafka_schema.py +++ b/flink/tests/unit/test_kafka_schema.py @@ -2,7 +2,7 @@ from unittest.mock import MagicMock -from lib.kafka_schema import OFFSET_IDX, PARTITION_IDX, PAYLOAD_IDX, parse_kafka_record +from flink_lib.kafka_schema import OFFSET_IDX, PARTITION_IDX, PAYLOAD_IDX, parse_kafka_record def _make_record( diff --git a/flink/tests/unit/test_logic.py b/flink/tests/unit/test_logic.py index 7c5fb0a..9799838 100644 --- a/flink/tests/unit/test_logic.py +++ b/flink/tests/unit/test_logic.py @@ -1,7 +1,7 @@ """Unit tests for flink/jobs/lib/logic.py — pure Python, no PyFlink dependency.""" import pytest -from lib.logic import ( +from flink_lib.logic import ( OHLCVState, apply_depth_diff, best_ask, diff --git a/flink/tests/unit/test_sql_runner.py b/flink/tests/unit/test_sql_runner.py index ce192c0..8654871 100644 --- a/flink/tests/unit/test_sql_runner.py +++ b/flink/tests/unit/test_sql_runner.py @@ -4,7 +4,7 @@ from unittest.mock import MagicMock import pytest -from lib.sql_runner import ( +from flink_lib.sql_runner import ( add_inserts_from_file, execute_sql_file, load_sql, diff --git a/pyproject.toml b/pyproject.toml index 4215f33..b7eb552 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ plugins = ["pydantic.mypy"] [tool.pytest.ini_options] asyncio_mode = "auto" -testpaths = ["ingest/tests", "api/tests", "replay/tests", "flink/tests"] +testpaths = ["ingest/tests", "api/tests", "replay/tests", "flink/tests", "airflow/tests", "spark/tests"] addopts = "--tb=short" [tool.coverage.run] diff --git a/spark/Dockerfile b/spark/Dockerfile new file mode 100644 index 0000000..6c4f08b --- /dev/null +++ b/spark/Dockerfile @@ -0,0 +1,16 @@ +FROM bitnami/spark:3.5 + +USER root + +# Iceberg Spark runtime (Spark 3.5 / Scala 2.12) + AWS bundle for S3FileIO. +# Same Iceberg 1.6.0 as the Flink and Airflow images — consistent catalog format. +RUN curl -fL \ + "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.0/iceberg-spark-runtime-3.5_2.12-1.6.0.jar" \ + -o /opt/bitnami/spark/jars/iceberg-spark-runtime-3.5_2.12-1.6.0.jar && \ + curl -fL \ + "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/1.6.0/iceberg-aws-bundle-1.6.0.jar" \ + -o /opt/bitnami/spark/jars/iceberg-aws-bundle-1.6.0.jar + +COPY jobs/ /opt/spark/jobs/ + +USER 1001 diff --git a/spark/jobs/__init__.py b/spark/jobs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/spark/jobs/backfill_ohlcv.py b/spark/jobs/backfill_ohlcv.py new file mode 100644 index 0000000..f75d9de --- /dev/null +++ b/spark/jobs/backfill_ohlcv.py @@ -0,0 +1,67 @@ +""" +backfill_ohlcv.py — Recompute normalized.ohlcv_1m for a date range. + +Reads from normalized.book_ticker (silver), aggregates to 1-minute OHLCV, +and writes with dynamic partition overwrite — identical reruns are idempotent. + +Usage (via spark-submit or Airflow SparkSubmitOperator): + spark-submit backfill_ohlcv.py --start-date 2026-05-01 --end-date 2026-05-02 +""" + +from __future__ import annotations + +import argparse +import logging +from datetime import date, timedelta + +log = logging.getLogger(__name__) + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + p = argparse.ArgumentParser(description="Backfill 1-minute OHLCV from book_ticker") + p.add_argument( + "--start-date", + required=True, + type=date.fromisoformat, + help="Inclusive start date (YYYY-MM-DD)", + ) + p.add_argument( + "--end-date", required=True, type=date.fromisoformat, help="Exclusive end date (YYYY-MM-DD)" + ) + return p.parse_args(argv) + + +def validate_date_range(start: date, end: date) -> None: + if end <= start: + raise ValueError(f"end_date ({end}) must be after start_date ({start})") + if (end - start) > timedelta(days=90): + raise ValueError("Range exceeds 90-day safety limit; split into smaller chunks") + + +def main() -> None: + from spark_lib.config import build_spark_session + from spark_lib.sql_runner import load_sql + + args = parse_args() + validate_date_range(args.start_date, args.end_date) + + spark = build_spark_session("ticksense-backfill-ohlcv") + spark.sparkContext.setLogLevel("WARN") + + sql = load_sql( + "backfill_ohlcv.sql", + start_ts=f"{args.start_date}T00:00:00", + end_ts=f"{args.end_date}T00:00:00", + ) + + try: + df = spark.sql(sql) + df.writeTo("iceberg.normalized.ohlcv_1m").overwritePartitions() + log.info("Backfill complete: %s → %s", args.start_date, args.end_date) + finally: + spark.stop() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + main() diff --git a/spark/jobs/compact_tables.py b/spark/jobs/compact_tables.py new file mode 100644 index 0000000..6103464 --- /dev/null +++ b/spark/jobs/compact_tables.py @@ -0,0 +1,38 @@ +""" +compact_tables.py — Nightly Iceberg rewrite_data_files for all silver tables. + +Streaming writers (Flink) produce many small Parquet files per checkpoint. +This job merges them into larger files (~128 MB) so Trino scans are faster. +Safe to rerun — rewrite_data_files is idempotent. + +Usage: + spark-submit compact_tables.py +""" + +from __future__ import annotations + +import logging + +log = logging.getLogger(__name__) + + +def main() -> None: + from spark_lib.config import TABLES_TO_COMPACT, build_spark_session + from spark_lib.sql_runner import load_sql + + spark = build_spark_session("ticksense-compact-tables") + spark.sparkContext.setLogLevel("WARN") + + try: + for table in TABLES_TO_COMPACT: + sql = load_sql("compact_table.sql", table=table) + log.info("Compacting %s ...", table) + spark.sql(sql).show(truncate=False) + log.info("Done: %s", table) + finally: + spark.stop() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + main() diff --git a/spark/jobs/spark_lib/__init__.py b/spark/jobs/spark_lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/spark/jobs/spark_lib/config.py b/spark/jobs/spark_lib/config.py new file mode 100644 index 0000000..65703c4 --- /dev/null +++ b/spark/jobs/spark_lib/config.py @@ -0,0 +1,45 @@ +"""Shared SparkSession builder and Iceberg catalog configuration.""" + +from __future__ import annotations + +import os + +TABLES_TO_COMPACT: list[str] = [ + "normalized.book_ticker", + "normalized.ohlcv_1m", + "normalized.symbol_config", +] + + +def build_spark_session(app_name: str): # type: ignore[return] + """ + Build a SparkSession pointing at the Iceberg REST catalog on MinIO. + All coordinates are read from environment variables so the same job runs + locally (local[*]) and on the Spark standalone cluster. + """ + from pyspark import SparkConf + from pyspark.sql import SparkSession + + iceberg_conf: dict[str, str] = { + "spark.sql.extensions": ( + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + ), + "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.iceberg.type": "rest", + "spark.sql.catalog.iceberg.uri": os.getenv("ICEBERG_REST_URI", "http://iceberg-rest:8181"), + "spark.sql.catalog.iceberg.warehouse": os.getenv( + "ICEBERG_WAREHOUSE", "s3://ticksense/warehouse" + ), + "spark.sql.catalog.iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", + "spark.sql.catalog.iceberg.s3.endpoint": os.getenv("S3_ENDPOINT", "http://minio:9000"), + "spark.sql.catalog.iceberg.s3.path-style-access": "true", + "spark.sql.catalog.iceberg.s3.access-key-id": os.getenv("AWS_ACCESS_KEY_ID", "minioadmin"), + "spark.sql.catalog.iceberg.s3.secret-access-key": os.getenv( + "AWS_SECRET_ACCESS_KEY", "minioadmin" + ), + "spark.sql.sources.partitionOverwriteMode": "dynamic", + } + + master = os.getenv("SPARK_MASTER_URL", "local[*]") + conf = SparkConf().setMaster(master).setAppName(app_name).setAll(list(iceberg_conf.items())) + return SparkSession.builder.config(conf=conf).getOrCreate() diff --git a/spark/jobs/spark_lib/sql_runner.py b/spark/jobs/spark_lib/sql_runner.py new file mode 100644 index 0000000..3d0237d --- /dev/null +++ b/spark/jobs/spark_lib/sql_runner.py @@ -0,0 +1,13 @@ +"""Read SQL files from spark/jobs/sql/ and substitute named parameters.""" + +from __future__ import annotations + +from pathlib import Path + +_SQL_DIR = Path(__file__).parent.parent / "sql" + + +def load_sql(filename: str, **params: str) -> str: + """Return the contents of sql/, with {params} substituted.""" + text = (_SQL_DIR / filename).read_text() + return text.format(**params) if params else text diff --git a/spark/jobs/sql/backfill_ohlcv.sql b/spark/jobs/sql/backfill_ohlcv.sql new file mode 100644 index 0000000..bd5a5aa --- /dev/null +++ b/spark/jobs/sql/backfill_ohlcv.sql @@ -0,0 +1,46 @@ +-- backfill_ohlcv.sql +-- Recompute 1-minute OHLCV from normalized.book_ticker for a date range. +-- +-- Parameters (substituted by Python .format()): +-- {start_ts} — inclusive lower bound, ISO timestamp string +-- {end_ts} — exclusive upper bound, ISO timestamp string +-- +-- open/close correctness: ROW_NUMBER picks the first and last tick by time +-- within each minute window, avoiding the undefined ordering of first()/last(). + +WITH ranked AS ( + SELECT + exchange, + symbol, + mid_price, + DATE_TRUNC('MINUTE', exchange_event_ts) AS window_start, + ROW_NUMBER() OVER ( + PARTITION BY exchange, symbol, + DATE_TRUNC('MINUTE', exchange_event_ts) + ORDER BY exchange_event_ts ASC + ) AS rn_asc, + ROW_NUMBER() OVER ( + PARTITION BY exchange, symbol, + DATE_TRUNC('MINUTE', exchange_event_ts) + ORDER BY exchange_event_ts DESC + ) AS rn_desc + FROM iceberg.normalized.book_ticker + WHERE exchange_event_ts >= TIMESTAMP '{start_ts}' + AND exchange_event_ts < TIMESTAMP '{end_ts}' +) +SELECT + exchange, + symbol, + window_start, + window_start + INTERVAL 1 MINUTES AS window_end, + MAX(CASE WHEN rn_asc = 1 THEN mid_price END) AS open, + MAX(mid_price) AS high, + MIN(mid_price) AS low, + MAX(CASE WHEN rn_desc = 1 THEN mid_price END) AS close, + AVG(mid_price) AS vwap, + COUNT(*) AS trade_count, + CAST(window_start AS DATE) AS event_date, + CURRENT_TIMESTAMP() AS ingest_ts +FROM ranked +GROUP BY exchange, symbol, window_start +ORDER BY exchange, symbol, window_start diff --git a/spark/jobs/sql/compact_table.sql b/spark/jobs/sql/compact_table.sql new file mode 100644 index 0000000..35c83d1 --- /dev/null +++ b/spark/jobs/sql/compact_table.sql @@ -0,0 +1,9 @@ +-- compact_table.sql +-- Trigger Iceberg rewrite_data_files for one table. +-- Parameter: {table} — fully qualified table name within the iceberg catalog +-- e.g. 'normalized.book_ticker' +-- +-- rewrite_data_files merges small files produced by streaming writers into +-- larger files sized for efficient batch reads (target ~128 MB). + +CALL iceberg.system.rewrite_data_files(table => '{table}') diff --git a/spark/tests/conftest.py b/spark/tests/conftest.py new file mode 100644 index 0000000..af4fe17 --- /dev/null +++ b/spark/tests/conftest.py @@ -0,0 +1,6 @@ +"""Add spark/jobs to sys.path so job modules and lib are importable by name.""" + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent / "jobs")) diff --git a/spark/tests/test_spark_jobs.py b/spark/tests/test_spark_jobs.py new file mode 100644 index 0000000..28864fc --- /dev/null +++ b/spark/tests/test_spark_jobs.py @@ -0,0 +1,93 @@ +"""Unit tests for Spark job logic — no SparkSession required.""" + +from __future__ import annotations + +from datetime import date, timedelta +from pathlib import Path + +import pytest + +# ── lib.sql_runner ──────────────────────────────────────────────────────────── + + +def test_load_sql_substitutes_params() -> None: + from spark_lib.sql_runner import load_sql + + sql = load_sql( + "backfill_ohlcv.sql", start_ts="2026-05-01T00:00:00", end_ts="2026-05-02T00:00:00" + ) + assert "2026-05-01T00:00:00" in sql + assert "2026-05-02T00:00:00" in sql + assert "{start_ts}" not in sql + assert "{end_ts}" not in sql + + +def test_load_sql_no_params_returns_raw() -> None: + from spark_lib.sql_runner import load_sql + + sql = load_sql("compact_table.sql") + assert "{table}" in sql # unsubstituted — caller must pass param + + +def test_sql_files_exist() -> None: + sql_dir = Path(__file__).parent.parent / "jobs" / "sql" + assert (sql_dir / "backfill_ohlcv.sql").exists() + assert (sql_dir / "compact_table.sql").exists() + + +# ── lib.config ──────────────────────────────────────────────────────────────── + + +def test_tables_to_compact_nonempty() -> None: + from spark_lib.config import TABLES_TO_COMPACT + + assert len(TABLES_TO_COMPACT) > 0 + assert all("." in t for t in TABLES_TO_COMPACT), "each entry must be schema.table" + + +# ── backfill_ohlcv.parse_args ───────────────────────────────────────────────── + + +def test_parse_args_valid() -> None: + from backfill_ohlcv import parse_args + + args = parse_args(["--start-date", "2026-05-01", "--end-date", "2026-05-02"]) + assert args.start_date == date(2026, 5, 1) + assert args.end_date == date(2026, 5, 2) + + +def test_parse_args_missing_required(monkeypatch: pytest.MonkeyPatch) -> None: + from backfill_ohlcv import parse_args + + with pytest.raises(SystemExit): + parse_args(["--start-date", "2026-05-01"]) # end-date missing + + +# ── backfill_ohlcv.validate_date_range ─────────────────────────────────────── + + +def test_validate_date_range_valid() -> None: + from backfill_ohlcv import validate_date_range + + validate_date_range(date(2026, 5, 1), date(2026, 5, 2)) # no exception + + +def test_validate_date_range_end_before_start() -> None: + from backfill_ohlcv import validate_date_range + + with pytest.raises(ValueError, match="must be after"): + validate_date_range(date(2026, 5, 2), date(2026, 5, 1)) + + +def test_validate_date_range_same_day() -> None: + from backfill_ohlcv import validate_date_range + + with pytest.raises(ValueError, match="must be after"): + validate_date_range(date(2026, 5, 1), date(2026, 5, 1)) + + +def test_validate_date_range_exceeds_limit() -> None: + from backfill_ohlcv import validate_date_range + + with pytest.raises(ValueError, match="90-day"): + validate_date_range(date(2026, 1, 1), date(2026, 1, 1) + timedelta(days=91))