From d2f3b112590d33795b838fbdba0f5102973f523a Mon Sep 17 00:00:00 2001 From: PascalEgn Date: Wed, 1 Apr 2026 14:13:50 +0200 Subject: [PATCH 1/3] feat: add Springer reharvest DAG for reprocessing articles --- dags/springer/repository.py | 21 +- dags/springer/springer_reharvest.py | 357 ++++++++++++++++++ .../test_springer_dag_process_file.py | 21 +- .../springer/test_springer_dag_pull_sftp.py | 65 ++-- .../springer/test_springer_repository.py | 34 +- .../springer/test_unit_springer_reharvest.py | 202 ++++++++++ 6 files changed, 641 insertions(+), 59 deletions(-) create mode 100644 dags/springer/springer_reharvest.py create mode 100644 tests/units/springer/test_unit_springer_reharvest.py diff --git a/dags/springer/repository.py b/dags/springer/repository.py index 925e560e..47496443 100644 --- a/dags/springer/repository.py +++ b/dags/springer/repository.py @@ -19,28 +19,27 @@ def __init__(self) -> None: def get_all_raw_filenames(self): return [ - f.key.removeprefix("raw/") + f.key.removeprefix(self.ZIPED_DIR) for f in self.s3.objects.filter(Prefix=self.ZIPED_DIR).all() ] def find_all(self, filenames_to_process=None): - ret_dict = {} - filenames = [] + grouped_files = {} filenames = ( filenames_to_process if filenames_to_process else self.__find_all_extracted_files() ) + if not filenames: + return [] for file in filenames: - file_parts = file.split("/") - last_part = file_parts[-1] + last_part = os.path.basename(file) filename_without_extension = last_part.split(".")[0] - if filename_without_extension not in ret_dict: - ret_dict[filename_without_extension] = dict() - ret_dict[filename_without_extension][ - "xml" if self.is_meta(last_part) else "pdf" - ] = file - return list(ret_dict.values()) + if filename_without_extension not in grouped_files: + grouped_files[filename_without_extension] = {} + extension = "xml" if self.is_meta(last_part) else "pdf" + grouped_files[filename_without_extension][extension] = file + return list(grouped_files.values()) def get_by_id(self, id): retfile = BytesIO() diff --git a/dags/springer/springer_reharvest.py b/dags/springer/springer_reharvest.py new file mode 100644 index 00000000..610e1c2c --- /dev/null +++ b/dags/springer/springer_reharvest.py @@ -0,0 +1,357 @@ +import base64 +import fnmatch +import logging +import re +from datetime import date, datetime, timedelta + +import pendulum +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.sdk import Param, dag, task +from common.notification_service import FailedDagNotifier +from common.utils import parse_without_names_spaces +from springer.repository import SpringerRepository + +logger = logging.getLogger("airflow.task") +SPRINGER_REPO = SpringerRepository() +_ARCHIVE_DT_PATTERN = re.compile(r"ftp_PUB_(\d{2}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})") + + +def _parse_date(value): + if not value: + return None + return datetime.strptime(value, "%Y-%m-%d").date() + + +def _normalize_dois(dois): + if not dois: + return set() + return {doi.strip().lower() for doi in dois if isinstance(doi, str) and doi.strip()} + + +def _normalize_file_keys(file_keys): + return [k.strip() for k in file_keys if isinstance(k, str) and k.strip()] + + +def _is_glob_pattern(value): + return any(c in value for c in ("*", "?", "[")) + + +def _extract_archive_datetime_from_key(key): + if not key: + return datetime.min + + match = _ARCHIVE_DT_PATTERN.search(key) + if not match: + return datetime.min + + try: + return datetime.strptime( + f"{match.group(1)}_{match.group(2)}", "%y-%m-%d_%H-%M-%S" + ) + except ValueError: + return datetime.min + + +def _extract_archive_date_from_key(key): + dt_value = _extract_archive_datetime_from_key(key) + if dt_value == datetime.min: + return None + return dt_value.date() + + +def _extract_doi_from_xml(xml_bytes): + try: + if isinstance(xml_bytes, bytes): + xml_text = xml_bytes.decode("utf-8") + else: + xml_text = xml_bytes + xml = parse_without_names_spaces(xml_text) + doi_element = xml.find("./Journal/Volume/Issue/Article/ArticleInfo/ArticleDOI") + if doi_element is not None and doi_element.text: + return doi_element.text.strip().lower() + except Exception: + pass + return None + + +def _resolve_file_keys(file_keys, all_xml_keys): + all_xml_keys_set = set(all_xml_keys) + prefix = SPRINGER_REPO.EXTRACTED_DIR + + exact_keys = [k for k in file_keys if not _is_glob_pattern(k)] + glob_patterns = [k for k in file_keys if _is_glob_pattern(k)] + + missing = [k for k in exact_keys if k not in all_xml_keys_set] + if missing: + raise ValueError(f"Some requested file_keys do not exist: {missing}") + + resolved = list(exact_keys) + + for pattern in glob_patterns: + matched = [ + key + for key in all_xml_keys + if fnmatch.fnmatch( + key[len(prefix) :] if key.startswith(prefix) else key, pattern + ) + ] + if not matched: + logger.warning("Pattern %r matched no Springer extracted XML keys", pattern) + resolved.extend(matched) + + seen = set() + deduped = [] + for key in resolved: + if key not in seen: + seen.add(key) + deduped.append(key) + return deduped + + +def _filter_keys_by_dois(keys, target_dois, repo): + deduped = {} + found_dois = set() + + for key in keys: + file_obj = repo.get_by_id(key) + doi = _extract_doi_from_xml(file_obj.getvalue()) + + if not doi: + logger.warning("Could not extract DOI from file: %s", key) + continue + + if doi not in target_dois: + continue + + found_dois.add(doi) + if doi not in deduped: + deduped[doi] = key + + missing_dois = sorted(target_dois - found_dois) + if missing_dois: + logger.warning("Some requested DOIs were not found: %s", missing_dois) + + return list(deduped.values()) + + +def _enforce_limit(records, limit): + if limit is None: + return + if len(records) > limit: + raise ValueError( + f"Matched {len(records)} records, which is above limit={limit}. " + "Please reduce the date range or increase the limit." + ) + + +@dag( + on_failure_callback=FailedDagNotifier(), + start_date=pendulum.today("UTC").add(days=-1), + schedule=None, + catchup=False, + tags=["reharvest", "springer"], + params={ + "date_from": Param( + default=None, + type=["string", "null"], + description="Start date from extracted archive folder in YYYY-MM-DD format", + title="Date from", + ), + "date_to": Param( + default=None, + type=["string", "null"], + description="End date from extracted archive folder in YYYY-MM-DD format", + title="Date to", + ), + "file_keys": Param( + default=[], + type=["array", "null"], + description=( + "Exact Springer extracted XML keys or glob patterns matched relative " + f"to '{SPRINGER_REPO.EXTRACTED_DIR}'. Examples: '*' or " + f"'EPJC/ftp_PUB_26-03-29_20-00-46/*'." + ), + title="File keys", + ), + "dois": Param( + default=[], + type=["array", "null"], + description=( + "List of DOIs to process. Can be used with date_from/date_to or together " + "with file_keys to filter the resolved key set. The maximum date range is 1 Year." + ), + title="DOIs", + ), + "limit": Param( + 1000, + type=["integer"], + description="Maximum number of records to process", + title="Limit", + ), + "dry_run": Param( + default=True, + type=["boolean", "null"], + description="Whether to perform a dry run. If true, no downstream DAGs will be triggered", + title="Dry run", + ), + }, +) +def springer_reharvest(): + @task() + def collect_records(repo=SPRINGER_REPO, **kwargs): + params = kwargs.get("params", {}) + + date_from = _parse_date(params.get("date_from")) + date_to = _parse_date(params.get("date_to")) + file_keys = _normalize_file_keys(params.get("file_keys") or []) + dois = _normalize_dois(params.get("dois") or []) + target_dois = set(dois) + limit = params.get("limit") + + if limit is not None and (not isinstance(limit, int) or limit <= 0): + raise ValueError("limit must be a positive integer when provided") + + if bool(file_keys) and (date_from or date_to): + raise ValueError( + "Invalid parameters: date_from/date_to cannot be used together with file_keys" + ) + + if (date_from and not date_to) or (date_to and not date_from): + raise ValueError("Both date_from and date_to must be provided together") + + all_xml_keys = [ + obj.key + for obj in repo.s3.objects.filter(Prefix=repo.EXTRACTED_DIR).all() + if repo.is_meta(obj.key) + ] + logger.info( + "Found %s Springer extracted XML file(s) in %s", + len(all_xml_keys), + repo.EXTRACTED_DIR, + ) + + if file_keys: + resolved_keys = _resolve_file_keys(file_keys, all_xml_keys) + + if target_dois: + resolved_keys = sorted( + resolved_keys, + key=_extract_archive_datetime_from_key, + reverse=True, + ) + selected_keys = _filter_keys_by_dois( + keys=resolved_keys, + target_dois=target_dois, + repo=repo, + ) + else: + selected_keys = resolved_keys + + _enforce_limit(selected_keys, limit) + logger.info( + "Selected %s Springer XML key(s) from file_keys (with glob expansion)", + len(selected_keys), + ) + return selected_keys + + if dois and not (date_from and date_to): + date_to = date.today() + date_from = date_to - timedelta(days=365) + + if not (date_from and date_to): + raise ValueError( + "Invalid parameters: provide either date_from+date_to, file_keys, or dois" + ) + + if dois and (date_to - date_from).days > 366: + raise ValueError( + "For DOI search the date range must not exceed one year. " + "Please use a smaller range." + ) + + logger.info( + "Selecting Springer extracted XML files in date range %s to %s", + date_from, + date_to, + ) + + keys_in_range = [] + for key in all_xml_keys: + key_date = _extract_archive_date_from_key(key) + if key_date is None: + continue + if date_from <= key_date <= date_to: + keys_in_range.append(key) + + keys_in_range.sort(key=_extract_archive_datetime_from_key, reverse=True) + + logger.info( + "Found %s Springer extracted XML file(s) in the requested date range", + len(keys_in_range), + ) + + deduped = {} + keys_without_doi = [] + found_dois = set() + + for key in keys_in_range: + file_obj = repo.get_by_id(key) + doi = _extract_doi_from_xml(file_obj.getvalue()) + + if not doi: + logger.warning("Could not extract DOI from file: %s", key) + if not target_dois: + keys_without_doi.append(key) + continue + + if target_dois and doi not in target_dois: + continue + + if target_dois: + found_dois.add(doi) + + if doi not in deduped: + deduped[doi] = key + + if target_dois: + missing_dois = sorted(target_dois - found_dois) + if missing_dois: + logger.warning("Some requested DOIs were not found: %s", missing_dois) + + selected_keys = list(deduped.values()) + keys_without_doi + + _enforce_limit(selected_keys, limit) + logger.info("Collected %s deduplicated Springer XML key(s)", len(selected_keys)) + logger.info("Selected Springer XML keys: %s", selected_keys) + return selected_keys + + @task() + def prepare_trigger_conf(file_keys, repo=SPRINGER_REPO, **kwargs): + dry_run = bool(kwargs.get("params", {}).get("dry_run", False)) + if dry_run: + logger.info( + "Dry run enabled. %s record(s) matched. No downstream runs will be triggered.", + len(file_keys), + ) + return [] + + confs = [] + for key in file_keys: + file_obj = repo.get_by_id(key) + encoded_xml = base64.b64encode(file_obj.getvalue()).decode("utf-8") + confs.append({"file": encoded_xml, "file_name": key}) + + logger.info("Prepared %s downstream trigger conf(s)", len(confs)) + return confs + + file_keys = collect_records() + trigger_confs = prepare_trigger_conf(file_keys) + + TriggerDagRunOperator.partial( + task_id="springer_reharvest_trigger_file_processing", + trigger_dag_id="springer_process_file", + reset_dag_run=True, + ).expand(conf=trigger_confs) + + +springer_reharvest_dag = springer_reharvest() diff --git a/tests/integration/springer/test_springer_dag_process_file.py b/tests/integration/springer/test_springer_dag_process_file.py index 441a8a25..7972cae0 100644 --- a/tests/integration/springer/test_springer_dag_process_file.py +++ b/tests/integration/springer/test_springer_dag_process_file.py @@ -91,11 +91,11 @@ def springer_data_files_in_s3(): s3_files = {obj.key for obj in repo.s3.objects.all()} assert ( - "extracted/EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/BodyRef/PDF/10052_2025_Article_15241.pdf" + f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/BodyRef/PDF/10052_2025_Article_15241.pdf" in s3_files ) assert ( - "extracted/JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/BodyRef/PDF/13130_2026_Article_28203.pdf" + f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/BodyRef/PDF/13130_2026_Article_28203.pdf" in s3_files ) @@ -105,7 +105,13 @@ def springer_data_files_in_s3(): def test_dag_loaded(dag): assert dag is not None - assert len(dag.tasks) == 7 + assert "parse_file" in dag.task_ids + assert "add_data_availability" in dag.task_ids + assert "enhance_file" in dag.task_ids + assert "populate_files" in dag.task_ids + assert "enrich_file" in dag.task_ids + assert "save_to_s3" in dag.task_ids + assert "create_or_update" in dag.task_ids publisher = "Springer" @@ -310,6 +316,7 @@ def test_dag_process_file_no_input_file(article): def test_extract_data_availability_data( dag, epjc_data_article, springer_data_files_in_s3 ): + repo = SpringerRepository() expected = { "statement": "This manuscript has associated data in a data repository. [Authors' comment: The public release of data supporting the findings of this article will follow the CERN Open Data Policy [124]. Inquiries about plots and tables associated with this article can be addressed to atlas.publications@cern.ch.]\nThismanuscripthasassociatedcode/software in a data repository. [Authors' comment: The ATLAS Collaboration's Athena software, including the configuration of the event generators, is open source (https://gitlab.cern.ch/atlas/athena).]", "urls": [ @@ -319,7 +326,7 @@ def test_extract_data_availability_data( result = dag.test( run_conf={ "file": base64.b64encode(ET.tostring(epjc_data_article)).decode(), - "file_name": "extracted/EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta", + "file_name": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta", "parse_pdf": True, }, mark_success_pattern="save_to_s3|create_or_update", @@ -332,13 +339,14 @@ def test_extract_data_availability_data( def test_extract_data_availability_no_data( dag, jhep_data_article, springer_data_files_in_s3 ): + repo = SpringerRepository() expected = { "statement": "This article has no associated data or the data will not be deposited.\nThis article has no associated code or the code will not be deposited.", } result = dag.test( run_conf={ "file": base64.b64encode(ET.tostring(jhep_data_article)).decode(), - "file_name": "extracted/JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/13130_2026_Article_28203.xml.scoap", + "file_name": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/13130_2026_Article_28203.xml.scoap", "parse_pdf": True, }, mark_success_pattern="save_to_s3|create_or_update", @@ -351,10 +359,11 @@ def test_extract_data_availability_no_data( def test_extract_data_availability_disabled_by_default( dag, epjc_data_article, springer_data_files_in_s3 ): + repo = SpringerRepository() result = dag.test( run_conf={ "file": base64.b64encode(ET.tostring(epjc_data_article)).decode(), - "file_name": "extracted/EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta", + "file_name": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta", }, mark_success_pattern="save_to_s3|create_or_update", ) diff --git a/tests/integration/springer/test_springer_dag_pull_sftp.py b/tests/integration/springer/test_springer_dag_pull_sftp.py index 180d2ef9..c1db1e5d 100644 --- a/tests/integration/springer/test_springer_dag_pull_sftp.py +++ b/tests/integration/springer/test_springer_dag_pull_sftp.py @@ -17,22 +17,31 @@ def dag(): def test_dag_loaded(dag): assert dag is not None - assert len(dag.tasks) == 3 + assert "migrate_from_ftp" in dag.task_ids + assert "prepare_trigger_conf" in dag.task_ids + assert "springer_trigger_file_processing" in dag.task_ids def test_dag_run(dag): repo = SpringerRepository() repo.delete_all() - dag.test() + dag.test( + run_conf={ + "filenames_pull": { + "enabled": True, + "filenames": ["JHEP/ftp_PUB_19-01-29_20-02-10_JHEP.zip"], + "force_from_ftp": True, + }, + "force_pull": False, + "excluded_directories": [], + }, + mark_success_pattern="springer_trigger_file_processing", + ) expected_subset = [ { - "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", - "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", - }, - { - "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", - "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", + "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", + "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", }, ] found_files = repo.find_all() @@ -77,16 +86,16 @@ def test_force_pull_from_sftp(): ) expected_files = [ { - "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", }, { - "pdf": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", }, { - "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", - "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", + "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", + "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", }, ] assert repo.find_all() == expected_files @@ -117,8 +126,8 @@ def test_force_pull_from_sftp_with_excluded_folder(): ) expected_files = [ { - "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", - "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", + "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", + "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", } ] assert repo.find_all() == expected_files @@ -149,16 +158,16 @@ def test_pull_from_sftp_and_reprocess(): ) excepted_files = [ { - "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", }, { - "pdf": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", }, { - "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", - "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", + "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", + "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", }, ] assert repo.find_all() == excepted_files @@ -180,16 +189,16 @@ def test_pull_from_sftp_and_reprocess(): ) excepted_files = [ { - "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta", }, { - "pdf": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", - "xml": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", + "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf", + "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta", }, { - "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", - "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", + "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf", + "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap", }, ] assert repo.find_all() == excepted_files diff --git a/tests/units/springer/test_springer_repository.py b/tests/units/springer/test_springer_repository.py index c5b559ba..a4bbca91 100644 --- a/tests/units/springer/test_springer_repository.py +++ b/tests/units/springer/test_springer_repository.py @@ -11,22 +11,28 @@ def __init__(self, key) -> None: S3_RETURNED_VALUES = [ - "file1.txt", - "file1.scoap", - "file1.pdf", - "file2.txt", - "file2.Meta", - "file2.pdf", + "extracted/file1.txt", + "extracted/file1.scoap", + "extracted/file1.pdf", + "extracted/file2.txt", + "extracted/file2.Meta", + "extracted/file2.pdf", ] FIND_ALL_EXPECTED_VALUES = [ - {"xml": "file1.scoap", "pdf": "file1.pdf"}, - {"xml": "file2.Meta", "pdf": "file2.pdf"}, + { + "xml": "extracted/file1.scoap", + "pdf": "extracted/file1.pdf", + }, + { + "xml": "extracted/file2.Meta", + "pdf": "extracted/file2.pdf", + }, ] FIND_ALL_EXTRACTED_FILES_EXPECTED_VALUES = [ - "file1.scoap", - "file1.pdf", - "file2.Meta", - "file2.pdf", + "extracted/file1.scoap", + "extracted/file1.pdf", + "extracted/file2.Meta", + "extracted/file2.pdf", ] @@ -58,7 +64,7 @@ def test_save_zip_file(boto3_fixture): filename = "test.zip" repo = SpringerRepository() repo.save(filename, file) - upload_mock.assert_called_with(file, "raw/" + filename) + upload_mock.assert_called_with(file, repo.ZIPED_DIR + filename) def test_save_file(boto3_fixture): @@ -67,7 +73,7 @@ def test_save_file(boto3_fixture): filename = "test.pdf" repo = SpringerRepository() repo.save(filename, file) - upload_mock.assert_called_with(file, "extracted/" + filename) + upload_mock.assert_called_with(file, repo.EXTRACTED_DIR + filename) def test_file_is_meta(): diff --git a/tests/units/springer/test_unit_springer_reharvest.py b/tests/units/springer/test_unit_springer_reharvest.py new file mode 100644 index 00000000..7898bf5e --- /dev/null +++ b/tests/units/springer/test_unit_springer_reharvest.py @@ -0,0 +1,202 @@ +import base64 +from io import BytesIO +from unittest import mock + +import pytest +from airflow.models import DagBag + + +class _S3Object: + def __init__(self, key): + self.key = key + + +def _springer_xml_with_doi(doi): + return f""" + + + + +
+ + {doi} + +
+
+
+
+
+""".encode() + + +@pytest.fixture(scope="class") +def dagbag(): + return DagBag(dag_folder="dags/", include_examples=False) + + +@pytest.mark.usefixtures("dagbag") +class TestUnitSpringerReharvest: + def setup_method(self): + self.dag_id = "springer_reharvest" + dagbag = DagBag(dag_folder="dags/", include_examples=False) + self.dag = dagbag.dags.get(self.dag_id) + assert self.dag is not None, f"DAG {self.dag_id} failed to load" + + def _build_repo(self, payload_by_key): + mock_repo = mock.MagicMock() + mock_repo.EXTRACTED_DIR = "extracted/" + mock_repo.s3.objects.filter.return_value.all.return_value = [ + _S3Object(key) for key in payload_by_key + ] + mock_repo.is_meta.side_effect = lambda key: key.endswith( + ".Meta" + ) or key.endswith(".scoap") + + def _get_by_id(key): + return BytesIO(payload_by_key[key]) + + mock_repo.get_by_id.side_effect = _get_by_id + return mock_repo + + def test_dag_structure(self): + assert "springer_reharvest_trigger_file_processing" in self.dag.task_ids + task = self.dag.get_task("springer_reharvest_trigger_file_processing") + assert "MappedOperator" in str(type(task)) or task.is_mapped + assert task.partial_kwargs["trigger_dag_id"] == "springer_process_file" + assert task.partial_kwargs["reset_dag_run"] is True + + def test_collect_records_file_keys_and_dois_filters_resolved_keys(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "extracted/EPJC/ftp_PUB_25-03-01_00-00-00/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/JHEP/ftp_PUB_26-03-30_20-00-50/other.Meta": _springer_xml_with_doi( + "10.1007/two" + ), + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={ + "file_keys": ["*"], + "dois": ["10.1007/one"], + }, + ) + + assert result == ["extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta"] + + def test_collect_records_date_range_dedup_keeps_newest(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "extracted/EPJC/ftp_PUB_25-08-15_00-00-00/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/JHEP/ftp_PUB_26-01-30_20-00-50/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/JHEP/ftp_PUB_25-12-19_20-00-50/other.scoap": _springer_xml_with_doi( + "10.1007/two" + ), + "extracted/JHEP/not-a-drop/ignored.Meta": _springer_xml_with_doi( + "10.1007/three" + ), + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={ + "date_from": "2025-08-15", + "date_to": "2026-01-30", + }, + ) + + assert len(result) == 2 + assert "extracted/JHEP/ftp_PUB_26-01-30_20-00-50/article.Meta" in result + assert "extracted/JHEP/ftp_PUB_25-12-19_20-00-50/other.scoap" in result + + def test_collect_records_glob_patterns(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/JHEP/ftp_PUB_26-03-30_20-00-50/other.scoap": _springer_xml_with_doi( + "10.1007/two" + ), + } + mock_repo = self._build_repo(payload_by_key) + + result_all = function_to_unit_test( + repo=mock_repo, + params={"file_keys": ["*"]}, + ) + assert set(result_all) == set(payload_by_key.keys()) + + result_epjc = function_to_unit_test( + repo=mock_repo, + params={"file_keys": ["EPJC/*"]}, + ) + assert result_epjc == ["extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta"] + + def test_collect_records_limit_exceeded_raises(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta": _springer_xml_with_doi( + "10.1007/one" + ), + "extracted/JHEP/ftp_PUB_26-03-30_20-00-50/other.scoap": _springer_xml_with_doi( + "10.1007/two" + ), + } + mock_repo = self._build_repo(payload_by_key) + + with pytest.raises(ValueError, match="above limit"): + function_to_unit_test( + repo=mock_repo, + params={ + "file_keys": ["*"], + "limit": 1, + }, + ) + + def test_prepare_trigger_conf_dry_run_and_normal(self): + task = self.dag.get_task("prepare_trigger_conf") + function_to_unit_test = task.python_callable + + file_key = "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta" + xml_bytes = _springer_xml_with_doi("10.1007/one") + + mock_repo = mock.MagicMock() + mock_repo.get_by_id.return_value = BytesIO(xml_bytes) + + dry_run_result = function_to_unit_test( + file_keys=[file_key], + repo=mock_repo, + params={"dry_run": True}, + ) + assert dry_run_result == [] + + normal_result = function_to_unit_test( + file_keys=[file_key], + repo=mock_repo, + params={"dry_run": False}, + ) + assert len(normal_result) == 1 + assert normal_result[0]["file_name"] == file_key + + decoded_xml = base64.b64decode(normal_result[0]["file"]).decode("utf-8") + assert "10.1007/one" in decoded_xml From 1bc6f52b681c01c77178144f9ca1a492ed12ce38 Mon Sep 17 00:00:00 2001 From: PascalEgn Date: Wed, 15 Apr 2026 18:31:59 +0200 Subject: [PATCH 2/3] feat: add Elsevier reharvest DAG for reprocessing articles --- dags/elsevier/elsevier_reharvest.py | 437 ++++++++++++++++++ dags/elsevier/repository.py | 2 +- .../cassettes/test_dag_enrich_file.yaml | 83 ++++ .../test_elsevier_dag_process_file.py | 132 ++++++ .../elsevier/test_unit_elsevier_reharvest.py | 269 +++++++++++ 5 files changed, 922 insertions(+), 1 deletion(-) create mode 100644 dags/elsevier/elsevier_reharvest.py create mode 100644 tests/integration/elsevier/cassettes/test_dag_enrich_file.yaml create mode 100644 tests/integration/elsevier/test_elsevier_dag_process_file.py create mode 100644 tests/units/elsevier/test_unit_elsevier_reharvest.py diff --git a/dags/elsevier/elsevier_reharvest.py b/dags/elsevier/elsevier_reharvest.py new file mode 100644 index 00000000..343e48bf --- /dev/null +++ b/dags/elsevier/elsevier_reharvest.py @@ -0,0 +1,437 @@ +import fnmatch +import logging +import os +from datetime import date, datetime, timedelta +from io import BytesIO + +import pendulum +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.sdk import Param, dag, task +from common.notification_service import FailedDagNotifier +from common.utils import parse_without_names_spaces +from elsevier.repository import ElsevierRepository +from elsevier.trigger_file_processing import trigger_file_processing_elsevier + +logger = logging.getLogger("airflow.task") +ELSEVIER_REPO = ElsevierRepository() +MAX_XML_SCAN_FILES = 10_000 + + +def _parse_date(value): + if not value: + return None + return datetime.strptime(value, "%Y-%m-%d").date() + + +def _normalize_dois(dois): + if not dois: + return set() + return {doi.strip().lower() for doi in dois if isinstance(doi, str) and doi.strip()} + + +def _normalize_file_keys(file_keys): + return [key.strip() for key in file_keys if isinstance(key, str) and key.strip()] + + +def _is_glob_pattern(value): + return any(char in value for char in ("*", "?", "[")) + + +def _is_elsevier_main_xml_key(key, repo): + return key.startswith(repo.EXTRACTED_DIR) and os.path.basename(key) == "main.xml" + + +def _extract_doi_from_xml(xml_bytes): + try: + xml = parse_without_names_spaces( + BytesIO(xml_bytes) if isinstance(xml_bytes, bytes) else xml_bytes + ) + doi_element = xml.find("item-info/doi") + if doi_element is not None and doi_element.text: + return doi_element.text.strip().lower() + except Exception: + pass + return None + + +def _extract_received_date_from_xml(xml_bytes): + try: + xml = parse_without_names_spaces( + BytesIO(xml_bytes) if isinstance(xml_bytes, bytes) else xml_bytes + ) + date_element = xml.find("head/date-received") + if date_element is None: + return None + + day_text = date_element.get("day") + month_text = date_element.get("month") + year_text = date_element.get("year") + if not (day_text and month_text and year_text): + return None + + return date(int(year_text), int(month_text), int(day_text)) + except Exception: + return None + + +def _enforce_limit(records, limit): + if limit is None: + return + if len(records) > limit: + raise ValueError( + f"Matched {len(records)} records, which is above limit={limit}. " + "Please reduce the date range or increase the limit." + ) + + +def _log_if_xml_scan_exceeds_threshold(total_files, scan_reason): + if total_files > MAX_XML_SCAN_FILES: + logger.error( + "XML scan threshold exceeded for %s: %s files to parse (> %s). " + "Consider narrowing filters.", + scan_reason, + total_files, + MAX_XML_SCAN_FILES, + ) + raise ValueError( + f"XML scan threshold exceeded for {scan_reason}: {total_files} files to parse (> {MAX_XML_SCAN_FILES}). Consider narrowing filters." + ) + + +def _resolve_file_keys(file_keys, all_xml_keys, repo): + """Resolve a mix of exact S3 keys and glob patterns. + + Patterns are matched against the path after the leading repo.EXTRACTED_DIR prefix, + so '*' matches every Elsevier main.xml key under that extracted folder. + """ + all_xml_keys_set = set(all_xml_keys) + prefix = repo.EXTRACTED_DIR + + exact_keys = [key for key in file_keys if not _is_glob_pattern(key)] + glob_patterns = [key for key in file_keys if _is_glob_pattern(key)] + + missing = [key for key in exact_keys if key not in all_xml_keys_set] + if missing: + raise ValueError(f"Some requested file_keys do not exist: {missing}") + + resolved = list(exact_keys) + + for pattern in glob_patterns: + matched = [ + key + for key in all_xml_keys + if fnmatch.fnmatch( + key[len(prefix) :] if key.startswith(prefix) else key, + pattern, + ) + ] + if not matched: + logger.warning("Pattern %r matched no Elsevier main.xml keys", pattern) + resolved.extend(matched) + + seen = set() + deduped = [] + for key in resolved: + if key not in seen: + seen.add(key) + deduped.append(key) + return deduped + + +def _find_dataset_key_for_xml_key(xml_key, dataset_keys, repo): + relative_key = ( + xml_key[len(repo.EXTRACTED_DIR) :] + if xml_key.startswith(repo.EXTRACTED_DIR) + else xml_key + ) + parts = relative_key.split("/") + + for depth in range(len(parts) - 1, 0, -1): + candidate = f"{repo.EXTRACTED_DIR}{'/'.join(parts[:depth])}/dataset.xml" + if candidate in dataset_keys: + return candidate + + return None + + +@dag( + on_failure_callback=FailedDagNotifier(), + start_date=pendulum.today("UTC").add(days=-1), + schedule=None, + catchup=False, + tags=["reharvest", "elsevier"], + params={ + "date_from": Param( + default=None, + type=["string", "null"], + description="Start ce:date-received (from XML) in YYYY-MM-DD format", + title="Date from", + ), + "date_to": Param( + default=None, + type=["string", "null"], + description="End ce:date-received (from XML) in YYYY-MM-DD format", + title="Date to", + ), + "file_keys": Param( + default=[], + type=["array", "null"], + description=( + "Exact Elsevier extracted main.xml keys or glob patterns matched relative " + f"to '{ELSEVIER_REPO.EXTRACTED_DIR}'. Examples: '*' or " + "'CERNAB00000010772A/*/*/*/main.xml'." + ), + title="File keys", + ), + "dois": Param( + default=[], + type=["array", "null"], + description=( + "List of DOIs to process. Can be used with date_from/date_to or together " + "with file_keys to filter the resolved key set. The maximum date range is 1 Year." + ), + title="DOIs", + ), + "limit": Param( + 1000, + type=["integer"], + description="Maximum number of records to process", + title="Limit", + ), + "dry_run": Param( + default=True, + type=["boolean", "null"], + description="Whether to perform a dry run. If true, no downstream DAGs will be triggered", + title="Dry run", + ), + }, +) +def elsevier_reharvest(): + @task() + def collect_records(repo=ELSEVIER_REPO, **kwargs): + params = kwargs.get("params", {}) + + date_from = _parse_date(params.get("date_from")) + date_to = _parse_date(params.get("date_to")) + file_keys = _normalize_file_keys(params.get("file_keys") or []) + target_dois = _normalize_dois(params.get("dois") or []) + limit = params.get("limit") + + if limit is not None and (not isinstance(limit, int) or limit <= 0): + raise ValueError("limit must be a positive integer when provided") + + if bool(file_keys) and (date_from or date_to): + raise ValueError( + "Invalid parameters: date_from/date_to cannot be used together with file_keys" + ) + + if (date_from and not date_to) or (date_to and not date_from): + raise ValueError("Both date_from and date_to must be provided together") + + all_xml_keys = [ + obj.key + for obj in repo.s3.objects.filter(Prefix=repo.EXTRACTED_DIR).all() + if _is_elsevier_main_xml_key(obj.key, repo) + ] + logger.info( + "Found %s Elsevier extracted main.xml file(s) in %s", + len(all_xml_keys), + repo.EXTRACTED_DIR, + ) + + xml_bytes_cache = {} + + def _get_xml_bytes(key): + if key not in xml_bytes_cache: + xml_bytes_cache[key] = repo.get_by_id(key).getvalue() + return xml_bytes_cache[key] + + def _sort_key(key): + record_date = _extract_received_date_from_xml(_get_xml_bytes(key)) + return (record_date or date.min, key) + + if file_keys: + resolved_keys = _resolve_file_keys(file_keys, all_xml_keys, repo) + + if target_dois: + _log_if_xml_scan_exceeds_threshold( + len(resolved_keys), + "date extraction for sorting file_keys before DOI filtering", + ) + resolved_keys = sorted(resolved_keys, key=_sort_key, reverse=True) + _log_if_xml_scan_exceeds_threshold( + len(resolved_keys), + "DOI extraction from file_keys candidates", + ) + found_dois = set() + deduped_by_doi = {} + + for key in resolved_keys: + doi = _extract_doi_from_xml(_get_xml_bytes(key)) + if not doi: + logger.warning("Could not extract DOI from file: %s", key) + continue + + if doi not in target_dois: + continue + + found_dois.add(doi) + if doi not in deduped_by_doi: + deduped_by_doi[doi] = key + + missing_dois = sorted(target_dois - found_dois) + if missing_dois: + logger.warning( + "Some requested DOIs were not found: %s", missing_dois + ) + + selected_keys = list(deduped_by_doi.values()) + else: + selected_keys = resolved_keys + + _enforce_limit(selected_keys, limit) + logger.info( + "Selected %s Elsevier main.xml key(s) from file_keys (with glob expansion)", + len(selected_keys), + ) + return selected_keys + + if target_dois and not (date_from and date_to): + date_to = date.today() + date_from = date_to - timedelta(days=365) + + if not (date_from and date_to): + raise ValueError( + "Invalid parameters: provide either date_from+date_to, file_keys, or dois" + ) + + if target_dois and (date_to - date_from).days > 366: + raise ValueError( + "For DOI search the date range must not exceed one year. " + "Please use a smaller range." + ) + + logger.info( + "Selecting Elsevier main.xml files by XML ce:date-received range %s to %s", + date_from, + date_to, + ) + + selected_records = [] + _log_if_xml_scan_exceeds_threshold( + len(all_xml_keys), + "ce:date-received extraction from all main.xml candidates", + ) + for key in all_xml_keys: + record_date = _extract_received_date_from_xml(_get_xml_bytes(key)) + if record_date is None: + continue + if date_from <= record_date <= date_to: + selected_records.append({"key": key, "record_date": record_date}) + + selected_records.sort( + key=lambda record: (record["record_date"], record["key"]), + reverse=True, + ) + + logger.info( + "Found %s Elsevier main.xml file(s) in the requested date range", + len(selected_records), + ) + + if not target_dois: + selected_keys = [record["key"] for record in selected_records] + _enforce_limit(selected_keys, limit) + logger.info("Selected %s Elsevier main.xml key(s)", len(selected_keys)) + return selected_keys + + found_dois = set() + deduped_by_doi = {} + + _log_if_xml_scan_exceeds_threshold( + len(selected_records), + "DOI extraction after date-range filtering", + ) + + for record in selected_records: + key = record["key"] + doi = _extract_doi_from_xml(_get_xml_bytes(key)) + + if not doi: + logger.warning("Could not extract DOI from file: %s", key) + continue + + if doi not in target_dois: + continue + + found_dois.add(doi) + if doi not in deduped_by_doi: + deduped_by_doi[doi] = key + + missing_dois = sorted(target_dois - found_dois) + if missing_dois: + logger.warning("Some requested DOIs were not found: %s", missing_dois) + + selected_keys = list(deduped_by_doi.values()) + _enforce_limit(selected_keys, limit) + + logger.info( + "Selected %s Elsevier main.xml key(s) after DOI filtering", + len(selected_keys), + ) + return selected_keys + + @task() + def prepare_trigger_conf(file_keys, repo=ELSEVIER_REPO, **kwargs): + dry_run = bool(kwargs.get("params", {}).get("dry_run", False)) + if dry_run: + logger.info( + "Dry run enabled. %s record(s) matched. No downstream runs will be triggered.", + len(file_keys), + ) + return [] + + dataset_keys = { + obj.key + for obj in repo.s3.objects.filter(Prefix=repo.EXTRACTED_DIR).all() + if os.path.basename(obj.key) == "dataset.xml" + } + requested_file_keys = set(file_keys) + resolved_dataset_keys = [] + + for key in file_keys: + dataset_key = _find_dataset_key_for_xml_key(key, dataset_keys, repo) + if not dataset_key: + raise ValueError(f"Could not find sibling dataset.xml for file: {key}") + if dataset_key not in resolved_dataset_keys: + resolved_dataset_keys.append(dataset_key) + + all_confs = trigger_file_processing_elsevier( + publisher="elsevier", + repo=repo, + logger=logger, + filenames=resolved_dataset_keys, + ) + confs = [conf for conf in all_confs if conf["file_name"] in requested_file_keys] + + prepared_file_keys = {conf["file_name"] for conf in confs} + missing_file_keys = sorted(requested_file_keys - prepared_file_keys) + if missing_file_keys: + raise ValueError( + f"Could not build Elsevier metadata for file(s): {missing_file_keys}" + ) + + logger.info("Prepared %s downstream trigger conf(s)", len(confs)) + return confs + + file_keys = collect_records() + trigger_confs = prepare_trigger_conf(file_keys) + + TriggerDagRunOperator.partial( + task_id="elsevier_reharvest_trigger_file_processing", + trigger_dag_id="elsevier_process_file", + reset_dag_run=True, + ).expand(conf=trigger_confs) + + +elsevier_reharvest_dag = elsevier_reharvest() diff --git a/dags/elsevier/repository.py b/dags/elsevier/repository.py index 89f77015..3c4aa159 100644 --- a/dags/elsevier/repository.py +++ b/dags/elsevier/repository.py @@ -21,7 +21,7 @@ def __init__(self): def get_all_raw_filenames(self): return [ - f.key.removeprefix("raw/") + f.key.removeprefix(self.ZIPED_DIR) for f in self.s3.objects.filter(Prefix=self.ZIPED_DIR).all() ] diff --git a/tests/integration/elsevier/cassettes/test_dag_enrich_file.yaml b/tests/integration/elsevier/cassettes/test_dag_enrich_file.yaml new file mode 100644 index 00000000..26925bd7 --- /dev/null +++ b/tests/integration/elsevier/cassettes/test_dag_enrich_file.yaml @@ -0,0 +1,83 @@ +interactions: +- request: + body: null + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + User-Agent: + - python-requests/2.32.5 + method: GET + uri: https://oaipmh.arxiv.org/oai?verb=GetRecord&identifier=oai:arXiv.org:2112.01211&metadataPrefix=arXiv + response: + body: + string: "\n\n 2026-04-16T11:06:33Z\n + \ http://oaipmh.arxiv.org/oai\n\n\n \n + \ \n
\n oai:arXiv.org:2112.01211\n + \ 2022-10-25\n physics:hep-th\n + \ physics:hep-ph\n
\n\n \n + \ \n + \ 2112.01211\n 2022-10-23\n 2022-10-25\n + \ \n \n Antoniadis\n + \ Ignatios\n \n + \ \n Nanopoulos\n + \ Dimitri V.\n \n + \ \n Rizos\n John\n + \ \n \n Particle physics + and cosmology of the string derived no-scale flipped $SU(5)$\n hep-th + hep-ph\n 47 pages, no figures. v2: published + version, few minor changes\n ACT-05-21, MI-HET-768\n + \ 10.1140/epjc/s10052-022-10353-6\n http://creativecommons.org/licenses/by/4.0/\n + \ In a recent paper, we identified a cosmological sector + of a flipped $SU(5)$ model derived in the free fermionic formulation of the + heterotic superstring, containing the inflaton and the goldstino superfields + with a superpotential leading to Starobinsky type inflation, while $SU(5){\\times}U(1)$ + is still unbroken. Here, we study the properties and phenomenology of the + vacuum after the end of inflation, where the gauge group is broken to the + Standard Model. We identify a set of vacuum expectation values, triggered + by the breaking of an anomalous $U(1)_A$ gauge symmetry at roughly an order + of magnitude below the string scale, that solve the F and D-flatness supersymmetric + conditions up to 6th order in the superpotential which is explicitly computed, + leading to a successful particle phenomenology. In particular, all extra colour + triplets become superheavy guaranteeing observable proton stability, while + the Higgs doublet mass matrix has a massless pair eigenstate with realistic + hierarchical Yukawa couplings to quarks and leptons. The supersymmetry breaking + scale is constrained to be high, consistent with the non observation of supersymmetric + signals at the LHC.\n \n\n \n + \
\n\n
\n
" + headers: + Accept-Ranges: + - bytes + Age: + - '0' + Connection: + - keep-alive + Content-Length: + - '3311' + Date: + - Thu, 16 Apr 2026 11:06:33 GMT + X-Cache: + - MISS, MISS, MISS + X-Served-By: + - cache-lga21993-LGA, cache-lga21973-LGA, cache-fra-eddf8230078-FRA + X-Timer: + - S1776337593.348975,VS0,VE220 + content-type: + - text/xml + server: + - Google Frontend + via: + - 1.1 google, 1.1 varnish, 1.1 varnish, 1.1 varnish + x-cloud-trace-context: + - 663ac66e9488896105c2df6ae226a91d + status: + code: 200 + message: OK +version: 1 diff --git a/tests/integration/elsevier/test_elsevier_dag_process_file.py b/tests/integration/elsevier/test_elsevier_dag_process_file.py new file mode 100644 index 00000000..3b0ed7de --- /dev/null +++ b/tests/integration/elsevier/test_elsevier_dag_process_file.py @@ -0,0 +1,132 @@ +import tarfile + +import pytest +from airflow.models import DagBag +from common.utils import parse_without_names_spaces +from elsevier.elsevier_file_processing import enhance_elsevier, enrich_elsevier +from elsevier.parser import ElsevierParser +from freezegun import freeze_time + +DAG_NAME = "elsevier_process_file" + + +@pytest.fixture +def dag(): + dagbag = DagBag(dag_folder="dags/", include_examples=False) + assert dagbag.import_errors.get(f"dags/{DAG_NAME}.py") is None + return dagbag.get_dag(dag_id=DAG_NAME) + + +@pytest.fixture +def article(): + data_dir = "./data/elsevier/" + test_file = "CERNQ000000010669A.tar" + + with tarfile.open(data_dir + test_file, "r") as tar: + xml_members = [m for m in tar.getmembers() if m.name.endswith("main.xml")] + xml_bytes = tar.extractfile(xml_members[0]).read() + + xml = parse_without_names_spaces(xml_bytes) + parser = ElsevierParser() + return parser.parse(xml) + + +def test_dag_loaded(dag): + assert dag is not None + assert "parse" in dag.task_ids + assert "enhance" in dag.task_ids + assert "populate_files" in dag.task_ids + assert "enrich" in dag.task_ids + assert "save_to_s3" in dag.task_ids + assert "create_or_update" in dag.task_ids + + +publisher = "Elsevier" + +generic_pseudo_parser_output = { + "abstract": "this is abstracts", + "copyright_holder": "copyright_holder", + "copyright_year": "2020", + "copyright_statement": "copyright_statement", + "copyright_material": "copyright_material", + "date_published": "2022-05-20", + "title": "title", + "subtitle": "subtitle", +} + +expected_output = { + "abstracts": [{"value": "this is abstracts", "source": publisher}], + "acquisition_source": { + "source": publisher, + "method": publisher, + "date": "2022-05-20T00:00:00", + }, + "copyright": [ + { + "holder": "copyright_holder", + "year": "2020", + "statement": "copyright_statement", + "material": "copyright_material", + } + ], + "imprints": [{"date": "2022-05-20", "publisher": publisher}], + "record_creation_date": "2022-05-20T00:00:00", + "titles": [{"title": "title", "subtitle": "subtitle", "source": publisher}], +} + +empty_generic_pseudo_parser_output = { + "abstract": "", + "copyright_holder": "", + "copyright_year": "", + "copyright_statement": "", + "copyright_material": "", + "date_published": "", + "title": "", + "subtitle": "", +} + +expected_output_from_empty_input = { + "abstracts": [{"value": "", "source": publisher}], + "acquisition_source": { + "source": publisher, + "method": publisher, + "date": "2022-05-20T00:00:00", + }, + "copyright": [{"holder": "", "year": "", "statement": "", "material": ""}], + "imprints": [{"date": "", "publisher": publisher}], + "record_creation_date": "2022-05-20T00:00:00", + "titles": [{"title": "", "subtitle": "", "source": publisher}], +} + + +@pytest.mark.parametrize( + ("test_input", "expected", "publisher"), + [ + pytest.param(generic_pseudo_parser_output, expected_output, publisher), + pytest.param( + empty_generic_pseudo_parser_output, + expected_output_from_empty_input, + publisher, + ), + ], +) +@freeze_time("2022-05-20") +def test_dag_enhance_file(test_input, expected, publisher): + assert expected == enhance_elsevier(test_input) + + +@pytest.mark.vcr +def test_dag_enrich_file(assertListEqual): + input_article = { + "arxiv_eprints": [{"value": "2112.01211"}], + "curated": "Test Value", + "citeable": "Test Value", + } + assertListEqual( + { + "arxiv_eprints": [ + {"value": "2112.01211", "categories": list(set(["hep-th", "hep-ph"]))} + ], + }, + enrich_elsevier(input_article), + ) diff --git a/tests/units/elsevier/test_unit_elsevier_reharvest.py b/tests/units/elsevier/test_unit_elsevier_reharvest.py new file mode 100644 index 00000000..d4243185 --- /dev/null +++ b/tests/units/elsevier/test_unit_elsevier_reharvest.py @@ -0,0 +1,269 @@ +from datetime import date +from io import BytesIO +from unittest import mock + +import pytest +from airflow.models import DagBag + + +class _S3Object: + def __init__(self, key): + self.key = key + + +def _main_xml_with_doi_and_received_date(doi, year, month, day): + return f''' +
+ + {doi} + + + + +
+'''.encode() + + +def _dataset_xml(): + return b"" + + +@pytest.fixture(scope="class") +def dagbag(): + return DagBag(dag_folder="dags/", include_examples=False) + + +@pytest.mark.usefixtures("dagbag") +class TestUnitElsevierReharvest: + def setup_method(self): + self.dag_id = "elsevier_reharvest" + dagbag = DagBag(dag_folder="dags/", include_examples=False) + self.dag = dagbag.dags.get(self.dag_id) + assert self.dag is not None, f"DAG {self.dag_id} failed to load" + + def _build_repo(self, payload_by_key): + mock_repo = mock.MagicMock() + mock_repo.EXTRACTED_DIR = "extracted/" + mock_repo.s3.objects.filter.return_value.all.return_value = [ + _S3Object(key) for key in payload_by_key + ] + + def _get_by_id(key): + return BytesIO(payload_by_key[key]) + + mock_repo.get_by_id.side_effect = _get_by_id + return mock_repo + + def test_dag_structure(self): + assert "elsevier_reharvest_trigger_file_processing" in self.dag.task_ids + task = self.dag.get_task("elsevier_reharvest_trigger_file_processing") + assert "MappedOperator" in str(type(task)) or task.is_mapped + assert task.partial_kwargs["trigger_dag_id"] == "elsevier_process_file" + assert task.partial_kwargs["reset_dag_run"] is True + + def test_collect_records_file_keys_and_dois_filters_resolved_keys(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "extracted/CERNAB00000010772A/CERNAB00000010772/older/article/main.xml": _main_xml_with_doi_and_received_date( + "10.1016/one", 2025, 1, 10 + ), + "extracted/CERNAB00000010772A/CERNAB00000010772/newer/article/main.xml": _main_xml_with_doi_and_received_date( + "10.1016/one", 2025, 2, 10 + ), + "extracted/CERNAB00000010772A/CERNAB00000010772/other/article/main.xml": _main_xml_with_doi_and_received_date( + "10.1016/two", 2025, 3, 10 + ), + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={ + "file_keys": ["*"], + "dois": ["10.1016/one"], + }, + ) + + assert result == [ + "extracted/CERNAB00000010772A/CERNAB00000010772/newer/article/main.xml" + ] + + def test_collect_records_date_range_uses_received_date_from_xml(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "extracted/CERNAB00000010772A/CERNAB00000010772/in-range/main.xml": _main_xml_with_doi_and_received_date( + "10.1016/one", 2025, 9, 5 + ), + "extracted/CERNAB00000010772A/CERNAB00000010772/out-of-range/main.xml": _main_xml_with_doi_and_received_date( + "10.1016/two", 2025, 6, 1 + ), + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={ + "date_from": "2025-09-01", + "date_to": "2025-09-30", + }, + ) + + assert result == [ + "extracted/CERNAB00000010772A/CERNAB00000010772/in-range/main.xml" + ] + + def test_collect_records_limit_exceeded_raises(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "extracted/CERNAB00000010772A/CERNAB00000010772/one/main.xml": _main_xml_with_doi_and_received_date( + "10.1016/one", 2025, 7, 1 + ), + "extracted/CERNAB00000010772A/CERNAB00000010772/two/main.xml": _main_xml_with_doi_and_received_date( + "10.1016/two", 2025, 7, 2 + ), + } + mock_repo = self._build_repo(payload_by_key) + + with pytest.raises(ValueError, match="above limit"): + function_to_unit_test( + repo=mock_repo, + params={ + "file_keys": ["*"], + "limit": 1, + }, + ) + + def test_collect_records_logs_and_raises_when_date_scan_is_over_10k(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + mock_repo = mock.MagicMock() + mock_repo.EXTRACTED_DIR = "extracted/" + mock_repo.s3.objects.filter.return_value.all.return_value = [ + _S3Object(f"extracted/collection/{i}/main.xml") for i in range(10001) + ] + mock_repo.get_by_id.return_value = BytesIO(b"
") + + mocked_logger = mock.MagicMock() + with ( + mock.patch.dict( + function_to_unit_test.__globals__, + { + "logger": mocked_logger, + "_extract_received_date_from_xml": lambda _: date(2025, 9, 5), + }, + ), + pytest.raises(ValueError, match="XML scan threshold exceeded"), + ): + function_to_unit_test( + repo=mock_repo, + params={ + "date_from": "2025-09-01", + "date_to": "2025-09-30", + }, + ) + + mocked_logger.error.assert_any_call( + "XML scan threshold exceeded for %s: %s files to parse (> %s). " + "Consider narrowing filters.", + "ce:date-received extraction from all main.xml candidates", + 10001, + 10000, + ) + + def test_collect_records_logs_and_raises_when_file_keys_doi_scan_is_over_10k(self): + task = self.dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + mock_repo = mock.MagicMock() + mock_repo.EXTRACTED_DIR = "extracted/" + mock_repo.s3.objects.filter.return_value.all.return_value = [ + _S3Object(f"extracted/collection/{i}/main.xml") for i in range(10001) + ] + mock_repo.get_by_id.return_value = BytesIO(b"
") + + mocked_logger = mock.MagicMock() + with ( + mock.patch.dict( + function_to_unit_test.__globals__, + { + "logger": mocked_logger, + "_extract_received_date_from_xml": lambda _: date(2025, 9, 5), + "_extract_doi_from_xml": lambda _: "10.1016/one", + }, + ), + pytest.raises(ValueError, match="XML scan threshold exceeded"), + ): + function_to_unit_test( + repo=mock_repo, + params={ + "file_keys": ["*"], + "dois": ["10.1016/one"], + }, + ) + + mocked_logger.error.assert_any_call( + "XML scan threshold exceeded for %s: %s files to parse (> %s). " + "Consider narrowing filters.", + "date extraction for sorting file_keys before DOI filtering", + 10001, + 10000, + ) + + def test_prepare_trigger_conf_dry_run_and_normal(self): + task = self.dag.get_task("prepare_trigger_conf") + function_to_unit_test = task.python_callable + + file_key = ( + "extracted/CERNAB00000010772A/" + "CERNAB00000010772/03702693/v846sC/S0370269322005871/main.xml" + ) + dataset_key = "extracted/CERNAB00000010772A/CERNAB00000010772/dataset.xml" + + mock_repo = mock.MagicMock() + mock_repo.EXTRACTED_DIR = "extracted/" + mock_repo.s3.objects.filter.return_value.all.return_value = [ + _S3Object(file_key), + _S3Object(dataset_key), + ] + mock_repo.get_by_id.return_value = BytesIO(_dataset_xml()) + + dry_run_result = function_to_unit_test( + file_keys=[file_key], + repo=mock_repo, + params={"dry_run": True}, + ) + assert dry_run_result == [] + + mocked_trigger = mock.MagicMock( + return_value=[ + { + "file_name": file_key, + "metadata": {"files": {"xml": file_key}, "meta": "data1"}, + } + ] + ) + + with mock.patch.dict( + function_to_unit_test.__globals__, + {"trigger_file_processing_elsevier": mocked_trigger}, + ): + normal_result = function_to_unit_test( + file_keys=[file_key], + repo=mock_repo, + params={"dry_run": False}, + ) + + assert normal_result == [ + { + "file_name": file_key, + "metadata": {"files": {"xml": file_key}, "meta": "data1"}, + } + ] From 67de23348a271e50b7a39cab20a49d460612f8d8 Mon Sep 17 00:00:00 2001 From: PascalEgn Date: Mon, 20 Apr 2026 10:51:28 +0200 Subject: [PATCH 3/3] feat: add jagiellonian reharvest DAG for reprocessing articles --- .../jagiellonian/jagiellonian_process_file.py | 28 +- dags/jagiellonian/jagiellonian_pull_api.py | 27 +- dags/jagiellonian/jagiellonian_reharvest.py | 311 ++++++++++++++++++ dags/jagiellonian/repository.py | 38 +++ .../test_integration_jagiellonian_pull_api.py | 160 ++------- .../test_jagiellonian_process_file.py | 133 ++------ .../test_unit_jagiellonian_reharvest.py | 182 ++++++++++ 7 files changed, 615 insertions(+), 264 deletions(-) create mode 100644 dags/jagiellonian/jagiellonian_reharvest.py create mode 100644 dags/jagiellonian/repository.py create mode 100644 tests/units/jagiellonian/test_unit_jagiellonian_reharvest.py diff --git a/dags/jagiellonian/jagiellonian_process_file.py b/dags/jagiellonian/jagiellonian_process_file.py index 8b53720d..cc3630a6 100644 --- a/dags/jagiellonian/jagiellonian_process_file.py +++ b/dags/jagiellonian/jagiellonian_process_file.py @@ -2,9 +2,9 @@ import logging import os from datetime import UTC, datetime +from io import BytesIO import pendulum -from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator from airflow.sdk import dag, task from common.enhancer import Enhancer @@ -12,8 +12,10 @@ from common.notification_service import FailedDagNotifier from common.utils import create_or_update_article from jagiellonian.parser import JagiellonianParser +from jagiellonian.repository import JagiellonianRepository logger = logging.getLogger("airflow.task") +JAGIELLONIAN_REPO = JagiellonianRepository() FILE_EXTENSIONS = {"pdf": ".pdf", "xml": ".xml", "pdfa": ".pdf"} @@ -39,6 +41,16 @@ def jagiellonian_process_file(): def parse(**kwargs): if "params" in kwargs and "article" in kwargs["params"]: article = kwargs["params"]["article"] + + if ( + isinstance(article, dict) + and isinstance(article.get("dois"), list) + and article.get("dois") + and isinstance(article["dois"][0], dict) + and article["dois"][0].get("value") + ): + return article + parsed = JagiellonianParser().parse(article) if "dois" not in parsed: @@ -105,21 +117,11 @@ def enrich(enhanced_file): return Enricher()(enhanced_file) @task(task_id="jagiellonian-save-to-s3") - def save_to_s3(enriched_file): - s3_bucket = os.getenv("JAGIELLONIAN_BUCKET_NAME", "jagiellonian") - aws_conn_id = os.getenv("AWS_CONN_ID", "aws_s3_minio") - + def save_to_s3(enriched_file, repo=JAGIELLONIAN_REPO): doi = enriched_file["dois"][0]["value"] key = f"{doi}_metadata_{(datetime.now(UTC))}.json" - s3_hook = S3Hook(aws_conn_id=aws_conn_id) - - s3_hook.load_string( - string_data=json.dumps(enriched_file, indent=2), - key=key, - bucket_name=s3_bucket, - replace=True, - ) + repo.save(key=key, obj=BytesIO(json.dumps(enriched_file, indent=2).encode())) @task() def create_or_update(enriched_file): diff --git a/dags/jagiellonian/jagiellonian_pull_api.py b/dags/jagiellonian/jagiellonian_pull_api.py index 88c8f598..35067ed6 100644 --- a/dags/jagiellonian/jagiellonian_pull_api.py +++ b/dags/jagiellonian/jagiellonian_pull_api.py @@ -3,33 +3,14 @@ from datetime import timedelta import pendulum -from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.http.hooks.http import HttpHook from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sdk import dag, task from common.notification_service import FailedDagNotifier +from jagiellonian.repository import JagiellonianRepository logger = logging.getLogger("airflow.task") - - -def get_latest_s3_file(): - s3_bucket = os.getenv("JAGIELLONIAN_BUCKET_NAME", "jagiellonian") - aws_conn_id = os.getenv("AWS_CONN_ID", "aws_s3_minio") - - s3_hook = S3Hook(aws_conn_id=aws_conn_id) - objects = s3_hook.list_keys(bucket_name=s3_bucket) - files = [obj for obj in objects if not obj.endswith("/")] - if not files: - return None - - file_timestamps = [] - for file_key in files: - object_info = s3_hook.get_key(key=file_key, bucket_name=s3_bucket) - file_timestamps.append((file_key, object_info.last_modified)) - - file_timestamps.sort(key=lambda x: x[1], reverse=True) - latest_timestamp = file_timestamps[0][1].strftime("%Y-%m-%d") - return latest_timestamp +JAGIELLONIAN_REPO = JagiellonianRepository() default_args = { @@ -53,7 +34,7 @@ def get_latest_s3_file(): ) def jagiellonian_pull_api(from_date=None): @task(task_id="jagiellonian_fetch_crossref_api") - def fetch_crossref_api(from_date_param=None): + def fetch_crossref_api(from_date_param=None, repo=JAGIELLONIAN_REPO): http_conn_id = os.getenv("HTTP_CONN_ID", "crossref_api") endpoint_filter = "" @@ -61,7 +42,7 @@ def fetch_crossref_api(from_date_param=None): logger.info("Using provided from_date: %s", from_date_param) endpoint_filter = f"from-created-date:{from_date_param}" else: - latest_s3_file = get_latest_s3_file() + latest_s3_file = repo.find_the_last_uploaded_file_date() if latest_s3_file: logger.info("Using latest S3 file date: %s", latest_s3_file) endpoint_filter = f"from-created-date:{latest_s3_file}" diff --git a/dags/jagiellonian/jagiellonian_reharvest.py b/dags/jagiellonian/jagiellonian_reharvest.py new file mode 100644 index 00000000..c4effd0b --- /dev/null +++ b/dags/jagiellonian/jagiellonian_reharvest.py @@ -0,0 +1,311 @@ +import fnmatch +import json +import logging +import re +from datetime import date, datetime, timedelta + +import pendulum +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.sdk import Param, dag, task +from common.notification_service import FailedDagNotifier +from jagiellonian.repository import JagiellonianRepository + +logger = logging.getLogger("airflow.task") +JAGIELLONIAN_REPO = JagiellonianRepository() + +_FILENAME_DATE_PATTERN = re.compile(r"_metadata_(\d{4}-\d{2}-\d{2})") +_FILENAME_DATETIME_PATTERN = re.compile( + r"_metadata_(\d{4}-\d{2}-\d{2})[ T](\d{2})[:_](\d{2})[:_](\d{2})" +) + + +def _parse_date(value): + if not value: + return None + return datetime.strptime(value, "%Y-%m-%d").date() + + +def _normalize_dois(dois): + if not dois: + return set() + return {doi.strip().lower() for doi in dois if isinstance(doi, str) and doi.strip()} + + +def _normalize_file_keys(file_keys): + return [k.strip() for k in file_keys if isinstance(k, str) and k.strip()] + + +def _extract_date_from_key(key): + if not key: + return None + + match = _FILENAME_DATE_PATTERN.search(key) + if not match: + return None + + try: + return datetime.strptime(match.group(1), "%Y-%m-%d").date() + except ValueError: + return None + + +def _extract_datetime_from_key(key): + if not key: + return datetime.min + + match = _FILENAME_DATETIME_PATTERN.search(key) + if not match: + return datetime.min + + try: + return datetime.strptime( + f"{match.group(1)} {match.group(2)}:{match.group(3)}:{match.group(4)}", + "%Y-%m-%d %H:%M:%S", + ) + except ValueError: + return datetime.min + + +def _extract_doi_from_json(article_json): + if not isinstance(article_json, dict): + return None + + doi_entries = article_json.get("dois") or [] + if isinstance(doi_entries, list): + for doi_entry in doi_entries: + if isinstance(doi_entry, dict): + value = doi_entry.get("value") + if isinstance(value, str) and value.strip(): + return value.strip().lower() + elif isinstance(doi_entry, str) and doi_entry.strip(): + return doi_entry.strip().lower() + + raw_doi = article_json.get("DOI") + if isinstance(raw_doi, str) and raw_doi.strip(): + return raw_doi.strip().lower() + + return None + + +def _is_glob_pattern(value): + return any(c in value for c in ("*", "?", "[")) + + +def _resolve_file_keys(file_keys, all_json_keys): + all_json_keys_set = set(all_json_keys) + + exact_keys = [k for k in file_keys if not _is_glob_pattern(k)] + glob_patterns = [k for k in file_keys if _is_glob_pattern(k)] + + missing = [k for k in exact_keys if k not in all_json_keys_set] + if missing: + raise ValueError(f"Some requested file_keys do not exist: {missing}") + + resolved = list(exact_keys) + + for pattern in glob_patterns: + matched = [key for key in all_json_keys if fnmatch.fnmatch(key, pattern)] + if not matched: + logger.warning( + "Pattern %r matched no Jagiellonian metadata JSON keys", pattern + ) + resolved.extend(matched) + + seen = set() + deduped = [] + for key in resolved: + if key not in seen: + seen.add(key) + deduped.append(key) + return deduped + + +def _enforce_limit(records, limit): + if limit is None: + return + if len(records) > limit: + raise ValueError( + f"Matched {len(records)} records, which is above limit={limit}. " + "Please reduce the date range or increase the limit." + ) + + +@dag( + on_failure_callback=FailedDagNotifier(), + start_date=pendulum.today("UTC").add(days=-1), + schedule=None, + catchup=False, + tags=["reharvest", "jagiellonian"], + params={ + "date_from": Param( + default=None, + type=["string", "null"], + description="Start date in YYYY-MM-DD format (from metadata filename)", + title="Date from", + ), + "date_to": Param( + default=None, + type=["string", "null"], + description="End date in YYYY-MM-DD format (from metadata filename)", + title="Date to", + ), + "file_keys": Param( + default=[], + type=["array", "null"], + description=( + "Exact Jagiellonian metadata JSON keys or glob patterns. " + "Example: '*' or '10.5506/*_metadata_2026-04-09*.json'." + ), + title="File keys", + ), + "dois": Param( + default=[], + type=["array", "null"], + description=( + "List of DOIs to process. DOI is extracted from metadata JSON at " + "dois[].value. If date_from/date_to are not provided, this will search " + "only records from the last 3 years." + ), + title="DOIs", + ), + "limit": Param( + 1000, + type=["integer"], + description="Maximum number of records to process", + title="Limit", + ), + "dry_run": Param( + default=True, + type=["boolean", "null"], + description="Whether to perform a dry run. If true, no downstream DAGs will be triggered", + title="Dry run", + ), + }, +) +def jagiellonian_reharvest(): + @task() + def collect_records(repo=JAGIELLONIAN_REPO, **kwargs): + params = kwargs.get("params", {}) + + date_from = _parse_date(params.get("date_from")) + date_to = _parse_date(params.get("date_to")) + file_keys = _normalize_file_keys(params.get("file_keys") or []) + dois = _normalize_dois(params.get("dois") or []) + target_dois = set(dois) + limit = params.get("limit") + + if limit is not None and (not isinstance(limit, int) or limit <= 0): + raise ValueError("limit must be a positive integer when provided") + + if bool(file_keys) and (date_from or date_to): + raise ValueError( + "Invalid parameters: date_from/date_to cannot be used together with file_keys" + ) + + if (date_from and not date_to) or (date_to and not date_from): + raise ValueError("Both date_from and date_to must be provided together") + + all_json_keys = [ + obj.key for obj in repo.s3_bucket.objects.all() if obj.key.endswith(".json") + ] + + logger.info( + "Found %s Jagiellonian metadata JSON file(s) in storage", len(all_json_keys) + ) + + if file_keys: + resolved_keys = _resolve_file_keys(file_keys, all_json_keys) + else: + if target_dois and not (date_from and date_to): + date_to = date.today() + date_from = date_to - timedelta(days=365 * 3) + + if not (date_from and date_to): + raise ValueError( + "Invalid parameters: provide either date_from+date_to, file_keys, or dois" + ) + + logger.info( + "Selecting Jagiellonian metadata JSON in date range %s to %s", + date_from, + date_to, + ) + + resolved_keys = [] + for key in all_json_keys: + key_date = _extract_date_from_key(key) + if key_date is None: + continue + if date_from <= key_date <= date_to: + resolved_keys.append(key) + + resolved_keys = sorted( + resolved_keys, key=_extract_datetime_from_key, reverse=True + ) + + deduped_by_doi = {} + keys_without_doi = [] + found_dois = set() + + for key in resolved_keys: + article_json = json.loads(repo.get_by_id(key).getvalue().decode("utf-8")) + doi = _extract_doi_from_json(article_json) + + if not doi: + logger.warning("Could not extract DOI from metadata JSON: %s", key) + if not target_dois: + keys_without_doi.append({"key": key, "article": article_json}) + continue + + if target_dois and doi not in target_dois: + continue + + if target_dois: + found_dois.add(doi) + + if doi not in deduped_by_doi: + deduped_by_doi[doi] = {"key": key, "article": article_json} + + if target_dois: + missing_dois = sorted(target_dois - found_dois) + if missing_dois: + logger.warning("Some requested DOIs were not found: %s", missing_dois) + + selected_records = list(deduped_by_doi.values()) + keys_without_doi + _enforce_limit(selected_records, limit) + + logger.info( + "Collected %s deduplicated Jagiellonian record(s)", len(selected_records) + ) + logger.info( + "Selected Jagiellonian metadata keys: %s", + [r["key"] for r in selected_records], + ) + + return [record["article"] for record in selected_records] + + @task() + def prepare_trigger_conf(records, **kwargs): + dry_run = bool(kwargs.get("params", {}).get("dry_run", False)) + if dry_run: + logger.info( + "Dry run enabled. %s record(s) matched. No downstream runs will be triggered.", + len(records), + ) + return [] + + confs = [{"article": record} for record in records] + logger.info("Prepared %s downstream trigger conf(s)", len(confs)) + return confs + + records = collect_records() + trigger_confs = prepare_trigger_conf(records) + + TriggerDagRunOperator.partial( + task_id="jagiellonian_reharvest_trigger_file_processing", + trigger_dag_id="jagiellonian_process_file", + reset_dag_run=True, + ).expand(conf=trigger_confs) + + +jagiellonian_reharvest_dag = jagiellonian_reharvest() diff --git a/dags/jagiellonian/repository.py b/dags/jagiellonian/repository.py new file mode 100644 index 00000000..398ebd1a --- /dev/null +++ b/dags/jagiellonian/repository.py @@ -0,0 +1,38 @@ +import io +import os + +from common.repository import IRepository +from common.s3_service import S3Service + + +class JagiellonianRepository(IRepository): + def __init__(self) -> None: + super().__init__() + self.s3_bucket = S3Service( + os.getenv("JAGIELLONIAN_BUCKET_NAME", "jagiellonian") + ) + + def find_all(self): + files = [] + for obj in self.s3_bucket.objects.all(): + file_name = os.path.basename(obj.key) + files.append(file_name) + return files + + def get_by_id(self, id): + retfile = io.BytesIO() + self.s3_bucket.download_fileobj(id, retfile) + return retfile + + def find_the_last_uploaded_file_date(self): + objects = list(self.s3_bucket.objects.all()) + if not objects: + return + dates = [obj.last_modified.strftime("%Y-%m-%d") for obj in objects] + return max(dates) + + def save(self, key, obj): + self.s3_bucket.upload_fileobj(obj, key) + + def delete_all(self): + self.s3_bucket.objects.all().delete() diff --git a/tests/integration/jagiellonian/test_integration_jagiellonian_pull_api.py b/tests/integration/jagiellonian/test_integration_jagiellonian_pull_api.py index a4cf846b..7291960b 100644 --- a/tests/integration/jagiellonian/test_integration_jagiellonian_pull_api.py +++ b/tests/integration/jagiellonian/test_integration_jagiellonian_pull_api.py @@ -1,144 +1,52 @@ -import contextlib -import os -from datetime import UTC, datetime from unittest import mock from unittest.mock import MagicMock -from urllib.parse import urlparse -import boto3 import pytest -from airflow.models import Connection, DagBag -from airflow.utils.session import create_session -from botocore.client import Config +from airflow.models import DagBag +from jagiellonian.repository import JagiellonianRepository -endpoint = os.getenv("S3_ENDPOINT", "s3") -parsed = urlparse(endpoint if "://" in endpoint else f"http://{endpoint}") -MINIO_HOST = parsed.hostname or "s3" +DAG_NAME = "jagiellonian_pull_api" -@pytest.fixture(scope="class") -def dagbag(): - return DagBag(dag_folder="dags/", include_examples=False) +@pytest.fixture +def dag(): + dagbag = DagBag(dag_folder="dags/", include_examples=False) + assert dagbag.import_errors.get(f"dags/{DAG_NAME}.py") is None + return dagbag.get_dag(dag_id=DAG_NAME) -@pytest.mark.usefixtures("dagbag") -class TestIntegrationJagiellonianPullApi: - def setup_method(self): - os.environ["AIRFLOW_CONN_AWS_S3_MINIO_TEST"] = ( - f"aws://airflow:Airflow01@{MINIO_HOST}:9000" - f"?endpoint_url=http%3A%2F%2F{MINIO_HOST}%3A9000" - "®ion_name=us-east-1&verify=false" - ) +@pytest.fixture +def repo(): + r = JagiellonianRepository() + for obj in r.s3_bucket.objects.all(): + obj.delete() + r.s3_bucket.put_object(Key="test.json", Body=b"") + yield r + for obj in r.s3_bucket.objects.all(): + obj.delete() - with create_session() as session: - session.query(Connection).filter( - Connection.conn_id == "crossref_api_test" - ).delete() - conn = Connection( - conn_id="crossref_api_test", - conn_type="http", - host="https://api.production.crossref.org", - ) - session.add(conn) - session.commit() +def test_dag_loaded(dag): + assert dag is not None - self.dag_id = "jagiellonian_pull_api" - self.execution_date = datetime.now(UTC) - self.dag = DagBag(dag_folder="dags/", include_examples=False).get_dag( - self.dag_id - ) - assert self.dag is not None, f"DAG {self.dag_id} failed to load" +@mock.patch("airflow.providers.http.hooks.http.HttpHook.run") +def test_fetch_crossref_api(mock_http_run, dag, repo): + mock_http_response = MagicMock() + mock_http_response.json.return_value = { + "message": {"items": [], "total-results": 0} + } + mock_http_response.raise_for_status = MagicMock() + mock_http_run.return_value = mock_http_response - s3 = boto3.client( - "s3", - endpoint_url=f"http://{MINIO_HOST}:9000", - aws_access_key_id="airflow", - aws_secret_access_key="Airflow01", - region_name="us-east-1", - config=Config(signature_version="s3v4"), - verify=False, - ) + task = dag.get_task("jagiellonian_fetch_crossref_api") + function_to_unit_test = task.python_callable - with contextlib.suppress(s3.exceptions.BucketAlreadyOwnedByYou): - s3.create_bucket(Bucket="jagiellonian-test", ACL="public-read-write") - response = s3.list_objects_v2(Bucket="jagiellonian-test") + results = function_to_unit_test(repo=repo) - if "Contents" in response: - objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]] + assert mock_http_run.called - if objects_to_delete: - s3.delete_objects( - Bucket="jagiellonian", Delete={"Objects": objects_to_delete} - ) + called_endpoint = mock_http_run.call_args[1]["data"]["filter"] + assert "from-created-date:" in called_endpoint - def teardown_method(self): - if "AIRFLOW_CONN_AWS_S3_MINIO_TEST" in os.environ: - del os.environ["AIRFLOW_CONN_AWS_S3_MINIO_TEST"] - - s3 = boto3.client( - "s3", - endpoint_url=f"http://{MINIO_HOST}:9000", - aws_access_key_id="airflow", - aws_secret_access_key="Airflow01", - region_name="us-east-1", - config=Config(signature_version="s3v4"), - verify=False, - ) - response = s3.list_objects_v2(Bucket="jagiellonian-test") - - if "Contents" in response: - objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]] - - if objects_to_delete: - s3.delete_objects( - Bucket="jagiellonian-test", Delete={"Objects": objects_to_delete} - ) - - s3.delete_bucket(Bucket="jagiellonian-test") - with create_session() as session: - session.query(Connection).filter( - Connection.conn_id == "crossref_api_test" - ).delete() - session.commit() - - @mock.patch.dict( - os.environ, - { - "JAGIELLONIAN_BUCKET_NAME": "jagiellonian-test", - "AWS_CONN_ID": "aws_s3_minio_test", - "HTTP_CONN_ID": "crossref_api_test", - }, - ) - @mock.patch("airflow.providers.http.hooks.http.HttpHook.run") - def test_fetch_crossref_api(self, mock_http_run): - mock_http_response = MagicMock() - mock_http_response.json.return_value = { - "message": {"items": [], "total-results": 0} - } - mock_http_response.raise_for_status = MagicMock() - mock_http_run.return_value = mock_http_response - - s3 = boto3.client( - "s3", - endpoint_url=f"http://{MINIO_HOST}:9000", - aws_access_key_id="airflow", - aws_secret_access_key="Airflow01", - region_name="us-east-1", - config=Config(signature_version="s3v4"), - verify=False, - ) - s3.put_object(Bucket="jagiellonian-test", Key="test.json", Body=b"") - - task = self.dag.get_task("jagiellonian_fetch_crossref_api") - function_to_unit_test = task.python_callable - - results = function_to_unit_test() - - assert mock_http_run.called - - called_endpoint = mock_http_run.call_args[1]["data"]["filter"] - assert "from-created-date:" in called_endpoint - - assert results == [] + assert results == [] diff --git a/tests/integration/jagiellonian/test_jagiellonian_process_file.py b/tests/integration/jagiellonian/test_jagiellonian_process_file.py index 331835eb..6bce9f83 100644 --- a/tests/integration/jagiellonian/test_jagiellonian_process_file.py +++ b/tests/integration/jagiellonian/test_jagiellonian_process_file.py @@ -1,116 +1,45 @@ -import os -from datetime import UTC, datetime -from unittest import mock -from urllib.parse import urlparse - -import boto3 import pytest from airflow.models import DagBag -from botocore.client import Config - -endpoint = os.getenv("S3_ENDPOINT", "s3") -parsed = urlparse(endpoint if "://" in endpoint else f"http://{endpoint}") -MINIO_HOST = parsed.hostname or "s3" - - -@pytest.fixture(scope="class") -def dagbag(): - return DagBag(dag_folder="dags/", include_examples=False) - - -@pytest.mark.usefixtures("dagbag") -class TestJagiellonianProcessFile: - def setup_method(self): - os.environ["AIRFLOW_CONN_AWS_S3_MINIO_TEST"] = ( - f"aws://airflow:Airflow01@{MINIO_HOST}:9000" - f"?endpoint_url=http%3A%2F%2F{MINIO_HOST}%3A9000" - "®ion_name=us-east-1&verify=false" - ) - - self.dag_id = "jagiellonian_process_file" - self.execution_date = datetime.now(UTC) - - self.dag = DagBag(dag_folder="dags/", include_examples=False).get_dag( - self.dag_id - ) - assert self.dag is not None, f"DAG {self.dag_id} failed to load" - - s3 = boto3.client( - "s3", - endpoint_url=f"http://{MINIO_HOST}:9000", - aws_access_key_id="airflow", - aws_secret_access_key="Airflow01", - region_name="us-east-1", - config=Config(signature_version="s3v4"), - verify=False, - ) - - s3.create_bucket(Bucket="jagiellonian-test", ACL="public-read-write") - response = s3.list_objects_v2(Bucket="jagiellonian-test") +from jagiellonian.repository import JagiellonianRepository - if "Contents" in response: - objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]] +DAG_NAME = "jagiellonian_process_file" - if objects_to_delete: - s3.delete_objects( - Bucket="jagiellonian", Delete={"Objects": objects_to_delete} - ) - def teardown_method(self): - if "AIRFLOW_CONN_AWS_S3_MINIO_TEST" in os.environ: - del os.environ["AIRFLOW_CONN_AWS_S3_MINIO_TEST"] - s3 = boto3.client( - "s3", - endpoint_url=f"http://{MINIO_HOST}:9000", - aws_access_key_id="airflow", - aws_secret_access_key="Airflow01", - region_name="us-east-1", - config=Config(signature_version="s3v4"), - verify=False, - ) - response = s3.list_objects_v2(Bucket="jagiellonian-test") +@pytest.fixture +def dag(): + dagbag = DagBag(dag_folder="dags/", include_examples=False) + assert dagbag.import_errors.get(f"dags/{DAG_NAME}.py") is None + return dagbag.get_dag(dag_id=DAG_NAME) - if "Contents" in response: - objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]] - if objects_to_delete: - s3.delete_objects( - Bucket="jagiellonian-test", Delete={"Objects": objects_to_delete} - ) +@pytest.fixture +def repo(): + r = JagiellonianRepository() + for obj in r.s3_bucket.objects.all(): + obj.delete() + yield r + for obj in r.s3_bucket.objects.all(): + obj.delete() - s3.delete_bucket(Bucket="jagiellonian-test") - @mock.patch.dict( - os.environ, - { - "JAGIELLONIAN_BUCKET_NAME": "jagiellonian-test", - "AWS_CONN_ID": "aws_s3_minio_test", - }, - ) - def test_save_to_s3(self): - sample_article = { - "title": "Test Article", - "authors": ["Author 1", "Author 2"], - "abstract": "This is a test abstract", - "dois": [{"value": "10.1234/test.123"}], - "files": [], - } +def test_dag_loaded(dag): + assert dag is not None - task = self.dag.get_task("jagiellonian-save-to-s3") - function_to_unit_test = task.python_callable - function_to_unit_test(sample_article) +def test_save_to_s3(dag, repo): + sample_article = { + "title": "Test Article", + "authors": ["Author 1", "Author 2"], + "abstract": "This is a test abstract", + "dois": [{"value": "10.1234/test.123"}], + "files": [], + } - s3 = boto3.client( - "s3", - endpoint_url=f"http://{MINIO_HOST}:9000", - aws_access_key_id="airflow", - aws_secret_access_key="Airflow01", - region_name="us-east-1", - config=Config(signature_version="s3v4"), - verify=False, - ) + task = dag.get_task("jagiellonian-save-to-s3") + function_to_unit_test = task.python_callable - response = s3.list_objects_v2(Bucket="jagiellonian-test") + function_to_unit_test(sample_article, repo=repo) - assert "10.1234/test.123" in response["Contents"][0]["Key"] + objects = list(repo.s3_bucket.objects.all()) + assert len(objects) == 1 + assert "10.1234/test.123" in objects[0].key diff --git a/tests/units/jagiellonian/test_unit_jagiellonian_reharvest.py b/tests/units/jagiellonian/test_unit_jagiellonian_reharvest.py new file mode 100644 index 00000000..e63428b9 --- /dev/null +++ b/tests/units/jagiellonian/test_unit_jagiellonian_reharvest.py @@ -0,0 +1,182 @@ +import json +from io import BytesIO +from unittest import mock + +import pytest +from airflow.models import DagBag +from freezegun import freeze_time + + +class _S3Object: + def __init__(self, key): + self.key = key + + +@pytest.fixture(scope="class") +def dagbag(): + return DagBag(dag_folder="dags/", include_examples=False) + + +@pytest.fixture(scope="class") +def dag(dagbag): + return dagbag.dags.get("jagiellonian_reharvest") + + +class TestUnitJagiellonianReharvest: + def _build_repo(self, payload_by_key): + mock_repo = mock.MagicMock() + keys = list(payload_by_key.keys()) + mock_repo.s3_bucket.objects.all.return_value = [_S3Object(key) for key in keys] + + def _get_by_id(key): + return BytesIO(json.dumps(payload_by_key[key]).encode("utf-8")) + + mock_repo.get_by_id.side_effect = _get_by_id + return mock_repo + + def test_dag_structure(self, dag): + assert "jagiellonian_reharvest_trigger_file_processing" in dag.task_ids + task = dag.get_task("jagiellonian_reharvest_trigger_file_processing") + assert "MappedOperator" in str(type(task)) or task.is_mapped + assert task.partial_kwargs["trigger_dag_id"] == "jagiellonian_process_file" + assert task.partial_kwargs["reset_dag_run"] is True + + def test_collect_records_invalid_combination_dates_with_file_keys(self, dag): + task = dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + mock_repo = self._build_repo({}) + + with pytest.raises( + ValueError, match="date_from/date_to cannot be used together with file_keys" + ): + function_to_unit_test( + repo=mock_repo, + params={ + "file_keys": ["10.5506/a_metadata_2026-04-09 00:35:13+00:00.json"], + "date_from": "2026-04-01", + "date_to": "2026-04-09", + }, + ) + + def test_collect_records_date_range_dedup_keeps_newest(self, dag): + task = dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "10.5506/aphyspolb.57.4-a4_metadata_2026-04-09 00_35_13.599324+00_00.json": { + "dois": [{"value": "10.5506/aphyspolb.57.4-a4"}], + "titles": [{"title": "new"}], + }, + "10.5506/aphyspolb.57.4-a4_metadata_2026-04-09 00_10_13.599324+00_00.json": { + "dois": [{"value": "10.5506/aphyspolb.57.4-a4"}], + "titles": [{"title": "old"}], + }, + "10.5506/aphyspolb.57.4-a5_metadata_2026-04-09 00_20_00.000000+00_00.json": { + "dois": [{"value": "10.5506/aphyspolb.57.4-a5"}], + }, + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={"date_from": "2026-04-09", "date_to": "2026-04-09"}, + ) + + assert len(result) == 2 + doi_to_title = { + article["dois"][0]["value"]: (article.get("titles") or [{"title": None}])[ + 0 + ]["title"] + for article in result + } + assert doi_to_title["10.5506/aphyspolb.57.4-a4"] == "new" + + def test_collect_records_file_keys_glob_patterns(self, dag): + task = dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "10.5506/aphyspolb.57.4-a4_metadata_2026-04-09 00_35_13.599324+00_00.json": { + "dois": [{"value": "10.5506/aphyspolb.57.4-a4"}], + }, + "10.5506/aphyspolb.57.4-a5_metadata_2026-04-09 00_35_13.599324+00_00.json": { + "dois": [{"value": "10.5506/aphyspolb.57.4-a5"}], + }, + "10.5506/aphyspolb.57.4-a6_metadata_2026-04-10 00_35_13.599324+00_00.json": { + "dois": [{"value": "10.5506/aphyspolb.57.4-a6"}], + }, + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={"file_keys": ["10.5506/*_metadata_2026-04-09*.json"]}, + ) + + dois = sorted([article["dois"][0]["value"] for article in result]) + assert dois == ["10.5506/aphyspolb.57.4-a4", "10.5506/aphyspolb.57.4-a5"] + + @freeze_time("2026-04-20") + def test_collect_records_dois_without_dates_defaults_last_3_years(self, dag): + task = dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "10.5506/old_metadata_2020-01-01 00_00_00.000000+00_00.json": { + "dois": [{"value": "10.5506/old"}], + }, + "10.5506/new_metadata_2026-04-09 00_35_13.599324+00_00.json": { + "dois": [{"value": "10.5506/new"}], + }, + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={"dois": ["10.5506/new"]}, + ) + + assert len(result) == 1 + assert result[0]["dois"][0]["value"] == "10.5506/new" + + def test_collect_records_limit_exceeded_raises(self, dag): + task = dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "10.5506/a_metadata_2026-04-09 00_00_00.000000+00_00.json": { + "dois": [{"value": "10.5506/a"}], + }, + "10.5506/b_metadata_2026-04-09 00_00_01.000000+00_00.json": { + "dois": [{"value": "10.5506/b"}], + }, + } + mock_repo = self._build_repo(payload_by_key) + + with pytest.raises(ValueError, match="above limit"): + function_to_unit_test( + repo=mock_repo, + params={ + "date_from": "2026-04-09", + "date_to": "2026-04-09", + "limit": 1, + }, + ) + + def test_prepare_trigger_conf_dry_run_and_normal(self, dag): + task = dag.get_task("prepare_trigger_conf") + function_to_unit_test = task.python_callable + + records = [{"dois": [{"value": "10.5506/aphyspolb.57.4-a4"}]}] + + dry_run_result = function_to_unit_test( + records=records, params={"dry_run": True} + ) + assert dry_run_result == [] + + normal_result = function_to_unit_test( + records=records, params={"dry_run": False} + ) + assert len(normal_result) == 1 + assert normal_result[0]["article"] == records[0]