Skip to content

[DO NOT MERGE] Wal2JSON proof of concept#4855

Closed
DilwoarH wants to merge 26 commits into
mainfrom
wal2json-proof-of-concept
Closed

[DO NOT MERGE] Wal2JSON proof of concept#4855
DilwoarH wants to merge 26 commits into
mainfrom
wal2json-proof-of-concept

Conversation

@DilwoarH

@DilwoarH DilwoarH commented May 20, 2026

Copy link
Copy Markdown
Contributor

WAL2JSON Replication Implementation

Overview

This document describes the end-to-end implementation of a PostgreSQL logical replication pipeline using the wal2json output plugin. The system reads Write-Ahead Log (WAL) changes from the notifications and notification_history tables and incrementally maintains an aggregate statistics table (ft_service_stats) — without ever querying the source tables directly.

The design solves a specific performance problem: computing per-service notification counts across potentially hundreds of millions of rows is expensive with ad-hoc COUNT queries. Instead, this pipeline maintains a running tally in near-real-time by consuming CDC (Change Data Capture) events from the database's own replication machinery.


Architecture at a Glance

PostgreSQL WAL
     │
     ▼
notify_dashboard_replication_slot  (wal2json logical decoding)
     │
     ▼
Celery periodic task (every 1s)
     │
     ├─ peek changes (non-destructive read)
     ├─ parse + classify (insert / update / delete)
     ├─ build dimension counter
     ├─ roll up to service stats dimensions
     ├─ UPSERT deltas into ft_service_stats
     ├─ commit
     └─ advance slot (acknowledge consumed LSNs)

A Postgres advisory lock ensures only one Celery worker processes the slot at any given moment, preventing race conditions and double-counting.


Step-by-Step Implementation

Step 1 — Create the logical replication slot

Migration: 0552_create_replacation_slot.py

op.execute(
    "SELECT * FROM pg_create_logical_replication_slot('notify_dashboard_replication_slot', 'wal2json');"
)

Before any CDC can happen, PostgreSQL must be told to start capturing WAL changes through a named slot using the wal2json logical decoding plugin.

What this does:

  • Creates a persistent slot named notify_dashboard_replication_slot
  • Tells Postgres to decode WAL using wal2json, which formats changes as JSON rather than the raw binary WAL format
  • From this point on, Postgres begins buffering changes for this slot — it won't discard WAL until the slot is advanced

Important: An unadvanced slot causes WAL accumulation on disk. The consumer task must run regularly and advance the slot after successful processing.

Downgrade drops the slot cleanly, which also releases any held WAL storage.


Step 2 — Add a composite index on notifications

Migration: 0553_notifications_id_status_idx.py

op.create_index(
    "ix_notifications_id_notification_status",
    "notifications",
    ["id", "notification_status"],
    unique=False,
    postgresql_concurrently=True,
)

Created concurrently (no table lock) as a prerequisite for the REPLICA IDENTITY change in the next step. Also benefits read queries filtering by notification ID and status.


Step 3 — Create the aggregate statistics table

Migration: 0554_create_service_stats.py (final table name: ft_service_stats)

Column Type Notes
id UUID Primary key
service_id UUID FK → services
template_id UUID FK → templates
notification_type ENUM email / sms / letter
notification_status TEXT FK → notification_status_types
count INTEGER Running aggregate, default 0

The unique constraint uix_ft_service_stats_dimensions on (service_id, template_id, notification_type, notification_status) is the heart of the UPSERT logic — it's the conflict target when incrementing counts.

Three supporting indexes are created for common query patterns:

  • (service_id, notification_type, notification_status) — dashboard lookups by service
  • (template_id, notification_type, notification_status) — template-level reporting
  • (service_id, template_id) — join/filter by both

Step 4 — Set REPLICA IDENTITY on notifications

Migration: 0555_notifications_replica_identity.py

This is the most nuanced migration in the chain.

By default, Postgres only writes the primary key into oldkeys in the WAL for UPDATE and DELETE events. But to correctly track status transitions, the pipeline needs to know the previous notification_status when a row is updated — so it can decrement the old status bucket and increment the new one.

