[DO NOT MERGE] Wal2JSON proof of concept#4855
Closed
DilwoarH wants to merge 26 commits into
Closed
Conversation
a6ed862 to
2d13025
Compare
…hance slot change data parsing
…s for improved clarity and maintainability
…database execution
…t imports for consistency
… replication slot processing
…ng previous notification statuses and updating service stats accordingly
…al statuses and updating status breakdown in response
…l status handling and update test assertions accordingly
- Consolidated service stats operations into a single function `apply_service_stats_delta` to handle both increments and decrements. - Removed deprecated functions for inserting, deleting, and updating service stats. - Introduced a new performance testing endpoint to simulate notification load, allowing for bulk inserts and updates of notifications. - Updated replication change utilities to include `nextlsn` in parsed rows and adjusted related tests accordingly. - Enhanced tests for service stats DAO to validate the new delta application logic. - Refactored replication slot change processing tests to align with the new structure and ensure proper handling of changes.
…ement operations into a single function
…d update related indices; remove obsolete migrations for notifications replica identity
7e45faf to
aeb7541
Compare
Contributor
Author
|
Superseded by #4873 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
WAL2JSON Replication Implementation
Overview
This document describes the end-to-end implementation of a PostgreSQL logical replication pipeline using the
wal2jsonoutput plugin. The system reads Write-Ahead Log (WAL) changes from thenotificationsandnotification_historytables and incrementally maintains an aggregate statistics table (ft_service_stats) — without ever querying the source tables directly.The design solves a specific performance problem: computing per-service notification counts across potentially hundreds of millions of rows is expensive with ad-hoc
COUNTqueries. Instead, this pipeline maintains a running tally in near-real-time by consuming CDC (Change Data Capture) events from the database's own replication machinery.Architecture at a Glance
A Postgres advisory lock ensures only one Celery worker processes the slot at any given moment, preventing race conditions and double-counting.
Step-by-Step Implementation
Step 1 — Create the logical replication slot
Migration:
0552_create_replacation_slot.pyBefore any CDC can happen, PostgreSQL must be told to start capturing WAL changes through a named slot using the
wal2jsonlogical decoding plugin.What this does:
notify_dashboard_replication_slotwal2json, which formats changes as JSON rather than the raw binary WAL formatDowngrade drops the slot cleanly, which also releases any held WAL storage.
Step 2 — Add a composite index on notifications
Migration:
0553_notifications_id_status_idx.pyCreated concurrently (no table lock) as a prerequisite for the REPLICA IDENTITY change in the next step. Also benefits read queries filtering by notification ID and status.
Step 3 — Create the aggregate statistics table
Migration:
0554_create_service_stats.py(final table name:ft_service_stats)idservice_idtemplate_idnotification_typenotification_statuscountThe unique constraint
uix_ft_service_stats_dimensionson(service_id, template_id, notification_type, notification_status)is the heart of the UPSERT logic — it's the conflict target when incrementing counts.Three supporting indexes are created for common query patterns:
(service_id, notification_type, notification_status)— dashboard lookups by service(template_id, notification_type, notification_status)— template-level reporting(service_id, template_id)— join/filter by bothStep 4 — Set REPLICA IDENTITY on notifications
Migration:
0555_notifications_replica_identity.pyThis is the most nuanced migration in the chain.
By default, Postgres only writes the primary key into
oldkeysin the WAL for UPDATE and DELETE events. But to correctly track status transitions, the pipeline needs to know the previousnotification_statuswhen a row is updated — so it can decrement the old status bucket and increment the new one.The solution is
REPLICA IDENTITY USING INDEX, which writes the index columns intooldkeysinstead:Why not
REPLICA IDENTITY FULL?FULLwrites the entire old row to WAL on every UPDATE — very expensive for a wide, high-write table likenotifications. The index-based approach writes only the two columns actually needed (id+notification_status), keeping WAL amplification minimal.After this migration, every UPDATE to a notification will carry the previous
notification_statusinoldkeys, which is exactly what the change parser needs.Step 5 — The SQLAlchemy model
File:
app/models.py—FactServiceStatsThe four dimension columns and the unique constraint directly mirror what the DAO uses as the
ON CONFLICTtarget.Step 6 — The replication slot reader utility
File:
app/replication/replication_changes_utils.pyThis module is responsible for talking to Postgres and normalising raw wal2json output into a consistent internal shape.
Fetching changes
get_replication_changes()builds and executes one of two Postgres functions depending on thepeekflag:pg_logical_slot_peek_changespg_logical_slot_get_changesThe pipeline always uses
peek=Trueduring processing and advances the slot manually only after a successful DB commit. This prevents data loss if the task crashes mid-flight.wal2json options passed through:
add-tablesis the key filter — without it, every WAL change across the entire database would be decoded and returned.Parsing the JSON payload
Each row returned by Postgres has a
datafield containing a wal2json JSON document:{ "nextlsn": "0/1A2B3C4", "change": [ { "kind": "update", "table": "notifications", "columnnames": ["id", "notification_status", "updated_at"], "columnvalues": ["abc-123", "delivered", "2026-05-22T10:00:00"], "oldkeys": { "keynames": ["id", "notification_status"], "keyvalues": ["abc-123", "sending"] } } ] }parse_change_data()extracts thenextlsn(used later to advance the slot) and iterates over each item in thechangearray.parse_row_data()converts each change item into the internalParsedRowshape:{ "type": "update", "table": "notifications", "nextlsn": "0/1A2B3C4", "current_row_data": { # columnnames zipped with columnvalues "id": "abc-123", "notification_status": "delivered", "updated_at": "2026-05-22T10:00:00", }, "previous_row_data": { # oldkeys.keynames zipped with oldkeys.keyvalues "id": "abc-123", "notification_status": "sending", } }Two helper functions handle safe field extraction from these dicts:
get_str_value(row_data, key)— returns the value only if it's astr, otherwiseNoneget_notification_status(row_data)— checksnotification_statusfirst, then falls back tostatusto handle both legacy and current column namesStep 7 — The DAO layer
File:
app/dao/fact_service_stats_dao.pyTwo public functions:
apply_service_stats_delta(dimensions, delta)routes to the internal update function. It's the only write interface callers need._update_service_stats_count(dimensions, delta)applies the delta with different strategies for positive and negative values:For positive deltas (new inserts or new status on an update), a PostgreSQL
INSERT ... ON CONFLICT DO UPDATEatomically creates or increments the row:For negative deltas (deletes or the old status on an update), a bounded
UPDATEprevents the count ever going below zero:The
GREATEST(..., 0)guard makes the system self-healing: if the slot is created after some notifications already exist, or a transition appears out of order, counts degrade gracefully to zero rather than going negative and corrupting the aggregate.dao_fetch_stats_for_service(service_id)is the read path, returning allFactServiceStatsrows for a given service, with a guard against emptyservice_id.Step 8 — The Celery task
File:
app/celery/process_replication_slot_changes.pyThis is the main processing loop, scheduled to run every 1 second via Celery Beat.
Advisory lock
Uses
pg_try_advisory_lock— non-blocking. If another worker already holds the lock, this invocation logs and exits immediately. This is more reliable than an external distributed lock (e.g. Redis) because the advisory lock lives in the same Postgres connection scope and is automatically released if the connection drops.Change classification (
_build_counter_from_changes)Iterates over all parsed rows and accumulates signed deltas into a
Counterkeyed by a 7-tuple of full dimensions:The classification rules:
insertcounter[new_dimensions] += 1updatecounter[new_dimensions] += 1andcounter[old_dimensions] -= 1deletefromnotificationscounter[old_dimensions] -= 1deletefromnotification_history_build_dimensions()handles the asymmetry: for inserts onlycurrent_row_datais populated; for deletes onlyprevious_row_datais populated; for updates both sides are present. The function takes ause_previous_rowflag and falls back across both sides when a value is absent from the primary side.Roll-up
_roll_up_service_stats_deltas()collapses the 7-tuple dimensions down to the 4-tuple thatft_service_statstracks:date,job_id, andkey_typeare intentionally dropped —ft_service_statsaggregates across all dates and job types.Commit and advance
The slot is only advanced after the commit succeeds. If the commit throws, the exception handler rolls back and the task retries up to 3 times with exponential backoff (
2^retry_countseconds). On retry the same changes are re-read from the slot — this is safe because the UPSERT andGREATESTguards make all DAO operations idempotent.Lock release
The advisory lock is always released in
finally, with its own nestedtry/exceptso a release failure doesn't mask the original exception.Step 9 — Celery Beat schedule
File:
app/config.pyThe 1-second cadence keeps
ft_service_statsclose to real-time. The advisory lock means there is no risk from multiple workers being scheduled concurrently — only one will do work at a time.Data Flow Example
A single notification being delivered produces this sequence of WAL events and resulting counter deltas:
After roll-up, the net deltas applied to
ft_service_stats:The final state in
ft_service_statscorrectly reflects one delivered notification for that service/template combination.Key Design Decisions
Peek + manual advance instead of
getUsing
pg_logical_slot_get_changeswould consume and advance the slot atomically — if the application crashed after consuming but before committing toft_service_stats, those changes would be lost permanently. The peek + commit + advance pattern guarantees at-least-once processing, and the idempotent DAO operations handle any reprocessing gracefully.Advisory lock instead of external distributed locking
The advisory lock is Postgres-native, colocated with the work it protects, and automatically released on connection drop. It avoids introducing an external dependency (e.g. Redis) for a coordination problem that is already scoped to a single database.
GREATEST(count + delta, 0)for decrementsUnder normal operation counts should never go negative. But the slot is created at a point in time — notifications that existed before the slot was created have no corresponding insert event. The guard prevents any ordering anomaly or slot creation timing issue from producing permanently corrupted (negative) counts.
REPLICA IDENTITY USING INDEXoverFULLREPLICA IDENTITY FULLwrites the entire old row to WAL on every UPDATE — very expensive for a wide, high-throughput table. The index-based identity writes onlyidandnotification_status, the minimum needed to compute status transition deltas, keeping WAL overhead proportional to what is actually consumed.Frequently Asked Questions
General
Q: Why not just run a nightly
COUNTjob instead?A nightly batch job produces stale data for most of the day. The replication pipeline keeps
ft_service_statswithin ~1 second of the livenotificationstable, which is necessary for any dashboard or API that consumers expect to reflect recent activity. Batch jobs also place a large read load on the database at query time, whereas this pipeline distributes that cost incrementally across every write.Q: Does this replace
ft_notification_status?No.
ft_notification_statusis a date-partitioned historical aggregate populated separately.ft_service_statsis a live running counter without date bucketing, designed for low-latency lookups. They serve different query patterns and coexist.Q: What happens if
wal2jsonis not installed on the database?The migration that creates the replication slot (
0552) will fail with a Postgres error indicating the output plugin is not found.wal2jsonmust be installed before running migrations. On RDS and Aurora Postgres it is available by default; on self-hosted instances it must be compiled and installed separately.Q: Can this pipeline be used with read replicas?
No. Logical replication slots exist on the primary and can only be read from the primary. Read replicas do not expose
pg_logical_slot_peek_changes. The Celery task must connect to the primary.Replication Slot
Q: What happens to the replication slot if the Celery task is down for an extended period?
Postgres will keep accumulating WAL for the slot indefinitely. If the task is down long enough, this can fill the disk and cause the database to stop accepting writes entirely. This is the most operationally dangerous failure mode. Monitoring
pg_replication_slotsforlag_bytesorconfirmed_flush_lsnstaleness is strongly recommended, with an alert threshold well before disk saturation.Q: What does "advancing the slot" actually mean?
Calling
pg_replication_slot_advance(slot_name, lsn)tells Postgres that all WAL up to and including the given LSN has been successfully processed and can be discarded. Until this call is made, Postgres must retain that WAL. The pipeline advances the slot only after a successful commit toft_service_stats, ensuring no changes are lost if the task crashes mid-flight.Q: Can two replication slots be created for the same purpose?
Yes, but each slot independently tracks its own position and holds its own WAL. Running two consumers from two slots would result in double-counting in
ft_service_stats. The single named slot combined with the advisory lock is the correct pattern for exactly-once processing semantics.Q: What is the
upto_nchangescap for?It bounds how much work a single task invocation does. At 10,000 changes per run on a 1-second schedule, the pipeline can sustain ~10,000 changes/second under steady state. If backlog builds up after an outage, successive runs drain it in batches of 10,000 rather than attempting to process the entire backlog in one potentially very slow transaction.
Q: Why is the slot created in a migration rather than application startup code?
Migrations run exactly once and are tracked in the alembic revision chain. Creating the slot in application startup code would risk creating it on every deploy or requiring idempotency guards. A migration is simpler and safer.
REPLICA IDENTITY
Q: Why does
REPLICA IDENTITY USING INDEXrequire the index to beUNIQUE?Postgres enforces this because a non-unique index cannot uniquely identify a row. REPLICA IDENTITY is used by logical replication to locate the corresponding row when applying changes — it must resolve to exactly one row. Without uniqueness, the identity would be ambiguous.
Q: What does the
oldkeysfield actually contain after this migration?Before the migration,
oldkeysin wal2json UPDATE/DELETE events contains only{"keynames": ["id"], "keyvalues": ["<uuid>"]}. After settingREPLICA IDENTITY USING INDEX ix_notifications_id_notification_status, it contains{"keynames": ["id", "notification_status"], "keyvalues": ["<uuid>", "sending"]}. This is what enables the pipeline to know which status bucket to decrement on a status transition.Q: Does
REPLICA IDENTITY USING INDEXaffect INSERT events?No. INSERT events always carry the full new row in
columnnames/columnvaluesregardless of replica identity setting. The replica identity only controls what appears inoldkeysfor UPDATE and DELETE events.Q: Could
notification_statusbe nullable in production data before this migration?The migration explicitly sets
NOT NULLonnotification_statusbefore creating the unique index, because PostgreSQL does not allowREPLICA IDENTITY USING INDEXon an index that includes nullable columns. If any existing rows had a NULL status, theALTER TABLE ... SET NOT NULLwould fail — a data audit or backfill would be needed first.The Celery Task and Processing Logic
Q: What happens if the advisory lock is never released — for example if the Postgres connection dies?
pg_try_advisory_lockacquires a session-level advisory lock. Session-level locks are automatically released when the database connection closes. If the worker process crashes hard, Postgres releases the lock as soon as the connection drops, allowing the next task invocation to acquire it and proceed.Q: Why use exponential backoff with only 3 retries?
Three retries covers transient failures (brief DB unavailability, connection blip) without queuing excessive retry work. The exponential backoff (1s, 2s, 4s) gives the database time to recover. After three failures the error is logged and re-raised, marking the Celery task as failed and surfacing it in monitoring. The next scheduled run (1 second later) will attempt again fresh regardless.
Q: What if an UPDATE event is missing both
current_row_datastatus andprevious_row_datastatus?_build_dimensions()returnsNoneif any required field (includingnotification_status) is absent from both sides of the change. The change is counted asignored_changesand skipped. This is a safe degradation — the counts for that notification will be slightly off but no exception is thrown and the slot continues advancing.Q: Can deletes from
notificationsresult in a negative count inft_service_stats?In theory, if a notification is deleted and there was no corresponding insert event in the slot (e.g. the slot was created after the notification was inserted), the delta would be -1 applied against a row that might not exist. The
GREATEST(count + delta, 0)guard handles this: if no row exists theUPDATEmatches zero rows and is a no-op; if a row exists with count 0 it stays at 0.Q: Why are deletes from
notification_historyignored?notification_historyis an archive table. Rows are moved there fromnotifications— when that happens, the DELETE fromnotificationsis processed (decrement the old status) and the INSERT intonotification_historyis ignored to avoid double-counting. Deletes fromnotification_historyrepresent permanent purges of historical data and should not affect live aggregate counts.Q: The task runs every second — won't that create a lot of empty runs?
Yes, when the system is idle most runs will find zero changes and exit immediately after a single
pg_logical_slot_peek_changescall that returns nothing. This is fast (sub-millisecond) and logged as"No replication slot changes found". The 1-second cadence is intentional for near-real-time behaviour; the cost of empty runs is negligible.The DAO and
ft_service_statsQ: Why split positive and negative deltas into two different SQL strategies?
A single
INSERT ... ON CONFLICT DO UPDATEcannot expressGREATEST(count + delta, 0)in the conflict clause while also inserting a new row with a negative initial count. Splitting the logic is cleaner: UPSERTs handle the "create or increment" case; bounded UPDATEs handle the "decrement but floor at zero" case. A row with a negative initial count is never meaningful, so the insert path for negative deltas is correctly omitted.Q: Is the UPSERT truly atomic? Could two concurrent workers both insert and end up with the count doubled?
Yes, it is atomic at the database level. The advisory lock prevents two workers from processing the slot simultaneously, so concurrent UPSERT races from the replication task itself cannot occur. If other code paths also call
apply_service_stats_deltaconcurrently, theON CONFLICT DO UPDATEserialises conflicting writes correctly and applies both increments.Q: Why does the table use a surrogate UUID primary key rather than a composite PK on the four dimensions?
The unique constraint on the four dimensions already enforces deduplication and serves as the conflict target for UPSERT. A surrogate PK is simpler for ORM usage and any tooling that assumes a single-column primary key. The tradeoff is one extra index, which is acceptable given how small the table stays.
Q: How large will
ft_service_statsgrow?The number of rows is bounded by the cardinality of
(service_id, template_id, notification_type, notification_status). With a finite number of services, templates, notification types (3), and statuses (~12), the table stays small regardless of notification volume. Even at thousands of services with hundreds of templates each, the row count remains in the low millions at most — trivially small compared tonotifications.Operations and Monitoring
Q: How do I know if the pipeline has fallen behind?
Query
pg_replication_slotsfor the slot'sconfirmed_flush_lsnand compare it topg_current_wal_lsn(). A growing gap indicates the consumer is behind. The Celery task also logschanges_counton every successful run; a sustained count approaching 10,000 (theupto_nchangescap) on consecutive runs suggests the pipeline is working through backlog.Q: How do I reset
ft_service_statsif the counts become corrupted?Truncate
ft_service_stats, drop and recreate the replication slot, then let the pipeline rebuild from current writes. This produces counts reflecting only notifications created or updated after the slot is recreated. For a full historical backfill, seedft_service_statswith a one-timeCOUNTquery againstnotificationsbefore restarting the pipeline.