diff --git a/app/__init__.py b/app/__init__.py index 0c1a19a58e..4223fd33e0 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -248,6 +248,7 @@ def register_blueprint(application): from app.provider_details.rest import ( provider_details as provider_details_blueprint, ) + from app.replication.rest import replication_blueprint from app.service.callback_rest import service_callback_blueprint from app.service.rest import service_blueprint from app.service_invite.rest import ( @@ -374,6 +375,9 @@ def ensure_user_id_attribute_before_request(): one_click_unsubscribe_blueprint.before_request(requires_no_auth) application.register_blueprint(one_click_unsubscribe_blueprint) + replication_blueprint.before_request(requires_no_auth) + application.register_blueprint(replication_blueprint) + if application.config["REGISTER_FUNCTIONAL_TESTING_BLUEPRINT"]: test_blueprint.before_request(requires_functional_test_auth) application.register_blueprint(test_blueprint) diff --git a/app/celery/process_replication_slot_changes.py b/app/celery/process_replication_slot_changes.py new file mode 100644 index 0000000000..357c86b745 --- /dev/null +++ b/app/celery/process_replication_slot_changes.py @@ -0,0 +1,272 @@ +from collections import Counter +from datetime import date, datetime +from uuid import UUID + +from notifications_utils.timezones import convert_utc_to_bst +from sqlalchemy import text + +from app import current_app, db, notify_celery +from app.cronitor import cronitor +from app.dao.fact_service_stats_dao import ServiceStatsDimensions, apply_service_stats_delta +from app.replication.replication_changes_utils import ( + ParsedRow, + RowData, + get_notification_status, + get_replication_changes, + get_str_value, +) + +REPLICATION_SLOT_NAME = "notify_dashboard_replication_slot" +REPLICATION_SLOT_UPTO_NCHANGES = 10_000 +REPLICATION_ADVISORY_LOCK_ID = 4_009_881 +NIL_UUID = UUID("00000000-0000-0000-0000-000000000000") + +type FullDimensions = tuple[date, UUID, UUID, UUID, str, str, str] +type ServiceStatsDimensionsKey = tuple[UUID, UUID, str, str] + + +# 1. Task entrypoint for consuming replication slot changes and applying aggregate deltas +# into the service stats table. This task is lock-guarded so only one worker processes +# and advances the slot at a time. +@notify_celery.task(bind=True, name="check-replication-slot-changes") +@cronitor("check-replication-slot-changes") +def check_replication_slot_changes(self): + lock_acquired = False + try: + with current_app.app_context(): + lock_acquired = _try_advisory_lock(REPLICATION_ADVISORY_LOCK_ID) + if not lock_acquired: + current_app.logger.info( + "Replication slot lock not acquired", + extra={"celery_task": "check-replication-slot-changes"}, + ) + return + + changes = get_replication_changes( + peek=True, + slot_name=REPLICATION_SLOT_NAME, + upto_nchanges=REPLICATION_SLOT_UPTO_NCHANGES, + table_names=("public.notifications", "public.notification_history"), + include_lsn=True, + format_version=1, + include_types=False, + include_typmod=False, + ) + fetched_changes = len(changes) + + if fetched_changes == 0: + current_app.logger.info( + "No replication slot changes found", + extra={"celery_task": "check-replication-slot-changes", "changes_count": 0}, + ) + return + + counter, processed_changes, ignored_changes, last_nextlsn = _build_counter_from_changes(changes) + service_stats_deltas = _roll_up_service_stats_deltas(counter) + + for service_stats_key, delta in service_stats_deltas.items(): + if delta == 0: + continue + + service_id, template_id, notification_type, notification_status = service_stats_key + dimensions: ServiceStatsDimensions = { + "service_id": service_id, + "template_id": template_id, + "notification_type": notification_type, + "notification_status": notification_status, + } + apply_service_stats_delta(dimensions, delta) + + db.session.commit() + if last_nextlsn: + _advance_replication_slot(last_nextlsn) + + current_app.logger.info( + "Replication slot changes processed", + extra={ + "celery_task": "check-replication-slot-changes", + "changes_count": fetched_changes, + "processed_changes": processed_changes, + "ignored_changes": ignored_changes, + "service_stats_delta_buckets": len(service_stats_deltas), + }, + ) + except Exception as exc: + db.session.rollback() + retry_count = self.request.retries + if retry_count < 3: + raise self.retry(exc=exc, countdown=2**retry_count) from exc + + current_app.logger.error( + "Replication slot query failed after 3 retries", + exc_info=True, + extra={"celery_task": "check-replication-slot-changes"}, + ) + raise + finally: + if lock_acquired: + try: + _advisory_unlock(REPLICATION_ADVISORY_LOCK_ID) + except Exception: + current_app.logger.exception( + "Failed to release advisory lock", + extra={"celery_task": "check-replication-slot-changes"}, + ) + + +# 2. Attempt to acquire a Postgres advisory lock used to serialize replication processing. +# Returns True when this worker owns the lock and can proceed safely. +def _try_advisory_lock(lock_id: int) -> bool: + return bool(db.session.execute(text("SELECT pg_try_advisory_lock(:lock_id)"), {"lock_id": lock_id}).scalar()) + + +# 3. Parse a list of WAL-derived row changes into fine-grained stats deltas keyed by full +# dimensions. Also tracks processing counters and the latest LSN that was seen. +def _build_counter_from_changes(changes: list[ParsedRow]) -> tuple[Counter[FullDimensions], int, int, str | None]: + counter: Counter[FullDimensions] = Counter() + processed_changes = 0 + ignored_changes = 0 + last_nextlsn: str | None = None + + for change in changes: + table_name = change["table"] + change_type = change["type"] + if change["nextlsn"]: + last_nextlsn = change["nextlsn"] + + if table_name not in {"notifications", "notification_history"}: + ignored_changes += 1 + continue + + if change_type == "insert": + dimensions = _build_dimensions(change, use_previous_row=False) + if not dimensions: + ignored_changes += 1 + continue + counter[dimensions] += 1 + processed_changes += 1 + continue + + if change_type == "update": + updated = False + new_dimensions = _build_dimensions(change, use_previous_row=False) + if new_dimensions: + counter[new_dimensions] += 1 + updated = True + + old_dimensions = _build_dimensions(change, use_previous_row=True) + if old_dimensions: + counter[old_dimensions] -= 1 + updated = True + + if not updated: + ignored_changes += 1 + continue + + processed_changes += 1 + continue + + if change_type == "delete": + if table_name == "notification_history": + ignored_changes += 1 + continue + + dimensions = _build_dimensions(change, use_previous_row=True) + if not dimensions: + ignored_changes += 1 + continue + + counter[dimensions] -= 1 + processed_changes += 1 + continue + + ignored_changes += 1 + + return counter, processed_changes, ignored_changes, last_nextlsn + + +# 4. Build the complete per-notification dimensions tuple from either the current row or +# previous row image, with fallback across both when one side lacks a value. +def _build_dimensions(change: ParsedRow, *, use_previous_row: bool) -> FullDimensions | None: + if use_previous_row: + row_data = change["previous_row_data"] + fallback_data = change["current_row_data"] + else: + row_data = change["current_row_data"] + fallback_data = change["previous_row_data"] + + service_id = _parse_uuid_value(row_data, "service_id") or _parse_uuid_value(fallback_data, "service_id") + template_id = _parse_uuid_value(row_data, "template_id") or _parse_uuid_value(fallback_data, "template_id") + notification_type = get_str_value(row_data, "notification_type") or get_str_value( + fallback_data, "notification_type" + ) + job_id = _parse_uuid_value(row_data, "job_id") or _parse_uuid_value(fallback_data, "job_id") or NIL_UUID + key_type = get_str_value(row_data, "key_type") or get_str_value(fallback_data, "key_type") + notification_status = get_notification_status(row_data) or get_notification_status(fallback_data) + created_at = _parse_datetime_value(row_data, "created_at") or _parse_datetime_value(fallback_data, "created_at") + + if not service_id or not template_id or not notification_type or not key_type or not notification_status or not created_at: + return None + + return ( + convert_utc_to_bst(created_at).date(), + template_id, + service_id, + job_id, + notification_type, + key_type, + notification_status, + ) + + +# 5. Safely parse a UUID field from row data. Invalid or missing values are treated as None +# so malformed entries do not break replication processing. +def _parse_uuid_value(row_data: RowData, key: str) -> UUID | None: + raw_value = get_str_value(row_data, key) + if not raw_value: + return None + + try: + return UUID(raw_value) + except ValueError: + return None + + +# 6. Parse ISO-like datetime values from replication payloads, normalizing trailing "Z" +# to a UTC offset format accepted by datetime.fromisoformat. +def _parse_datetime_value(row_data: RowData, key: str) -> datetime | None: + raw_value = get_str_value(row_data, key) + if not raw_value: + return None + + normalized = raw_value.replace("Z", "+00:00") + try: + return datetime.fromisoformat(normalized) + except ValueError: + return None + + +# 7. Collapse full-dimension deltas down to service stats dimensions used by +# app.dao.fact_service_stats_dao.apply_service_stats_delta. +def _roll_up_service_stats_deltas(counter: Counter[FullDimensions]) -> Counter[ServiceStatsDimensionsKey]: + deltas: Counter[ServiceStatsDimensionsKey] = Counter() + for dimensions, delta in counter.items(): + _, template_id, service_id, _, notification_type, _, notification_status = dimensions + deltas[(service_id, template_id, notification_type, notification_status)] += delta + + return deltas + + +# 8. Advance the logical replication slot after successful commit so processed changes are +# acknowledged and are not replayed on the next task run. +def _advance_replication_slot(lsn: str) -> None: + db.session.execute( + text("SELECT pg_replication_slot_advance(:slot_name, :lsn)"), + {"slot_name": REPLICATION_SLOT_NAME, "lsn": lsn}, + ) + + +# 9. Release the advisory lock acquired at task start. This is called in finally to avoid +# lock leaks when errors or retries occur. +def _advisory_unlock(lock_id: int) -> None: + db.session.execute(text("SELECT pg_advisory_unlock(:lock_id)"), {"lock_id": lock_id}) diff --git a/app/config.py b/app/config.py index 6c84d483f0..7857630697 100644 --- a/app/config.py +++ b/app/config.py @@ -492,6 +492,11 @@ class Config: "schedule": crontab(hour=9, minute=0, day_of_week="wed", day_of_month="2-8"), "options": {"queue": QueueNames.PERIODIC}, }, + "check-replication-slot-changes": { + "task": "check-replication-slot-changes", + "schedule": timedelta(seconds=1), + "options": {"queue": QueueNames.PERIODIC}, + }, }, } diff --git a/app/dao/fact_service_stats_dao.py b/app/dao/fact_service_stats_dao.py new file mode 100644 index 0000000000..1fe3086996 --- /dev/null +++ b/app/dao/fact_service_stats_dao.py @@ -0,0 +1,85 @@ +import uuid +from typing import TypedDict +from uuid import UUID + +from sqlalchemy import func +from sqlalchemy.dialects.postgresql import insert + +from app import db +from app.models import FactServiceStats + + +class ServiceStatsDimensions(TypedDict): + service_id: UUID + template_id: UUID + notification_type: str + notification_status: str + + +# 1. Public write API used by callers to apply a single aggregated delta into +# service statistics for a specific dimensions tuple. +def apply_service_stats_delta(dimensions: ServiceStatsDimensions, delta: int) -> None: + _update_service_stats_count(dimensions, delta) + + +# 2. Internal persistence routine that applies the delta with UPSERT behavior for +# positive changes and bounded decrement behavior for negative changes. +def _update_service_stats_count(dimensions: ServiceStatsDimensions, delta: int) -> None: + if delta == 0: + return + + dimension_values = { + "service_id": dimensions["service_id"], + "template_id": dimensions["template_id"], + "notification_type": dimensions["notification_type"], + "notification_status": dimensions["notification_status"], + } + filters = ( + FactServiceStats.service_id == dimension_values["service_id"], + FactServiceStats.template_id == dimension_values["template_id"], + FactServiceStats.notification_type == dimension_values["notification_type"], + FactServiceStats.notification_status == dimension_values["notification_status"], + ) + + if delta > 0: + stmt = insert(FactServiceStats).values( + id=uuid.uuid4(), + **dimension_values, + count=delta, + ) + stmt = stmt.on_conflict_do_update( + constraint="uix_ft_service_stats_dimensions", + set_={ + "count": FactServiceStats.count + delta, + }, + ) + db.session.execute(stmt) + else: + ( + db.session.query(FactServiceStats) + .filter(*filters) + .update( + { + "count": func.greatest(FactServiceStats.count + delta, 0), + }, + synchronize_session=False, + ) + ) + + +# 3. Public read API that returns all stats rows for a single service, with a +# defensive guard to avoid querying when no service id is provided. +def dao_fetch_stats_for_service(service_id: UUID) -> list[FactServiceStats]: + """ + Fetch service stats for a specific service. + + Args: + service_id: UUID of the service to fetch stats for + + Returns: + List of FactServiceStats records for the specified service + """ + if not service_id: + return [] + + return db.session.query(FactServiceStats).filter(FactServiceStats.service_id == service_id).all() diff --git a/app/dao/service_stats_dao.py b/app/dao/service_stats_dao.py new file mode 100644 index 0000000000..15f5601ea6 --- /dev/null +++ b/app/dao/service_stats_dao.py @@ -0,0 +1,85 @@ +import uuid +from typing import TypedDict +from uuid import UUID + +from sqlalchemy import func +from sqlalchemy.dialects.postgresql import insert + +from app import db +from app.models import ServiceStats + + +class ServiceStatsDimensions(TypedDict): + service_id: UUID + template_id: UUID + notification_type: str + notification_status: str + + +# 1. Public write API used by callers to apply a single aggregated delta into +# service statistics for a specific dimensions tuple. +def apply_service_stats_delta(dimensions: ServiceStatsDimensions, delta: int) -> None: + _update_service_stats_count(dimensions, delta) + + +# 2. Internal persistence routine that applies the delta with UPSERT behavior for +# positive changes and bounded decrement behavior for negative changes. +def _update_service_stats_count(dimensions: ServiceStatsDimensions, delta: int) -> None: + if delta == 0: + return + + dimension_values = { + "service_id": dimensions["service_id"], + "template_id": dimensions["template_id"], + "notification_type": dimensions["notification_type"], + "notification_status": dimensions["notification_status"], + } + filters = ( + ServiceStats.service_id == dimension_values["service_id"], + ServiceStats.template_id == dimension_values["template_id"], + ServiceStats.notification_type == dimension_values["notification_type"], + ServiceStats.notification_status == dimension_values["notification_status"], + ) + + if delta > 0: + stmt = insert(ServiceStats).values( + id=uuid.uuid4(), + **dimension_values, + count=delta, + ) + stmt = stmt.on_conflict_do_update( + constraint="uix_ft_service_stats_dimensions", + set_={ + "count": ServiceStats.count + delta, + }, + ) + db.session.execute(stmt) + else: + ( + db.session.query(ServiceStats) + .filter(*filters) + .update( + { + "count": func.greatest(ServiceStats.count + delta, 0), + }, + synchronize_session=False, + ) + ) + + +# 3. Public read API that returns all stats rows for a single service, with a +# defensive guard to avoid querying when no service id is provided. +def dao_fetch_stats_for_service(service_id: UUID) -> list[ServiceStats]: + """ + Fetch service stats for a specific service. + + Args: + service_id: UUID of the service to fetch stats for + + Returns: + List of ServiceStats records for the specified service + """ + if not service_id: + return [] + + return db.session.query(ServiceStats).filter(ServiceStats.service_id == service_id).all() \ No newline at end of file diff --git a/app/models.py b/app/models.py index e4f670d236..e1bd965a33 100644 --- a/app/models.py +++ b/app/models.py @@ -2414,6 +2414,44 @@ class FactNotificationStatus(db.Model): ) +class FactServiceStats(db.Model): + __tablename__ = "ft_service_stats" + + id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + service_id = db.Column(UUID(as_uuid=True), db.ForeignKey("services.id"), nullable=False) + template_id = db.Column(UUID(as_uuid=True), db.ForeignKey("templates.id"), nullable=False) + notification_type = db.Column(notification_types, nullable=False) + notification_status = db.Column(db.Text, db.ForeignKey("notification_status_types.name"), nullable=False) + count = db.Column(db.Integer(), nullable=False, default=0, server_default=db.text("0")) + + __table_args__ = ( + UniqueConstraint( + "service_id", + "template_id", + "notification_type", + "notification_status", + name="uix_ft_service_stats_dimensions", + ), + Index( + "ix_ft_svc_stats_svc_ntype_nstatus", + "service_id", + "notification_type", + "notification_status", + ), + Index( + "ix_ft_svc_stats_tmpl_ntype_nstatus", + "template_id", + "notification_type", + "notification_status", + ), + Index( + "ix_ft_service_stats_service_id_template_id", + "service_id", + "template_id", + ), + ) + + class FactProcessingTime(db.Model): __tablename__ = "ft_processing_time" diff --git a/app/replication/performance_test.py b/app/replication/performance_test.py new file mode 100644 index 0000000000..fde3bcad8d --- /dev/null +++ b/app/replication/performance_test.py @@ -0,0 +1,199 @@ +from datetime import datetime +import random +from uuid import UUID, uuid4 + +from flask import jsonify, request + +from app import db +from app.constants import ( + EMAIL_TYPE, + KEY_TYPE_NORMAL, + LETTER_TYPE, + NOTIFICATION_DELIVERED, + NOTIFICATION_SENDING, + NOTIFICATION_SENT, + SMS_TYPE, +) +from app.models import Notification, Template + +MAX_NOTIFICATION_COUNT = 50000 +MAX_UPDATES_PER_NOTIFICATION = 5 +DEFAULT_NOTIFICATION_COUNT = 1000 +DEFAULT_UPDATES_PER_NOTIFICATION = 2 +DEFAULT_RETURNED_IDS = 20 + + +def simulate_notification_load(): + payload = request.get_json(silent=True) or {} + + notification_count, error = _parse_positive_int( + payload.get("notification_count", DEFAULT_NOTIFICATION_COUNT), + "notification_count", + max_value=MAX_NOTIFICATION_COUNT, + ) + if error: + return jsonify({"message": error}), 400 + + updates_per_notification, error = _parse_positive_int( + payload.get("updates_per_notification", DEFAULT_UPDATES_PER_NOTIFICATION), + "updates_per_notification", + max_value=MAX_UPDATES_PER_NOTIFICATION, + ) + if error: + return jsonify({"message": error}), 400 + + template, error = _resolve_template(payload) + if error: + return jsonify({"message": error}), 400 + + random_seed = payload.get("random_seed") + randomizer = random.Random(random_seed) if random_seed is not None else random.Random() + + notification_ids, status_breakdown = _insert_and_update_notifications( + template=template, + notification_count=notification_count, + updates_per_notification=updates_per_notification, + randomizer=randomizer, + ) + returned_id_count = min(len(notification_ids), DEFAULT_RETURNED_IDS) + total_updates = notification_count * updates_per_notification + + return ( + jsonify( + { + "message": "notification send/update load inserted into notifications table", + "notification_count": notification_count, + "updates_per_notification": updates_per_notification, + "inserted_count": notification_count, + "updated_count": total_updates, + "service_id": str(template.service_id), + "template_id": str(template.id), + "template_version": template.version, + "status_breakdown": status_breakdown, + "inserted_notification_ids": notification_ids[:returned_id_count], + } + ), + 200, + ) + + +def _parse_positive_int(value, field_name, max_value): + try: + parsed = int(value) + except (TypeError, ValueError): + return None, f"{field_name} must be an integer" + + if parsed < 1: + return None, f"{field_name} must be greater than 0" + + if parsed > max_value: + return None, f"{field_name} must be less than or equal to {max_value}" + + return parsed, None + + +def _resolve_template(payload): + template_id = payload.get("template_id") + service_id = payload.get("service_id") + + query = Template.query + if template_id: + try: + query = query.filter(Template.id == UUID(str(template_id))) + except (ValueError, TypeError): + return None, "template_id must be a valid UUID" + + if service_id: + try: + query = query.filter(Template.service_id == UUID(str(service_id))) + except (ValueError, TypeError): + return None, "service_id must be a valid UUID" + + template = query.order_by(Template.created_at.asc()).first() + if not template: + return None, "No template found for the provided service/template filters" + + return template, None + + +def _insert_and_update_notifications(template, notification_count, updates_per_notification, randomizer): + inserted_notifications = [] + status_breakdown = {} + now = datetime.utcnow() + + for index in range(notification_count): + terminal_status = _pick_terminal_status(template.template_type, randomizer) + status_path = _build_status_path(terminal_status, updates_per_notification) + + notification = Notification( + id=uuid4(), + to=_build_recipient(template.template_type, index), + service_id=template.service_id, + template_id=template.id, + template_version=template.version, + key_type=KEY_TYPE_NORMAL, + notification_type=template.template_type, + created_at=now, + status=NOTIFICATION_SENDING, + billable_units=1, + postage="second" if template.template_type == LETTER_TYPE else None, + rate_multiplier=1 if template.template_type == SMS_TYPE else None, + ) + db.session.add(notification) + inserted_notifications.append((notification, status_path)) + + db.session.flush() + + for notification, status_path in inserted_notifications: + for status in status_path: + notification.status = status + notification.updated_at = datetime.utcnow() + if status in {NOTIFICATION_SENT, NOTIFICATION_DELIVERED}: + notification.sent_at = datetime.utcnow() + + status_breakdown[notification.status] = status_breakdown.get(notification.status, 0) + 1 + + db.session.commit() + + return [str(notification.id) for notification, _ in inserted_notifications], status_breakdown + + +def _pick_terminal_status(template_type, randomizer): + if template_type == LETTER_TYPE: + statuses = [NOTIFICATION_DELIVERED, NOTIFICATION_SENT] + weights = [85, 15] + else: + statuses = [NOTIFICATION_DELIVERED, NOTIFICATION_SENT] + weights = [80, 20] + + return randomizer.choices(statuses, weights=weights, k=1)[0] + + +def _build_status_path(terminal_status, updates_per_notification): + if updates_per_notification <= 0: + return [] + + if updates_per_notification == 1: + return [terminal_status] + + if terminal_status == NOTIFICATION_DELIVERED: + base_path = [NOTIFICATION_SENT, NOTIFICATION_DELIVERED] + elif terminal_status == NOTIFICATION_SENT: + base_path = [NOTIFICATION_SENT] + else: + base_path = [NOTIFICATION_SENT, terminal_status] + + if len(base_path) >= updates_per_notification: + return base_path[:updates_per_notification] + + return base_path + [base_path[-1]] * (updates_per_notification - len(base_path)) + + +def _build_recipient(template_type, index): + if template_type == SMS_TYPE: + return f"+447700900{index % 1000:03d}" + if template_type == EMAIL_TYPE: + return f"load-test-{index}@example.com" + if template_type == LETTER_TYPE: + return "Load Test User\n1 Test Street\nTest City\nSW1A 1AA" + return f"load-test-{index}@example.com" \ No newline at end of file diff --git a/app/replication/replication_changes_utils.py b/app/replication/replication_changes_utils.py new file mode 100644 index 0000000000..d5bd5107af --- /dev/null +++ b/app/replication/replication_changes_utils.py @@ -0,0 +1,221 @@ +import json +from typing import TypedDict, cast + +from sqlalchemy import text + +from app import db + +type JsonPrimitive = str | int | float | bool | None +type JsonValue = JsonPrimitive | dict[str, "JsonValue"] | list["JsonValue"] +type RowData = dict[str, JsonValue] + + +class ReplicationChangeRow(TypedDict): + lsn: str + data: str + + +class OldKeys(TypedDict, total=False): + keynames: list[str] + keyvalues: list[JsonValue] + + +class ReplicationJsonRow(TypedDict, total=False): + kind: str + table: str + columnnames: list[str] + columnvalues: list[JsonValue] + oldkeys: OldKeys + + +class ChangePayload(TypedDict, total=False): + nextlsn: str + change: list[ReplicationJsonRow] + + +class ParsedRow(TypedDict): + type: str + table: str + nextlsn: str | None + current_row_data: RowData + previous_row_data: RowData + + +DEFAULT_CHANGE_ROWS: list[ReplicationJsonRow] = [{}] + + +def _quote_sql_literal(value: str) -> str: + """ + Escape single quotes for safe embedding into SQL string literals. + + This helper performs the standard PostgreSQL escaping rule for single + quotes by doubling them (e.g. `O'Reilly` -> `O''Reilly`). It is used when + constructing the replication function call via a SQL text string. + """ + return value.replace("'", "''") + + +def get_replication_changes( + *, + peek: bool = True, + slot_name: str = "notify_dashboard_replication_slot", + upto_nchanges: int | None = None, + table_names: tuple[str, ...] = ("public.notifications",), + include_lsn: bool | None = None, + format_version: int | None = None, + include_types: bool | None = None, + include_typmod: bool | None = None, +) -> list[ParsedRow]: + """ + Fetch logical replication changes and normalize them into parsed rows. + + The query calls either: + - `pg_logical_slot_peek_changes` (non-destructive read), or + - `pg_logical_slot_get_changes` (consumes changes from the slot) + + Option flags are passed using `wal2json`-compatible key/value arguments. + Each returned change row is expected to contain a JSON payload in `data` + and an optional `lsn` fallback. + + Args: + peek: If True, read changes without advancing the replication slot. + If False, consume and advance the slot. + slot_name: Logical replication slot name to read from. + upto_nchanges: Optional cap for number of changes returned by PostgreSQL. + `None` maps to SQL `NULL` (database default behavior). + table_names: Optional tuple of fully-qualified tables to include. + Passed through `add-tables` option. + include_lsn: Whether to include LSN in output payload (wal2json option). + format_version: wal2json format version. + include_types: Whether to include PostgreSQL type names in output. + include_typmod: Whether to include typmod metadata in output. + + Returns: + Flat list of parsed rows produced from all JSON `change` entries. + """ + options: list[str] = ["pretty-print", "on"] + + if table_names: + options.extend(["add-tables", ",".join(table_names)]) + if include_lsn is not None: + options.extend(["include-lsn", "on" if include_lsn else "off"]) + if format_version is not None: + options.extend(["format-version", str(format_version)]) + if include_types is not None: + options.extend(["include-types", "on" if include_types else "off"]) + if include_typmod is not None: + options.extend(["include-typmod", "on" if include_typmod else "off"]) + + options_sql = "" + if options: + # Build SQL literal list: 'key', 'value', 'key', 'value', ... + options_sql = ",\n " + ",\n ".join( + f"'{_quote_sql_literal(option)}'" for option in options + ) + + # Choose peek/get behavior based on whether changes should be consumed. + function_name = "pg_logical_slot_peek_changes" if peek else "pg_logical_slot_get_changes" + query = f""" + SELECT * FROM {function_name}( + '{_quote_sql_literal(slot_name)}', + NULL, + {upto_nchanges if upto_nchanges is not None else 'NULL'}{options_sql} + ); + """ + + result = db.session.execute(text(query)) + changes = [dict(change) for change in result.mappings().all()] + + # Flatten per-change payload arrays into one list of normalized rows. + parsed_data = [row for change in changes for row in (parse_change_data(cast(ReplicationChangeRow, change)) or [])] + + return parsed_data + + +def parse_change_data(change: ReplicationChangeRow) -> list[ParsedRow] | None: + """ + Parse a single replication result row into normalized parsed rows. + + The database returns a row containing: + - `data`: a JSON document produced by wal2json + - `lsn`: replication position (used as fallback when `nextlsn` is absent) + + If the payload has no `change` entries, `None` is returned so callers can + skip this item naturally in flattening logic. + + Args: + change: Raw row from PostgreSQL logical decoding query. + + Returns: + A list of parsed row dictionaries, or `None` when no change entries + are present. + """ + payload = cast(ChangePayload, json.loads(change["data"])) + nextlsn = payload.get("nextlsn") or change.get("lsn") + raw_changes = payload.get("change", DEFAULT_CHANGE_ROWS) + + if len(raw_changes) == 0: + return None + + return [parse_row_data(row, nextlsn=nextlsn) for row in raw_changes] + + +def parse_row_data(row: ReplicationJsonRow, nextlsn: str | None = None) -> ParsedRow: + """ + Convert one wal2json change item into the internal `ParsedRow` shape. + + For UPDATE/DELETE events, wal2json may include `oldkeys`, which represent + key columns from the previous row state. Those are mapped into + `previous_row_data`. + + Args: + row: One item from wal2json `change` list. + nextlsn: Replication position associated with this change item. + + Returns: + ParsedRow containing event type, table name, LSN, current row values, + and previous key values. + """ + column_names = row.get("columnnames", []) + column_values = row.get("columnvalues", []) + + old_column_names = row.get("oldkeys", {}).get("keynames", []) + old_column_values = row.get("oldkeys", {}).get("keyvalues", []) + + return { + "type": row.get("kind", ""), + "table": row.get("table", ""), + "nextlsn": nextlsn, + "current_row_data": dict(zip(column_names, column_values, strict=False)), + "previous_row_data": dict(zip(old_column_names, old_column_values, strict=False)), + } + + +def get_str_value(row_data: RowData, key: str) -> str | None: + """ + Safely read a string value from a row data mapping. + + Args: + row_data: Parsed row dictionary containing JSON-compatible values. + key: Field name to read. + + Returns: + The string value for `key` when present and of type `str`; otherwise + `None`. + """ + value = row_data.get(key) + if isinstance(value, str): + return value + return None + + +def get_notification_status(row_data: RowData) -> str | None: + """ + Extract notification status from known status fields. + + This helper supports both legacy/current key names and returns the first + available string value in this order: + 1. `notification_status` + 2. `status` + """ + return get_str_value(row_data, "notification_status") or get_str_value(row_data, "status") diff --git a/app/replication/rest.py b/app/replication/rest.py new file mode 100644 index 0000000000..ae62c2293d --- /dev/null +++ b/app/replication/rest.py @@ -0,0 +1,71 @@ +from flask import Blueprint, jsonify + +from app.celery.process_replication_slot_changes import check_replication_slot_changes +from app.dao.fact_service_stats_dao import dao_fetch_stats_for_service +from app.replication.performance_test import simulate_notification_load as performance_test_simulate_notification_load +from app.replication.replication_changes_utils import get_replication_changes +from app.v2.errors import register_errors + +replication_blueprint = Blueprint("replication", __name__, url_prefix="/replication") +register_errors(replication_blueprint) + + +@replication_blueprint.route("/process-slot-changes", methods=["POST"]) +# Trigger immediate processing of pending replication slot changes via the Celery task. +# This endpoint is mainly operational, allowing manual execution for verification +# or recovery scenarios when scheduled processing needs to be nudged. +def trigger_process_replication_slot_changes(): + check_replication_slot_changes() + return jsonify({"message": "check-replication-slot-changes task executed"}), 201 + + +@replication_blueprint.route("/check-slot-changes", methods=["GET"]) +# Inspect replication slot changes in peek mode so callers can review pending records +# without consuming or advancing the slot position. Useful for debugging parser logic +# and validating what data would be processed by the worker task. +def trigger_check_replication_slot_changes(): + changes = get_replication_changes(peek=True) + + return jsonify({"changes": changes}), 200 + + +@replication_blueprint.route("/simulate-notification-load", methods=["POST"]) +# Run a controlled notification-load simulation used for replication performance testing. +# The underlying helper encapsulates test data generation and returns a response payload +# describing the simulated operation outcome. +def simulate_notification_load(): + return performance_test_simulate_notification_load() + + +@replication_blueprint.route("/stats/", methods=["GET"]) +# Return aggregated service statistics grouped by template, notification type, and +# notification status for the requested service id, formatted for API consumers. +def get_service_stats(service_id): + """ + Get service stats for a specific service. + + Path parameter: + - service_id: UUID of the service + + Returns: + - List of stats entries with: + - template_id: UUID of the template + - notification_type: Type of notification (email, sms, letter) + - notification_status: Status of the notification + - count: Number of notifications with this status + """ + stats = dao_fetch_stats_for_service(service_id) + + result = [] + for stat in stats: + result.append({ + "template_id": str(stat.template_id), + "notification_type": stat.notification_type, + "notification_status": stat.notification_status, + "count": stat.count, + }) + + return jsonify({"stats": result}), 200 + + + diff --git a/migrations/.current-alembic-head b/migrations/.current-alembic-head index e306e24ed0..74c92585a5 100644 --- a/migrations/.current-alembic-head +++ b/migrations/.current-alembic-head @@ -1 +1 @@ -0551_drop_ntfcns_failed_idx +0554_create_service_stats diff --git a/migrations/versions/0552_create_replacation_slot.py b/migrations/versions/0552_create_replacation_slot.py new file mode 100644 index 0000000000..f340ecacd9 --- /dev/null +++ b/migrations/versions/0552_create_replacation_slot.py @@ -0,0 +1,22 @@ +""" +Create Date: 2026-05-14T16:39:58 +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +revision = "0552_create_replacation_slot" +down_revision = "0551_drop_ntfcns_failed_idx" + + +def upgrade(): + op.execute( + "SELECT * FROM pg_create_logical_replication_slot('notify_dashboard_replication_slot', 'wal2json');" + ) + + +def downgrade(): + op.execute( + "SELECT * FROM pg_drop_replication_slot('notify_dashboard_replication_slot');" + ) diff --git a/migrations/versions/0553_notifications_id_status_idx.py b/migrations/versions/0553_notifications_id_status_idx.py new file mode 100644 index 0000000000..5c21fefe66 --- /dev/null +++ b/migrations/versions/0553_notifications_id_status_idx.py @@ -0,0 +1,43 @@ +""" +Create Date: 2026-05-19T00:00:00 +""" + +from alembic import op + +revision = "0553_notifications_id_status_idx" +down_revision = "0552_create_replacation_slot" + + +def upgrade(): + op.execute("ALTER TABLE notifications ALTER COLUMN notification_status SET NOT NULL;") + + with op.get_context().autocommit_block(): + op.create_index( + "ix_notifications_id_notification_status", + "notifications", + ["id", "notification_status"], + unique=True, + postgresql_concurrently=True, + ) + + op.execute("ALTER TABLE notifications REPLICA IDENTITY USING INDEX ix_notifications_id_notification_status;") + + +def downgrade(): + op.execute("ALTER TABLE notifications REPLICA IDENTITY DEFAULT;") + + op.execute("ALTER TABLE notifications ALTER COLUMN notification_status DROP NOT NULL;") + + with op.get_context().autocommit_block(): + op.drop_index( + "ix_notifications_id_notification_status", + table_name="notifications", + postgresql_concurrently=True, + ) + op.create_index( + "ix_notifications_id_notification_status", + "notifications", + ["id", "notification_status"], + unique=False, + postgresql_concurrently=True, + ) diff --git a/migrations/versions/0554_create_service_stats.py b/migrations/versions/0554_create_service_stats.py new file mode 100644 index 0000000000..7b698802e8 --- /dev/null +++ b/migrations/versions/0554_create_service_stats.py @@ -0,0 +1,69 @@ +""" +Create service_stats table for aggregated notification status counts. + +Revision ID: 0554_create_service_stats +Revises: 0553_notifications_id_status_idx +Create Date: 2026-05-19 00:00:00 +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +revision = "0554_create_service_stats" +down_revision = "0553_notifications_id_status_idx" + + +def upgrade(): + op.create_table( + "ft_service_stats", + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("service_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("template_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("notification_type", postgresql.ENUM(name="notification_type", create_type=False), nullable=False), + sa.Column("notification_status", sa.Text(), nullable=False), + sa.Column("count", sa.Integer(), nullable=False, server_default=sa.text("0")), + sa.ForeignKeyConstraint(["service_id"], ["services.id"]), + sa.ForeignKeyConstraint(["template_id"], ["templates.id"]), + sa.ForeignKeyConstraint(["notification_status"], ["notification_status_types.name"]), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "service_id", + "template_id", + "notification_type", + "notification_status", + name="uix_ft_service_stats_dimensions", + ), + ) + + op.create_index( + "ix_ft_svc_stats_svc_ntype_nstatus", + "ft_service_stats", + ["service_id", "notification_type", "notification_status"], + unique=False, + ) + op.create_index( + "ix_ft_svc_stats_tmpl_ntype_nstatus", + "ft_service_stats", + ["template_id", "notification_type", "notification_status"], + unique=False, + ) + op.create_index( + "ix_ft_service_stats_service_id_template_id", + "ft_service_stats", + ["service_id", "template_id"], + unique=False, + ) + + +def downgrade(): + op.drop_index("ix_ft_service_stats_service_id_template_id", table_name="ft_service_stats") + op.drop_index( + "ix_ft_svc_stats_tmpl_ntype_nstatus", + table_name="ft_service_stats", + ) + op.drop_index( + "ix_ft_svc_stats_svc_ntype_nstatus", + table_name="ft_service_stats", + ) + op.drop_table("ft_service_stats")