Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
afb4f56
Add migration to create logical replication slot for notifications
DilwoarH May 14, 2026
81ea3e9
Add task to check replication slot changes and update configuration
DilwoarH May 15, 2026
0ed22f6
Add replication blueprint and endpoint to process slot changes
DilwoarH May 15, 2026
829d0fd
Add endpoint to check replication slot changes and corresponding tests
DilwoarH May 18, 2026
96ba58a
Refactor replication blueprint to allow unauthenticated access and en…
DilwoarH May 18, 2026
c364ffa
Implement replication change processing and corresponding tests
DilwoarH May 18, 2026
c19570e
Refactor replication slot change handling to utilize utility function…
DilwoarH May 18, 2026
84fa039
Add migration for notifications_id_status index creation
DilwoarH May 19, 2026
d7f653c
Fix typo in get_replication_changes function and update test to mock …
DilwoarH May 19, 2026
ed048b7
Add migration to create service_stats table for aggregated notificati…
DilwoarH May 19, 2026
3ccd4e6
Refactor type annotations in replication_changes_utils and update tes…
DilwoarH May 19, 2026
527dac6
Implement service stats tracking for notification changes and enhance…
DilwoarH May 19, 2026
1379cc8
Add migration to set REPLICA IDENTITY on notifications table using id…
DilwoarH May 19, 2026
58c8f2c
Enhance replication slot change processing by adding logic for handli…
DilwoarH May 19, 2026
4f96328
Add simulate notification load endpoint with validation and processin…
DilwoarH May 19, 2026
d5ce799
Enhance simulate notification load by adding randomization for termin…
DilwoarH May 20, 2026
e49d992
Add endpoint to fetch service stats and implement data retrieval logic
DilwoarH May 20, 2026
a8b5052
Increase MAX_NOTIFICATION_COUNT to allow processing of larger notific…
DilwoarH May 20, 2026
017cc92
Refactor notification statuses in simulation load to simplify termina…
DilwoarH May 22, 2026
af6607a
Refactor service stats handling and add performance testing
DilwoarH May 22, 2026
0e427b1
Refactor service stats update logic to consolidate increment and decr…
DilwoarH May 22, 2026
4e5f376
Remove obsolete test files for service stats and replication slot cha…
DilwoarH May 22, 2026
d656a38
Add docs
DilwoarH May 22, 2026
b6ceff0
Rename service_stats to ft_service_stats and update related references
DilwoarH May 22, 2026
a46f457
Enhance documentation for replication change handling functions
DilwoarH May 22, 2026
aeb7541
Refactor service_stats migration: rename table to ft_service_stats an…
DilwoarH May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def register_blueprint(application):
from app.provider_details.rest import (
provider_details as provider_details_blueprint,
)
from app.replication.rest import replication_blueprint
from app.service.callback_rest import service_callback_blueprint
from app.service.rest import service_blueprint
from app.service_invite.rest import (
Expand Down Expand Up @@ -374,6 +375,9 @@ def ensure_user_id_attribute_before_request():
one_click_unsubscribe_blueprint.before_request(requires_no_auth)
application.register_blueprint(one_click_unsubscribe_blueprint)

replication_blueprint.before_request(requires_no_auth)
application.register_blueprint(replication_blueprint)

if application.config["REGISTER_FUNCTIONAL_TESTING_BLUEPRINT"]:
test_blueprint.before_request(requires_functional_test_auth)
application.register_blueprint(test_blueprint)
Expand Down
272 changes: 272 additions & 0 deletions app/celery/process_replication_slot_changes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
from collections import Counter
from datetime import date, datetime
from uuid import UUID

from notifications_utils.timezones import convert_utc_to_bst
from sqlalchemy import text

from app import current_app, db, notify_celery
from app.cronitor import cronitor
from app.dao.fact_service_stats_dao import ServiceStatsDimensions, apply_service_stats_delta
from app.replication.replication_changes_utils import (
ParsedRow,
RowData,
get_notification_status,
get_replication_changes,
get_str_value,
)

REPLICATION_SLOT_NAME = "notify_dashboard_replication_slot"
REPLICATION_SLOT_UPTO_NCHANGES = 10_000
REPLICATION_ADVISORY_LOCK_ID = 4_009_881
NIL_UUID = UUID("00000000-0000-0000-0000-000000000000")

type FullDimensions = tuple[date, UUID, UUID, UUID, str, str, str]
type ServiceStatsDimensionsKey = tuple[UUID, UUID, str, str]


# 1. Task entrypoint for consuming replication slot changes and applying aggregate deltas
# into the service stats table. This task is lock-guarded so only one worker processes
# and advances the slot at a time.
@notify_celery.task(bind=True, name="check-replication-slot-changes")
@cronitor("check-replication-slot-changes")
def check_replication_slot_changes(self):
lock_acquired = False
try:
with current_app.app_context():
lock_acquired = _try_advisory_lock(REPLICATION_ADVISORY_LOCK_ID)
if not lock_acquired:
current_app.logger.info(
"Replication slot lock not acquired",
extra={"celery_task": "check-replication-slot-changes"},
)
return

changes = get_replication_changes(
peek=True,
slot_name=REPLICATION_SLOT_NAME,
upto_nchanges=REPLICATION_SLOT_UPTO_NCHANGES,
table_names=("public.notifications", "public.notification_history"),
include_lsn=True,
format_version=1,
include_types=False,
include_typmod=False,
)
fetched_changes = len(changes)

if fetched_changes == 0:
current_app.logger.info(
"No replication slot changes found",
extra={"celery_task": "check-replication-slot-changes", "changes_count": 0},
)
return

counter, processed_changes, ignored_changes, last_nextlsn = _build_counter_from_changes(changes)
service_stats_deltas = _roll_up_service_stats_deltas(counter)

for service_stats_key, delta in service_stats_deltas.items():
if delta == 0:
continue

service_id, template_id, notification_type, notification_status = service_stats_key
dimensions: ServiceStatsDimensions = {
"service_id": service_id,
"template_id": template_id,
"notification_type": notification_type,
"notification_status": notification_status,
}
apply_service_stats_delta(dimensions, delta)

db.session.commit()
if last_nextlsn:
_advance_replication_slot(last_nextlsn)

current_app.logger.info(
"Replication slot changes processed",
extra={
"celery_task": "check-replication-slot-changes",
"changes_count": fetched_changes,
"processed_changes": processed_changes,
"ignored_changes": ignored_changes,
"service_stats_delta_buckets": len(service_stats_deltas),
},
)
except Exception as exc:
db.session.rollback()
retry_count = self.request.retries
if retry_count < 3:
raise self.retry(exc=exc, countdown=2**retry_count) from exc

current_app.logger.error(
"Replication slot query failed after 3 retries",
exc_info=True,
extra={"celery_task": "check-replication-slot-changes"},
)
raise
finally:
if lock_acquired:
try:
_advisory_unlock(REPLICATION_ADVISORY_LOCK_ID)
except Exception:
current_app.logger.exception(
"Failed to release advisory lock",
extra={"celery_task": "check-replication-slot-changes"},
)


# 2. Attempt to acquire a Postgres advisory lock used to serialize replication processing.
# Returns True when this worker owns the lock and can proceed safely.
def _try_advisory_lock(lock_id: int) -> bool:
return bool(db.session.execute(text("SELECT pg_try_advisory_lock(:lock_id)"), {"lock_id": lock_id}).scalar())


# 3. Parse a list of WAL-derived row changes into fine-grained stats deltas keyed by full
# dimensions. Also tracks processing counters and the latest LSN that was seen.
def _build_counter_from_changes(changes: list[ParsedRow]) -> tuple[Counter[FullDimensions], int, int, str | None]:
counter: Counter[FullDimensions] = Counter()
processed_changes = 0
ignored_changes = 0
last_nextlsn: str | None = None

for change in changes:
table_name = change["table"]
change_type = change["type"]
if change["nextlsn"]:
last_nextlsn = change["nextlsn"]

if table_name not in {"notifications", "notification_history"}:
ignored_changes += 1
continue

if change_type == "insert":
dimensions = _build_dimensions(change, use_previous_row=False)
if not dimensions:
ignored_changes += 1
continue
counter[dimensions] += 1
processed_changes += 1
continue

if change_type == "update":
updated = False
new_dimensions = _build_dimensions(change, use_previous_row=False)
if new_dimensions:
counter[new_dimensions] += 1
updated = True

old_dimensions = _build_dimensions(change, use_previous_row=True)
if old_dimensions:
counter[old_dimensions] -= 1
updated = True

if not updated:
ignored_changes += 1
continue

processed_changes += 1
continue

if change_type == "delete":
if table_name == "notification_history":
ignored_changes += 1
continue

dimensions = _build_dimensions(change, use_previous_row=True)
if not dimensions:
ignored_changes += 1
continue

counter[dimensions] -= 1
processed_changes += 1
continue

ignored_changes += 1

return counter, processed_changes, ignored_changes, last_nextlsn


# 4. Build the complete per-notification dimensions tuple from either the current row or
# previous row image, with fallback across both when one side lacks a value.
def _build_dimensions(change: ParsedRow, *, use_previous_row: bool) -> FullDimensions | None:
if use_previous_row:
row_data = change["previous_row_data"]
fallback_data = change["current_row_data"]
else:
row_data = change["current_row_data"]
fallback_data = change["previous_row_data"]

service_id = _parse_uuid_value(row_data, "service_id") or _parse_uuid_value(fallback_data, "service_id")
template_id = _parse_uuid_value(row_data, "template_id") or _parse_uuid_value(fallback_data, "template_id")
notification_type = get_str_value(row_data, "notification_type") or get_str_value(
fallback_data, "notification_type"
)
job_id = _parse_uuid_value(row_data, "job_id") or _parse_uuid_value(fallback_data, "job_id") or NIL_UUID
key_type = get_str_value(row_data, "key_type") or get_str_value(fallback_data, "key_type")
notification_status = get_notification_status(row_data) or get_notification_status(fallback_data)
created_at = _parse_datetime_value(row_data, "created_at") or _parse_datetime_value(fallback_data, "created_at")

if not service_id or not template_id or not notification_type or not key_type or not notification_status or not created_at:
return None

return (
convert_utc_to_bst(created_at).date(),
template_id,
service_id,
job_id,
notification_type,
key_type,
notification_status,
)


# 5. Safely parse a UUID field from row data. Invalid or missing values are treated as None
# so malformed entries do not break replication processing.
def _parse_uuid_value(row_data: RowData, key: str) -> UUID | None:
raw_value = get_str_value(row_data, key)
if not raw_value:
return None

try:
return UUID(raw_value)
except ValueError:
return None


# 6. Parse ISO-like datetime values from replication payloads, normalizing trailing "Z"
# to a UTC offset format accepted by datetime.fromisoformat.
def _parse_datetime_value(row_data: RowData, key: str) -> datetime | None:
raw_value = get_str_value(row_data, key)
if not raw_value:
return None

normalized = raw_value.replace("Z", "+00:00")
try:
return datetime.fromisoformat(normalized)
except ValueError:
return None


# 7. Collapse full-dimension deltas down to service stats dimensions used by
# app.dao.fact_service_stats_dao.apply_service_stats_delta.
def _roll_up_service_stats_deltas(counter: Counter[FullDimensions]) -> Counter[ServiceStatsDimensionsKey]:
deltas: Counter[ServiceStatsDimensionsKey] = Counter()
for dimensions, delta in counter.items():
_, template_id, service_id, _, notification_type, _, notification_status = dimensions
deltas[(service_id, template_id, notification_type, notification_status)] += delta

return deltas


# 8. Advance the logical replication slot after successful commit so processed changes are
# acknowledged and are not replayed on the next task run.
def _advance_replication_slot(lsn: str) -> None:
db.session.execute(
text("SELECT pg_replication_slot_advance(:slot_name, :lsn)"),
{"slot_name": REPLICATION_SLOT_NAME, "lsn": lsn},
)


# 9. Release the advisory lock acquired at task start. This is called in finally to avoid
# lock leaks when errors or retries occur.
def _advisory_unlock(lock_id: int) -> None:
db.session.execute(text("SELECT pg_advisory_unlock(:lock_id)"), {"lock_id": lock_id})
5 changes: 5 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ class Config:
"schedule": crontab(hour=9, minute=0, day_of_week="wed", day_of_month="2-8"),
"options": {"queue": QueueNames.PERIODIC},
},
"check-replication-slot-changes": {
"task": "check-replication-slot-changes",
"schedule": timedelta(seconds=1),
"options": {"queue": QueueNames.PERIODIC},
},
},
}

