Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions canine/localization/file_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ def __init__(self, path, **kwargs):
"""
super().__init__(path, **kwargs)

self.check_md5 = self.extra_args.get("check_md5", False)

# remove any trailing slashes, in case path refers to a directory
self.path = path.rstrip("/")

Expand Down Expand Up @@ -371,9 +373,26 @@ def __init__(self, path, **kwargs):
raise ValueError(f"Error accessing S3 file:\n{head_resp.stderr.decode()}")
elif head_resp.returncode != 0:
raise ValueError(f"Unknown AWS S3 error occurred:\n{head_resp.stderr.decode()}")

self.headers = json.loads(head_resp.stdout)

# Check for multiple parts to enable local hash calculation
if self.headers.get("PartsCount", 1) > 1:
part1_head_resp = subprocess.run(
"{env} aws s3api {extra_args} head-object --bucket {bucket} --key {obj} --part-number 1".format(
env = self.command_env_str,
extra_args = self.s3_extra_args_str,
bucket = bucket,
obj = obj
),
shell = True,
capture_output = True
)
Comment thread
dheiman marked this conversation as resolved.
if part1_head_resp.returncode == 254:
raise ValueError(f"Error accessing S3 file:\n{part1_head_resp.stderr.decode()}")
if part1_head_resp.returncode != 0:
raise ValueError(f"Unknown AWS S3 error occurred:\n{part1_head_resp.stderr.decode()}")
self.headers["PartLength"] = json.loads(part1_head_resp.stdout)["ContentLength"]
def _get_hash(self):
return self.headers["ETag"].replace('"', '')

Expand All @@ -385,20 +404,47 @@ def localization_command(self, dest):
dest_file = shlex.quote(os.path.basename(dest))
self.localized_path = os.path.join(dest_dir, dest_file)

return "\n".join([
cmd = [
f"[ ! -d {dest_dir} ] && mkdir -p {dest_dir} || :",
f"[ -f {self.localized_path} ] && SZ=$(stat --printf '%s' {self.localized_path}) || SZ=0",
f"if [ $SZ != {self.size} ]; then",
"{env} aws s3api {extra_args} get-object --bucket {bucket} --key {file} --range \"bytes=$SZ-\" >(cat >> {dest}) > /dev/null".format(
'{env} aws s3api {extra_args} get-object --bucket {bucket} --key {file} --range "bytes=$SZ-" >(cat >> {dest}) > /dev/null'.format(
env = self.command_env_str,
extra_args = self.s3_extra_args_str,
bucket = self.path.split("/")[2],
file = "/".join(self.path.split("/")[3:]),
dest = self.localized_path
),
"fi"
])

]
if self.check_md5:
if "PartLength" in self.headers:
chunk_size = self.headers["PartLength"]
chunks = self.headers["PartsCount"]
cmd += [
"md5hash=$(python3 << CODE",
"import hashlib, multiprocessing",
"def hash_chunk(args):",
" i, cs, f = args",
" fh = open(f, 'rb')",
" fh.seek(i * cs)",
" return hashlib.md5(fh.read(cs)).digest()",
f"chunk_size = {chunk_size}",
f"chunks = {chunks}",
f"fp = '{self.localized_path}'",
"pool = multiprocessing.Pool()",
"results = pool.map(hash_chunk, [(i, chunk_size, fp) for i in range(chunks)])",
"pool.close()",
"print(hashlib.md5(b''.join(results)).hexdigest() + '-' + str(chunks))",
"CODE",
")"
]
else:
cmd += [f"md5hash=$(md5sum {self.localized_path} | cut -d ' ' -f 1)"]
cmd += [f'[[ "$md5hash" == "{self.hash}" ]] || {{ echo "deleting corrupted file" 1>&2 ; rm -f {self.localized_path} ; exit 1 ; }}']

return "\n".join(cmd)

class HandleAWSURLStream(HandleAWSURL):
localization_mode = "stream"

Expand Down
2 changes: 1 addition & 1 deletion canine/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from operator import itemgetter
from itertools import groupby

version = '0.17.0'
version = '0.17.1'

ADAPTERS = {
'Manual': ManualAdapter,
Expand Down
Loading