The solution is REPLICA IDENTITY USING INDEX, which writes the index columns into oldkeys instead:

-- Step 1: Make notification_status NOT NULL (required for a unique index)
ALTER TABLE notifications ALTER COLUMN notification_status SET NOT NULL;

-- Step 2: Recreate the index as UNIQUE (required by PG for REPLICA IDENTITY USING INDEX)
--         Done concurrently to avoid a full table lock
CREATE UNIQUE INDEX CONCURRENTLY ix_notifications_id_notification_status
    ON notifications (id, notification_status);

-- Step 3: Declare this index as the replica identity source
ALTER TABLE notifications
    REPLICA IDENTITY USING INDEX ix_notifications_id_notification_status;

Why not REPLICA IDENTITY FULL?
FULL writes the entire old row to WAL on every UPDATE — very expensive for a wide, high-write table like notifications. The index-based approach writes only the two columns actually needed (id + notification_status), keeping WAL amplification minimal.

After this migration, every UPDATE to a notification will carry the previous notification_status in oldkeys, which is exactly what the change parser needs.


Step 5 — The SQLAlchemy model

File: app/models.pyFactServiceStats

class FactServiceStats(db.Model):
    __tablename__ = "ft_service_stats"

    id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    service_id = db.Column(UUID(as_uuid=True), db.ForeignKey("services.id"), nullable=False)
    template_id = db.Column(UUID(as_uuid=True), db.ForeignKey("templates.id"), nullable=False)
    notification_type = db.Column(notification_types, nullable=False)
    notification_status = db.Column(db.Text, db.ForeignKey("notification_status_types.name"), nullable=False)
    count = db.Column(db.Integer(), nullable=False, default=0, server_default=db.text("0"))

    __table_args__ = (
        UniqueConstraint(
            "service_id", "template_id", "notification_type", "notification_status",
            name="uix_ft_service_stats_dimensions",
        ),
        Index("ix_ft_svc_stats_svc_ntype_nstatus", "service_id", "notification_type", "notification_status"),
        Index("ix_ft_svc_stats_tmpl_ntype_nstatus", "template_id", "notification_type", "notification_status"),
        Index("ix_ft_service_stats_service_id_template_id", "service_id", "template_id"),
    )

The four dimension columns and the unique constraint directly mirror what the DAO uses as the ON CONFLICT target.


Step 6 — The replication slot reader utility

File: app/replication/replication_changes_utils.py

This module is responsible for talking to Postgres and normalising raw wal2json output into a consistent internal shape.

Fetching changes

get_replication_changes() builds and executes one of two Postgres functions depending on the peek flag:

Function Behaviour
pg_logical_slot_peek_changes Read-only; does not advance the slot
pg_logical_slot_get_changes Consumes changes and advances the slot

The pipeline always uses peek=True during processing and advances the slot manually only after a successful DB commit. This prevents data loss if the task crashes mid-flight.

wal2json options passed through:

pretty-print      on
add-tables        public.notifications, public.notification_history
include-lsn       on
format-version    1
include-types     off
include-typmod    off

add-tables is the key filter — without it, every WAL change across the entire database would be decoded and returned.

Parsing the JSON payload

Each row returned by Postgres has a data field containing a wal2json JSON document:

{
  "nextlsn": "0/1A2B3C4",
  "change": [
    {
      "kind": "update",
      "table": "notifications",
      "columnnames": ["id", "notification_status", "updated_at"],
      "columnvalues": ["abc-123", "delivered", "2026-05-22T10:00:00"],
      "oldkeys": {
        "keynames": ["id", "notification_status"],
        "keyvalues": ["abc-123", "sending"]
      }
    }
  ]
}

parse_change_data() extracts the nextlsn (used later to advance the slot) and iterates over each item in the change array.

parse_row_data() converts each change item into the internal ParsedRow shape:

{
    "type": "update",
    "table": "notifications",
    "nextlsn": "0/1A2B3C4",
    "current_row_data": {           # columnnames zipped with columnvalues
        "id": "abc-123",
        "notification_status": "delivered",
        "updated_at": "2026-05-22T10:00:00",
    },
    "previous_row_data": {          # oldkeys.keynames zipped with oldkeys.keyvalues
        "id": "abc-123",
        "notification_status": "sending",
    }
}

