Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from notifications_utils.recipients import RecipientCSV
from sqlalchemy.exc import SQLAlchemyError

from app import create_random_identifier, create_uuid, notify_celery, signing
from app import create_random_identifier, create_uuid, document_download_client, notify_celery, signing
from app.aws import s3
from app.celery import letters_pdf_tasks, provider_tasks
from app.celery.service_callback_tasks import create_returned_letter_callback_data, send_returned_letter_to_service
Expand Down Expand Up @@ -46,15 +46,15 @@
from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id
from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id
from app.dao.templates_dao import dao_get_template_by_id
from app.notifications.process_notifications import add_email_file_links_to_personalisation, persist_notification
from app.notifications.process_notifications import persist_notification
from app.notifications.validators import (
check_service_over_daily_message_limit,
validate_and_format_recipient,
)
from app.report_requests.process_notifications_report import ReportRequestProcessor
from app.serialised_models import SerialisedService, SerialisedTemplate
from app.service.utils import service_allowed_to_send_to
from app.utils import batched
from app.utils import batched, try_download_template_email_file_from_s3
from app.v2.errors import TooManyRequestsError

DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE = 32
Expand Down Expand Up @@ -103,6 +103,7 @@ def process_job(self, job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER
return

recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
email_file_uploads = upload_email_files_for_job(template=template, recipients=recipient_csv)

current_app.logger.info(
"Starting job %s processing %s notifications",
Expand All @@ -113,7 +114,9 @@ def process_job(self, job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER

for shatter_batch in batched(recipient_csv.get_rows(), n=shatter_batch_size):
batch_args_kwargs = [
get_id_task_args_kwargs_for_job_row(row, template, job, service, sender_id=sender_id)[1]
get_id_task_args_kwargs_for_job_row(
row, template, job, service, sender_id=sender_id, email_file_uploads=email_file_uploads
)[1]
for row in shatter_batch
]
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs, self.message_group_id)
Expand Down Expand Up @@ -217,15 +220,15 @@ def get_recipient_csv_and_template_and_sender_id(job):
return recipient_csv, template, meta_data.get("sender_id")


def get_id_task_args_kwargs_for_job_row(row, template, job, service, sender_id=None):
def get_id_task_args_kwargs_for_job_row(row, template, job, service, sender_id=None, email_file_uploads=None):
encoded = signing.encode(
{
"template": str(template.id),
"template_version": job.template_version,
"job": str(job.id),
"to": row.recipient,
"row_number": row.index,
"personalisation": dict(row.personalisation),
"personalisation": {**dict(row.personalisation), **email_file_uploads},
# row.recipient_and_personalisation gets all columns for the row, even those not in template placeholders
"client_reference": dict(row.recipient_and_personalisation).get("reference", None),
}
Expand Down Expand Up @@ -397,9 +400,7 @@ def save_email(self, service_id, notification_id, encoded_notification, sender_i
version=notification["template_version"],
)

personalisation = add_email_file_links_to_personalisation(
template=template, personalisation=notification.get("personalisation", {}), recipient=notification["to"]
)
personalisation = notification.get("personalisation", {})

if sender_id:
reply_to_text = dao_get_reply_to_by_id(reply_to_id=sender_id, service_id=service_id).email_address
Expand Down Expand Up @@ -519,6 +520,29 @@ def save_letter(
handle_exception(self, notification, notification_id, e)


def upload_email_files_for_job(template, recipients_csv):
email_files_personalisation = {}
for email_file in template.email_file_objects:
template_email_file_from_s3 = try_download_template_email_file_from_s3(template.service, email_file.id)
doc_download_link = document_download_client.upload_document(
template.service,
template_email_file_from_s3,
confirmation_email=None,
retention_period=f"{email_file.retention_period} weeks",
filename=email_file.filename,
from_job=True,
validation_emails_csv=[row["email_address"] for row in recipients_csv]
if email_file.validate_users_email
else None,
)
if email_file.link_text:
email_files_personalisation[email_file.filename] = f"[{email_file.link_text}]({doc_download_link})"
else:
email_files_personalisation[email_file.filename] = doc_download_link

return email_files_personalisation


def handle_exception(task, notification, notification_id, exc):
if not get_notification_by_id(notification_id):
extra = {
Expand Down
8 changes: 8 additions & 0 deletions app/clients/document_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def upload_document(
confirmation_email: str | None = None,
retention_period: str | None = None,
filename: str | None = None,
from_job: bool | None = None,
validation_emails_csv: list | None = None,
):
try:
data = {
Expand All @@ -61,6 +63,12 @@ def upload_document(
if filename:
data["filename"] = filename

if from_job:
data["from_job"] = from_job

if validation_emails_csv:
data["validation_emails_csv"] = validation_emails_csv

headers = {"Authorization": f"Bearer {self.auth_token}"}
if has_request_context() and hasattr(request, "get_onwards_request_headers"):
headers.update(request.get_onwards_request_headers())
Expand Down
Loading