From 36f3603bfe30d30871455f9c5f5a8f79696b1d03 Mon Sep 17 00:00:00 2001 From: David Heiman Date: Tue, 10 Mar 2026 11:17:31 -0400 Subject: [PATCH 1/4] Enable check_md5 argument for AWS --- canine/localization/file_handlers.py | 46 +++++++++++++++++++++++++--- canine/orchestrator.py | 2 +- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/canine/localization/file_handlers.py b/canine/localization/file_handlers.py index d38f578..eff267f 100644 --- a/canine/localization/file_handlers.py +++ b/canine/localization/file_handlers.py @@ -307,6 +307,8 @@ def __init__(self, path, **kwargs): """ super().__init__(path, **kwargs) + self.check_md5 = self.extra_args.get("check_md5", False) + # remove any trailing slashes, in case path refers to a directory self.path = path.rstrip("/") @@ -371,9 +373,26 @@ def __init__(self, path, **kwargs): raise ValueError(f"Error accessing S3 file:\n{head_resp.stderr.decode()}") elif head_resp.returncode != 0: raise ValueError(f"Unknown AWS S3 error occurred:\n{head_resp.stderr.decode()}") - + self.headers = json.loads(head_resp.stdout) + # Check for multiple parts to enable local hash calculation + if self.headers.get("PartsCount", 1) > 1: + part1_head_resp = subprocess.run( + "{env} aws s3api {extra_args} head-object --bucket {bucket} --key {obj} --part-number 1".format( + env = self.command_env_str, + extra_args = self.s3_extra_args_str, + bucket = bucket, + obj = obj + ), + shell = True, + capture_output = True + ) + if head_resp.returncode == 254: + raise ValueError(f"Error accessing S3 file:\n{head_resp.stderr.decode()}") + if head_resp.returncode != 0: + raise ValueError(f"Unknown AWS S3 error occurred:\n{head_resp.stderr.decode()}") + self.headers["PartLength"] = json.loads(part1_head_resp.stdout)["ContentLength"] def _get_hash(self): return self.headers["ETag"].replace('"', '') @@ -385,11 +404,11 @@ def localization_command(self, dest): dest_file = shlex.quote(os.path.basename(dest)) self.localized_path = os.path.join(dest_dir, dest_file) - return "\n".join([ + cmd = [ f"[ ! -d {dest_dir} ] && mkdir -p {dest_dir} || :", f"[ -f {self.localized_path} ] && SZ=$(stat --printf '%s' {self.localized_path}) || SZ=0", f"if [ $SZ != {self.size} ]; then", - "{env} aws s3api {extra_args} get-object --bucket {bucket} --key {file} --range \"bytes=$SZ-\" >(cat >> {dest}) > /dev/null".format( + '{env} aws s3api {extra_args} get-object --bucket {bucket} --key {file} --range "bytes=$SZ-" >(cat >> {dest}) > /dev/null'.format( env = self.command_env_str, extra_args = self.s3_extra_args_str, bucket = self.path.split("/")[2], @@ -397,8 +416,25 @@ def localization_command(self, dest): dest = self.localized_path ), "fi" - ]) - + ] + if self.check_md5: + if "PartLength" in self.headers: + chunk_size_mb = int(self.headers["PartLength"] / (1024 * 1024)) + chunks = self.headers["PartsCount"] + cmd += [ + "CHECKSUMS=$(mktemp)", + """trap 'rm -f "$CHECKSUMS"' EXIT""", + f"for i in {{0..{chunks - 1}}}; do", + f" dd bs=1M count={chunk_size_mb} skip=$((i * {chunk_size_mb})) if={self.localized_path} | md5sum | cut -d ' ' -f 1 >> $CHECKSUMS", + "done", + f"""md5hash=$(python3 -c "import sys, binascii; sys.stdout.buffer.write(binascii.unhexlify(''.join(sys.stdin.read().split())))" < $CHECKSUMS | md5sum | cut -d ' ' -f 1)-{chunks}""", + ] + else: + cmd += [f"md5hash=$(md5sum {self.localized_path} | cut -d ' ' -f 1)"] + cmd += [f'[[ "$md5hash" == "{self.hash}" ]] || {{ echo "deleting corrupted file" 1>&2 ; rm -f {self.localized_path} ; exit 1 ; }}'] + + return "\n".join(cmd) + class HandleAWSURLStream(HandleAWSURL): localization_mode = "stream" diff --git a/canine/orchestrator.py b/canine/orchestrator.py index d1b5e6a..c3a14c7 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -17,7 +17,7 @@ from operator import itemgetter from itertools import groupby -version = '0.17.0' +version = '0.17.1' ADAPTERS = { 'Manual': ManualAdapter, From cbf6756aecdf6af2d940fb80a9bfa43be82a6ac2 Mon Sep 17 00:00:00 2001 From: David Heiman Date: Tue, 10 Mar 2026 16:05:00 -0400 Subject: [PATCH 2/4] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- canine/localization/file_handlers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/canine/localization/file_handlers.py b/canine/localization/file_handlers.py index eff267f..4aad589 100644 --- a/canine/localization/file_handlers.py +++ b/canine/localization/file_handlers.py @@ -388,10 +388,10 @@ def __init__(self, path, **kwargs): shell = True, capture_output = True ) - if head_resp.returncode == 254: - raise ValueError(f"Error accessing S3 file:\n{head_resp.stderr.decode()}") - if head_resp.returncode != 0: - raise ValueError(f"Unknown AWS S3 error occurred:\n{head_resp.stderr.decode()}") + if part1_head_resp.returncode == 254: + raise ValueError(f"Error accessing S3 file:\n{part1_head_resp.stderr.decode()}") + if part1_head_resp.returncode != 0: + raise ValueError(f"Unknown AWS S3 error occurred:\n{part1_head_resp.stderr.decode()}") self.headers["PartLength"] = json.loads(part1_head_resp.stdout)["ContentLength"] def _get_hash(self): return self.headers["ETag"].replace('"', '') From 25db9e5b1c55b78f86da177d1f5c6ea42a3f61e3 Mon Sep 17 00:00:00 2001 From: David Heiman Date: Thu, 12 Mar 2026 14:43:08 -0400 Subject: [PATCH 3/4] Speed up the md5 hash check with multithreading. --- canine/localization/file_handlers.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/canine/localization/file_handlers.py b/canine/localization/file_handlers.py index eff267f..76ec713 100644 --- a/canine/localization/file_handlers.py +++ b/canine/localization/file_handlers.py @@ -419,15 +419,24 @@ def localization_command(self, dest): ] if self.check_md5: if "PartLength" in self.headers: - chunk_size_mb = int(self.headers["PartLength"] / (1024 * 1024)) + chunk_size = self.headers["PartLength"] chunks = self.headers["PartsCount"] cmd += [ - "CHECKSUMS=$(mktemp)", - """trap 'rm -f "$CHECKSUMS"' EXIT""", - f"for i in {{0..{chunks - 1}}}; do", - f" dd bs=1M count={chunk_size_mb} skip=$((i * {chunk_size_mb})) if={self.localized_path} | md5sum | cut -d ' ' -f 1 >> $CHECKSUMS", - "done", - f"""md5hash=$(python3 -c "import sys, binascii; sys.stdout.buffer.write(binascii.unhexlify(''.join(sys.stdin.read().split())))" < $CHECKSUMS | md5sum | cut -d ' ' -f 1)-{chunks}""", + "md5hash=$(python3 << CODE", + "import hashlib", + "from concurrent.futures import ThreadPoolExecutor", + "def hash_chunk(i, cs, f):", + " fh = open(f, 'rb')", + " fh.seek(i * cs)", + " return hashlib.md5(fh.read(cs)).digest()", + f"chunk_size = {chunk_size}", + f"chunks = {chunks}", + f"fp = '{self.localized_path}'", + "with ThreadPoolExecutor() as pool:", + " results = pool.map(lambda i: hash_chunk(i, chunk_size, fp), range(chunks))", + "print(hashlib.md5(b''.join(results)).hexdigest() + '-' + str(chunks))", + "CODE", + ")" ] else: cmd += [f"md5hash=$(md5sum {self.localized_path} | cut -d ' ' -f 1)"] From 791406a215a115aec5d08bac75382dfd04adb5c3 Mon Sep 17 00:00:00 2001 From: David Heiman Date: Thu, 12 Mar 2026 19:33:12 -0400 Subject: [PATCH 4/4] multiprocessing uses less memory than multithreading --- canine/localization/file_handlers.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/canine/localization/file_handlers.py b/canine/localization/file_handlers.py index 5ab52e1..6d5c4f8 100644 --- a/canine/localization/file_handlers.py +++ b/canine/localization/file_handlers.py @@ -423,17 +423,18 @@ def localization_command(self, dest): chunks = self.headers["PartsCount"] cmd += [ "md5hash=$(python3 << CODE", - "import hashlib", - "from concurrent.futures import ThreadPoolExecutor", - "def hash_chunk(i, cs, f):", + "import hashlib, multiprocessing", + "def hash_chunk(args):", + " i, cs, f = args", " fh = open(f, 'rb')", " fh.seek(i * cs)", " return hashlib.md5(fh.read(cs)).digest()", f"chunk_size = {chunk_size}", f"chunks = {chunks}", f"fp = '{self.localized_path}'", - "with ThreadPoolExecutor() as pool:", - " results = pool.map(lambda i: hash_chunk(i, chunk_size, fp), range(chunks))", + "pool = multiprocessing.Pool()", + "results = pool.map(hash_chunk, [(i, chunk_size, fp) for i in range(chunks)])", + "pool.close()", "print(hashlib.md5(b''.join(results)).hexdigest() + '-' + str(chunks))", "CODE", ")"