Two helper functions handle safe field extraction from these dicts:

  • get_str_value(row_data, key) — returns the value only if it's a str, otherwise None
  • get_notification_status(row_data) — checks notification_status first, then falls back to status to handle both legacy and current column names

Step 7 — The DAO layer

File: app/dao/fact_service_stats_dao.py

Two public functions:

apply_service_stats_delta(dimensions, delta) routes to the internal update function. It's the only write interface callers need.

_update_service_stats_count(dimensions, delta) applies the delta with different strategies for positive and negative values:

For positive deltas (new inserts or new status on an update), a PostgreSQL INSERT ... ON CONFLICT DO UPDATE atomically creates or increments the row:

stmt = insert(FactServiceStats).values(id=uuid4(), **dimension_values, count=delta)
stmt = stmt.on_conflict_do_update(
    constraint="uix_ft_service_stats_dimensions",
    set_={"count": FactServiceStats.count + delta},
)
db.session.execute(stmt)

For negative deltas (deletes or the old status on an update), a bounded UPDATE prevents the count ever going below zero:

db.session.query(FactServiceStats).filter(*filters).update(
    {"count": func.greatest(FactServiceStats.count + delta, 0)},
    synchronize_session=False,
)

The GREATEST(..., 0) guard makes the system self-healing: if the slot is created after some notifications already exist, or a transition appears out of order, counts degrade gracefully to zero rather than going negative and corrupting the aggregate.

dao_fetch_stats_for_service(service_id) is the read path, returning all FactServiceStats rows for a given service, with a guard against empty service_id.


Step 8 — The Celery task

File: app/celery/process_replication_slot_changes.py

This is the main processing loop, scheduled to run every 1 second via Celery Beat.

Advisory lock

lock_acquired = _try_advisory_lock(REPLICATION_ADVISORY_LOCK_ID)

Uses pg_try_advisory_lock — non-blocking. If another worker already holds the lock, this invocation logs and exits immediately. This is more reliable than an external distributed lock (e.g. Redis) because the advisory lock lives in the same Postgres connection scope and is automatically released if the connection drops.

Change classification (_build_counter_from_changes)

Iterates over all parsed rows and accumulates signed deltas into a Counter keyed by a 7-tuple of full dimensions:

type FullDimensions = tuple[date, UUID, UUID, UUID, str, str, str]
# (bst_date, template_id, service_id, job_id, notification_type, key_type, notification_status)

The classification rules:

Change type Action
insert counter[new_dimensions] += 1
update counter[new_dimensions] += 1 and counter[old_dimensions] -= 1
delete from notifications counter[old_dimensions] -= 1
delete from notification_history ignored — history rows are not decremented

_build_dimensions() handles the asymmetry: for inserts only current_row_data is populated; for deletes only previous_row_data is populated; for updates both sides are present. The function takes a use_previous_row flag and falls back across both sides when a value is absent from the primary side.

Roll-up

_roll_up_service_stats_deltas() collapses the 7-tuple dimensions down to the 4-tuple that ft_service_stats tracks:

(service_id, template_id, notification_type, notification_status)

date, job_id, and key_type are intentionally dropped — ft_service_stats aggregates across all dates and job types.

Commit and advance

db.session.commit()
if last_nextlsn:
    _advance_replication_slot(last_nextlsn)

The slot is only advanced after the commit succeeds. If the commit throws, the exception handler rolls back and the task retries up to 3 times with exponential backoff (2^retry_count seconds). On retry the same changes are re-read from the slot — this is safe because the UPSERT and GREATEST guards make all DAO operations idempotent.

Lock release

The advisory lock is always released in finally, with its own nested try/except so a release failure doesn't mask the original exception.


Step 9 — Celery Beat schedule

File: app/config.py

"check-replication-slot-changes": {
    "task": "check-replication-slot-changes",
    "schedule": timedelta(seconds=1),
    "options": {"queue": QueueNames.PERIODIC},
},

