From 6c4c3542e3eeb54c89f594d328f18944802285d8 Mon Sep 17 00:00:00 2001 From: Andrew Chen Date: Sun, 17 May 2026 19:30:23 -0500 Subject: [PATCH] Load testing with new Kafka offset logic and update documentation --- .dockerignore | 4 + docs/DEBUGGING_PHASE5.md | 227 ++++++++++++++++++ docs/ROADMAP.md | 51 +++- flink/.dockerignore | 4 + flink/jobs/cdc_symbol_config.py | 26 +- flink/jobs/flink_lib/kafka_schema.py | 46 +--- flink/jobs/lib/kafka_schema.py | 73 ------ flink/jobs/normalize.py | 31 +-- .../sql/cdc_symbol_config/source_bronze.sql | 20 ++ flink/jobs/sql/normalize/source_raw.sql | 21 ++ 10 files changed, 348 insertions(+), 155 deletions(-) create mode 100644 .dockerignore create mode 100644 docs/DEBUGGING_PHASE5.md create mode 100644 flink/.dockerignore delete mode 100644 flink/jobs/lib/kafka_schema.py create mode 100644 flink/jobs/sql/cdc_symbol_config/source_bronze.sql create mode 100644 flink/jobs/sql/normalize/source_raw.sql diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..43dfdf6 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +**/__pycache__ +**/*.pyc +**/*.pyo +.git/ diff --git a/docs/DEBUGGING_PHASE5.md b/docs/DEBUGGING_PHASE5.md new file mode 100644 index 0000000..667970f --- /dev/null +++ b/docs/DEBUGGING_PHASE5.md @@ -0,0 +1,227 @@ +# TickSense — Phase 5 Debug Runbook (Airflow + Spark + Kafka Offset Fix) + +--- + +## Flink + +### `ImportError: cannot import name 'KafkaRecordDeserializer' from 'flink_lib.kafka_schema'` + +**Symptom:** + +``` +=== Submitting normalize job === +Traceback (most recent call last): + File "/opt/flink/jobs/normalize.py", line 35, in + from flink_lib.kafka_schema import ( +ImportError: cannot import name 'KafkaRecordDeserializer' from 'flink_lib.kafka_schema' +org.apache.flink.client.program.ProgramAbortException: ... + Python process exits with code: 1 +``` + +`flink-init` exits with code 31. This cascades: `dbt-runner` depends on `flink-init: service_completed_successfully`, so it never starts. `api` depends on `dbt-runner`, `prometheus` depends on `api`, `grafana` depends on `prometheus` — all four end up in Docker "Created" (never started) state. + +**Root cause (two-layer):** + +1. `KafkaRecordDeserializer` inherits from `KafkaRecordDeserializationSchema`. That class exists in the Flink Java API but **is not exposed in PyFlink 1.18 Python bindings**. The `try/except ImportError: pass` in `kafka_schema.py` silently swallows the import failure, so `KafkaRecordDeserializer` is never defined. + +2. Even if it were importable, `KafkaSourceBuilder` in PyFlink 1.18 only exposes `set_value_only_deserializer()` — there is no `set_deserializer()` method at all. + +**Verification:** + +```bash +docker run --rm ticksense-flink-jobmanager python3 -c \ + "from pyflink.datastream.connectors.kafka import KafkaRecordDeserializationSchema" +# ImportError: cannot import name 'KafkaRecordDeserializationSchema' ... + +docker run --rm ticksense-flink-jobmanager python3 -c \ + "from pyflink.datastream.connectors.kafka import KafkaSource; \ + print([m for m in dir(KafkaSource.builder()) if not m.startswith('_')])" +# [..., 'set_value_only_deserializer'] ← no set_deserializer +``` + +**Fix:** Replace the `KafkaSource.builder()` DataStream approach with a Flink SQL Table API source that exposes Kafka metadata columns directly. Real partition and offset are captured via `METADATA FROM 'partition' VIRTUAL` and `METADATA FROM 'offset' VIRTUAL` columns, then bridged to DataStream via `t_env.to_append_stream()`. + +New SQL source table pattern (used in `normalize/source_raw.sql` and `cdc_symbol_config/source_bronze.sql`): + +```sql +CREATE TEMPORARY TABLE raw_orderbook_stream ( + payload STRING, + kafka_partition INT METADATA FROM 'partition' VIRTUAL, + kafka_offset BIGINT METADATA FROM 'offset' VIRTUAL +) WITH ( + 'connector' = 'kafka', + 'topic' = '{topic_raw}', + ... + 'format' = 'raw' +) +``` + +In Python: + +```python +execute_sql_file(t_env, SQL / "normalize" / "source_raw.sql", **cfg.as_dict()) +raw_stream = t_env.to_append_stream( + t_env.sql_query( + "SELECT payload, kafka_partition, kafka_offset FROM raw_orderbook_stream" + ), + KAFKA_RECORD_TYPE, +) +``` + +`KafkaRecordDeserializer` class removed from `flink_lib/kafka_schema.py` entirely. + +--- + +### Why `METADATA FROM 'partition' VIRTUAL` and not just `METADATA FROM 'partition'` + +`VIRTUAL` marks the column as **read-only metadata**. Without it, Flink treats the column as both readable and writable. On a source-only table this doesn't matter functionally, but omitting `VIRTUAL` on a metadata column that Kafka controls (partition assignment is done by the broker, not the producer) is misleading and would cause errors if the table were ever used as a sink. + +The value is populated automatically by the Flink Kafka connector: when Flink reads each `ConsumerRecord`, the connector extracts `record.partition()` and `record.offset()` from the Kafka metadata and fills the declared columns. No user code required. + +**Mental model:** Same as PostgreSQL's `ctid` or a database `ROWID` — the system fills it, you can read it, you cannot write it. + +--- + +### `format = 'raw'` vs `format = 'json'` for the DataStream source + +The existing `normalize/source.sql` uses `format = 'json'` and declares `bids ARRAY>` and `asks ARRAY>`. In PyFlink 1.18, `ARRAY>` cannot be bridged through `to_append_stream()` — it triggers a `LegacyTypeInformationType` cast error (documented in Phase 2 bugs). + +`format = 'raw'` reads the entire Kafka message value as a single `STRING`. The table must have exactly one non-metadata column of type `STRING` or `BYTES`. The resulting DataStream emits `Row(payload, kafka_partition, kafka_offset)` with no complex nested types. + +The `OrderBookProcessor` already parses the JSON payload in Python, so it does not need the structured fields from the SQL layer. The `source.sql` table (with full JSON schema) is kept for potential future Table-API-only paths but is not used in the DataStream job. + +--- + +### Phase 5 lib namespace conflict — `lib/kafka_schema.py` not deleted after rename + +**Symptom:** Even after Phase 5 renamed `flink/jobs/lib/` to `flink/jobs/flink_lib/`, a stale `flink/jobs/lib/kafka_schema.py` survived in the git tree. + +**Root cause:** The merge commit (`bb45f4f`) handled the rename correctly for `__init__.py`, `config.py`, `logic.py`, and `sql_runner.py` (shown as `{lib => flink_lib}/file` in the diff), but `kafka_schema.py` was added as a new file to `flink_lib/` **without deleting** the old `lib/kafka_schema.py`. Git `ls-files flink/jobs/lib/` confirmed the file was still tracked. + +**Effect:** Two copies of the module existed in the Docker image: +- `/opt/flink/jobs/flink_lib/kafka_schema.py` (correct, used) +- `/opt/flink/jobs/lib/kafka_schema.py` (stale, harmless on its own but confusing) + +**Fix:** `git rm flink/jobs/lib/kafka_schema.py`. + +--- + +### Container cascade from flink-init failure + +When `flink-init` exits with a non-zero code, Docker Compose does not start any service whose `depends_on` specifies `condition: service_completed_successfully` for `flink-init`. The full cascade in this project: + +``` +flink-init (Exit 31) + └─ dbt-runner (Created — never started) + └─ api (Created — never started) + └─ prometheus (Created — never started) + └─ grafana (Created — never started) +``` + +Diagnosis: look for exit code in `docker ps -a`. "Created" (not "Exited") on downstream containers is the tell. + +**Rule:** Fix the flink job submission error first. Everything else unblocks automatically. + +--- + +## Docker + +### `__pycache__` directories copied into Docker images + +**Symptom:** After `make build`, running `docker run --rm find /opt/flink/jobs -name "__pycache__"` shows bytecode directories from the host machine. + +**Root cause:** No `.dockerignore` file. The `COPY jobs/ /opt/flink/jobs/` instruction copies everything, including `__pycache__/` directories containing `.pyc` files compiled by the host Python (3.13). The Flink container runs Python 3.10. Python detects the version mismatch and ignores the `.pyc` files, but they bloat the image and are confusing. + +Similarly, the `ingest` and `api` images use `context: .` (project root) with `COPY ingest/ ./ingest/` and `COPY api/ ./api/`, pulling in test `__pycache__` directories. + +**Fix:** + +`flink/.dockerignore`: +``` +**/__pycache__ +**/*.pyc +**/*.pyo +tests/ +``` + +`.dockerignore` (root, for `ingest` and `api` builds): +``` +**/__pycache__ +**/*.pyc +**/*.pyo +.git/ +``` + +**Verification:** +```bash +docker run --rm ticksense-flink-init find /opt/flink/jobs -name "__pycache__" -type d +# (no output — clean) +``` + +**Note:** Each build context needs its own `.dockerignore`. The root `.dockerignore` is read when the build context is `.`. The `flink/.dockerignore` is read when the build context is `./flink`. They do not inherit from each other. + +--- + +## Airflow (WIP — pending E2E test) + +The Airflow DAGs and Docker image are implemented but have not yet been run end-to-end against the live stack. Notes below are based on code review only. + +### DAG structure + +| DAG | Schedule | Purpose | +|---|---|---| +| `backfill_ohlcv_1m` | manual / on-demand | Spark: recompute OHLCV for a date range | +| `compact_iceberg_tables` | `0 2 * * *` | Spark: `rewrite_data_files` on bronze + normalized tables | +| `expire_iceberg_snapshots` | `0 3 * * *` | Retain last 7 days; older snapshots deleted | +| `run_dbt_models` | `0 4 * * *` | `dbt run --full-refresh` on mart models | +| `run_great_expectations` | `0 5 * * *` | Row count, null check, price range validation | +| `freshness_sla_check` | `*/5 * * * *` | Alert if any symbol > 35s stale | + +### Known risks to test + +- `backfill_ohlcv_1m` passes date-range params to Spark via `SparkSubmitOperator`. Verify param passing and idempotency (re-running same date range should produce same result, not duplicates). +- `compact_iceberg_tables` uses `rewrite_data_files` with a `min_file_size_bytes` threshold. Verify it doesn't compact files that are actively being written by Flink. +- `freshness_sla_check` queries Trino. Verify the Trino connection string works from inside the Airflow container network. + +--- + +## Spark (WIP — pending E2E test) + +The Spark jobs are implemented but have not yet been submitted against the live stack. + +### Job structure + +| Job | Trigger | Purpose | +|---|---|---| +| `backfill_ohlcv.py` | Airflow / CLI | Recompute 1m OHLCV from bronze for a given date range | +| `compact_tables.py` | Airflow | `rewrite_data_files` on Iceberg tables | + +### `spark_lib/` package + +- `config.py`: reads env vars (`ICEBERG_REST_URI`, `S3_ENDPOINT`, `AWS_*`, date range params) +- `sql_runner.py`: minimal SQL file execution helper (mirrors `flink_lib/sql_runner.py` pattern) + +### Why `spark_lib/` not `lib/` + +Phase 5 originally used a bare `lib/` directory in both `flink/jobs/` and `spark/jobs/`, which created a Python namespace package collision when both were on `sys.path`. Renamed to `flink_lib/` and `spark_lib/` respectively to give each service a unique package name. + +### Known risks to test + +- Spark job uses `IcebergSparkSessionExtensions` and `SparkCatalog`. Verify catalog config matches the REST catalog endpoint and S3 credentials. +- `backfill_ohlcv.py` writes to `normalized.ohlcv_1m` which Flink also writes to. Test for concurrent write conflicts (Iceberg uses optimistic concurrency — conflicts result in retries, not data loss, but worth verifying). +- SQL files (`backfill_ohlcv.sql`, `compact_table.sql`) use Iceberg `CALL` procedures. Verify these are supported by the installed `iceberg-spark-runtime` JAR version. + +--- + +## Port reference + +| Service | Host port | +|---|---| +| FastAPI | 8000 | +| Prometheus | 9090 | +| Grafana | 3000 | +| Trino | 8082 | +| MinIO UI | 9001 | +| Flink UI | 8081 | +| Redpanda Console | 8080 | +| Debezium | 8083 | diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index 6d6a89a..d3738dd 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -239,11 +239,34 @@ GET /symbols active trading pairs from CDC symbol_config --- -## Phase 5 — Ops + Observability +## Phase 5 — Ops + Observability (partial ✅ — Airflow + Spark pending E2E) -**Goal:** Airflow orchestration, data quality checks, freshness SLA alerts. +**Goal:** Airflow orchestration, Spark backfill/compaction, data quality checks, freshness SLA alerts. -### Airflow DAGs +**Status (2026-05-17):** Code complete. Flink kafka-offset fix shipped and verified. Airflow and Spark implemented but not yet E2E tested against the live stack. + +### Kafka offset fix ✅ + +The `kafka_partition` and `kafka_offset` columns were previously hardcoded to `-1` in `normalize.py` and `cdc_symbol_config.py` because the `KafkaRecordDeserializationSchema` class is not exposed in PyFlink 1.18 Python bindings, and `KafkaSourceBuilder.set_deserializer()` does not exist in Python. + +**Fix:** Replaced the DataStream `KafkaSource.builder()` approach with a Flink SQL Table API source using `METADATA FROM 'partition' VIRTUAL` and `METADATA FROM 'offset' VIRTUAL` columns. Real partition and offset are now stored in Iceberg. The `to_append_stream()` bridge works cleanly because the source schema uses only `STRING`, `INT`, and `BIGINT` — no complex types. + +Files changed: `flink_lib/kafka_schema.py` (removed `KafkaRecordDeserializer`), `normalize.py`, `cdc_symbol_config.py`, new `sql/normalize/source_raw.sql`, new `sql/cdc_symbol_config/source_bronze.sql`. + +See `docs/DEBUGGING_PHASE5.md` for full explanation including the `VIRTUAL` keyword semantics. + +### Docker `.dockerignore` ✅ + +Added `.dockerignore` files to exclude `__pycache__` and `.pyc` files from all Docker build contexts. Host Python 3.13 bytecodes were previously being copied into Python 3.10 Flink containers (harmless but wasteful and confusing). + +- `flink/.dockerignore`: covers `flink-jobmanager`, `flink-taskmanager`, `flink-init` +- `.dockerignore` (root): covers `ingest` and `api` (both use `context: .`) + +### lib namespace conflict fix ✅ + +`flink/jobs/lib/` was renamed to `flink/jobs/flink_lib/` and `spark/jobs/lib/` to `spark/jobs/spark_lib/` to eliminate a Python namespace package collision when both service packages are on `sys.path`. The Phase 5 merge commit accidentally left `flink/jobs/lib/kafka_schema.py` undeleted — removed in the fix commit. + +### Airflow DAGs (implemented, pending E2E test) ``` backfill_ohlcv_1m Spark: recompute OHLCV for a date range (idempotent) @@ -261,12 +284,24 @@ freshness_sla_check Alert if any symbol > 35s stale - Row count within expected range per `(exchange, symbol, date)` - Freshness: `max(exchange_event_ts) > now() - 35s` +### Spark jobs (implemented, pending E2E test) + +- `backfill_ohlcv.py`: recompute `normalized.ohlcv_1m` from bronze for a given date range +- `compact_tables.py`: `rewrite_data_files` on Iceberg bronze + normalized tables + **Deliverables:** -- [ ] 6 Airflow DAGs, all idempotent, all with exponential-backoff retries -- [ ] Spark compaction job for `normalized.book_ticker` -- [ ] Great Expectations suite with checkpoint -- [ ] Freshness alert fires when a symbol goes stale (can simulate by stopping ingest) -- [ ] `make test` passes +- [x] `flink_lib/kafka_schema.py`: real partition/offset via SQL metadata columns +- [x] `normalize.py` + `cdc_symbol_config.py`: use `to_append_stream()` from SQL source +- [x] `sql/normalize/source_raw.sql` + `sql/cdc_symbol_config/source_bronze.sql` +- [x] `flink/.dockerignore` + root `.dockerignore` +- [x] lib namespace conflict resolved (`flink_lib/`, `spark_lib/`) +- [x] 6 Airflow DAGs (`airflow/dags/`) +- [x] 2 Spark jobs (`spark/jobs/`) +- [x] `airflow/tests/` + `spark/tests/` with unit tests; all pass in `make test` +- [ ] Airflow DAGs E2E verified against live stack +- [ ] Spark compaction job E2E verified (no conflict with live Flink writes) +- [ ] `backfill_ohlcv_1m` idempotency verified (same date range → same row count) +- [ ] Freshness alert fires when ingest is stopped **Done when:** Airflow UI shows all DAGs green on a nightly run. diff --git a/flink/.dockerignore b/flink/.dockerignore new file mode 100644 index 0000000..b8f9a2d --- /dev/null +++ b/flink/.dockerignore @@ -0,0 +1,4 @@ +**/__pycache__ +**/*.pyc +**/*.pyo +tests/ diff --git a/flink/jobs/cdc_symbol_config.py b/flink/jobs/cdc_symbol_config.py index 4280f93..ecd7b93 100644 --- a/flink/jobs/cdc_symbol_config.py +++ b/flink/jobs/cdc_symbol_config.py @@ -58,23 +58,19 @@ from flink_lib.config import Config from flink_lib.kafka_schema import ( + KAFKA_RECORD_TYPE, OFFSET_IDX, PARTITION_IDX, PAYLOAD_IDX, - KafkaRecordDeserializer, ) from flink_lib.sql_runner import add_inserts_from_file, execute_sql_file -from pyflink.common import Row, WatermarkStrategy +from pyflink.common import Row from pyflink.common.restart_strategy import RestartStrategies from pyflink.common.typeinfo import Types from pyflink.datastream import ( CheckpointingMode, StreamExecutionEnvironment, ) -from pyflink.datastream.connectors.kafka import ( - KafkaOffsetsInitializer, - KafkaSource, -) from pyflink.datastream.functions import FlatMapFunction from pyflink.table import DataTypes, Schema, StreamTableEnvironment @@ -264,18 +260,12 @@ def main() -> None: log.info("cdc_ddl source cdc_symbol_config_source (debezium-json) registered") # ── DataStream: bronze path (raw JSON → CdcEnvelopeProcessor) ──────────── - # Uses a separate consumer group (flink-cdc-bronze) so it progresses - # independently from the Table API normalized path. - raw_stream = env.from_source( - KafkaSource.builder() - .set_bootstrap_servers(cfg.kafka_brokers) - .set_topics(cfg.TOPIC_CDC) - .set_group_id(cfg.GROUP_CDC_BRONZE) - .set_starting_offsets(KafkaOffsetsInitializer.earliest()) - .set_deserializer(KafkaRecordDeserializer()) - .build(), - WatermarkStrategy.no_watermarks(), - "cdc-raw-source", + # Separate consumer group from the normalized Table API path so both progress + # independently. Bridged from the SQL source table to get real partition/offset. + execute_sql_file(t_env, SQL / "cdc_symbol_config" / "source_bronze.sql", **cfg.as_dict()) + raw_stream = t_env.to_append_stream( + t_env.sql_query("SELECT payload, kafka_partition, kafka_offset FROM cdc_bronze_stream"), + KAFKA_RECORD_TYPE, ) log.info( "cdc_source DataStream Kafka source created topic=%s group=%s", diff --git a/flink/jobs/flink_lib/kafka_schema.py b/flink/jobs/flink_lib/kafka_schema.py index ba8990e..8cb26c9 100644 --- a/flink/jobs/flink_lib/kafka_schema.py +++ b/flink/jobs/flink_lib/kafka_schema.py @@ -1,25 +1,19 @@ """ -Kafka record deserialization helpers. +Kafka record helpers shared across Flink jobs and unit tests. -The pure function parse_kafka_record is importable without PyFlink (used in -unit tests running on Python 3.12 in the root workspace). The -KafkaRecordDeserializer class and KAFKA_RECORD_TYPE are only defined when -PyFlink is present; they live under a try/except so the module remains -importable in the test environment. +parse_kafka_record is importable without PyFlink (used in unit tests). +KAFKA_RECORD_TYPE is only defined when PyFlink is present; the try/except +keeps the module importable in the test environment. -Why this exists ---------------- -SimpleStringSchema is a value-only deserializer: it discards all Kafka record -metadata (partition, offset, timestamp, key). Switching to -KafkaRecordDeserializationSchema lets downstream Flink jobs store the real -kafka_partition and kafka_offset in Iceberg instead of -1, enabling precise -offset-based seek during replay. +Real partition and offset are captured via Flink SQL Kafka source metadata +columns ('partition' VIRTUAL, 'offset' VIRTUAL) and bridged to DataStream +with to_append_stream(). This is the supported path in PyFlink 1.18; the +Python bindings do not expose KafkaRecordDeserializationSchema. """ from __future__ import annotations -# Positional indices into the Row emitted by KafkaRecordDeserializer. -# Use these constants instead of magic numbers in job code. +# Positional indices into Row(payload, kafka_partition, kafka_offset). PAYLOAD_IDX = 0 PARTITION_IDX = 1 OFFSET_IDX = 2 @@ -29,7 +23,7 @@ def parse_kafka_record(record: object) -> tuple[str, int, int] | None: """Extract (payload_str, partition, offset) from a Kafka ConsumerRecord. Duck-typed: any object with .value(), .partition(), .offset() methods works, - which makes this function testable without PyFlink. + making this testable without PyFlink. Returns None for tombstone records (null value bytes). """ @@ -43,31 +37,11 @@ def parse_kafka_record(record: object) -> tuple[str, int, int] | None: try: - from pyflink.common import Row from pyflink.common.typeinfo import Types - from pyflink.datastream.connectors.kafka import KafkaRecordDeserializationSchema KAFKA_RECORD_TYPE = Types.ROW_NAMED( ["payload", "kafka_partition", "kafka_offset"], [Types.STRING(), Types.INT(), Types.LONG()], ) - - class KafkaRecordDeserializer(KafkaRecordDeserializationSchema): - """Replaces SimpleStringSchema to capture real kafka_partition and kafka_offset. - - Emits Row(payload: str, kafka_partition: int, kafka_offset: long). - Tombstone records (null value) are suppressed here so downstream - processors never receive None payloads. - """ - - def deserialize(self, record: object, collector: object) -> None: - result = parse_kafka_record(record) - if result is not None: - payload, partition, offset = result - collector.collect(Row(payload, partition, offset)) # type: ignore[union-attr] - - def get_produced_type(self) -> object: - return KAFKA_RECORD_TYPE - except ImportError: pass diff --git a/flink/jobs/lib/kafka_schema.py b/flink/jobs/lib/kafka_schema.py deleted file mode 100644 index ba8990e..0000000 --- a/flink/jobs/lib/kafka_schema.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -Kafka record deserialization helpers. - -The pure function parse_kafka_record is importable without PyFlink (used in -unit tests running on Python 3.12 in the root workspace). The -KafkaRecordDeserializer class and KAFKA_RECORD_TYPE are only defined when -PyFlink is present; they live under a try/except so the module remains -importable in the test environment. - -Why this exists ---------------- -SimpleStringSchema is a value-only deserializer: it discards all Kafka record -metadata (partition, offset, timestamp, key). Switching to -KafkaRecordDeserializationSchema lets downstream Flink jobs store the real -kafka_partition and kafka_offset in Iceberg instead of -1, enabling precise -offset-based seek during replay. -""" - -from __future__ import annotations - -# Positional indices into the Row emitted by KafkaRecordDeserializer. -# Use these constants instead of magic numbers in job code. -PAYLOAD_IDX = 0 -PARTITION_IDX = 1 -OFFSET_IDX = 2 - - -def parse_kafka_record(record: object) -> tuple[str, int, int] | None: - """Extract (payload_str, partition, offset) from a Kafka ConsumerRecord. - - Duck-typed: any object with .value(), .partition(), .offset() methods works, - which makes this function testable without PyFlink. - - Returns None for tombstone records (null value bytes). - """ - value_bytes = record.value() # type: ignore[union-attr] - if value_bytes is None: - return None - payload = bytes(value_bytes).decode("utf-8") - partition: int = record.partition() # type: ignore[union-attr] - offset: int = record.offset() # type: ignore[union-attr] - return (payload, partition, offset) - - -try: - from pyflink.common import Row - from pyflink.common.typeinfo import Types - from pyflink.datastream.connectors.kafka import KafkaRecordDeserializationSchema - - KAFKA_RECORD_TYPE = Types.ROW_NAMED( - ["payload", "kafka_partition", "kafka_offset"], - [Types.STRING(), Types.INT(), Types.LONG()], - ) - - class KafkaRecordDeserializer(KafkaRecordDeserializationSchema): - """Replaces SimpleStringSchema to capture real kafka_partition and kafka_offset. - - Emits Row(payload: str, kafka_partition: int, kafka_offset: long). - Tombstone records (null value) are suppressed here so downstream - processors never receive None payloads. - """ - - def deserialize(self, record: object, collector: object) -> None: - result = parse_kafka_record(record) - if result is not None: - payload, partition, offset = result - collector.collect(Row(payload, partition, offset)) # type: ignore[union-attr] - - def get_produced_type(self) -> object: - return KAFKA_RECORD_TYPE - -except ImportError: - pass diff --git a/flink/jobs/normalize.py b/flink/jobs/normalize.py index 87b9e95..32aae82 100644 --- a/flink/jobs/normalize.py +++ b/flink/jobs/normalize.py @@ -33,10 +33,10 @@ from flink_lib.config import Config from flink_lib.kafka_schema import ( + KAFKA_RECORD_TYPE, OFFSET_IDX, PARTITION_IDX, PAYLOAD_IDX, - KafkaRecordDeserializer, ) from flink_lib.logic import ( apply_depth_diff, @@ -47,17 +47,13 @@ compute_spread, ) from flink_lib.sql_runner import add_inserts_from_file, execute_sql_file -from pyflink.common import Row, WatermarkStrategy +from pyflink.common import Row from pyflink.common.restart_strategy import RestartStrategies from pyflink.common.typeinfo import Types from pyflink.datastream import ( CheckpointingMode, StreamExecutionEnvironment, ) -from pyflink.datastream.connectors.kafka import ( - KafkaOffsetsInitializer, - KafkaSource, -) from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext from pyflink.datastream.state import ValueStateDescriptor from pyflink.table import DataTypes, Schema, StreamTableEnvironment @@ -285,24 +281,19 @@ def main() -> None: t_env = StreamTableEnvironment.create(env) t_env.get_config().set("table.exec.state.ttl", str(cfg.STATE_TTL_MS)) - # ── DDL: catalog and sinks ──────────────────────────────────────────────── - # Source is read directly via DataStream KafkaSource (avoids ARRAY type - # serialization issues with ARRAY> in the Table→DataStream bridge). + # ── DDL: catalog, source, and sinks ────────────────────────────────────── execute_sql_file(t_env, SQL / "catalogs.sql", **cfg.as_dict()) + execute_sql_file(t_env, SQL / "normalize" / "source_raw.sql", **cfg.as_dict()) execute_sql_file(t_env, SQL / "normalize" / "sink_iceberg.sql") execute_sql_file(t_env, SQL / "normalize" / "sink_kafka.sql", **cfg.as_dict()) - # ── DataStream Kafka source: raw JSON strings ───────────────────────────── - raw_stream = env.from_source( - KafkaSource.builder() - .set_bootstrap_servers(cfg.kafka_brokers) - .set_topics(cfg.TOPIC_RAW) - .set_group_id(cfg.GROUP_NORMALIZE) - .set_starting_offsets(KafkaOffsetsInitializer.earliest()) - .set_deserializer(KafkaRecordDeserializer()) - .build(), - WatermarkStrategy.no_watermarks(), - "raw-orderbook-source", + # ── DataStream Kafka source: raw payload + real partition/offset ────────── + # Bridged from the SQL source table so metadata columns (partition, offset) + # are populated by the Kafka connector without needing the unavailable + # KafkaRecordDeserializationSchema Python binding. + raw_stream = t_env.to_append_stream( + t_env.sql_query("SELECT payload, kafka_partition, kafka_offset FROM raw_orderbook_stream"), + KAFKA_RECORD_TYPE, ) # ── Stateful DataStream processing ──────────────────────────────────────── diff --git a/flink/jobs/sql/cdc_symbol_config/source_bronze.sql b/flink/jobs/sql/cdc_symbol_config/source_bronze.sql new file mode 100644 index 0000000..39fb7b9 --- /dev/null +++ b/flink/jobs/sql/cdc_symbol_config/source_bronze.sql @@ -0,0 +1,20 @@ +-- cdc_symbol_config/source_bronze.sql: Raw-format Kafka source for the bronze DataStream path. +-- +-- Each message value is read as a raw STRING (full Debezium JSON envelope preserved). +-- Separate consumer group from source_normalized.sql so both paths progress independently +-- and can be reset independently. +-- +-- Metadata columns supply real partition and offset for Iceberg lineage. + +CREATE TEMPORARY TABLE cdc_bronze_stream ( + payload STRING, + kafka_partition INT METADATA FROM 'partition' VIRTUAL, + kafka_offset BIGINT METADATA FROM 'offset' VIRTUAL +) WITH ( + 'connector' = 'kafka', + 'topic' = '{topic_cdc}', + 'properties.bootstrap.servers' = '{kafka_brokers}', + 'properties.group.id' = '{group_cdc_bronze}', + 'scan.startup.mode' = 'earliest-offset', + 'format' = 'raw' +) diff --git a/flink/jobs/sql/normalize/source_raw.sql b/flink/jobs/sql/normalize/source_raw.sql new file mode 100644 index 0000000..7eb6c1c --- /dev/null +++ b/flink/jobs/sql/normalize/source_raw.sql @@ -0,0 +1,21 @@ +-- normalize/source_raw.sql: Minimal Kafka source for DataStream consumption. +-- +-- Uses 'format'='raw' so each message value arrives as a single STRING (the raw +-- JSON payload). No ARRAY/MAP columns means the Table->DataStream bridge via +-- to_append_stream() works without type-mapping issues. +-- +-- Metadata columns supply the real Kafka partition and offset so Iceberg rows +-- preserve full lineage for offset-based replay. + +CREATE TEMPORARY TABLE raw_orderbook_stream ( + payload STRING, + kafka_partition INT METADATA FROM 'partition' VIRTUAL, + kafka_offset BIGINT METADATA FROM 'offset' VIRTUAL +) WITH ( + 'connector' = 'kafka', + 'topic' = '{topic_raw}', + 'properties.bootstrap.servers' = '{kafka_brokers}', + 'properties.group.id' = '{group_normalize}', + 'scan.startup.mode' = 'earliest-offset', + 'format' = 'raw' +)