Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dags/aps/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@


class APSRepository(IRepository):
PARSED_DIR = "parsed/"

def __init__(self) -> None:
super().__init__()
self.s3_bucket = S3Service(os.getenv("APS_BUCKET_NAME", "aps"))

def find_all(self):
files = []
for obj in self.s3_bucket.objects.all():
if obj.key.startswith(self.PARSED_DIR):
continue
file_name = os.path.basename(obj.key)
files.append(file_name)
return files
Expand All @@ -32,5 +36,8 @@ def find_the_last_uploaded_file_date(self):
def save(self, key, obj):
self.s3_bucket.upload_fileobj(obj, key)

def save_parsed(self, key, obj):
self.s3_bucket.upload_fileobj(obj, f"{self.PARSED_DIR}{key}")

def delete_all(self):
self.s3_bucket.objects.all().delete()
4 changes: 2 additions & 2 deletions dags/common/pull_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def migrate_files(
repo.save(archive_name, file_bytes)
else:
extracted_or_downloaded_filenames.append(
os.path.join(repo.ZIPED_DIR, archive_name)
os.path.join(repo.RAW_DIR, archive_name)
)
repo.save(archive_name, file_bytes)

Expand Down Expand Up @@ -113,7 +113,7 @@ def _filenames_pull(
def _find_files_in_zip(filenames, repo):
extracted_filenames = []
for zipped_filename in filenames:
zipped_file = repo.get_by_id(f"{repo.ZIPED_DIR}{zipped_filename}")
zipped_file = repo.get_by_id(f"{repo.RAW_DIR}{zipped_filename}")
with zipfile.ZipFile(zipped_file) as zip:
for zip_filename in zip.namelist():
if repo.is_meta(zip_filename):
Expand Down
3 changes: 3 additions & 0 deletions dags/common/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ def find_the_last_uploaded_file_date(self):
def save(self, filename, obj):
raise NotImplementedError

def save_parsed(self, filename, obj):
raise NotImplementedError

def delete_all(self):
raise NotImplementedError

Expand Down
4 changes: 2 additions & 2 deletions dags/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,6 @@ def upload_json_to_s3(json_record, repo):
current_date_and_time_str = current_date.strftime("%Y-%m-%d_%H:%M:%S")
doi = get_value(json_record, "dois.value[0]")
file_key = os.path.join(
"parsed", current_date_str, f"{doi}__{current_date_and_time_str}.json"
current_date_str, f"{doi}__{current_date_and_time_str}.json"
)
repo.save(file_key, file_in_bytes)
repo.save_parsed(file_key, file_in_bytes)
Loading
Loading