From d2f3b112590d33795b838fbdba0f5102973f523a Mon Sep 17 00:00:00 2001 From: PascalEgn Date: Wed, 1 Apr 2026 14:13:50 +0200 Subject: [PATCH] feat: add Springer reharvest DAG for reprocessing articles --- dags/springer/repository.py | 21 +- dags/springer/springer_reharvest.py | 357 ++++++++++++++++++ .../test_springer_dag_process_file.py | 21 +- .../springer/test_springer_dag_pull_sftp.py | 65 ++-- .../springer/test_springer_repository.py | 34 +- .../springer/test_unit_springer_reharvest.py | 202 ++++++++++ 6 files changed, 641 insertions(+), 59 deletions(-) create mode 100644 dags/springer/springer_reharvest.py create mode 100644 tests/units/springer/test_unit_springer_reharvest.py diff --git a/dags/springer/repository.py b/dags/springer/repository.py index 925e560e..47496443 100644 --- a/dags/springer/repository.py +++ b/dags/springer/repository.py @@ -19,28 +19,27 @@ def __init__(self) -> None: def get_all_raw_filenames(self): return [ - f.key.removeprefix("raw/") + f.key.removeprefix(self.ZIPED_DIR) for f in self.s3.objects.filter(Prefix=self.ZIPED_DIR).all() ] def find_all(self, filenames_to_process=None): - ret_dict = {} - filenames = [] + grouped_files = {} filenames = ( filenames_to_process if filenames_to_process else self.__find_all_extracted_files() ) + if not filenames: + return [] for file in filenames: - file_parts = file.split("/") - last_part = file_parts[-1] + last_part = os.path.basename(file) filename_without_extension = last_part.split(".")[0] - if filename_without_extension not in ret_dict: - ret_dict[filename_without_extension] = dict() - ret_dict[filename_without_extension][ - "xml" if self.is_meta(last_part) else "pdf" - ] = file - return list(ret_dict.values()) + if filename_without_extension not in grouped_files: + grouped_files[filename_without_extension] = {} + extension = "xml" if self.is_meta(last_part) else "pdf" + grouped_files[filename_without_extension][extension] = file + return list(grouped_files.values()) def get_by_id(self, id): retfile = BytesIO() diff --git a/dags/springer/springer_reharvest.py b/dags/springer/springer_reharvest.py new file mode 100644 index 00000000..610e1c2c --- /dev/null +++ b/dags/springer/springer_reharvest.py @@ -0,0 +1,357 @@ +import base64 +import fnmatch +import logging +import re +from datetime import date, datetime, timedelta + +import pendulum +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.sdk import Param, dag, task +from common.notification_service import FailedDagNotifier +from common.utils import parse_without_names_spaces +from springer.repository import SpringerRepository + +logger = logging.getLogger("airflow.task") +SPRINGER_REPO = SpringerRepository() +_ARCHIVE_DT_PATTERN = re.compile(r"ftp_PUB_(\d{2}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})") + + +def _parse_date(value): + if not value: + return None + return datetime.strptime(value, "%Y-%m-%d").date() + + +def _normalize_dois(dois): + if not dois: + return set() + return {doi.strip().lower() for doi in dois if isinstance(doi, str) and doi.strip()} + + +def _normalize_file_keys(file_keys): + return [k.strip() for k in file_keys if isinstance(k, str) and k.strip()] + + +def _is_glob_pattern(value): + return any(c in value for c in ("*", "?", "[")) + + +def _extract_archive_datetime_from_key(key): + if not key: + return datetime.min + + match = _ARCHIVE_DT_PATTERN.search(key) + if not match: + return datetime.min + + try: + return datetime.strptime( + f"{match.group(1)}_{match.group(2)}", "%y-%m-%d_%H-%M-%S" + ) + except ValueError: + return datetime.min + + +def _extract_archive_date_from_key(key): + dt_value = _extract_archive_datetime_from_key(key) + if dt_value == datetime.min: + return None + return dt_value.date() + + +def _extract_doi_from_xml(xml_bytes): + try: + if isinstance(xml_bytes, bytes): + xml_text = xml_bytes.decode("utf-8") + else: + xml_text = xml_bytes + xml = parse_without_names_spaces(xml_text) + doi_element = xml.find("./Journal/Volume/Issue/Article/ArticleInfo/ArticleDOI") + if doi_element is not None and doi_element.text: + return doi_element.text.strip().lower() + except Exception: + pass + return None + + +def _resolve_file_keys(file_keys, all_xml_keys): + all_xml_keys_set = set(all_xml_keys) + prefix = SPRINGER_REPO.EXTRACTED_DIR + + exact_keys = [k for k in file_keys if not _is_glob_pattern(k)] + glob_patterns = [k for k in file_keys if _is_glob_pattern(k)] + + missing = [k for k in exact_keys if k not in all_xml_keys_set] + if missing: + raise ValueError(f"Some requested file_keys do not exist: {missing}") + + resolved = list(exact_keys) + + for pattern in glob_patterns: + matched = [ + key + for key in all_xml_keys + if fnmatch.fnmatch( + key[len(prefix) :] if key.startswith(prefix) else key, pattern + ) + ] + if not matched: + logger.warning("Pattern %r matched no Springer extracted XML keys", pattern) + resolved.extend(matched) + + seen = set() + deduped = [] + for key in resolved: + if key not in seen: + seen.add(key) + deduped.append(key) + return deduped + + +def _filter_keys_by_dois(keys, target_dois, repo): + deduped = {} + found_dois = set() + + for key in keys: + file_obj = repo.get_by_id(key) + doi = _extract_doi_from_xml(file_obj.getvalue()) + + if not doi: + logger.warning("Could not extract DOI from file: %s", key) + continue + + if doi not in target_dois: + continue + + found_dois.add(doi) + if doi not in deduped: + deduped[doi] = key + + missing_dois = sorted(target_dois - found_dois) + if missing_dois: + logger.warning("Some requested DOIs were not found: %s", missing_dois) + + return list(deduped.values()) + + +def _enforce_limit(records, limit): + if limit is None: + return + if len(records) > limit: + raise ValueError( + f"Matched {len(records)} records, which is above limit={limit}. " + "Please reduce the date range or increase the limit." + ) + + +@dag( + on_failure_callback=FailedDagNotifier(), + start_date=pendulum.today("UTC").add(days=-1), + schedule=None, + catchup=False, + tags=["reharvest", "springer"], + params={ + "date_from": Param( + default=None, + type=["string", "null"], + description="Start date from extracted archive folder in YYYY-MM-DD format", + title="Date from", + ), + "date_to": Param( + default=None, + type=["string", "null"], + description="End date from extracted archive folder in YYYY-MM-DD format", + title="Date to", + ), + "file_keys": Param( + default=[], + type=["array", "null"], + description=( + "Exact Springer extracted XML keys or glob patterns matched relative " + f"to '{SPRINGER_REPO.EXTRACTED_DIR}'. Examples: '*' or " + f"'EPJC/ftp_PUB_26-03-29_20-00-46/*'." + ), + title="File keys", + ), + "dois": Param( + default=[], + type=["array", "null"], + description=( + "List of DOIs to process. Can be used with date_from/date_to or together " + "with file_keys to filter the resolved key set. The maximum date range is 1 Year." + ), + title="DOIs", + ), + "limit": Param( + 1000, + type=["integer"], + description="Maximum number of records to process", + title="Limit", + ), + "dry_run": Param( + default=True, + type=["boolean", "null"], + description="Whether to perform a dry run. If true, no downstream DAGs will be triggered", + title="Dry run", + ), + }, +) +def springer_reharvest(): + @task() + def collect_records(repo=SPRINGER_REPO, **kwargs): + params = kwargs.get("params", {}) + + date_from = _parse_date(params.get("date_from")) + date_to = _parse_date(params.get("date_to")) + file_keys = _normalize_file_keys(params.get("file_keys") or []) + dois = _normalize_dois(params.get("dois") or []) + target_dois = set(dois) + limit = params.get("limit") + + if limit is not None and (not isinstance(limit, int) or limit <= 0): + raise ValueError("limit must be a positive integer when provided") + + if bool(file_keys) and (date_from or date_to): + raise ValueError( + "Invalid parameters: date_from/date_to cannot be used together with file_keys" + ) + + if (date_from and not date_to) or (date_to and not date_from): + raise ValueError("Both date_from and date_to must be provided together") + + all_xml_keys = [ + obj.key + for obj in repo.s3.objects.filter(Prefix=repo.EXTRACTED_DIR).all() + if repo.is_meta(obj.key) + ] + logger.info( + "Found %s Springer extracted XML file(s) in %s", + len(all_xml_keys), + repo.EXTRACTED_DIR, + ) + + if file_keys: + resolved_keys = _resolve_file_keys(file_keys, all_xml_keys) + + if target_dois: + resolved_keys = sorted( + resolved_keys, + key=_extract_archive_datetime_from_key, + reverse=True, + ) + selected_keys = _filter_keys_by_dois( + keys=resolved_keys, + target_dois=target_dois, + repo=repo, + ) + else: + selected_keys = resolved_keys + + _enforce_limit(selected_keys, limit) + logger.info( + "Selected %s Springer XML key(s) from file_keys (with glob expansion)", + len(selected_keys), + ) + return selected_keys + + if dois and not (date_from and date_to): + date_to = date.today() + date_from = date_to - timedelta(days=365) + + if not (date_from and date_to): + raise ValueError( + "Invalid parameters: provide either date_from+date_to, file_keys, or dois" + ) + + if dois and (date_to - date_from).days > 366: + raise ValueError( + "For DOI search the date range must not exceed one year. " + "Please use a smaller range." + ) + + logger.info( + "Selecting Springer extracted XML files in date range %s to %s", + date_from, + date_to, + ) + + keys_in_range = [] + for key in all_xml_keys: + key_date = _extract_archive_date_from_key(key) + if key_date is None: + continue + if date_from <= key_date <= date_to: + keys_in_range.append(key) + + keys_in_range.sort(key=_extract_archive_datetime_from_key, reverse=True) + + logger.info( + "Found %s Springer extracted XML file(s) in the requested date range", + len(keys_in_range), + ) + + deduped = {} + keys_without_doi = [] + found_dois = set() + + for key in keys_in_range: + file_obj = repo.get_by_id(key) + doi = _extract_doi_from_xml(file_obj.getvalue()) + + if not doi: + logger.warning("Could not extract DOI from file: %s", key) + if not target_dois: + keys_without_doi.append(key) + continue + + if target_dois and doi not in target_dois: + continue + + if target_dois: + found_dois.add(doi) + + if doi not in deduped: + deduped[doi] = key + + if target_dois: + missing_dois = sorted(target_dois - found_dois) + if missing_dois: + logger.warning("Some requested DOIs were not found: %s", missing_dois) + + selected_keys = list(deduped.values()) + keys_without_doi + + _enforce_limit(selected_keys, limit) + logger.info("Collected %s deduplicated Springer XML key(s)", len(selected_keys)) + logger.info("Selected Springer XML keys: %s", selected_keys) + return selected_keys + + @task() + def prepare_trigger_conf(file_keys, repo=SPRINGER_REPO, **kwargs): + dry_run = bool(kwargs.get("params", {}).get("dry_run", False)) + if dry_run: + logger.info( + "Dry run enabled. %s record(s) matched. No downstream runs will be triggered.", + len(file_keys), + ) + return [] + + confs = [] + for key in file_keys: + file_obj = repo.get_by_id(key) + encoded_xml = base64.b64encode(file_obj.getvalue()).decode("utf-8") + confs.append({"file": encoded_xml, "file_name": key}) + + logger.info("Prepared %s downstream trigger conf(s)", len(confs)) + return confs + + file_keys = collect_records() + trigger_confs = prepare_trigger_conf(file_keys) + + TriggerDagRunOperator.partial( + task_id="springer_reharvest_trigger_file_processing", + trigger_dag_id="springer_process_file", + reset_dag_run=True, + ).expand(conf=trigger_confs) + + +springer_reharvest_dag = springer_reharvest() diff --git a/tests/integration/springer/test_springer_dag_process_file.py b/tests/integration/springer/test_springer_dag_process_file.py index 441a8a25..7972cae0 100644 --- a/tests/integration/springer/test_springer_dag_process_file.py +++ b/tests/integration/springer/test_springer_dag_process_file.py @@ -91,11 +91,11 @@ def springer_data_files_in_s3(): s3_files = {obj.key for obj in repo.s3.objects.all()} assert ( - "extracted/EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/BodyRef/PDF/10052_2025_Article_15241.pdf" + f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/BodyRef/PDF/10052_2025_Article_15241.pdf" in s3_files ) assert ( - "extracted/JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/BodyRef/PDF/13130_2026_Article_28203.pdf" + f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/BodyRef/PDF/13130_2026_Article_28203.pdf" in s3_files ) @@ -105,7 +105,13 @@ def springer_data_files_in_s3(): def test_dag_loaded(dag): assert dag is not None - assert len(dag.tasks) == 7 + assert "parse_file" in dag.task_ids + assert "add_data_availability" in dag.task_ids + assert "enhance_file" in dag.task_ids + assert "populate_files" in dag.task_ids + assert "enrich_file" in dag.task_ids + assert "save_to_s3" in dag.task_ids + assert "create_or_update" in dag.task_ids publisher = "Springer" @@ -310,6 +316,7 @@ def test_dag_process_file_no_input_file(article): def test_extract_data_availability_data( dag, epjc_data_article, springer_data_files_in_s3 ): + repo = SpringerRepository() expected = { "statement": "This manuscript has associated data in a data repository. [Authors' comment: The public release of data supporting the findings of this article will follow the CERN Open Data Policy [124]. Inquiries about plots and tables associated with this article can be addressed to atlas.publications@cern.ch.]\nThismanuscripthasassociatedcode/software in a data repository. [Authors' comment: The ATLAS Collaboration's Athena software, including the configuration of the event generators, is open source (https://gitlab.cern.ch/atlas/athena).]", "urls": [ @@ -319,7 +326,7 @@ def test_extract_data_availability_data( result = dag.test( run_conf={ "file": base64.b64encode(ET.tostring(epjc_data_article)).decode(), - "file_name": "extracted/EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta", + "file_name": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta", "parse_pdf": True, }, mark_success_pattern="save_to_s3|create_or_update", @@ -332,13 +339,14 @@ def test_extract_data_availability_data( def test_extract_data_availability_no_data( dag, jhep_data_article, springer_data_files_in_s3 ): + repo = SpringerRepository() expected = { "statement": "This article has no associated data or the data will not be deposited.\nThis article has no associated code or the code will not be deposited.", } result = dag.test( run_conf={ "file": base64.b64encode(ET.tostring(jhep_data_article)).decode(), - "file_name": "extracted/JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/13130_2026_Article_28203.xml.scoap", + "file_name": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/13130_2026_Article_28203.xml.scoap", "parse_pdf": True, }, mark_success_pattern="save_to_s3|create_or_update", @@ -351,10 +359,11 @@ def test_extract_data_availability_no_data( def test_extract_data_availability_disabled_by_default( dag, epjc_data_article, springer_data_files_in_s3 ): + repo = SpringerRepository() result = dag.test( run_conf={ "file": base64.b64encode(ET.tostring(epjc_data_article)).decode(), - "file_name": "extracted/EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta", + "file_name": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta", }, mark_success_pattern="save_to_s3|create_or_update", ) diff --git a/tests/integration/springer/test_springer_dag_pull_sftp.py b/tests/integration/springer/test_springer_dag_pull_sftp.py index 180d2ef9..c1db1e5d 100644 --- a/tests/integration/springer/test_springer_dag_pull_sftp.py +++ b/tests/integration/springer/test_springer_dag_pull_sftp.py @@ -17,22 +17,31 @@ def dag(): def test_dag_loaded(dag): assert dag is not None - assert len(dag.tasks) == 3 + assert "migrate_from_ftp" in dag.task_ids + assert "prepare_trigger_conf" in dag.task_ids + assert "springer_trigger_file_processing" in dag.task_ids def test_dag_run(dag): repo = SpringerRepository() repo.delete_all() - dag.test() + dag.test( + run_conf={ + "filenames_pull": { + "enabled": True, + "filenames": ["JHEP/ftp_PUB_19-01-29_20-02-10_JHEP.zip"], + "force_from_ftp": True, + }, + "force_pull": False, + "excluded_directories": [], + }, + mark_success_pattern="springer_trigger_file_processing", + ) expected_subset = [ { - "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", - "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", - }, - { - "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", - "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", + "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", + "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", }, ] found_files = repo.find_all() @@ -77,16 +86,16 @@ def test_force_pull_from_sftp(): ) expected_files = [ { - "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", }, { - "pdf": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", }, { - "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", - "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", + "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", + "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", }, ] assert repo.find_all() == expected_files @@ -117,8 +126,8 @@ def test_force_pull_from_sftp_with_excluded_folder(): ) expected_files = [ { - "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", - "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", + "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", + "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", } ] assert repo.find_all() == expected_files @@ -149,16 +158,16 @@ def test_pull_from_sftp_and_reprocess(): ) excepted_files = [ { - "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", }, { - "pdf": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", }, { - "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", - "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", + "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", + "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", }, ] assert repo.find_all() == excepted_files @@ -180,16 +189,16 @@ def test_pull_from_sftp_and_reprocess(): ) excepted_files = [ { - "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", }, { - "pdf": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", }, { - "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", - "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", + "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", + "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", }, ] assert repo.find_all() == excepted_files diff --git a/tests/units/springer/test_springer_repository.py b/tests/units/springer/test_springer_repository.py index c5b559ba..a4bbca91 100644 --- a/tests/units/springer/test_springer_repository.py +++ b/tests/units/springer/test_springer_repository.py @@ -11,22 +11,28 @@ def __init__(self, key) -> None: S3_RETURNED_VALUES = [ - "file1.txt", - "file1.scoap", - "file1.pdf", - "file2.txt", - "file2.Meta", - "file2.pdf", + "extracted/file1.txt", + "extracted/file1.scoap", + "extracted/file1.pdf", + "extracted/file2.txt", + "extracted/file2.Meta", + "extracted/file2.pdf", ] FIND_ALL_EXPECTED_VALUES = [ - {"xml": "file1.scoap", "pdf": "file1.pdf"}, - {"xml": "file2.Meta", "pdf": "file2.pdf"}, + { + "xml": "extracted/file1.scoap", + "pdf": "extracted/file1.pdf", + }, + { + "xml": "extracted/file2.Meta", + "pdf": "extracted/file2.pdf", + }, ] FIND_ALL_EXTRACTED_FILES_EXPECTED_VALUES = [ - "file1.scoap", - "file1.pdf", - "file2.Meta", - "file2.pdf", + "extracted/file1.scoap", + "extracted/file1.pdf", + "extracted/file2.Meta", + "extracted/file2.pdf", ] @@ -58,7 +64,7 @@ def test_save_zip_file(boto3_fixture): filename = "test.zip" repo = SpringerRepository() repo.save(filename, file) - upload_mock.assert_called_with(file, "raw/" + filename) + upload_mock.assert_called_with(file, repo.ZIPED_DIR + filename) def test_save_file(boto3_fixture): @@ -67,7 +73,7 @@ def test_save_file(boto3_fixture): filename = "test.pdf" repo = SpringerRepository() repo.save(filename, file) - upload_mock.assert_called_with(file, "extracted/" + filename) + upload_mock.assert_called_with(file, repo.EXTRACTED_DIR + filename) def test_file_is_meta(): diff --git a/tests/units/springer/test_unit_springer_reharvest.py b/tests/units/springer/test_unit_springer_reharvest.py new file mode 100644 index 00000000..7898bf5e --- /dev/null +++ b/tests/units/springer/test_unit_springer_reharvest.py @@ -0,0 +1,202 @@ +import base64 +from io import BytesIO +from unittest import mock + +import pytest +from airflow.models import DagBag + + +class _S3Object: + def __init__(self, key): + self.key = key + + +def _springer_xml_with_doi(doi): + return f""" + + + + +
+ + {doi} + +
+
+
+
+
+""".encode() + + +@pytest.fixture(scope="class") +def dagbag(): + return DagBag(dag_folder="dags/", include_examples=False) + + +@pytest.mark.usefixtures("dagbag") +class TestUnitSpringerReharvest: + def setup_method(self): + self.dag_id = "springer_reharvest" + dagbag = DagBag(dag_folder="dags/", include_examples=False) + self.dag = dagbag.dags.get(self.dag_id) + assert self.dag is not None, f"DAG {self.dag_id} failed to load" + + def _build_repo(self, payload_by_key): + mock_repo = mock.MagicMock() + mock_repo.EXTRACTED_DIR = "extracted/" + mock_repo.s3.objects.filter.return_value.all.return_value = [ + _S3Object(key) for key in payload_by_key + ] + mock_repo.is_meta.side_effect = lambda key: key.endswith( + ".Meta" + ) or key.endswith(".scoap") + + def _get_by_id(key): + return BytesIO(payload_by_key[key]) + + mock_repo.get_by_id.side_effect = _get_by_id + return mock_repo + + def test_dag_structure(self): + assert "springer_reharvest_trigger_file_processing" in self.dag.task_ids + task = self.dag.get_task("springer_reharvest_trigger_file_processing") + assert "MappedOperator" in str(type(task)) or task.is_mapped + assert task.partial_kwargs["trigger_dag_id"] == "springer_process_file" + assert task.partial_kwargs["reset_dag_run"] is True + + def test_collect_records_file_keys_and_dois_filters_resolved_keys(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "extracted/EPJC/ftp_PUB_25-03-01_00-00-00/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/JHEP/ftp_PUB_26-03-30_20-00-50/other.Meta": _springer_xml_with_doi( + "10.1007/two" + ), + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={ + "file_keys": ["*"], + "dois": ["10.1007/one"], + }, + ) + + assert result == ["extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta"] + + def test_collect_records_date_range_dedup_keeps_newest(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "extracted/EPJC/ftp_PUB_25-08-15_00-00-00/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/JHEP/ftp_PUB_26-01-30_20-00-50/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/JHEP/ftp_PUB_25-12-19_20-00-50/other.scoap": _springer_xml_with_doi( + "10.1007/two" + ), + "extracted/JHEP/not-a-drop/ignored.Meta": _springer_xml_with_doi( + "10.1007/three" + ), + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={ + "date_from": "2025-08-15", + "date_to": "2026-01-30", + }, + ) + + assert len(result) == 2 + assert "extracted/JHEP/ftp_PUB_26-01-30_20-00-50/article.Meta" in result + assert "extracted/JHEP/ftp_PUB_25-12-19_20-00-50/other.scoap" in result + + def test_collect_records_glob_patterns(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/JHEP/ftp_PUB_26-03-30_20-00-50/other.scoap": _springer_xml_with_doi( + "10.1007/two" + ), + } + mock_repo = self._build_repo(payload_by_key) + + result_all = function_to_unit_test( + repo=mock_repo, + params={"file_keys": ["*"]}, + ) + assert set(result_all) == set(payload_by_key.keys()) + + result_epjc = function_to_unit_test( + repo=mock_repo, + params={"file_keys": ["EPJC/*"]}, + ) + assert result_epjc == ["extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta"] + + def test_collect_records_limit_exceeded_raises(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/JHEP/ftp_PUB_26-03-30_20-00-50/other.scoap": _springer_xml_with_doi( + "10.1007/two" + ), + } + mock_repo = self._build_repo(payload_by_key) + + with pytest.raises(ValueError, match="above limit"): + function_to_unit_test( + repo=mock_repo, + params={ + "file_keys": ["*"], + "limit": 1, + }, + ) + + def test_prepare_trigger_conf_dry_run_and_normal(self): + task = self.dag.get_task("prepare_trigger_conf") + function_to_unit_test = task.python_callable + + file_key = "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta" + xml_bytes = _springer_xml_with_doi("10.1007/one") + + mock_repo = mock.MagicMock() + mock_repo.get_by_id.return_value = BytesIO(xml_bytes) + + dry_run_result = function_to_unit_test( + file_keys=[file_key], + repo=mock_repo, + params={"dry_run": True}, + ) + assert dry_run_result == [] + + normal_result = function_to_unit_test( + file_keys=[file_key], + repo=mock_repo, + params={"dry_run": False}, + ) + assert len(normal_result) == 1 + assert normal_result[0]["file_name"] == file_key + + decoded_xml = base64.b64decode(normal_result[0]["file"]).decode("utf-8") + assert "10.1007/one" in decoded_xml