Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ ingest/src/ingest/

### Known limitations

- `kafka_partition` and `kafka_offset` hardcoded to `-1` in normalize output — `SimpleStringSchema` doesn't expose Kafka record metadata. Fix: implement `KafkaRecordDeserializationSchema`.
~~`kafka_partition` and `kafka_offset` hardcoded to `-1` in normalize output~~**Fixed (2026-05-17):** replaced `SimpleStringSchema` with `KafkaRecordDeserializer` (`lib/kafka_schema.py`) in both `normalize.py` and `cdc_symbol_config.py`. Real partition and offset are now stored in Iceberg; `iceberg_reader.py` precise-seek path is active for rows written after the fix.

---

Expand Down
33 changes: 16 additions & 17 deletions flink/jobs/cdc_symbol_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@
from pathlib import Path

from lib.config import Config
from lib.kafka_schema import (
OFFSET_IDX,
PARTITION_IDX,
PAYLOAD_IDX,
KafkaRecordDeserializer,
)
from lib.sql_runner import add_inserts_from_file, execute_sql_file
from pyflink.common import Row, WatermarkStrategy
from pyflink.common.restart_strategy import RestartStrategies
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import (
CheckpointingMode,
Expand Down Expand Up @@ -133,19 +138,17 @@ class CdcEnvelopeProcessor(FlatMapFunction):
- Missing payload (both before and after null) → log warning, skip
"""

def flat_map(self, value: str) -> None: # type: ignore[override]
def flat_map(self, value: Row) -> None: # type: ignore[override]
now_ms = int(datetime.now(UTC).timestamp() * 1000)

# Tombstone guard: null Kafka value (should not arrive; tombstones.on.delete=false)
if value is None:
log.warning("cdc_tombstone_received: null value skipped (connector misconfigured?)")
return
kafka_partition: int = value[PARTITION_IDX]
kafka_offset: int = value[OFFSET_IDX]
raw_json: str = value[PAYLOAD_IDX]

# ── Parse Debezium envelope ────────────────────────────────────────────
try:
envelope = json.loads(value)
envelope = json.loads(raw_json)
except (json.JSONDecodeError, ValueError) as exc:
log.warning("cdc_parse_error error=%s msg_preview=%.200s", exc, value)
log.warning("cdc_parse_error error=%s msg_preview=%.200s", exc, raw_json)
return

op: str = envelope.get("op", "")
Expand Down Expand Up @@ -186,10 +189,6 @@ def flat_map(self, value: str) -> None: # type: ignore[override]
)

# ── Emit bronze row ────────────────────────────────────────────────────
# kafka_partition and kafka_offset are -1 because SimpleStringSchema
# doesn't expose Kafka record metadata (same limitation as normalize.py).
# The source_lsn from Debezium's source block is the canonical ordering key
# for replay purposes.
yield Row(
op,
symbol,
Expand All @@ -199,9 +198,9 @@ def flat_map(self, value: str) -> None: # type: ignore[override]
source_ts_ms,
tx_id,
debezium_ts_ms,
Config.TOPIC_CDC, # kafka_topic
-1, # kafka_partition (not available via SimpleStringSchema)
-1, # kafka_offset (not available via SimpleStringSchema)
Config.TOPIC_CDC,
kafka_partition,
kafka_offset,
now_ms,
)

Expand Down Expand Up @@ -273,7 +272,7 @@ def main() -> None:
.set_topics(cfg.TOPIC_CDC)
.set_group_id(cfg.GROUP_CDC_BRONZE)
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
.set_value_only_deserializer(SimpleStringSchema())
.set_deserializer(KafkaRecordDeserializer())
.build(),
WatermarkStrategy.no_watermarks(),
"cdc-raw-source",
Expand Down
73 changes: 73 additions & 0 deletions flink/jobs/lib/kafka_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Kafka record deserialization helpers.

The pure function parse_kafka_record is importable without PyFlink (used in
unit tests running on Python 3.12 in the root workspace). The
KafkaRecordDeserializer class and KAFKA_RECORD_TYPE are only defined when
PyFlink is present; they live under a try/except so the module remains
importable in the test environment.

Why this exists
---------------
SimpleStringSchema is a value-only deserializer: it discards all Kafka record
metadata (partition, offset, timestamp, key). Switching to
KafkaRecordDeserializationSchema lets downstream Flink jobs store the real
kafka_partition and kafka_offset in Iceberg instead of -1, enabling precise
offset-based seek during replay.
"""

from __future__ import annotations

# Positional indices into the Row emitted by KafkaRecordDeserializer.
# Use these constants instead of magic numbers in job code.
PAYLOAD_IDX = 0
PARTITION_IDX = 1
OFFSET_IDX = 2


def parse_kafka_record(record: object) -> tuple[str, int, int] | None:
"""Extract (payload_str, partition, offset) from a Kafka ConsumerRecord.

Duck-typed: any object with .value(), .partition(), .offset() methods works,
which makes this function testable without PyFlink.

Returns None for tombstone records (null value bytes).
"""
value_bytes = record.value() # type: ignore[union-attr]
if value_bytes is None:
return None
payload = bytes(value_bytes).decode("utf-8")
partition: int = record.partition() # type: ignore[union-attr]
offset: int = record.offset() # type: ignore[union-attr]
return (payload, partition, offset)


try:
from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.kafka import KafkaRecordDeserializationSchema

KAFKA_RECORD_TYPE = Types.ROW_NAMED(
["payload", "kafka_partition", "kafka_offset"],
[Types.STRING(), Types.INT(), Types.LONG()],
)

class KafkaRecordDeserializer(KafkaRecordDeserializationSchema):
"""Replaces SimpleStringSchema to capture real kafka_partition and kafka_offset.

Emits Row(payload: str, kafka_partition: int, kafka_offset: long).
Tombstone records (null value) are suppressed here so downstream
processors never receive None payloads.
"""

def deserialize(self, record: object, collector: object) -> None:
result = parse_kafka_record(record)
if result is not None:
payload, partition, offset = result
collector.collect(Row(payload, partition, offset)) # type: ignore[union-attr]

def get_produced_type(self) -> object:
return KAFKA_RECORD_TYPE

except ImportError:
pass
25 changes: 16 additions & 9 deletions flink/jobs/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
from typing import Any

from lib.config import Config
from lib.kafka_schema import (
OFFSET_IDX,
PARTITION_IDX,
PAYLOAD_IDX,
KafkaRecordDeserializer,
)
from lib.logic import (
apply_depth_diff,
best_ask,
Expand All @@ -43,7 +49,6 @@
from lib.sql_runner import add_inserts_from_file, execute_sql_file
from pyflink.common import Row, WatermarkStrategy
from pyflink.common.restart_strategy import RestartStrategies
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import (
CheckpointingMode,
Expand Down Expand Up @@ -125,9 +130,9 @@ def _to_epoch_ms(ts: Any) -> int:
return int(datetime.now(UTC).timestamp() * 1000)


def _kafka_key(msg: str) -> str:
"""Extract routing key from raw JSON message (called by key_by)."""
data = json.loads(msg)
def _kafka_key(record: Row) -> str:
"""Extract routing key from Kafka record Row(payload, partition, offset)."""
data = json.loads(record[PAYLOAD_IDX])
return f"{data['exchange']}#{data['symbol']}"


Expand Down Expand Up @@ -155,9 +160,11 @@ def open(self, runtime_context: RuntimeContext) -> None: # type: ignore[overrid
)

def process_element( # type: ignore[override]
self, value: str, ctx: KeyedProcessFunction.Context
self, value: Row, ctx: KeyedProcessFunction.Context
) -> None:
msg = json.loads(value)
kafka_partition: int = value[PARTITION_IDX]
kafka_offset: int = value[OFFSET_IDX]
msg = json.loads(value[PAYLOAD_IDX])
last_update_id: int = int(msg["last_update_id"])

# ── 1. Dedup ──────────────────────────────────────────────────────────
Expand Down Expand Up @@ -222,8 +229,8 @@ def process_element( # type: ignore[override]
compute_mid_price(bb_price, ba_price),
compute_imbalance(book["bids"], book["asks"]),
Config.TOPIC_RAW,
-1, # partition not available via SimpleStringSchema
-1, # offset not available via SimpleStringSchema
kafka_partition,
kafka_offset,
)


Expand Down Expand Up @@ -292,7 +299,7 @@ def main() -> None:
.set_topics(cfg.TOPIC_RAW)
.set_group_id(cfg.GROUP_NORMALIZE)
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
.set_value_only_deserializer(SimpleStringSchema())
.set_deserializer(KafkaRecordDeserializer())
.build(),
WatermarkStrategy.no_watermarks(),
"raw-orderbook-source",
Expand Down
61 changes: 61 additions & 0 deletions flink/tests/unit/test_kafka_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Unit tests for lib/kafka_schema.py — no PyFlink dependency."""

from unittest.mock import MagicMock

from lib.kafka_schema import OFFSET_IDX, PARTITION_IDX, PAYLOAD_IDX, parse_kafka_record


def _make_record(
value_bytes: bytes | None,
partition: int = 0,
offset: int = 0,
) -> MagicMock:
record = MagicMock()
record.value.return_value = value_bytes
record.partition.return_value = partition
record.offset.return_value = offset
return record


class TestParseKafkaRecord:
def test_returns_payload_partition_offset(self) -> None:
record = _make_record(b'{"symbol": "BTCUSDT"}', partition=2, offset=12345)
result = parse_kafka_record(record)
assert result is not None
assert result[PAYLOAD_IDX] == '{"symbol": "BTCUSDT"}'
assert result[PARTITION_IDX] == 2
assert result[OFFSET_IDX] == 12345

def test_tombstone_returns_none(self) -> None:
record = _make_record(None, partition=0, offset=99)
assert parse_kafka_record(record) is None

def test_partition_zero_and_offset_zero_are_valid(self) -> None:
record = _make_record(b"{}", partition=0, offset=0)
result = parse_kafka_record(record)
assert result is not None
assert result[PARTITION_IDX] == 0
assert result[OFFSET_IDX] == 0

def test_utf8_payload_decoded_correctly(self) -> None:
payload = '{"exchange": "binance", "symbol": "ETHUSDT"}'
record = _make_record(payload.encode("utf-8"), partition=1, offset=500)
result = parse_kafka_record(record)
assert result is not None
assert result[PAYLOAD_IDX] == payload

def test_large_offset_handled(self) -> None:
large_offset = 2**40
record = _make_record(b"{}", partition=0, offset=large_offset)
result = parse_kafka_record(record)
assert result is not None
assert result[OFFSET_IDX] == large_offset

def test_high_partition_number(self) -> None:
record = _make_record(b"{}", partition=47, offset=1)
result = parse_kafka_record(record)
assert result is not None
assert result[PARTITION_IDX] == 47

def test_index_constants_are_distinct(self) -> None:
assert len({PAYLOAD_IDX, PARTITION_IDX, OFFSET_IDX}) == 3
13 changes: 4 additions & 9 deletions replay/src/replay/iceberg_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,10 @@
time window.
4. Return min/max (partition, offset) per partition.

Known limitation: normalize.py currently sets kafka_partition=-1 and
kafka_offset=-1 because SimpleStringSchema doesn't expose Kafka record metadata.
When offsets are -1, this reader falls back to Kafka's offsets_for_times() API,
which finds the first offset whose timestamp is >= the requested time. The
Kafka path is always tried as a verification step even when Iceberg offsets are
available.

See the Known Limitations section in docs/DEBUGGING_PHASE3.md for the fix
(implement KafkaRecordDeserializationSchema in normalize.py).
Rows written before the KafkaRecordDeserializer fix (see lib/kafka_schema.py)
will have kafka_partition=-1 and kafka_offset=-1. When all rows in the scan
window carry -1, this reader falls back to Kafka's offsets_for_times() API.
New rows written after the fix carry real offsets and take the precise path.
"""

from __future__ import annotations
Expand Down
2 changes: 1 addition & 1 deletion replay/src/replay/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
1. Determine which Kafka offsets correspond to the requested time window:
a. Try Iceberg reader (reads kafka_offset from normalized.book_ticker).
b. Fall back to Kafka's offsets_for_times() when Iceberg returns no ranges
(the current state due to kafka_offset=-1 in normalize.py).
(e.g. rows written before the KafkaRecordDeserializer fix still have -1).
2. For each partition in the offset range:
a. Seek a temporary consumer to start_offset.
b. Read records up to end_offset (or max_events limit).
Expand Down
Loading