diff --git a/canine/localization/file_handlers.py b/canine/localization/file_handlers.py index d38f578..6d5c4f8 100644 --- a/canine/localization/file_handlers.py +++ b/canine/localization/file_handlers.py @@ -307,6 +307,8 @@ def __init__(self, path, **kwargs): """ super().__init__(path, **kwargs) + self.check_md5 = self.extra_args.get("check_md5", False) + # remove any trailing slashes, in case path refers to a directory self.path = path.rstrip("/") @@ -371,9 +373,26 @@ def __init__(self, path, **kwargs): raise ValueError(f"Error accessing S3 file:\n{head_resp.stderr.decode()}") elif head_resp.returncode != 0: raise ValueError(f"Unknown AWS S3 error occurred:\n{head_resp.stderr.decode()}") - + self.headers = json.loads(head_resp.stdout) + # Check for multiple parts to enable local hash calculation + if self.headers.get("PartsCount", 1) > 1: + part1_head_resp = subprocess.run( + "{env} aws s3api {extra_args} head-object --bucket {bucket} --key {obj} --part-number 1".format( + env = self.command_env_str, + extra_args = self.s3_extra_args_str, + bucket = bucket, + obj = obj + ), + shell = True, + capture_output = True + ) + if part1_head_resp.returncode == 254: + raise ValueError(f"Error accessing S3 file:\n{part1_head_resp.stderr.decode()}") + if part1_head_resp.returncode != 0: + raise ValueError(f"Unknown AWS S3 error occurred:\n{part1_head_resp.stderr.decode()}") + self.headers["PartLength"] = json.loads(part1_head_resp.stdout)["ContentLength"] def _get_hash(self): return self.headers["ETag"].replace('"', '') @@ -385,11 +404,11 @@ def localization_command(self, dest): dest_file = shlex.quote(os.path.basename(dest)) self.localized_path = os.path.join(dest_dir, dest_file) - return "\n".join([ + cmd = [ f"[ ! -d {dest_dir} ] && mkdir -p {dest_dir} || :", f"[ -f {self.localized_path} ] && SZ=$(stat --printf '%s' {self.localized_path}) || SZ=0", f"if [ $SZ != {self.size} ]; then", - "{env} aws s3api {extra_args} get-object --bucket {bucket} --key {file} --range \"bytes=$SZ-\" >(cat >> {dest}) > /dev/null".format( + '{env} aws s3api {extra_args} get-object --bucket {bucket} --key {file} --range "bytes=$SZ-" >(cat >> {dest}) > /dev/null'.format( env = self.command_env_str, extra_args = self.s3_extra_args_str, bucket = self.path.split("/")[2], @@ -397,8 +416,35 @@ def localization_command(self, dest): dest = self.localized_path ), "fi" - ]) - + ] + if self.check_md5: + if "PartLength" in self.headers: + chunk_size = self.headers["PartLength"] + chunks = self.headers["PartsCount"] + cmd += [ + "md5hash=$(python3 << CODE", + "import hashlib, multiprocessing", + "def hash_chunk(args):", + " i, cs, f = args", + " fh = open(f, 'rb')", + " fh.seek(i * cs)", + " return hashlib.md5(fh.read(cs)).digest()", + f"chunk_size = {chunk_size}", + f"chunks = {chunks}", + f"fp = '{self.localized_path}'", + "pool = multiprocessing.Pool()", + "results = pool.map(hash_chunk, [(i, chunk_size, fp) for i in range(chunks)])", + "pool.close()", + "print(hashlib.md5(b''.join(results)).hexdigest() + '-' + str(chunks))", + "CODE", + ")" + ] + else: + cmd += [f"md5hash=$(md5sum {self.localized_path} | cut -d ' ' -f 1)"] + cmd += [f'[[ "$md5hash" == "{self.hash}" ]] || {{ echo "deleting corrupted file" 1>&2 ; rm -f {self.localized_path} ; exit 1 ; }}'] + + return "\n".join(cmd) + class HandleAWSURLStream(HandleAWSURL): localization_mode = "stream" diff --git a/canine/orchestrator.py b/canine/orchestrator.py index d1b5e6a..c3a14c7 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -17,7 +17,7 @@ from operator import itemgetter from itertools import groupby -version = '0.17.0' +version = '0.17.1' ADAPTERS = { 'Manual': ManualAdapter,