[WIP] Add Service Stats (Wal2JSON implementation)#4873
Open
DilwoarH wants to merge 4 commits into
Open
Conversation
Contributor
Author
|
Tests to be added |
05ebd0b to
3066614
Compare
b4b9050 to
71e9afa
Compare
3066614 to
ca795e3
Compare
Contributor
Author
|
Needs this merged in first: #4881 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
ft_service_statstable to hold the count of service notifications by typeWhy
This is part of the initial implementation to improve the performance of GOVUK Notify Dashboards. The initial stage will be to collect the stats in the background and monitor to ensure the stats are being collected in a reliable way. If it's all good, then dashboard will be switched over to read from the
ft_service_statstable.Commentary on PR
Branch implementation summary
This branch introduces a WAL-driven service stats pipeline:
ft_service_statsRelevant changed files:
app/dao/replication_slot_changes_dao.pyapp/dao/fact_service_stats_dao.pyapp/celery/scheduled_tasks.pyapp/config.pyapp/models.pymigrations/versions/0552_create_replacation_slot.pymigrations/versions/0553_notifications_id_status_idx.pymigrations/versions/0554_create_service_stats.pyCurrent implementation in this repository
1) Migration 0552 creates a wal2json logical replication slot
Migration migrations/versions/0552_create_replacation_slot.py creates a logical replication slot named
notify_dashboard_replication_slotwith output pluginwal2json.It runs:
2) Migration 0553 sets replica identity to support old row keys on updates
Migration migrations/versions/0553_notifications_id_status_idx.py does three things on
notifications:notification_statustoNOT NULLix_notifications_id_notification_statusREPLICA IDENTITY USING INDEX ix_notifications_id_notification_statusThis is important because update messages need key material (
oldkeys) for correct decrement behavior in aggregates.3) Migration 0554 creates the target aggregate table
Migration migrations/versions/0554_create_service_stats.py creates
ft_service_statswith:service_id,template_id,notification_type,notification_statuscountuix_ft_service_stats_dimensions)4) Polling runs as a Celery periodic task every second
Celery beat schedules task
process-replication-slot-changesevery 1 second in app/config.py.The task is wired in app/celery/scheduled_tasks.py and calls
dao_process_replication_slot_changes().5) DAO reads logical changes via wal2json
Core logic is in app/dao/replication_slot_changes_dao.py.
Key behavior:
pg_try_advisory_lock) to prevent overlapping workers.pg_logical_slot_peek_changes(notget), with wal2json options:add-tables:notifications,notification_historyinclude-lsn: trueformat-version: 1include-types: falseinclude-typmod: falsenotificationsnotification_historyapply_service_stats_change(...).SELECT pg_replication_slot_advance(:slot_name, :lsn)The "peek then advance" pattern means changes are only acknowledged after successful processing and commit.
6) Aggregate writes are handled via UPSERT/decrement DAO
Core write path is in app/dao/fact_service_stats_dao.py:
INSERT ... ON CONFLICT ... DO UPDATEagainstuix_ft_service_stats_dimensionsgreatest(count + change_count, 0)It also adds a read helper (
dao_fetch_stats_for_service) for one-service lookups.End-to-end runtime flow
process-replication-slot-changesevery second.notificationsandnotification_history.ft_service_stats.AWS setup required
1) Enable logical decoding on the RDS instance
Logical decoding must be enabled in the DB parameter group used by the instance.
Set/verify:
rds.logical_replication = 1max_replication_slotsto at least 1 (higher if other slots are used)max_wal_sendershigh enough for your replication workloadsApply the parameter group and reboot the instance if required.
2) Ensure wal2json is available for your RDS engine version
The slot creation migration depends on output plugin
wal2json.Before deploy, verify on the target DB:
If this returns no row, confirm engine version support and extension availability for your RDS PostgreSQL version.
Also verify the slot plugin resolves correctly after migration:
3) Permissions and execution role
The identity executing migration and polling must be able to:
On RDS, this is typically your master/app role setup with appropriate replication capabilities. Validate in non-prod first.
4) Deploy order
Recommended order:
0552,0553,0554in order.5) Post-deploy verification checklist
Run these checks against the DB:
Also verify app logs show:
processed_changesduring write traffic6) Monitoring and alerting in AWS
Create alerts for replication and WAL growth risk, for example:
TransactionLogsDiskUsagegrowthIf the consumer stops, WAL can accumulate because logical slots retain WAL until acknowledged.
7) Operational runbook notes
Current gaps to close
app/dao/replication_slot_changes_dao.py.app/dao/fact_service_stats_dao.pybehavior (upsert/decrement boundaries).Recommended next step: add focused DAO tests for payload parsing, update transitions, and slot advancement semantics.
Known implementation constants
From app/dao/replication_slot_changes_dao.py:
notify_dashboard_replication_slotnotifications,notification_history10_000changes4009881