The 1-second cadence keeps ft_service_stats close to real-time. The advisory lock means there is no risk from multiple workers being scheduled concurrently — only one will do work at a time.


Data Flow Example

A single notification being delivered produces this sequence of WAL events and resulting counter deltas:

INSERT notifications (id=X, status=sending, ...)
  → counter[(S, T, email, normal, sending)] += 1

UPDATE notifications SET status='sent' WHERE id=X
  → counter[(S, T, email, normal, sent)]    += 1
  → counter[(S, T, email, normal, sending)] -= 1

UPDATE notifications SET status='delivered' WHERE id=X
  → counter[(S, T, email, normal, delivered)] += 1
  → counter[(S, T, email, normal, sent)]      -= 1

After roll-up, the net deltas applied to ft_service_stats:

service_id template_id type status net delta
S T email sending 0 (incremented then decremented)
S T email sent 0 (incremented then decremented)
S T email delivered +1

The final state in ft_service_stats correctly reflects one delivered notification for that service/template combination.


Key Design Decisions

Peek + manual advance instead of get
Using pg_logical_slot_get_changes would consume and advance the slot atomically — if the application crashed after consuming but before committing to ft_service_stats, those changes would be lost permanently. The peek + commit + advance pattern guarantees at-least-once processing, and the idempotent DAO operations handle any reprocessing gracefully.

Advisory lock instead of external distributed locking
The advisory lock is Postgres-native, colocated with the work it protects, and automatically released on connection drop. It avoids introducing an external dependency (e.g. Redis) for a coordination problem that is already scoped to a single database.

GREATEST(count + delta, 0) for decrements
Under normal operation counts should never go negative. But the slot is created at a point in time — notifications that existed before the slot was created have no corresponding insert event. The guard prevents any ordering anomaly or slot creation timing issue from producing permanently corrupted (negative) counts.

REPLICA IDENTITY USING INDEX over FULL
REPLICA IDENTITY FULL writes the entire old row to WAL on every UPDATE — very expensive for a wide, high-throughput table. The index-based identity writes only id and notification_status, the minimum needed to compute status transition deltas, keeping WAL overhead proportional to what is actually consumed.


Frequently Asked Questions

General

Q: Why not just run a nightly COUNT job instead?
A nightly batch job produces stale data for most of the day. The replication pipeline keeps ft_service_stats within ~1 second of the live notifications table, which is necessary for any dashboard or API that consumers expect to reflect recent activity. Batch jobs also place a large read load on the database at query time, whereas this pipeline distributes that cost incrementally across every write.

Q: Does this replace ft_notification_status?
No. ft_notification_status is a date-partitioned historical aggregate populated separately. ft_service_stats is a live running counter without date bucketing, designed for low-latency lookups. They serve different query patterns and coexist.

Q: What happens if wal2json is not installed on the database?
The migration that creates the replication slot (0552) will fail with a Postgres error indicating the output plugin is not found. wal2json must be installed before running migrations. On RDS and Aurora Postgres it is available by default; on self-hosted instances it must be compiled and installed separately.

Q: Can this pipeline be used with read replicas?
No. Logical replication slots exist on the primary and can only be read from the primary. Read replicas do not expose pg_logical_slot_peek_changes. The Celery task must connect to the primary.


Replication Slot

Q: What happens to the replication slot if the Celery task is down for an extended period?
Postgres will keep accumulating WAL for the slot indefinitely. If the task is down long enough, this can fill the disk and cause the database to stop accepting writes entirely. This is the most operationally dangerous failure mode. Monitoring pg_replication_slots for lag_bytes or confirmed_flush_lsn staleness is strongly recommended, with an alert threshold well before disk saturation.

Q: What does "advancing the slot" actually mean?
Calling pg_replication_slot_advance(slot_name, lsn) tells Postgres that all WAL up to and including the given LSN has been successfully processed and can be discarded. Until this call is made, Postgres must retain that WAL. The pipeline advances the slot only after a successful commit to ft_service_stats, ensuring no changes are lost if the task crashes mid-flight.