Expand Down
85 changes: 85 additions & 0 deletions app/dao/fact_service_stats_dao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import uuid
from typing import TypedDict
from uuid import UUID

from sqlalchemy import func
from sqlalchemy.dialects.postgresql import insert

from app import db
from app.models import FactServiceStats


class ServiceStatsDimensions(TypedDict):
service_id: UUID
template_id: UUID
notification_type: str
notification_status: str


# 1. Public write API used by callers to apply a single aggregated delta into
# service statistics for a specific dimensions tuple.
def apply_service_stats_delta(dimensions: ServiceStatsDimensions, delta: int) -> None:
_update_service_stats_count(dimensions, delta)


# 2. Internal persistence routine that applies the delta with UPSERT behavior for
# positive changes and bounded decrement behavior for negative changes.
def _update_service_stats_count(dimensions: ServiceStatsDimensions, delta: int) -> None:
if delta == 0:
return

dimension_values = {
"service_id": dimensions["service_id"],
"template_id": dimensions["template_id"],
"notification_type": dimensions["notification_type"],
"notification_status": dimensions["notification_status"],
}
filters = (
FactServiceStats.service_id == dimension_values["service_id"],
FactServiceStats.template_id == dimension_values["template_id"],
FactServiceStats.notification_type == dimension_values["notification_type"],
FactServiceStats.notification_status == dimension_values["notification_status"],
)

if delta > 0:
stmt = insert(FactServiceStats).values(
id=uuid.uuid4(),
**dimension_values,
count=delta,
)
stmt = stmt.on_conflict_do_update(
constraint="uix_ft_service_stats_dimensions",
set_={
"count": FactServiceStats.count + delta,
},
)
db.session.execute(stmt)
else:
(
db.session.query(FactServiceStats)
.filter(*filters)
.update(
{
"count": func.greatest(FactServiceStats.count + delta, 0),
},
synchronize_session=False,
)
)


# 3. Public read API that returns all stats rows for a single service, with a
# defensive guard to avoid querying when no service id is provided.
def dao_fetch_stats_for_service(service_id: UUID) -> list[FactServiceStats]:
"""
Fetch service stats for a specific service.

Args:
service_id: UUID of the service to fetch stats for

Returns:
List of FactServiceStats records for the specified service
"""
if not service_id:
return []

return db.session.query(FactServiceStats).filter(FactServiceStats.service_id == service_id).all()
Loading
Loading