Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
dao_adjust_provider_priority_back_to_resting_points,
dao_reduce_sms_provider_priority,
)
from app.dao.replication_slot_changes_dao import dao_process_replication_slot_changes
from app.dao.services_dao import (
dao_fetch_service_by_id,
dao_find_services_sending_to_tv_numbers,
Expand Down Expand Up @@ -821,3 +822,8 @@ def populate_annual_billing(year, missing_services_only):
def run_populate_annual_billing():
year = get_current_financial_year_start_year()
populate_annual_billing(year=year, missing_services_only=True)


@notify_celery.task(name="process-replication-slot-changes")
def process_replication_slot_changes():
dao_process_replication_slot_changes()
5 changes: 5 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ class Config:
"schedule": crontab(hour=9, minute=0, day_of_week="wed", day_of_month="2-8"),
"options": {"queue": QueueNames.PERIODIC},
},
"process-replication-slot-changes": {
"task": "process-replication-slot-changes",
"schedule": timedelta(seconds=1),
"options": {"queue": QueueNames.PERIODIC},
},
},
}

Expand Down
85 changes: 85 additions & 0 deletions app/dao/fact_service_stats_dao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import uuid
from typing import TypedDict
from uuid import UUID

from sqlalchemy import func
from sqlalchemy.dialects.postgresql import insert

from app import db
from app.models import FactServiceStats


class ServiceStatsDimensions(TypedDict):
service_id: UUID
template_id: UUID
notification_type: str
notification_status: str


# 1. Public write API used by callers to apply a single aggregated count change into
# service statistics for a specific dimensions tuple.
def apply_service_stats_change(dimensions: ServiceStatsDimensions, change_count: int) -> None:
_update_service_stats_count(dimensions, change_count)


# 2. Internal persistence routine that applies the count change with UPSERT behavior for
# positive changes and bounded decrement behavior for negative changes.
def _update_service_stats_count(dimensions: ServiceStatsDimensions, change_count: int) -> None:
if change_count == 0:
return

dimension_values = {
"service_id": dimensions["service_id"],
"template_id": dimensions["template_id"],
"notification_type": dimensions["notification_type"],
"notification_status": dimensions["notification_status"],
}
filters = (
FactServiceStats.service_id == dimension_values["service_id"],
FactServiceStats.template_id == dimension_values["template_id"],
FactServiceStats.notification_type == dimension_values["notification_type"],
FactServiceStats.notification_status == dimension_values["notification_status"],
)

if change_count > 0:
stmt = insert(FactServiceStats).values(
id=uuid.uuid4(),
**dimension_values,
count=change_count,
)
stmt = stmt.on_conflict_do_update(
constraint="uix_ft_service_stats_dimensions",
set_={
"count": FactServiceStats.count + change_count,
},
)
db.session.execute(stmt)
else:
(
db.session.query(FactServiceStats)
.filter(*filters)
.update(
{
"count": func.greatest(FactServiceStats.count + change_count, 0),
},
synchronize_session=False,
)
)


# 3. Public read API that returns all stats rows for a single service, with a
# defensive guard to avoid querying when no service id is provided.
def dao_fetch_stats_for_service(service_id: UUID) -> list[FactServiceStats]:
"""
Fetch service stats for a specific service.

Args:
service_id: UUID of the service to fetch stats for

Returns:
List of FactServiceStats records for the specified service
"""
if not service_id:
return []

return db.session.query(FactServiceStats).filter(FactServiceStats.service_id == service_id).all()
Loading
Loading