Q: Can two replication slots be created for the same purpose?
Yes, but each slot independently tracks its own position and holds its own WAL. Running two consumers from two slots would result in double-counting in ft_service_stats. The single named slot combined with the advisory lock is the correct pattern for exactly-once processing semantics.

Q: What is the upto_nchanges cap for?
It bounds how much work a single task invocation does. At 10,000 changes per run on a 1-second schedule, the pipeline can sustain ~10,000 changes/second under steady state. If backlog builds up after an outage, successive runs drain it in batches of 10,000 rather than attempting to process the entire backlog in one potentially very slow transaction.

Q: Why is the slot created in a migration rather than application startup code?
Migrations run exactly once and are tracked in the alembic revision chain. Creating the slot in application startup code would risk creating it on every deploy or requiring idempotency guards. A migration is simpler and safer.


REPLICA IDENTITY

Q: Why does REPLICA IDENTITY USING INDEX require the index to be UNIQUE?
Postgres enforces this because a non-unique index cannot uniquely identify a row. REPLICA IDENTITY is used by logical replication to locate the corresponding row when applying changes — it must resolve to exactly one row. Without uniqueness, the identity would be ambiguous.

Q: What does the oldkeys field actually contain after this migration?
Before the migration, oldkeys in wal2json UPDATE/DELETE events contains only {"keynames": ["id"], "keyvalues": ["<uuid>"]}. After setting REPLICA IDENTITY USING INDEX ix_notifications_id_notification_status, it contains {"keynames": ["id", "notification_status"], "keyvalues": ["<uuid>", "sending"]}. This is what enables the pipeline to know which status bucket to decrement on a status transition.

Q: Does REPLICA IDENTITY USING INDEX affect INSERT events?
No. INSERT events always carry the full new row in columnnames/columnvalues regardless of replica identity setting. The replica identity only controls what appears in oldkeys for UPDATE and DELETE events.

Q: Could notification_status be nullable in production data before this migration?
The migration explicitly sets NOT NULL on notification_status before creating the unique index, because PostgreSQL does not allow REPLICA IDENTITY USING INDEX on an index that includes nullable columns. If any existing rows had a NULL status, the ALTER TABLE ... SET NOT NULL would fail — a data audit or backfill would be needed first.


The Celery Task and Processing Logic

Q: What happens if the advisory lock is never released — for example if the Postgres connection dies?
pg_try_advisory_lock acquires a session-level advisory lock. Session-level locks are automatically released when the database connection closes. If the worker process crashes hard, Postgres releases the lock as soon as the connection drops, allowing the next task invocation to acquire it and proceed.

Q: Why use exponential backoff with only 3 retries?
Three retries covers transient failures (brief DB unavailability, connection blip) without queuing excessive retry work. The exponential backoff (1s, 2s, 4s) gives the database time to recover. After three failures the error is logged and re-raised, marking the Celery task as failed and surfacing it in monitoring. The next scheduled run (1 second later) will attempt again fresh regardless.

Q: What if an UPDATE event is missing both current_row_data status and previous_row_data status?
_build_dimensions() returns None if any required field (including notification_status) is absent from both sides of the change. The change is counted as ignored_changes and skipped. This is a safe degradation — the counts for that notification will be slightly off but no exception is thrown and the slot continues advancing.

Q: Can deletes from notifications result in a negative count in ft_service_stats?
In theory, if a notification is deleted and there was no corresponding insert event in the slot (e.g. the slot was created after the notification was inserted), the delta would be -1 applied against a row that might not exist. The GREATEST(count + delta, 0) guard handles this: if no row exists the UPDATE matches zero rows and is a no-op; if a row exists with count 0 it stays at 0.

Q: Why are deletes from notification_history ignored?
notification_history is an archive table. Rows are moved there from notifications — when that happens, the DELETE from notifications is processed (decrement the old status) and the INSERT into notification_history is ignored to avoid double-counting. Deletes from notification_history represent permanent purges of historical data and should not affect live aggregate counts.

