Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 69 additions & 5 deletions dags/common/scoap3_s3.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import hashlib
import os
from uuid import uuid4

import requests
from botocore.exceptions import ClientError
from common.repository import IRepository
from common.s3_service import S3Service
from structlog import get_logger
Expand All @@ -21,6 +23,11 @@ def update_filename_extension(filename, type):
return f"{filename}{extension}"


def get_file_checksum(data):
"""Calculate MD5 checksum of file data"""
return hashlib.md5(data).hexdigest()


class Scoap3Repository(IRepository):
def __init__(self):
super().__init__()
Expand All @@ -30,6 +37,31 @@ def __init__(self):
self.s3 = S3Service(self.bucket)
self.client = self.s3.meta.client

def file_exists_with_same_checksum(self, bucket, key, data=None):
"""Check if a file exists at the destination and has the same checksum"""
try:
if data:
# Calculate checksum of data
data_checksum = get_file_checksum(data)

# Get destination file if it exists
try:
dest_response = self.client.head_object(Bucket=bucket, Key=key)
dest_checksum = dest_response.get("ETag", "").strip('"')

# Compare checksums
return data_checksum == dest_checksum
except ClientError as e:
if e.response["Error"]["Code"] == "404":
# File doesn't exist at destination
return False
raise

return False
except Exception as e:
logger.error("Error checking file existence", error=str(e))
return False

def copy_file(self, source_bucket, source_key, prefix=None, type=None):
if not self.upload_enabled:
return ""
Expand Down Expand Up @@ -113,7 +145,7 @@ def download_files_for_aps(self, files, prefix=None):

def download_and_upload_to_s3(self, url, prefix=None, headers=None, type=None):
if not self.upload_enabled:
return ""
return {"path": "", "version_id": ""}

if not prefix:
prefix = str(uuid4())
Expand All @@ -127,11 +159,37 @@ def download_and_upload_to_s3(self, url, prefix=None, headers=None, type=None):
response.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.error("Failed to download file", error=str(e), url=url)
return
return {"path": "", "version_id": ""}

if self.file_exists_with_same_checksum(
self.bucket, destination_key, data=response.content
):
logger.info(
"File already exists with the same checksum, skipping upload",
url=url,
destination=f"{self.bucket}/{destination_key}",
)
try:
# Get the VersionId using head_object
head_response = self.client.head_object(
Bucket=self.bucket, Key=destination_key
)
version_id = head_response.get("VersionId", "")
return {
"path": f"{self.bucket}/{destination_key}",
"version_id": version_id,
}
except Exception as e:
logger.error(
"Failed to get version ID for existing file",
error=str(e),
bucket=self.bucket,
key=destination_key,
)
return {"path": f"{self.bucket}/{destination_key}", "version_id": ""}

try:
# Upload the file to S3
self.client.put_object(
put_response = self.client.put_object(
Body=response.content,
Bucket=self.bucket,
Key=destination_key,
Expand All @@ -140,11 +198,17 @@ def download_and_upload_to_s3(self, url, prefix=None, headers=None, type=None):
},
ACL="public-read",
)
return f"{self.bucket}/{destination_key}"
# Extract VersionId from the response
version_id = put_response.get("VersionId", "")
return {
"path": f"{self.bucket}/{destination_key}",
"version_id": version_id,
}
except Exception as e:
logger.error(
"Failed to upload file",
error=str(e),
bucket=self.bucket,
key=destination_key,
)
return {"path": "", "version_id": ""}
Loading