diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index 5a956f5..6d6a89a 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -132,7 +132,7 @@ ingest/src/ingest/ ### Known limitations -- `kafka_partition` and `kafka_offset` hardcoded to `-1` in normalize output — `SimpleStringSchema` doesn't expose Kafka record metadata. Fix: implement `KafkaRecordDeserializationSchema`. +~~`kafka_partition` and `kafka_offset` hardcoded to `-1` in normalize output~~ — **Fixed (2026-05-17):** replaced `SimpleStringSchema` with `KafkaRecordDeserializer` (`lib/kafka_schema.py`) in both `normalize.py` and `cdc_symbol_config.py`. Real partition and offset are now stored in Iceberg; `iceberg_reader.py` precise-seek path is active for rows written after the fix. --- diff --git a/flink/jobs/cdc_symbol_config.py b/flink/jobs/cdc_symbol_config.py index cd5c586..1657fcd 100644 --- a/flink/jobs/cdc_symbol_config.py +++ b/flink/jobs/cdc_symbol_config.py @@ -57,10 +57,15 @@ from pathlib import Path from lib.config import Config +from lib.kafka_schema import ( + OFFSET_IDX, + PARTITION_IDX, + PAYLOAD_IDX, + KafkaRecordDeserializer, +) from lib.sql_runner import add_inserts_from_file, execute_sql_file from pyflink.common import Row, WatermarkStrategy from pyflink.common.restart_strategy import RestartStrategies -from pyflink.common.serialization import SimpleStringSchema from pyflink.common.typeinfo import Types from pyflink.datastream import ( CheckpointingMode, @@ -133,19 +138,17 @@ class CdcEnvelopeProcessor(FlatMapFunction): - Missing payload (both before and after null) → log warning, skip """ - def flat_map(self, value: str) -> None: # type: ignore[override] + def flat_map(self, value: Row) -> None: # type: ignore[override] now_ms = int(datetime.now(UTC).timestamp() * 1000) - - # Tombstone guard: null Kafka value (should not arrive; tombstones.on.delete=false) - if value is None: - log.warning("cdc_tombstone_received: null value skipped (connector misconfigured?)") - return + kafka_partition: int = value[PARTITION_IDX] + kafka_offset: int = value[OFFSET_IDX] + raw_json: str = value[PAYLOAD_IDX] # ── Parse Debezium envelope ──────────────────────────────────────────── try: - envelope = json.loads(value) + envelope = json.loads(raw_json) except (json.JSONDecodeError, ValueError) as exc: - log.warning("cdc_parse_error error=%s msg_preview=%.200s", exc, value) + log.warning("cdc_parse_error error=%s msg_preview=%.200s", exc, raw_json) return op: str = envelope.get("op", "") @@ -186,10 +189,6 @@ def flat_map(self, value: str) -> None: # type: ignore[override] ) # ── Emit bronze row ──────────────────────────────────────────────────── - # kafka_partition and kafka_offset are -1 because SimpleStringSchema - # doesn't expose Kafka record metadata (same limitation as normalize.py). - # The source_lsn from Debezium's source block is the canonical ordering key - # for replay purposes. yield Row( op, symbol, @@ -199,9 +198,9 @@ def flat_map(self, value: str) -> None: # type: ignore[override] source_ts_ms, tx_id, debezium_ts_ms, - Config.TOPIC_CDC, # kafka_topic - -1, # kafka_partition (not available via SimpleStringSchema) - -1, # kafka_offset (not available via SimpleStringSchema) + Config.TOPIC_CDC, + kafka_partition, + kafka_offset, now_ms, ) @@ -273,7 +272,7 @@ def main() -> None: .set_topics(cfg.TOPIC_CDC) .set_group_id(cfg.GROUP_CDC_BRONZE) .set_starting_offsets(KafkaOffsetsInitializer.earliest()) - .set_value_only_deserializer(SimpleStringSchema()) + .set_deserializer(KafkaRecordDeserializer()) .build(), WatermarkStrategy.no_watermarks(), "cdc-raw-source", diff --git a/flink/jobs/lib/kafka_schema.py b/flink/jobs/lib/kafka_schema.py new file mode 100644 index 0000000..ba8990e --- /dev/null +++ b/flink/jobs/lib/kafka_schema.py @@ -0,0 +1,73 @@ +""" +Kafka record deserialization helpers. + +The pure function parse_kafka_record is importable without PyFlink (used in +unit tests running on Python 3.12 in the root workspace). The +KafkaRecordDeserializer class and KAFKA_RECORD_TYPE are only defined when +PyFlink is present; they live under a try/except so the module remains +importable in the test environment. + +Why this exists +--------------- +SimpleStringSchema is a value-only deserializer: it discards all Kafka record +metadata (partition, offset, timestamp, key). Switching to +KafkaRecordDeserializationSchema lets downstream Flink jobs store the real +kafka_partition and kafka_offset in Iceberg instead of -1, enabling precise +offset-based seek during replay. +""" + +from __future__ import annotations + +# Positional indices into the Row emitted by KafkaRecordDeserializer. +# Use these constants instead of magic numbers in job code. +PAYLOAD_IDX = 0 +PARTITION_IDX = 1 +OFFSET_IDX = 2 + + +def parse_kafka_record(record: object) -> tuple[str, int, int] | None: + """Extract (payload_str, partition, offset) from a Kafka ConsumerRecord. + + Duck-typed: any object with .value(), .partition(), .offset() methods works, + which makes this function testable without PyFlink. + + Returns None for tombstone records (null value bytes). + """ + value_bytes = record.value() # type: ignore[union-attr] + if value_bytes is None: + return None + payload = bytes(value_bytes).decode("utf-8") + partition: int = record.partition() # type: ignore[union-attr] + offset: int = record.offset() # type: ignore[union-attr] + return (payload, partition, offset) + + +try: + from pyflink.common import Row + from pyflink.common.typeinfo import Types + from pyflink.datastream.connectors.kafka import KafkaRecordDeserializationSchema + + KAFKA_RECORD_TYPE = Types.ROW_NAMED( + ["payload", "kafka_partition", "kafka_offset"], + [Types.STRING(), Types.INT(), Types.LONG()], + ) + + class KafkaRecordDeserializer(KafkaRecordDeserializationSchema): + """Replaces SimpleStringSchema to capture real kafka_partition and kafka_offset. + + Emits Row(payload: str, kafka_partition: int, kafka_offset: long). + Tombstone records (null value) are suppressed here so downstream + processors never receive None payloads. + """ + + def deserialize(self, record: object, collector: object) -> None: + result = parse_kafka_record(record) + if result is not None: + payload, partition, offset = result + collector.collect(Row(payload, partition, offset)) # type: ignore[union-attr] + + def get_produced_type(self) -> object: + return KAFKA_RECORD_TYPE + +except ImportError: + pass diff --git a/flink/jobs/normalize.py b/flink/jobs/normalize.py index 496f5cc..22bf5a1 100644 --- a/flink/jobs/normalize.py +++ b/flink/jobs/normalize.py @@ -32,6 +32,12 @@ from typing import Any from lib.config import Config +from lib.kafka_schema import ( + OFFSET_IDX, + PARTITION_IDX, + PAYLOAD_IDX, + KafkaRecordDeserializer, +) from lib.logic import ( apply_depth_diff, best_ask, @@ -43,7 +49,6 @@ from lib.sql_runner import add_inserts_from_file, execute_sql_file from pyflink.common import Row, WatermarkStrategy from pyflink.common.restart_strategy import RestartStrategies -from pyflink.common.serialization import SimpleStringSchema from pyflink.common.typeinfo import Types from pyflink.datastream import ( CheckpointingMode, @@ -125,9 +130,9 @@ def _to_epoch_ms(ts: Any) -> int: return int(datetime.now(UTC).timestamp() * 1000) -def _kafka_key(msg: str) -> str: - """Extract routing key from raw JSON message (called by key_by).""" - data = json.loads(msg) +def _kafka_key(record: Row) -> str: + """Extract routing key from Kafka record Row(payload, partition, offset).""" + data = json.loads(record[PAYLOAD_IDX]) return f"{data['exchange']}#{data['symbol']}" @@ -155,9 +160,11 @@ def open(self, runtime_context: RuntimeContext) -> None: # type: ignore[overrid ) def process_element( # type: ignore[override] - self, value: str, ctx: KeyedProcessFunction.Context + self, value: Row, ctx: KeyedProcessFunction.Context ) -> None: - msg = json.loads(value) + kafka_partition: int = value[PARTITION_IDX] + kafka_offset: int = value[OFFSET_IDX] + msg = json.loads(value[PAYLOAD_IDX]) last_update_id: int = int(msg["last_update_id"]) # ── 1. Dedup ────────────────────────────────────────────────────────── @@ -222,8 +229,8 @@ def process_element( # type: ignore[override] compute_mid_price(bb_price, ba_price), compute_imbalance(book["bids"], book["asks"]), Config.TOPIC_RAW, - -1, # partition not available via SimpleStringSchema - -1, # offset not available via SimpleStringSchema + kafka_partition, + kafka_offset, ) @@ -292,7 +299,7 @@ def main() -> None: .set_topics(cfg.TOPIC_RAW) .set_group_id(cfg.GROUP_NORMALIZE) .set_starting_offsets(KafkaOffsetsInitializer.earliest()) - .set_value_only_deserializer(SimpleStringSchema()) + .set_deserializer(KafkaRecordDeserializer()) .build(), WatermarkStrategy.no_watermarks(), "raw-orderbook-source", diff --git a/flink/tests/unit/test_kafka_schema.py b/flink/tests/unit/test_kafka_schema.py new file mode 100644 index 0000000..444fa41 --- /dev/null +++ b/flink/tests/unit/test_kafka_schema.py @@ -0,0 +1,61 @@ +"""Unit tests for lib/kafka_schema.py — no PyFlink dependency.""" + +from unittest.mock import MagicMock + +from lib.kafka_schema import OFFSET_IDX, PARTITION_IDX, PAYLOAD_IDX, parse_kafka_record + + +def _make_record( + value_bytes: bytes | None, + partition: int = 0, + offset: int = 0, +) -> MagicMock: + record = MagicMock() + record.value.return_value = value_bytes + record.partition.return_value = partition + record.offset.return_value = offset + return record + + +class TestParseKafkaRecord: + def test_returns_payload_partition_offset(self) -> None: + record = _make_record(b'{"symbol": "BTCUSDT"}', partition=2, offset=12345) + result = parse_kafka_record(record) + assert result is not None + assert result[PAYLOAD_IDX] == '{"symbol": "BTCUSDT"}' + assert result[PARTITION_IDX] == 2 + assert result[OFFSET_IDX] == 12345 + + def test_tombstone_returns_none(self) -> None: + record = _make_record(None, partition=0, offset=99) + assert parse_kafka_record(record) is None + + def test_partition_zero_and_offset_zero_are_valid(self) -> None: + record = _make_record(b"{}", partition=0, offset=0) + result = parse_kafka_record(record) + assert result is not None + assert result[PARTITION_IDX] == 0 + assert result[OFFSET_IDX] == 0 + + def test_utf8_payload_decoded_correctly(self) -> None: + payload = '{"exchange": "binance", "symbol": "ETHUSDT"}' + record = _make_record(payload.encode("utf-8"), partition=1, offset=500) + result = parse_kafka_record(record) + assert result is not None + assert result[PAYLOAD_IDX] == payload + + def test_large_offset_handled(self) -> None: + large_offset = 2**40 + record = _make_record(b"{}", partition=0, offset=large_offset) + result = parse_kafka_record(record) + assert result is not None + assert result[OFFSET_IDX] == large_offset + + def test_high_partition_number(self) -> None: + record = _make_record(b"{}", partition=47, offset=1) + result = parse_kafka_record(record) + assert result is not None + assert result[PARTITION_IDX] == 47 + + def test_index_constants_are_distinct(self) -> None: + assert len({PAYLOAD_IDX, PARTITION_IDX, OFFSET_IDX}) == 3 diff --git a/replay/src/replay/iceberg_reader.py b/replay/src/replay/iceberg_reader.py index 85dca2a..a0dbb2b 100644 --- a/replay/src/replay/iceberg_reader.py +++ b/replay/src/replay/iceberg_reader.py @@ -10,15 +10,10 @@ time window. 4. Return min/max (partition, offset) per partition. -Known limitation: normalize.py currently sets kafka_partition=-1 and -kafka_offset=-1 because SimpleStringSchema doesn't expose Kafka record metadata. -When offsets are -1, this reader falls back to Kafka's offsets_for_times() API, -which finds the first offset whose timestamp is >= the requested time. The -Kafka path is always tried as a verification step even when Iceberg offsets are -available. - -See the Known Limitations section in docs/DEBUGGING_PHASE3.md for the fix -(implement KafkaRecordDeserializationSchema in normalize.py). +Rows written before the KafkaRecordDeserializer fix (see lib/kafka_schema.py) +will have kafka_partition=-1 and kafka_offset=-1. When all rows in the scan +window carry -1, this reader falls back to Kafka's offsets_for_times() API. +New rows written after the fix carry real offsets and take the precise path. """ from __future__ import annotations diff --git a/replay/src/replay/producer.py b/replay/src/replay/producer.py index 29aaf16..c1a2a46 100644 --- a/replay/src/replay/producer.py +++ b/replay/src/replay/producer.py @@ -6,7 +6,7 @@ 1. Determine which Kafka offsets correspond to the requested time window: a. Try Iceberg reader (reads kafka_offset from normalized.book_ticker). b. Fall back to Kafka's offsets_for_times() when Iceberg returns no ranges - (the current state due to kafka_offset=-1 in normalize.py). + (e.g. rows written before the KafkaRecordDeserializer fix still have -1). 2. For each partition in the offset range: a. Seek a temporary consumer to start_offset. b. Read records up to end_offset (or max_events limit).