Skip to content

[WIP] Add Service Stats (Wal2JSON implementation)#4873

Open
DilwoarH wants to merge 4 commits into
add-ft-service-stats-db-updatesfrom
add-ft-service-stats
Open

[WIP] Add Service Stats (Wal2JSON implementation)#4873
DilwoarH wants to merge 4 commits into
add-ft-service-stats-db-updatesfrom
add-ft-service-stats

Conversation

@DilwoarH

@DilwoarH DilwoarH commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

What

  • Adds Replication Slot processing (Wal2JSON)
  • Adds ft_service_stats table to hold the count of service notifications by type
  • Adds celery task to process replication slot logs

Why

This is part of the initial implementation to improve the performance of GOVUK Notify Dashboards. The initial stage will be to collect the stats in the background and monitor to ensure the stats are being collected in a reliable way. If it's all good, then dashboard will be switched over to read from the ft_service_stats table.

Untitled-2026-04-23-1037

Commentary on PR

Branch implementation summary

This branch introduces a WAL-driven service stats pipeline:

  1. logical replication slot using wal2json
  2. high-frequency Celery polling of logical changes
  3. transformation into dimension deltas
  4. upsert/decrement into ft_service_stats

Relevant changed files:

  • app/dao/replication_slot_changes_dao.py
  • app/dao/fact_service_stats_dao.py
  • app/celery/scheduled_tasks.py
  • app/config.py
  • app/models.py
  • migrations/versions/0552_create_replacation_slot.py
  • migrations/versions/0553_notifications_id_status_idx.py
  • migrations/versions/0554_create_service_stats.py

Current implementation in this repository

1) Migration 0552 creates a wal2json logical replication slot

Migration migrations/versions/0552_create_replacation_slot.py creates a logical replication slot named notify_dashboard_replication_slot with output plugin wal2json.

It runs:

SELECT * FROM pg_create_logical_replication_slot('notify_dashboard_replication_slot', 'wal2json');

2) Migration 0553 sets replica identity to support old row keys on updates

Migration migrations/versions/0553_notifications_id_status_idx.py does three things on notifications:

  • sets notification_status to NOT NULL
  • creates unique index ix_notifications_id_notification_status
  • sets REPLICA IDENTITY USING INDEX ix_notifications_id_notification_status

This is important because update messages need key material (oldkeys) for correct decrement behavior in aggregates.

3) Migration 0554 creates the target aggregate table

Migration migrations/versions/0554_create_service_stats.py creates ft_service_stats with:

  • dimensions: service_id, template_id, notification_type, notification_status
  • metric: count
  • uniqueness on dimensions (uix_ft_service_stats_dimensions)
  • supporting read indexes

4) Polling runs as a Celery periodic task every second

Celery beat schedules task process-replication-slot-changes every 1 second in app/config.py.

The task is wired in app/celery/scheduled_tasks.py and calls dao_process_replication_slot_changes().

5) DAO reads logical changes via wal2json

Core logic is in app/dao/replication_slot_changes_dao.py.

Key behavior:

  • Uses advisory locking (pg_try_advisory_lock) to prevent overlapping workers.
  • Reads changes with pg_logical_slot_peek_changes (not get), with wal2json options:
    • add-tables: notifications,notification_history
    • include-lsn: true
    • format-version: 1
    • include-types: false
    • include-typmod: false
  • Parses wal2json payload and accepts table changes for:
    • notifications
    • notification_history
  • Builds counters for service stats deltas and applies them via apply_service_stats_change(...).
  • Commits DB work first, then advances slot using:
SELECT pg_replication_slot_advance(:slot_name, :lsn)

The "peek then advance" pattern means changes are only acknowledged after successful processing and commit.

6) Aggregate writes are handled via UPSERT/decrement DAO

Core write path is in app/dao/fact_service_stats_dao.py:

  • positive deltas: INSERT ... ON CONFLICT ... DO UPDATE against uix_ft_service_stats_dimensions
  • negative deltas: bounded decrement with greatest(count + change_count, 0)

It also adds a read helper (dao_fetch_stats_for_service) for one-service lookups.

End-to-end runtime flow

  1. Celery beat triggers process-replication-slot-changes every second.
  2. Worker acquires advisory lock.
  3. Worker peeks wal2json changes for notifications and notification_history.
  4. Worker computes delta counts for dimension tuples.
  5. Worker applies deltas to ft_service_stats.
  6. Worker commits.
  7. Worker advances replication slot to last processed LSN.

AWS setup required