Q: The task runs every second — won't that create a lot of empty runs?
Yes, when the system is idle most runs will find zero changes and exit immediately after a single pg_logical_slot_peek_changes call that returns nothing. This is fast (sub-millisecond) and logged as "No replication slot changes found". The 1-second cadence is intentional for near-real-time behaviour; the cost of empty runs is negligible.


The DAO and ft_service_stats

Q: Why split positive and negative deltas into two different SQL strategies?
A single INSERT ... ON CONFLICT DO UPDATE cannot express GREATEST(count + delta, 0) in the conflict clause while also inserting a new row with a negative initial count. Splitting the logic is cleaner: UPSERTs handle the "create or increment" case; bounded UPDATEs handle the "decrement but floor at zero" case. A row with a negative initial count is never meaningful, so the insert path for negative deltas is correctly omitted.

Q: Is the UPSERT truly atomic? Could two concurrent workers both insert and end up with the count doubled?
Yes, it is atomic at the database level. The advisory lock prevents two workers from processing the slot simultaneously, so concurrent UPSERT races from the replication task itself cannot occur. If other code paths also call apply_service_stats_delta concurrently, the ON CONFLICT DO UPDATE serialises conflicting writes correctly and applies both increments.

Q: Why does the table use a surrogate UUID primary key rather than a composite PK on the four dimensions?
The unique constraint on the four dimensions already enforces deduplication and serves as the conflict target for UPSERT. A surrogate PK is simpler for ORM usage and any tooling that assumes a single-column primary key. The tradeoff is one extra index, which is acceptable given how small the table stays.

Q: How large will ft_service_stats grow?
The number of rows is bounded by the cardinality of (service_id, template_id, notification_type, notification_status). With a finite number of services, templates, notification types (3), and statuses (~12), the table stays small regardless of notification volume. Even at thousands of services with hundreds of templates each, the row count remains in the low millions at most — trivially small compared to notifications.


Operations and Monitoring

Q: How do I know if the pipeline has fallen behind?
Query pg_replication_slots for the slot's confirmed_flush_lsn and compare it to pg_current_wal_lsn(). A growing gap indicates the consumer is behind. The Celery task also logs changes_count on every successful run; a sustained count approaching 10,000 (the upto_nchanges cap) on consecutive runs suggests the pipeline is working through backlog.

Q: How do I reset ft_service_stats if the counts become corrupted?
Truncate ft_service_stats, drop and recreate the replication slot, then let the pipeline rebuild from current writes. This produces counts reflecting only notifications created or updated after the slot is recreated. For a full historical backfill, seed ft_service_stats with a one-time COUNT query against notifications before restarting the pipeline.

@quis quis marked this pull request as draft May 20, 2026 13:30
@DilwoarH DilwoarH force-pushed the wal2json-proof-of-concept branch 2 times, most recently from a6ed862 to 2d13025 Compare May 22, 2026 10:30
@DilwoarH DilwoarH changed the title [WIP] Wal2JSON proof of concept [DO NOT MERGE] Wal2JSON proof of concept May 25, 2026
DilwoarH added 26 commits May 29, 2026 15:01
…ng previous notification statuses and updating service stats accordingly
…al statuses and updating status breakdown in response
…l status handling and update test assertions accordingly
- Consolidated service stats operations into a single function `apply_service_stats_delta` to handle both increments and decrements.
- Removed deprecated functions for inserting, deleting, and updating service stats.
- Introduced a new performance testing endpoint to simulate notification load, allowing for bulk inserts and updates of notifications.
- Updated replication change utilities to include `nextlsn` in parsed rows and adjusted related tests accordingly.
- Enhanced tests for service stats DAO to validate the new delta application logic.
- Refactored replication slot change processing tests to align with the new structure and ensure proper handling of changes.
…d update related indices; remove obsolete migrations for notifications replica identity
@DilwoarH DilwoarH force-pushed the wal2json-proof-of-concept branch from 7e45faf to aeb7541 Compare May 29, 2026 14:07
@DilwoarH

DilwoarH commented Jun 4, 2026

Copy link
Copy Markdown
Contributor Author

Superseded by #4873

@DilwoarH DilwoarH closed this Jun 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant