diff --git a/app/aws/s3.py b/app/aws/s3.py index cf4d7989a6..6c34bfd9ee 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -1,5 +1,8 @@ +from io import BytesIO + import botocore -from boto3 import resource +from boto3 import client, resource +from boto3.s3.transfer import TransferConfig from flask import current_app FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv" @@ -61,3 +64,31 @@ def remove_contact_list_from_s3(service_id, contact_list_id): def remove_s3_object(bucket_name, object_key): obj = get_s3_object(bucket_name, object_key) return obj.delete() + + +def stream_to_s3( + bucket_name, + object_key, + copy_command, + cursor, + multipart_threshold=1024 * 1024 * 10, # 10MB + max_concurrency=10, +): + s3_client = client("s3", current_app.config["AWS_REGION"]) + config = TransferConfig(multipart_threshold=multipart_threshold, max_concurrency=max_concurrency) + + buffer = BytesIO() + buffer.write("\ufeff".encode()) + buffer.seek(0, 2) + + cursor.copy_expert(copy_command, buffer) + + buffer.seek(0) + + s3_client.upload_fileobj( + Fileobj=buffer, + Bucket=bucket_name, + Key=object_key, + Config=config, + ExtraArgs={"ServerSideEncryption": "AES256"}, + ) diff --git a/app/db_copy_utils.py b/app/db_copy_utils.py new file mode 100644 index 0000000000..f00b6ffd8d --- /dev/null +++ b/app/db_copy_utils.py @@ -0,0 +1,171 @@ +from sqlalchemy import case, func, text +from sqlalchemy.orm import aliased + +import app.dao.notifications_dao +from app import db as real_db +from app.dao.notifications_dao import db as db +from app.models import ( + Job, + Notification, + TemplateHistory, + User, +) + +EMAIL_STATUS_FORMATTED = { + "created": "Sending", + "sending": "Sending", + "delivered": "Delivered", + "pending": "Sending", + "failed": "Failed", + "technical-failure": "Tech issue", + "temporary-failure": "Content or inbox issue", + "permanent-failure": "No such address", + "pending-virus-check": "Sending", + "virus-scan-failed": "Attachment has virus", + "validation-failed": "Content or inbox issue", +} + +SMS_STATUS_FORMATTED = { + "created": "Sending", + "sending": "Sending", + "pending": "Sending", + "sent": "Sent", + "delivered": "Delivered", + "failed": "Failed", + "technical-failure": "Tech issue", + "temporary-failure": "Carrier issue", + "permanent-failure": "No such number", +} + + +def build_notifications_copy_query( + service_id, + notification_type, + notification_statuses=None, + limit_days=7, + chunk_size=None, + older_than_id=None, +): + db_for_scalar = app.dao.notifications_dao.db + + if notification_statuses is None: + notification_statuses = [] + notifications = aliased(Notification, name="notifications") + templates_history = aliased(TemplateHistory, name="templates_history") + jobs = aliased(Job, name="jobs") + users = aliased(User, name="users") + + query_filters = [ + notifications.service_id == service_id, + notifications.notification_type == notification_type, + notifications.created_at >= func.now() - text(f"interval '{limit_days} days'"), + notifications.key_type != "test", + ] + + if notification_statuses: + statuses = Notification.substitute_status(notification_statuses) + query_filters.append(notifications.status.in_(statuses)) + + if older_than_id: + older_than_notification = ( + db_for_scalar.session.query(Notification.created_at).filter(Notification.id == older_than_id).scalar() + ) + if older_than_notification: + query_filters.append( + text(f"(notifications.created_at, notifications.id) < ('{older_than_notification}', '{older_than_id}')") + ) + + email_status_cases = [(notifications.status == k, v) for k, v in EMAIL_STATUS_FORMATTED.items()] + sms_status_cases = [(notifications.status == k, v) for k, v in SMS_STATUS_FORMATTED.items()] + + if notification_type == "email": + status_expr = case(*email_status_cases, else_=notifications.status) + elif notification_type == "sms": + status_expr = case(*sms_status_cases, else_=notifications.status) + else: + status_expr = notifications.status + + query_columns = [ + notifications.to.label("Recipient"), + notifications.reference.label("Reference"), + templates_history.name.label("Template"), + notifications.notification_type.cast(real_db.String).label("Type"), + func.coalesce(users.name, "").label("Sent by"), + func.coalesce(users.email_address, "").label("Sent by email"), + func.coalesce(jobs.original_file_name, "").label("Job"), + status_expr.label("Status"), + func.to_char( + func.timezone("America/Toronto", func.timezone("UTC", notifications.created_at)), "YYYY-MM-DD HH24:MI:SS" + ).label("Time"), + notifications.api_key_id.label("API key name"), + notifications.id, + notifications.created_at, + ] + + query = ( + real_db.session.query(*query_columns) + .join( + templates_history, + (templates_history.id == notifications.template_id) + & (templates_history.version == notifications.template_version), + ) + .outerjoin(jobs, jobs.id == notifications.job_id) + .outerjoin(users, users.id == notifications.created_by_id) + .filter(*query_filters) + .order_by(notifications.created_at.desc(), notifications.id.desc()) + ) + + if chunk_size: + query = query.limit(chunk_size) + + compiled = query.statement.compile(dialect=real_db.engine.dialect, compile_kwargs={"literal_binds": True}) + return str(compiled) + + +def execute_copy_to_bytes(query, include_header=True): + from io import BytesIO + + from app.db_copy_utils import db as current_db + + buffer = BytesIO() + copy_command = f"COPY ({query}) TO STDOUT WITH CSV" + if include_header: + copy_command += " HEADER" + + conn = current_db.engine.raw_connection() + try: + cursor = conn.cursor() + + cursor.execute(query) + rows = cursor.fetchall() + row_count = len(rows) + last_id = rows[-1][-2] if rows else None + + cursor.copy_expert(copy_command, buffer) + buffer.seek(0) + csv_bytes = buffer.getvalue() + + return csv_bytes, last_id, row_count + finally: + conn.close() + + +def get_notifications_csv_chunk( + service_id, + notification_type, + notification_status_filter, + limit_days, + chunk_size, + older_than_id=None, + include_header=True, +): + notification_statuses = [] if notification_status_filter == "all" else [notification_status_filter] + query = build_notifications_copy_query( + service_id=service_id, + notification_type=notification_type, + notification_statuses=notification_statuses, + limit_days=limit_days, + chunk_size=chunk_size, + older_than_id=older_than_id, + ) + return execute_copy_to_bytes(query, include_header=include_header) diff --git a/app/report_requests/process_notifications_report.py b/app/report_requests/process_notifications_report.py index 3e1bee6195..08d8a6b53a 100644 --- a/app/report_requests/process_notifications_report.py +++ b/app/report_requests/process_notifications_report.py @@ -1,21 +1,14 @@ -import csv -from io import StringIO -from typing import Any from uuid import UUID from flask import current_app -from notifications_utils.s3 import ( - S3_MULTIPART_UPLOAD_MIN_PART_SIZE, - s3_multipart_upload_abort, - s3_multipart_upload_complete, - s3_multipart_upload_create, - s3_multipart_upload_part, -) -from app.constants import NOTIFICATION_REPORT_REQUEST_MAPPING -from app.dao.notifications_dao import get_notifications_for_service from app.dao.report_requests_dao import dao_get_report_request_by_id from app.dao.service_data_retention_dao import fetch_service_data_retention_by_notification_type +from app.report_requests.utils import ( + build_notifications_query, + compile_query_for_copy, + stream_query_to_s3, +) class ReportRequestProcessor: @@ -25,169 +18,39 @@ def __init__(self, service_id: UUID, report_request_id: UUID): self.report_request = dao_get_report_request_by_id(service_id, report_request_id) self.notification_type = self.report_request.parameter["notification_type"] self.notification_status = self.report_request.parameter["notification_status"] - self.page_size = current_app.config.get("REPORT_REQUEST_NOTIFICATIONS_CSV_BATCH_SIZE") self.s3_bucket = current_app.config["S3_BUCKET_REPORT_REQUESTS_DOWNLOAD"] self.filename = f"notifications_report/{report_request_id}.csv" - self.upload_id: str | None = None - self.parts: list[dict[str, Any]] = [] - self.part_number = 1 - self.csv_buffer = StringIO() - self.csv_writer = csv.writer(self.csv_buffer) def process(self) -> None: - self._initialize_csv() - self._start_multipart_upload() - try: - self._fetch_and_upload_notifications() - self._finalize_upload() + self._stream_notifications_to_s3() except Exception as e: current_app.logger.exception("Error occurred while processing the report: %s", e) - self._abort_upload() raise e - def _initialize_csv(self) -> None: - headers = [ - "Recipient", - "Reference", - "Template", - "Type", - "Sent by", - "Sent by email", - "Job", - "Status", - "Time", - "API key name", - ] - self.csv_writer.writerow(headers) - - def _start_multipart_upload(self) -> None: - response = s3_multipart_upload_create(self.s3_bucket, self.filename) - self.upload_id = response["UploadId"] - - def _fetch_and_upload_notifications(self) -> None: + def _stream_notifications_to_s3(self) -> None: service_retention = fetch_service_data_retention_by_notification_type(self.service_id, self.notification_type) limit_days = service_retention.days_of_retention if service_retention else 7 - older_than = None - is_notification = True - while is_notification: - serialized_notifications = self._fetch_serialized_notifications(limit_days, older_than) - - is_notification = len(serialized_notifications) != 0 - - csv_data = self._convert_notifications_to_csv(serialized_notifications) - self.csv_writer.writerows(csv_data) - self._upload_csv_part_if_needed() - older_than = serialized_notifications[-1]["id"] if is_notification else None - # Upload any remaining data - self._upload_remaining_data() - def _fetch_serialized_notifications(self, limit_days: int, older_than: str | None) -> list[dict[str, Any]]: - statuses = NOTIFICATION_REPORT_REQUEST_MAPPING[self.notification_status] - - notifications = get_notifications_for_service( + query = build_notifications_query( service_id=self.service_id, - filter_dict={ - "template_type": self.notification_type, - "status": statuses, - }, - page_size=self.page_size, - count_pages=False, - limit_days=limit_days, - include_jobs=True, - with_personalisation=False, - include_from_test_key=False, - error_out=False, - include_one_off=True, - older_than=older_than, + notification_type=self.notification_type, + language="en", + notification_statuses=[self.notification_status] if self.notification_status != "all" else [], + days_limit=limit_days, ) - serialized_notifications = [notification.serialize_for_csv() for notification in notifications] - return serialized_notifications - - def _convert_notifications_to_csv(self, serialized_notifications: list[dict[str, Any]]) -> list[tuple]: - values = [] - for notification in serialized_notifications: - values.append( - ( - # the recipient for precompiled letters is the full address block - notification["recipient"].splitlines()[0].lstrip().rstrip(" ,"), - notification["client_reference"], - notification["template_name"], - notification["template_type"], - notification["created_by_name"] or "", - notification["created_by_email_address"] or "", - notification["job_name"] or "", - notification["status"], - notification["created_at"], - notification["api_key_name"] or "", - ) - ) - return values - - def _upload_csv_part_if_needed(self) -> None: - data_bytes = self.csv_buffer.getvalue().encode("utf-8") - if len(data_bytes) >= S3_MULTIPART_UPLOAD_MIN_PART_SIZE: - self._upload_part(data_bytes) - - # Reset the buffer for the next part - # truncate(0) does not reset the cursor so seek(0) is needed to reset the cursor - self.csv_buffer.seek(0) - self.csv_buffer.truncate(0) - self.csv_writer = csv.writer(self.csv_buffer) - - def _upload_remaining_data(self) -> None: - data_bytes = self.csv_buffer.getvalue().encode("utf-8") - if len(data_bytes) > 0: - self._upload_part(data_bytes) - - def _upload_part(self, data_bytes: bytes) -> None: - response = s3_multipart_upload_part( - part_number=self.part_number, - bucket_name=self.s3_bucket, - filename=self.filename, - upload_id=self.upload_id, - data_bytes=data_bytes, - ) - extra = { - "part_number": self.part_number, - "report_request_id": self.report_request_id, - "s3_bucket": self.s3_bucket, - "s3_key": self.filename, - "row_count": data_bytes.count(b"\n"), - } - current_app.logger.info( - "Uploaded part %(part_number)s of report request %(report_request_id)s to bucket %(s3_bucket)s " - "with filename %(s3_key)s. Rows per part: %(row_count)s", - extra, - extra=extra, - ) - self.parts.append({"PartNumber": self.part_number, "ETag": response["ETag"]}) - self.part_number += 1 + copy_command = compile_query_for_copy(query) + stream_query_to_s3(copy_command, self.s3_bucket, self.filename) - def _finalize_upload(self) -> None: - s3_multipart_upload_complete( - bucket_name=self.s3_bucket, - filename=self.filename, - upload_id=self.upload_id, - parts=self.parts, - ) extra = { "report_request_id": self.report_request_id, "s3_bucket": self.s3_bucket, "s3_key": self.filename, - "part_count": len(self.parts), } current_app.logger.info( "Upload complete for report request %(report_request_id)s to bucket %(s3_bucket)s " - "with filename %(s3_key)s. Total parts: %(part_count)s.", + "with filename %(s3_key)s.", extra, extra=extra, ) - - def _abort_upload(self) -> None: - s3_multipart_upload_abort( - bucket_name=self.s3_bucket, - filename=self.filename, - upload_id=self.upload_id, - ) diff --git a/app/report_requests/utils.py b/app/report_requests/utils.py new file mode 100644 index 0000000000..cb76869614 --- /dev/null +++ b/app/report_requests/utils.py @@ -0,0 +1,200 @@ +from sqlalchemy import case, func, text +from sqlalchemy.orm import aliased + +from app import db +from app.aws.s3 import stream_to_s3 +from app.models import ( + Job, + Notification, + Template, + User, +) + +EMAIL_STATUS_FORMATTED = { + "created": "Sending", + "sending": "Sending", + "delivered": "Delivered", + "pending": "Sending", + "failed": "Failed", + "technical-failure": "Tech issue", + "temporary-failure": "Content or inbox issue", + "permanent-failure": "No such address", + "pending-virus-check": "Sending", + "virus-scan-failed": "Attachment has virus", + "validation-failed": "Content or inbox issue", +} + +SMS_STATUS_FORMATTED = { + "created": "Sending", + "sending": "Sending", + "pending": "Sending", + "sent": "Sent", + "delivered": "Delivered", + "failed": "Failed", + "technical-failure": "Tech issue", + "temporary-failure": "Carrier issue", + "permanent-failure": "No such number", +} + +FR_TRANSLATIONS = { + "Recipient": "Destinataire", + "Template": "Gabarit", + "Type": "Type", + "Sent by": "Envoyé par", + "Sent by email": "Envoyé par courriel", + "Job": "Tâche", + "Row number": "Numéro de ligne", + "Status": "État", + "Sent Time": "Heure d’envoi", + # notification types + "email": "courriel", + "sms": "sms", + # notification statuses + "Failed": "Échec", + "Tech issue": "Problème technique", + "Content or inbox issue": "Problème de contenu ou de boîte de réception", + "Attachment has virus": "La pièce jointe contient un virus", + "Delivered": "Livraison réussie", + "In transit": "Envoi en cours", + "Exceeds Protected A": "Niveau supérieur à Protégé A", + "Carrier issue": "Problème du fournisseur", + "No such number": "Numéro inexistant", + "Sent": "Envoyé", + "Blocked": "Message bloqué", + "No such address": "Adresse inexistante", + # "Can't send to this international number": "" # no translation exists for this yet +} + + +class Translate: + def __init__(self, language="en"): + """Initialize the Translate class with a language.""" + self.language = language + self.translations = { + "fr": FR_TRANSLATIONS, + } + + def translate(self, x): + """Translate the given string based on the set language.""" + if self.language == "fr" and x in self.translations["fr"]: + return self.translations["fr"][x] + return x + + +def build_notifications_query( + service_id, notification_type, language, notification_statuses=None, job_id=None, days_limit=7 +): + if notification_statuses is None: + notification_statuses = [] + # Create aliases for the tables to make the query more readable + n = aliased(Notification) + t = aliased(Template) + j = aliased(Job) + u = aliased(User) + + # Build the inner subquery (returns enum values, cast as text for notification_type) + query_filters = [ + n.service_id == service_id, + n.notification_type == notification_type, + n.created_at > func.now() - text(f"interval '{days_limit} days'"), + ] + + if notification_statuses: + statuses = Notification.substitute_status(notification_statuses) + query_filters.append(n.status.in_(statuses)) + + if job_id: + query_filters.append(n.job_id == job_id) + + inner_query = ( + db.session.query( + n.to.label("to"), + t.name.label("template_name"), + n.notification_type.cast(db.String).label("notification_type"), + u.name.label("user_name"), + u.email_address.label("user_email"), + j.original_file_name.label("job_name"), + n.job_row_number.label("job_row_number"), + n.status.label("status"), + n.created_at.label("created_at"), + ) + .join(t, t.id == n.template_id) + .outerjoin(j, j.id == n.job_id) + .outerjoin(u, u.id == n.created_by_id) + .filter(*query_filters) + .subquery() + ) + + # Map statuses for translation + translate = Translate(language).translate + + email_status_cases = [(inner_query.c.status == k, translate(v)) for k, v in EMAIL_STATUS_FORMATTED.items()] + sms_status_cases = [(inner_query.c.status == k, translate(v)) for k, v in SMS_STATUS_FORMATTED.items()] + + if notification_type == "email": + status_expr = case(*email_status_cases, else_=inner_query.c.status) + elif notification_type == "sms": + status_expr = case(*sms_status_cases, else_=inner_query.c.status) + else: + status_expr = inner_query.c.status + + if language == "fr": + status_expr = func.coalesce(func.nullif(status_expr, ""), "").label(translate("Status")) + else: + status_expr = status_expr.label(translate("Status")) + + notification_type_translated = case( + (inner_query.c.notification_type == "email", translate("email")), + (inner_query.c.notification_type == "sms", translate("sms")), + else_=inner_query.c.notification_type, + ).label(translate("Type")) + + query_columns = [ + inner_query.c.to.label(translate("Recipient")), + inner_query.c.template_name.label(translate("Template")), + notification_type_translated, + ] + + if job_id is None: + query_columns.extend( + [ + func.coalesce(inner_query.c.user_name, "").label(translate("Sent by")), + func.coalesce(inner_query.c.user_email, "").label(translate("Sent by email")), + ] + ) + else: + query_columns.insert(0, (inner_query.c.job_row_number + 1).label(translate("Row number"))) + + query_columns.extend( + [ + func.coalesce(inner_query.c.job_name, "").label(translate("Job")), + status_expr, + func.to_char( + func.timezone("America/Toronto", func.timezone("UTC", inner_query.c.created_at)), + "YYYY-MM-DD HH24:MI:SS", + ).label(translate("Sent Time")), + ] + ) + + return db.session.query(*query_columns).order_by( + inner_query.c.created_at.asc() if job_id else inner_query.c.created_at.desc() + ) + + +def compile_query_for_copy(query): + compiled_query = query.statement.compile(dialect=db.engine.dialect, compile_kwargs={"literal_binds": True}) + return f"COPY ({compiled_query}) TO STDOUT WITH CSV HEADER" + + +def stream_query_to_s3(copy_command, s3_bucket, s3_key): + conn = db.engine.raw_connection() + try: + cursor = conn.cursor() + stream_to_s3( + bucket_name=s3_bucket, + object_key=s3_key, + copy_command=copy_command, + cursor=cursor, + ) + finally: + conn.close() diff --git a/tests/app/report_requests/test_process_notifications_report.py b/tests/app/report_requests/test_process_notifications_report.py index 083d6afd06..ee1b87ffa1 100644 --- a/tests/app/report_requests/test_process_notifications_report.py +++ b/tests/app/report_requests/test_process_notifications_report.py @@ -1,27 +1,10 @@ -import csv -import io -import random -from datetime import datetime, timedelta -from io import StringIO - -import boto3 import pytest from flask import current_app from moto import mock_aws -from notifications_utils.s3 import S3_MULTIPART_UPLOAD_MIN_PART_SIZE, S3ObjectNotFound -from app.aws.s3 import get_s3_object from app.constants import ( KEY_TYPE_NORMAL, - NOTIFICATION_CREATED, - NOTIFICATION_DELIVERED, - NOTIFICATION_PERMANENT_FAILURE, NOTIFICATION_REQUEST_REPORT_ALL, - NOTIFICATION_REQUEST_REPORT_DELIVERED, - NOTIFICATION_REQUEST_REPORT_FAILED, - NOTIFICATION_SENDING, - NOTIFICATION_SENT, - NOTIFICATION_TECHNICAL_FAILURE, REPORT_REQUEST_NOTIFICATIONS, ) from app.dao.report_requests_dao import ( @@ -31,20 +14,10 @@ from app.report_requests.process_notifications_report import ( ReportRequestProcessor, ) -from app.utils import utc_string_to_bst_string from tests.app.db import ( create_api_key, - create_job, - create_notification, create_service, - create_service_data_retention, ) -from tests.conftest import set_config - - -def get_created_at_date_time(days_ago=0): - datetime_now = datetime.now().replace(hour=14, minute=30, second=30, microsecond=random.randint(0, 999999)) - return datetime_now - timedelta(days=days_ago) @pytest.fixture @@ -54,82 +27,7 @@ def mock_service( sample_sms_template, ): service = create_service(check_if_service_exists=True) - - api_key = create_api_key(service=service, key_type=KEY_TYPE_NORMAL, id="8e33368c-3965-4ae1-ab55-4f9d3275f84d") - - for _i in range(10): - create_notification( - template=sample_email_template, - status=NOTIFICATION_SENDING, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=1), - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_email_template, - status=NOTIFICATION_PERMANENT_FAILURE, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=2), - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_email_template, - status=NOTIFICATION_DELIVERED, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=3), - client_reference="email-reference", - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_email_template, - status=NOTIFICATION_CREATED, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=4), - client_reference="email-reference", - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_email_template, - status=NOTIFICATION_SENT, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=5), - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_sms_template, - status=NOTIFICATION_SENDING, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=4), - client_reference="sms-reference", - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_sms_template, - status=NOTIFICATION_CREATED, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=6), - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_sms_template, - status=NOTIFICATION_TECHNICAL_FAILURE, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=2), - client_reference="sms-reference", - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_sms_template, - status=NOTIFICATION_DELIVERED, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=0), - created_by_id=service.users[0].id, - ) - - report_bucket = current_app.config.get("S3_BUCKET_REPORT_REQUESTS_DOWNLOAD") - s3 = boto3.client("s3", region_name="eu-west-1") - s3.create_bucket(Bucket=report_bucket, CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}) - + create_api_key(service=service, key_type=KEY_TYPE_NORMAL, id="8e33368c-3965-4ae1-ab55-4f9d3275f84d") return service @@ -151,457 +49,33 @@ def mock_processor(sample_user, mock_service): return ReportRequestProcessor(mock_service.id, report_request.id) -def test_initialize_csv_writes_headers(mock_processor, mock_service): - mock_processor._initialize_csv() - csv_contents = mock_processor.csv_buffer.getvalue() - headers = [ - "Recipient", - "Reference", - "Template", - "Type", - "Sent by", - "Sent by email", - "Job", - "Status", - "Time", - "API key name", - ] - for h in headers: - assert h in csv_contents - - -def test_start_multipart_upload_sets_upload_id(mocker, mock_processor): - mock_convert_notifications_to_csv = mocker.patch( - "app.report_requests.process_notifications_report.s3_multipart_upload_create", return_value={"UploadId": "1234"} - ) - mock_processor._start_multipart_upload() - assert mock_processor.upload_id == "1234" - mock_convert_notifications_to_csv.assert_called_once() - - -def test_fetch_and_upload_notifications(mocker, mock_processor): - mock_upload = mocker.patch.object(mock_processor, "_upload_csv_part_if_needed") - mock_upload_rem = mocker.patch.object(mock_processor, "_upload_remaining_data") - mock_processor._fetch_and_upload_notifications() - - assert mock_upload.call_count == 2 - mock_upload_rem.assert_called_once() - - -def test_upload_part_adds_to_parts(mocker, mock_processor): - mock_s3_upload_part = mocker.patch( - "app.report_requests.process_notifications_report.s3_multipart_upload_part", return_value={"ETag": "etag123"} +def test_process_calls_stream_notifications_to_s3(mocker, mock_processor): + mock_stream = mocker.patch("app.report_requests.process_notifications_report.stream_query_to_s3") + mock_compile = mocker.patch( + "app.report_requests.process_notifications_report.compile_query_for_copy", return_value="COPY command" ) - mock_processor.upload_id = "upload123" - test_data = b"some,data,for,csv\n" * 100 - mock_processor._upload_part(test_data) - - assert mock_processor.parts == [{"PartNumber": 1, "ETag": "etag123"}] - assert mock_processor.part_number == 2 - mock_s3_upload_part.assert_called_once() - - -def test_finalize_upload_calls_s3(mocker, mock_processor): - mock_complete = mocker.patch( - "app.report_requests.process_notifications_report.s3_multipart_upload_complete", - return_value={"ETag": "etag123"}, + mock_build_query = mocker.patch( + "app.report_requests.process_notifications_report.build_notifications_query", return_value="query" ) - mock_processor.upload_id = "upload123" - mock_processor.parts = [{"PartNumber": 1, "ETag": "etag123"}] - mock_processor._finalize_upload() - - mock_complete.assert_called_once() - - -def test_abort_upload_calls_s3(mocker, mock_processor): - mock_abort = mocker.patch("app.report_requests.process_notifications_report.s3_multipart_upload_abort") - mock_processor.upload_id = "upload123" - mock_processor._abort_upload() - mock_abort.assert_called_once() - - -def test_convert_notifications_to_csv(mock_processor): - notifications = [ - { - "recipient": "user@email.com", - "client_reference": "abc123", - "template_name": "test-template", - "template_type": "email", - "created_by_name": "Admin User", - "created_by_email_address": "admin@test.com", - "job_name": "send emails", - "status": "delivered", - "created_at": "2025-04-01T00:00:00", - "api_key_name": "TEST-API-KEY", - } - ] - rows = mock_processor._convert_notifications_to_csv(notifications) - assert rows[0][0] == "user@email.com" - assert rows[0][1] == "abc123" - assert rows[0][2] == "test-template" - assert rows[0][3] == "email" - assert rows[0][4] == "Admin User" - assert rows[0][5] == "admin@test.com" - assert rows[0][6] == "send emails" - assert rows[0][7] == "delivered" - assert rows[0][8] == "2025-04-01T00:00:00" - assert rows[0][9] == "TEST-API-KEY" - - -def test_process_calls_abort_on_exception(mocker, mock_processor): - mocker.patch.object(mock_processor, "_initialize_csv") - mocker.patch.object(mock_processor, "_start_multipart_upload") - mocker.patch.object(mock_processor, "_fetch_and_upload_notifications", side_effect=S3ObjectNotFound({}, "")) - mock_abort = mocker.patch.object(mock_processor, "_abort_upload") - - with pytest.raises(S3ObjectNotFound): - mock_processor.process() - - mock_abort.assert_called_once() - - -def test_process_does_not_call_abort_on_success(mocker, mock_processor): - mocker.patch.object(mock_processor, "_initialize_csv") - mocker.patch.object(mock_processor, "_start_multipart_upload") - mocker.patch.object(mock_processor, "_fetch_and_upload_notifications") - mock_finalize = mocker.patch.object(mock_processor, "_finalize_upload") - mock_abort = mocker.patch.object(mock_processor, "_abort_upload") mock_processor.process() - mock_finalize.assert_called_once() - mock_abort.assert_not_called() - - -def test_upload_csv_part_if_needed_triggers_upload(mocker, mock_processor): - mock_s3_upload_part = mocker.patch("app.report_requests.process_notifications_report.s3_multipart_upload_part") - mock_processor.upload_id = "upload123" - mock_processor.part_number = 1 - - large_row = ",".join(["x" * 100] * 10) - while len(mock_processor.csv_buffer.getvalue().encode("utf-8")) < S3_MULTIPART_UPLOAD_MIN_PART_SIZE: - mock_processor.csv_writer.writerow([large_row] * 10) - - mock_processor._upload_csv_part_if_needed() - - mock_s3_upload_part.assert_called_once() - - -def test_upload_remaining_data_skips_if_empty(mocker, mock_processor): - mock_s3_upload_part = mocker.patch("app.report_requests.process_notifications_report.s3_multipart_upload_part") - mock_processor.csv_buffer = StringIO() # empty - mock_processor._upload_remaining_data() - mock_s3_upload_part.assert_not_called() - - -def test_fetch_serialized_notifications_empty(mocker, mock_processor): - mocker.patch("app.report_requests.process_notifications_report.get_notifications_for_service") - result = mock_processor._fetch_serialized_notifications(7, None) - assert result == [] - - -@pytest.mark.parametrize( - ( - "page_size," - "notification_type," - "notification_report_request_status," - "expected_rows," - "expected_statuses," - "expected_recipient," - "expected_template_type," - "client_reference" - ), - [ - ( - 1, - "sms", - NOTIFICATION_REQUEST_REPORT_ALL, - 31, - ["Delivered", "Sending", "Technical failure"], - "+447700900855", - "Template Name", - "", - ), - ( - 2, - "email", - NOTIFICATION_REQUEST_REPORT_ALL, - 51, - ["Delivered", "Sending", "Email address doesn’t exist"], - "test@example.com", - "Email Template Name", - "", - ), - ( - 4, - "sms", - NOTIFICATION_REQUEST_REPORT_DELIVERED, - 11, - ["Delivered"], - "+447700900855", - "Template Name", - "sms-reference", - ), - ( - 5, - "email", - NOTIFICATION_REQUEST_REPORT_FAILED, - 11, - ["Email address doesn’t exist"], - "test@example.com", - "Email Template Name", - "email-reference", - ), - (3, "letter", NOTIFICATION_REQUEST_REPORT_FAILED, 1, [], "", "", ""), - ], -) -@mock_aws -def test_process_report_request_should_return_correct_rows( - page_size, - notification_type, - sample_user, - notify_api, - sample_email_template, - notification_report_request_status, - expected_rows, - expected_statuses, - expected_recipient, - expected_template_type, - sample_sms_template, - client_reference, -): - expected_headers = "Recipient,Reference,Template,Type,Sent by,Sent by email,Job,Status,Time,API key name" - - service = create_service(check_if_service_exists=True) - service_retention = 5 - create_service_data_retention( - service=service, notification_type=notification_type, days_of_retention=service_retention + mock_build_query.assert_called_once() + mock_compile.assert_called_once_with("query") + mock_stream.assert_called_once_with( + "COPY command", + current_app.config["S3_BUCKET_REPORT_REQUESTS_DOWNLOAD"], + f"notifications_report/{mock_processor.report_request_id}.csv", ) - api_key = create_api_key(service=service, key_type=KEY_TYPE_NORMAL, id="8e33368c-3965-4ae1-ab55-4f9d3275f84d") - for _i in range(10): - create_notification( - template=sample_email_template, - status=NOTIFICATION_SENDING, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=1), - client_reference=client_reference, - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_email_template, - status=NOTIFICATION_PERMANENT_FAILURE, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=2), - client_reference=client_reference, - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_email_template, - status=NOTIFICATION_DELIVERED, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=3), - client_reference=client_reference, - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_email_template, - status=NOTIFICATION_CREATED, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=4), - client_reference=client_reference, - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_email_template, - status=NOTIFICATION_SENT, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=5), - client_reference=client_reference, - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_sms_template, - status=NOTIFICATION_SENDING, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=4), - client_reference=client_reference, - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_sms_template, - status=NOTIFICATION_CREATED, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=6), - client_reference=client_reference, - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_sms_template, - status=NOTIFICATION_TECHNICAL_FAILURE, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=2), - client_reference=client_reference, - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_sms_template, - status=NOTIFICATION_DELIVERED, - api_key=api_key, - created_at=get_created_at_date_time(days_ago=0), - client_reference=client_reference, - created_by_id=service.users[0].id, - ) - - sample_parameter = { - "notification_type": notification_type, - "notification_status": notification_report_request_status, - } - - report_bucket = current_app.config.get("S3_BUCKET_REPORT_REQUESTS_DOWNLOAD") - s3 = boto3.client("s3", region_name="eu-west-1") - s3.create_bucket(Bucket=report_bucket, CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}) - - report_request = ReportRequest( - user_id=sample_user.id, - service_id=service.id, - report_type=REPORT_REQUEST_NOTIFICATIONS, - parameter=sample_parameter, +def test_process_logs_error_on_exception(mocker, mock_processor): + mocker.patch( + "app.report_requests.process_notifications_report.stream_query_to_s3", side_effect=Exception("Test error") ) - dao_create_report_request(report_request) - - with set_config(notify_api, "REPORT_REQUEST_NOTIFICATIONS_CSV_BATCH_SIZE", page_size): - processor = ReportRequestProcessor(service.id, report_request.id) - processor.process() - - original_pdf_object = get_s3_object( - current_app.config["S3_BUCKET_REPORT_REQUESTS_DOWNLOAD"], f"notifications_report/{report_request.id}.csv" - ) - content = original_pdf_object.get()["Body"].read().decode("utf-8") - line_count = sum(1 for line in content.strip().splitlines() if line.strip()) - reader = csv.DictReader(io.StringIO(content)) + mock_logger = mocker.patch("app.report_requests.process_notifications_report.current_app.logger.exception") - headers = reader.fieldnames - assert ",".join(headers) == expected_headers - - status_counter = 0 - for row in reader: - created_at = row.get("Time") - assert created_at in [ - utc_string_to_bst_string(get_created_at_date_time(days_ago=0)), - utc_string_to_bst_string(get_created_at_date_time(days_ago=1)), - utc_string_to_bst_string(get_created_at_date_time(days_ago=2)), - utc_string_to_bst_string(get_created_at_date_time(days_ago=3)), - utc_string_to_bst_string(get_created_at_date_time(days_ago=4)), - utc_string_to_bst_string(get_created_at_date_time(days_ago=5)), - ] - - assert row.get("Type") == notification_type - assert row.get("Recipient") == expected_recipient - assert row.get("Reference") == client_reference - assert row.get("Template") == expected_template_type - assert row.get("Sent by") == service.users[0].name - assert row.get("Sent by email") == service.users[0].email_address - assert row.get("API key name") == api_key.name - - status = row.get("Status") - if status in expected_statuses: - status_counter += 1 - - assert line_count == expected_rows - assert status_counter == expected_rows - 1 # exclude header - - -@pytest.mark.parametrize( - ("page_size,notification_type,notification_report_request_status,"), - [ - ( - 2, - "email", - NOTIFICATION_REQUEST_REPORT_ALL, - ), - ], -) -@mock_aws -def test_process_report_request_should_contain_job_notification( - page_size, - notification_type, - sample_user, - notify_api, - sample_email_template, - notification_report_request_status, -): - expected_headers = "Recipient,Reference,Template,Type,Sent by,Sent by email,Job,Status,Time,API key name" - - service = create_service(check_if_service_exists=True) - service_retention = 5 - - create_service_data_retention( - service=service, notification_type=notification_type, days_of_retention=service_retention - ) - - api_key = create_api_key(service=service, key_type=KEY_TYPE_NORMAL, id="8e33368c-3965-4ae1-ab55-4f9d3275f84d") - - job = create_job(template=sample_email_template) - - # job notification - create_notification( - template=sample_email_template, - client_reference="job-reference", - status=NOTIFICATION_SENDING, - job=job, - job_row_number=2, - created_at=get_created_at_date_time(days_ago=1), - created_by_id=service.users[0].id, - ) - create_notification( - template=sample_email_template, - client_reference="no-job-reference", - api_key=api_key, - status=NOTIFICATION_SENDING, - job_row_number=2, - created_at=get_created_at_date_time(days_ago=1), - created_by_id=service.users[0].id, - ) - - sample_parameter = { - "notification_type": notification_type, - "notification_status": notification_report_request_status, - } - - report_bucket = current_app.config.get("S3_BUCKET_REPORT_REQUESTS_DOWNLOAD") - s3 = boto3.client("s3", region_name="eu-west-1") - s3.create_bucket(Bucket=report_bucket, CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}) - - report_request = ReportRequest( - user_id=sample_user.id, - service_id=service.id, - report_type=REPORT_REQUEST_NOTIFICATIONS, - parameter=sample_parameter, - ) - dao_create_report_request(report_request) - - with set_config(notify_api, "REPORT_REQUEST_NOTIFICATIONS_CSV_BATCH_SIZE", page_size): - processor = ReportRequestProcessor(service.id, report_request.id) - processor.process() - - original_pdf_object = get_s3_object( - current_app.config["S3_BUCKET_REPORT_REQUESTS_DOWNLOAD"], f"notifications_report/{report_request.id}.csv" - ) - content = original_pdf_object.get()["Body"].read().decode("utf-8") - line_count = sum(1 for line in content.strip().splitlines() if line.strip()) - assert line_count == 3 # two rows + header - - reader = csv.DictReader(io.StringIO(content)) - - headers = reader.fieldnames - assert ",".join(headers) == expected_headers + with pytest.raises(Exception, match="Test error"): + mock_processor.process() - for row in reader: - if row.get("Job"): - assert row.get("Job") == "some.csv" - assert row.get("Reference") == "job-reference" - else: - assert row.get("Job") == "" - assert row.get("Reference") == "no-job-reference" + mock_logger.assert_called_once_with("Error occurred while processing the report: %s", mocker.ANY) diff --git a/tests/app/test_db_copy_utils.py b/tests/app/test_db_copy_utils.py new file mode 100644 index 0000000000..cb040360d8 --- /dev/null +++ b/tests/app/test_db_copy_utils.py @@ -0,0 +1,248 @@ +from unittest.mock import MagicMock, patch +from uuid import uuid4 + +from app.db_copy_utils import ( + build_notifications_copy_query, + execute_copy_to_bytes, + get_notifications_csv_chunk, +) + + +class TestBuildNotificationsCopyQuery: + def test_basic_query_structure(self): + service_id = uuid4() + query = build_notifications_copy_query( + service_id=service_id, + notification_type="sms", + notification_statuses=["delivered", "sent"], + limit_days=7, + chunk_size=1000, + ) + + assert "SELECT" in query + assert "FROM notifications" in query + assert "templates_history" in query + assert str(service_id) in query + assert "notification_type = 'sms'" in query + assert "delivered" in query + assert "sent" in query + assert "LIMIT 1000" in query + + @patch("app.dao.notifications_dao.db") + def test_query_with_older_than_id(self, mock_db): + service_id = uuid4() + older_than_id = uuid4() + + mock_db.session.query.return_value.filter.return_value.scalar.return_value = "2023-01-01 00:00:00" + + query = build_notifications_copy_query( + service_id=service_id, + notification_type="email", + notification_statuses=[], + limit_days=14, + chunk_size=500, + older_than_id=older_than_id, + ) + + assert "notifications.created_at, notifications.id" in query + assert "2023-01-01 00:00:00" in query + assert "notifications.created_at >=" in query + + def test_query_excludes_test_keys(self): + query = build_notifications_copy_query( + service_id=uuid4(), + notification_type="sms", + notification_statuses=[], + limit_days=7, + chunk_size=100, + ) + + assert "key_type != 'test'" in query + + def test_query_includes_all_columns(self): + query = build_notifications_copy_query( + service_id=uuid4(), + notification_type="email", + notification_statuses=[], + limit_days=7, + chunk_size=100, + ) + + expected_columns = [ + '"Recipient"', + '"Reference"', + '"Template"', + '"Type"', + '"Sent by"', + '"Sent by email"', + '"Job"', + '"Status"', + '"Time"', + '"API key name"', + "id", + "created_at", + ] + + for column in expected_columns: + assert column in query, f"Expected column {column} not found in query" + + +class TestExecuteCopyToBytes: + @patch("app.db_copy_utils.db") + def test_executes_query_and_copy_command(self, mock_db): + mock_cursor = MagicMock() + mock_conn = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_db.engine.raw_connection.return_value = mock_conn + + sample_id = uuid4() + mock_cursor.fetchall.return_value = [ + ( + "recipient@example.com", + "ref123", + "Template", + "email", + "User", + "user@example.com", + "job.csv", + "delivered", + "2023-01-01 10:00:00", + "api_key", + sample_id, + "2023-01-01 10:00:00", + ), + ] + + def copy_expert_side_effect(sql, buffer): + csv_data = ( + b"Recipient,Reference,Template,Type,Sent by,Sent by email,Job,Status,Time,API key name\n" + b"recipient@example.com,ref123,Template,email,User,user@example.com,job.csv,delivered," + b"2023-01-01 10:00:00,api_key\n" + ) + buffer.write(csv_data) + + mock_cursor.copy_expert.side_effect = copy_expert_side_effect + + query = "SELECT * FROM notifications LIMIT 10" + csv_bytes, last_id, row_count = execute_copy_to_bytes(query, include_header=True) + + assert csv_bytes is not None + assert len(csv_bytes) > 0 + assert b"Recipient" in csv_bytes + assert b"recipient@example.com" in csv_bytes + assert last_id == sample_id + assert row_count == 1 + mock_conn.close.assert_called_once() + + @patch("app.db_copy_utils.db") + def test_handles_empty_result_set(self, mock_db): + mock_cursor = MagicMock() + mock_conn = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_db.engine.raw_connection.return_value = mock_conn + + mock_cursor.fetchall.return_value = [] + + def copy_expert_side_effect(sql, buffer): + buffer.write(b"Recipient,Reference,Template,Type,Sent by,Sent by email,Job,Status,Time,API key name\n") + + mock_cursor.copy_expert.side_effect = copy_expert_side_effect + + query = "SELECT * FROM notifications WHERE 1=0" + csv_bytes, last_id, row_count = execute_copy_to_bytes(query, include_header=True) + + assert last_id is None + assert row_count == 0 + assert csv_bytes is not None + mock_conn.close.assert_called_once() + + @patch("app.db_copy_utils.db") + def test_excludes_header_when_requested(self, mock_db): + mock_cursor = MagicMock() + mock_conn = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_db.engine.raw_connection.return_value = mock_conn + + sample_id = uuid4() + mock_cursor.fetchall.return_value = [ + ( + "test@example.com", + "", + "Template", + "email", + "", + "", + "", + "delivered", + "2023-01-01", + "", + sample_id, + "2023-01-01", + ), + ] + + def copy_expert_side_effect(sql, buffer): + assert "HEADER" not in sql + buffer.write(b"test@example.com,,Template,email,,,,delivered,2023-01-01,\n") + + mock_cursor.copy_expert.side_effect = copy_expert_side_effect + + query = "SELECT * FROM notifications LIMIT 1" + csv_bytes, last_id, row_count = execute_copy_to_bytes(query, include_header=False) + + assert b"Recipient" not in csv_bytes + assert b"test@example.com" in csv_bytes + mock_conn.close.assert_called_once() + + +class TestGetNotificationsCSVChunk: + @patch("app.db_copy_utils.execute_copy_to_bytes") + @patch("app.db_copy_utils.build_notifications_copy_query") + def test_calls_build_query_and_execute(self, mock_build_query, mock_execute): + service_id = uuid4() + sample_id = uuid4() + mock_build_query.return_value = "SELECT * FROM notifications" + mock_execute.return_value = (b"csv,data\n", sample_id, 100) + + csv_bytes, last_id, row_count = get_notifications_csv_chunk( + service_id=service_id, + notification_type="sms", + notification_status_filter="all", + limit_days=7, + chunk_size=1000, + older_than_id=None, + include_header=True, + ) + + mock_build_query.assert_called_once() + mock_execute.assert_called_once_with("SELECT * FROM notifications", include_header=True) + assert csv_bytes == b"csv,data\n" + assert last_id == sample_id + assert row_count == 100 + + @patch("app.db_copy_utils.execute_copy_to_bytes") + @patch("app.db_copy_utils.build_notifications_copy_query") + def test_passes_pagination_parameters(self, mock_build_query, mock_execute): + service_id = uuid4() + older_than_id = uuid4() + mock_build_query.return_value = "SELECT * FROM notifications" + mock_execute.return_value = (b"", None, 0) + + get_notifications_csv_chunk( + service_id=service_id, + notification_type="email", + notification_status_filter="delivered", + limit_days=14, + chunk_size=500, + older_than_id=older_than_id, + include_header=False, + ) + + call_args = mock_build_query.call_args + assert call_args[1]["service_id"] == service_id + assert call_args[1]["notification_type"] == "email" + assert call_args[1]["limit_days"] == 14 + assert call_args[1]["chunk_size"] == 500 + assert call_args[1]["older_than_id"] == older_than_id + + mock_execute.assert_called_once_with("SELECT * FROM notifications", include_header=False)