1) Enable logical decoding on the RDS instance

Logical decoding must be enabled in the DB parameter group used by the instance.

Set/verify:

  • rds.logical_replication = 1
  • max_replication_slots to at least 1 (higher if other slots are used)
  • max_wal_senders high enough for your replication workloads

Apply the parameter group and reboot the instance if required.

2) Ensure wal2json is available for your RDS engine version

The slot creation migration depends on output plugin wal2json.

Before deploy, verify on the target DB:

SELECT *
FROM pg_available_extensions
WHERE name = 'wal2json';

If this returns no row, confirm engine version support and extension availability for your RDS PostgreSQL version.

Also verify the slot plugin resolves correctly after migration:

SELECT slot_name, plugin, slot_type
FROM pg_replication_slots
WHERE slot_name = 'notify_dashboard_replication_slot';

3) Permissions and execution role

The identity executing migration and polling must be able to:

  • create/drop logical replication slots
  • read logical slot changes
  • advance replication slots

On RDS, this is typically your master/app role setup with appropriate replication capabilities. Validate in non-prod first.

4) Deploy order

Recommended order:

  1. Apply parameter group changes in AWS and reboot if needed.
  2. Run DB migrations 0552, 0553, 0554 in order.
  3. Deploy app workers with Celery beat/periodic worker enabled.
  4. Validate task logs and replication lag metrics.

5) Post-deploy verification checklist

Run these checks against the DB:

-- Slot exists and is logical/wal2json
SELECT slot_name, plugin, slot_type, active, restart_lsn, confirmed_flush_lsn
FROM pg_replication_slots
WHERE slot_name = 'notify_dashboard_replication_slot';

-- Read a sample from the slot (non-destructive)
SELECT data
FROM pg_logical_slot_peek_changes(
  'notify_dashboard_replication_slot',
  NULL,
  10,
  'add-tables', 'notifications,notification_history',
  'include-lsn', 'true',
  'format-version', '1',
  'include-types', 'false',
  'include-typmod', 'false'
);

-- Target aggregate table is populated
SELECT service_id, template_id, notification_type, notification_status, count
FROM ft_service_stats
ORDER BY count DESC
LIMIT 20;

Also verify app logs show:

  • "Replication slot changes processed"
  • non-zero processed_changes during write traffic

6) Monitoring and alerting in AWS

Create alerts for replication and WAL growth risk, for example:

  • replication slot lag / retained WAL growth
  • TransactionLogsDiskUsage growth
  • DB free storage threshold

If the consumer stops, WAL can accumulate because logical slots retain WAL until acknowledged.

7) Operational runbook notes

  • If slot processing fails repeatedly, fix consumer before manually advancing or dropping slot.
  • Advancing/dropping a slot can skip data and cause permanent stats divergence.
  • Keep only one active consumer for this slot (the advisory lock protects at app level, but infra should still avoid duplicate schedulers where possible).

Current gaps to close

  • There are currently no dedicated tests in this branch for app/dao/replication_slot_changes_dao.py.
  • There are currently no dedicated tests in this branch for app/dao/fact_service_stats_dao.py behavior (upsert/decrement boundaries).

Recommended next step: add focused DAO tests for payload parsing, update transitions, and slot advancement semantics.

Known implementation constants

From app/dao/replication_slot_changes_dao.py:

  • Slot name: notify_dashboard_replication_slot
  • Included tables: notifications, notification_history
  • Batch size per poll: 10_000 changes
  • Advisory lock id: 4009881

@DilwoarH DilwoarH changed the title Add Service Stats (Wal2JSON implementation) [WIP] Add Service Stats (Wal2JSON implementation) Jun 4, 2026
@DilwoarH

DilwoarH commented Jun 4, 2026

Copy link
Copy Markdown
Contributor Author

Tests to be added

@DilwoarH DilwoarH force-pushed the add-ft-service-stats branch from 05ebd0b to 3066614 Compare June 14, 2026 21:35
@DilwoarH DilwoarH changed the base branch from main to add-ft-service-stats-db-updates June 14, 2026 21:41
@DilwoarH DilwoarH force-pushed the add-ft-service-stats-db-updates branch 5 times, most recently from b4b9050 to 71e9afa Compare June 15, 2026 00:24
@DilwoarH DilwoarH force-pushed the add-ft-service-stats branch from 3066614 to ca795e3 Compare June 15, 2026 00:47
@DilwoarH

Copy link
Copy Markdown
Contributor Author

Needs this merged in first: #4881

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant