From f73cfec997328009f6069ae653a3f557d4995de8 Mon Sep 17 00:00:00 2001 From: PascalEgn Date: Mon, 30 Mar 2026 17:33:50 +0200 Subject: [PATCH 1/2] feat: add Hindawi reharvest DAG for reprocessing articles --- dags/hindawi/hindawi_reharvest.py | 289 ++++++++++++++++++ .../hindawi/test_unit_hindawi_reharvest.py | 220 +++++++++++++ 2 files changed, 509 insertions(+) create mode 100644 dags/hindawi/hindawi_reharvest.py create mode 100644 tests/units/hindawi/test_unit_hindawi_reharvest.py diff --git a/dags/hindawi/hindawi_reharvest.py b/dags/hindawi/hindawi_reharvest.py new file mode 100644 index 00000000..4526feb3 --- /dev/null +++ b/dags/hindawi/hindawi_reharvest.py @@ -0,0 +1,289 @@ +import fnmatch +import logging +import re +from datetime import date, datetime, timedelta + +import pendulum +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.sdk import Param, dag, task +from hindawi.repository import HindawiRepository +from hindawi.utils import split_xmls + +logger = logging.getLogger("airflow.task") +HINDAWI_REPO = HindawiRepository() + + +def _parse_date(value): + if not value: + return None + return datetime.strptime(value, "%Y-%m-%d").date() + + +def _normalize_dois(dois): + if not dois: + return set() + return {doi.strip().lower() for doi in dois if isinstance(doi, str) and doi.strip()} + + +def _extract_key_date(s3_key): + key_date = s3_key.split("/")[0] + return _parse_date(key_date) + + +def _extract_key_datetime(s3_key): + try: + key_ts = s3_key.split("/")[1].removesuffix(".xml") + return datetime.strptime(key_ts, "%Y-%m-%dT%H:%M") + except (IndexError, ValueError): + return datetime.min + + +def _extract_doi_from_record(record_xml): + match = re.search(r"10\.1155/[^\s<]+", record_xml, flags=re.IGNORECASE) + if not match: + return None + return match.group(0).strip().lower() + + +def _enforce_limit(records, limit): + if limit is None: + return + if len(records) > limit: + raise ValueError( + f"Matched {len(records)} records, which is above limit={limit}. " + "Please reduce the date range or increase the limit." + ) + + +def _is_glob_pattern(value): + return any(c in value for c in ("*", "?", "[")) + + +def _resolve_file_keys(file_keys, all_snapshot_keys): + all_snapshot_keys_set = set(all_snapshot_keys) + + exact_keys = [k for k in file_keys if not _is_glob_pattern(k)] + glob_patterns = [k for k in file_keys if _is_glob_pattern(k)] + + missing_keys = [k for k in exact_keys if k not in all_snapshot_keys_set] + if missing_keys: + raise ValueError(f"Some requested file_keys do not exist: {missing_keys}") + + resolved = list(exact_keys) + + for pattern in glob_patterns: + matched = [key for key in all_snapshot_keys if fnmatch.fnmatch(key, pattern)] + if not matched: + logger.warning("Pattern %r matched no Hindawi snapshot keys", pattern) + resolved.extend(matched) + + seen = set() + deduped = [] + for key in resolved: + if key not in seen: + seen.add(key) + deduped.append(key) + return deduped + + +@dag( + start_date=pendulum.today("UTC").add(days=-1), + schedule=None, + catchup=False, + tags=["reharvest", "hindawi"], + params={ + "date_from": Param( + default=None, + type=["string", "null"], + description="Start date in YYYY-MM-DD format", + title="Date from", + ), + "date_to": Param( + default=None, + type=["string", "null"], + description="End date in YYYY-MM-DD format", + title="Date to", + ), + "file_keys": Param( + default=[], + type=["array", "null"], + description=( + "List of Hindawi snapshot keys to process. Supports glob patterns " + "(e.g. '*' or '2022-09-30/*')." + ), + title="File keys", + ), + "dois": Param( + default=[], + type=["array", "null"], + description="List of DOIs to process. This will only search for records from the last 3 years, if date_from/date_to are not provided.", + title="DOIs", + ), + "limit": Param( + 1000, + type=["integer"], + description="Maximum number of records to process", + title="Limit", + ), + "dry_run": Param( + default=True, + type=["boolean", "null"], + description="Whether to perform a dry run. If true, no downstream DAGs will be triggered", + title="Dry run", + ), + }, +) +def hindawi_reharvest(): + @task() + def collect_records(repo=HINDAWI_REPO, **kwargs): + params = kwargs.get("params", {}) + + date_from = _parse_date(params.get("date_from")) + date_to = _parse_date(params.get("date_to")) + file_keys = params.get("file_keys") or [] + dois = _normalize_dois(params.get("dois") or []) + limit = params.get("limit") + + if limit is not None and (not isinstance(limit, int) or limit <= 0): + raise ValueError("limit must be a positive integer when provided") + + if bool(file_keys) and bool(dois): + raise ValueError( + "Invalid parameters: file_keys and dois cannot be used together" + ) + + if bool(file_keys) and (date_from or date_to): + raise ValueError( + "Invalid parameters: date_from/date_to cannot be used together with file_keys" + ) + + if (date_from and not date_to) or (date_to and not date_from): + raise ValueError("Both date_from and date_to must be provided together") + + all_snapshot_keys = [ + obj.key + for obj in repo.s3_bucket.objects.all() + if obj.key.endswith(".xml") + and "/" in obj.key + and not obj.key.startswith("parsed/") + ] + logger.info( + "Found %s total Hindawi snapshot file(s) in storage", + len(all_snapshot_keys), + ) + + selected_keys = [] + target_dois = set(dois) + + if file_keys: + logger.info( + "Selecting Hindawi snapshot files from file_keys (with glob expansion)" + ) + selected_keys = _resolve_file_keys(file_keys, all_snapshot_keys) + + elif dois: + if not date_from and not date_to: + date_to = date.today() + date_from = date_to - timedelta(days=365 * 3) + + logger.info( + "Selecting Hindawi snapshot files for DOI search in date range %s to %s", + date_from, + date_to, + ) + keys_in_range = [] + for key in all_snapshot_keys: + key_date = _extract_key_date(key) + if not key_date: + continue + if date_from <= key_date <= date_to: + keys_in_range.append(key) + + selected_keys = sorted( + keys_in_range, key=_extract_key_datetime, reverse=True + ) + + elif date_from and date_to: + logger.info( + "Selecting Hindawi snapshot files in date range %s to %s", + date_from, + date_to, + ) + for key in all_snapshot_keys: + key_date = _extract_key_date(key) + if not key_date: + continue + if date_from <= key_date <= date_to: + selected_keys.append(key) + + else: + raise ValueError( + "Invalid parameters: provide either date_from+date_to, file_keys, or dois" + ) + + logger.info("Selected %s Hindawi snapshot key(s)", len(selected_keys)) + logger.info("Selected Hindawi snapshot keys: %s", selected_keys) + + deduped_records = {} + found_dois = set() + + for key in selected_keys: + snapshot_dt = _extract_key_datetime(key) + for record in split_xmls(repo, key): + identifier = _extract_doi_from_record(record) + if not identifier: + logger.warning( + "Skipping Hindawi record with missing DOI in file %s", key + ) + continue + + if target_dois and identifier not in target_dois: + continue + + if target_dois: + found_dois.add(identifier) + + existing = deduped_records.get(identifier) + if not existing or snapshot_dt > existing["snapshot_dt"]: + deduped_records[identifier] = { + "record": record, + "snapshot_dt": snapshot_dt, + } + + records = [entry["record"] for entry in deduped_records.values()] + + if target_dois: + missing_dois = sorted(target_dois - found_dois) + if missing_dois: + logger.warning("Some requested DOIs were not found: %s", missing_dois) + + _enforce_limit(records, limit) + + logger.info("Collected %s deduplicated Hindawi record(s)", len(records)) + return records + + @task() + def prepare_trigger_conf(records, **kwargs): + dry_run = bool(kwargs.get("params", {}).get("dry_run", False)) + if dry_run: + logger.info( + "Dry run enabled. %s record(s) matched. No downstream runs will be triggered.", + len(records), + ) + return [] + + confs = [{"record": record} for record in records] + logger.info("Prepared %s downstream trigger conf(s)", len(confs)) + return confs + + records = collect_records() + trigger_confs = prepare_trigger_conf(records) + + TriggerDagRunOperator.partial( + task_id="hindawi_reharvest_trigger_file_processing", + trigger_dag_id="hindawi_file_processing", + reset_dag_run=True, + ).expand(conf=trigger_confs) + + +hindawi_reharvest_dag = hindawi_reharvest() diff --git a/tests/units/hindawi/test_unit_hindawi_reharvest.py b/tests/units/hindawi/test_unit_hindawi_reharvest.py new file mode 100644 index 00000000..84db23c8 --- /dev/null +++ b/tests/units/hindawi/test_unit_hindawi_reharvest.py @@ -0,0 +1,220 @@ +from io import BytesIO +from unittest import mock + +import pytest +from airflow.models import DagBag +from freezegun import freeze_time + + +class _S3Object: + def __init__(self, key): + self.key = key + + +def _record_xml(doi): + return f""" + + + + + {doi} + + + + +""".strip() + + +def _snapshot_xml(*records): + records_xml = "".join(records) + return f"{records_xml}".encode() + + +@pytest.fixture(scope="class") +def dagbag(): + return DagBag(dag_folder="dags/", include_examples=False) + + +@pytest.fixture(scope="class") +def dag(dagbag): + return dagbag.dags.get("hindawi_reharvest") + + +class TestUnitHindawiReharvest: + def _build_repo(self, payload_by_key): + mock_repo = mock.MagicMock() + keys = list(payload_by_key.keys()) + mock_repo.s3_bucket.objects.all.return_value = [_S3Object(key) for key in keys] + + def _get_by_id(key): + return BytesIO(payload_by_key[key]) + + mock_repo.get_by_id.side_effect = _get_by_id + return mock_repo + + def test_dag_structure(self, dag): + assert "hindawi_reharvest_trigger_file_processing" in dag.task_ids + task = dag.get_task("hindawi_reharvest_trigger_file_processing") + assert "MappedOperator" in str(type(task)) or task.is_mapped + assert task.partial_kwargs["trigger_dag_id"] == "hindawi_file_processing" + assert task.partial_kwargs["reset_dag_run"] is True + + def test_collect_records_invalid_combination_file_keys_and_dois(self, dag): + task = dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + mock_repo = self._build_repo({}) + + with pytest.raises(ValueError, match="file_keys and dois"): + function_to_unit_test( + repo=mock_repo, + params={ + "file_keys": ["2022-09-30/2022-09-30T12:00.xml"], + "dois": ["10.1155/2022/1234567"], + }, + ) + + def test_collect_records_date_range_dedup_keeps_newest(self, dag): + task = dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "2022-09-30/2022-09-30T12:00.xml": _snapshot_xml( + _record_xml("10.1155/2022/1111111"), + _record_xml("10.1155/2022/2222222"), + ), + "2022-09-30/2022-09-30T13:00.xml": _snapshot_xml( + _record_xml("10.1155/2022/1111111") + ), + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={ + "date_from": "2022-09-30", + "date_to": "2022-09-30", + }, + ) + + assert len(result) == 2 + doi_hits = sorted(["10.1155/2022/1111111" in record for record in result]) + assert doi_hits == [False, True] + latest_key_calls = [call.args[0] for call in mock_repo.get_by_id.call_args_list] + assert "2022-09-30/2022-09-30T13:00.xml" in latest_key_calls + + def test_collect_records_limit_exceeded_raises(self, dag): + task = dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "2022-09-30/2022-09-30T12:00.xml": _snapshot_xml( + _record_xml("10.1155/2022/1111111"), + _record_xml("10.1155/2022/2222222"), + ) + } + mock_repo = self._build_repo(payload_by_key) + + with pytest.raises(ValueError, match="above limit"): + function_to_unit_test( + repo=mock_repo, + params={ + "date_from": "2022-09-30", + "date_to": "2022-09-30", + "limit": 1, + }, + ) + + def test_collect_records_file_keys_glob_patterns(self, dag): + task = dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "2022-09-30/2022-09-30T12:00.xml": _snapshot_xml( + _record_xml("10.1155/2022/1111111") + ), + "2022-09-30/2022-09-30T13:00.xml": _snapshot_xml( + _record_xml("10.1155/2022/2222222") + ), + "2022-10-01/2022-10-01T12:00.xml": _snapshot_xml( + _record_xml("10.1155/2022/3333333") + ), + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={"file_keys": ["2022-09-30/*"]}, + ) + + dois = sorted( + [ + next( + part + for part in [record] + if "10.1155/2022/1111111" in part or "10.1155/2022/2222222" in part + ) + for record in result + ] + ) + assert len(dois) == 2 + assert any("10.1155/2022/1111111" in record for record in result) + assert any("10.1155/2022/2222222" in record for record in result) + + @freeze_time("2022-10-02") + def test_collect_records_dois_without_dates_defaults_last_year(self, dag): + task = dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + payload_by_key = { + "2020-01-01/2020-01-01T10:00.xml": _snapshot_xml( + _record_xml("10.1155/2020/9999999") + ), + "2022-09-30/2022-09-30T12:00.xml": _snapshot_xml( + _record_xml("10.1155/2022/1234567") + ), + } + mock_repo = self._build_repo(payload_by_key) + + result = function_to_unit_test( + repo=mock_repo, + params={ + "dois": ["10.1155/2022/1234567"], + }, + ) + + assert len(result) == 1 + assert "10.1155/2022/1234567" in result[0] + + def test_collect_records_dois_range_too_large_raises(self, dag): + task = dag.get_task("collect_records") + function_to_unit_test = task.python_callable + + mock_repo = self._build_repo({}) + + with pytest.raises(ValueError, match="must not exceed one year"): + function_to_unit_test( + repo=mock_repo, + params={ + "dois": ["10.1155/2022/1234567"], + "date_from": "2020-01-01", + "date_to": "2022-01-01", + }, + ) + + def test_prepare_trigger_conf_dry_run_and_normal(self, dag): + task = dag.get_task("prepare_trigger_conf") + function_to_unit_test = task.python_callable + + records = [_record_xml("10.1155/2022/1234567")] + + dry_run_result = function_to_unit_test( + records=records, params={"dry_run": True} + ) + assert dry_run_result == [] + + normal_result = function_to_unit_test( + records=records, params={"dry_run": False} + ) + assert len(normal_result) == 1 + assert normal_result[0]["record"] == records[0] From 6d6072a6161b1a4e75bd80e8765e8837a1955a3a Mon Sep 17 00:00:00 2001 From: PascalEgn Date: Mon, 30 Mar 2026 17:34:10 +0200 Subject: [PATCH 2/2] reharvest: improve doi search --- dags/aps/aps_reharvest.py | 9 ++------- dags/hindawi/hindawi_reharvest.py | 2 ++ dags/iop/iop_reharvest.py | 2 +- dags/oup/oup_reharvest.py | 4 ++-- tests/units/aps/test_unit_aps_reharvest.py | 16 ---------------- .../units/hindawi/test_unit_hindawi_reharvest.py | 16 ---------------- 6 files changed, 7 insertions(+), 42 deletions(-) diff --git a/dags/aps/aps_reharvest.py b/dags/aps/aps_reharvest.py index d7fedc28..dcaa203e 100644 --- a/dags/aps/aps_reharvest.py +++ b/dags/aps/aps_reharvest.py @@ -131,7 +131,7 @@ def _resolve_file_keys(file_keys, all_snapshot_keys): "dois": Param( default=[], type=["array", "null"], - description="List of DOIs to process", + description="List of DOIs to process. This will only search for records from the last 3 years, if date_from/date_to are not provided.", title="DOIs", ), "limit": Param( @@ -197,13 +197,8 @@ def collect_articles(repo=APS_REPO, **kwargs): elif dois: if not date_from and not date_to: date_to = date.today() - date_from = date_to - timedelta(days=365) + date_from = date_to - timedelta(days=365 * 3) - if (date_to - date_from).days > 366: - raise ValueError( - "For DOI search the date range must not exceed one year. " - "Please use a smaller range." - ) logger.info( "Selecting APS snapshot files for DOI search in date range %s to %s", date_from, diff --git a/dags/hindawi/hindawi_reharvest.py b/dags/hindawi/hindawi_reharvest.py index 4526feb3..7890568d 100644 --- a/dags/hindawi/hindawi_reharvest.py +++ b/dags/hindawi/hindawi_reharvest.py @@ -6,6 +6,7 @@ import pendulum from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sdk import Param, dag, task +from common.notification_service import FailedDagNotifier from hindawi.repository import HindawiRepository from hindawi.utils import split_xmls @@ -87,6 +88,7 @@ def _resolve_file_keys(file_keys, all_snapshot_keys): @dag( + on_failure_callback=FailedDagNotifier(), start_date=pendulum.today("UTC").add(days=-1), schedule=None, catchup=False, diff --git a/dags/iop/iop_reharvest.py b/dags/iop/iop_reharvest.py index 794523a2..ee45f2ce 100644 --- a/dags/iop/iop_reharvest.py +++ b/dags/iop/iop_reharvest.py @@ -157,7 +157,7 @@ def _enforce_limit(records, limit): type=["array", "null"], description=( "List of DOIs to process. Can be used with date_from/date_to or " - "together with file_keys to filter the resolved key set." + "together with file_keys to filter the resolved key set. The maximum date range is 1 Year." ), title="DOIs", ), diff --git a/dags/oup/oup_reharvest.py b/dags/oup/oup_reharvest.py index 90b28652..e85b4b3c 100644 --- a/dags/oup/oup_reharvest.py +++ b/dags/oup/oup_reharvest.py @@ -185,8 +185,8 @@ def _enforce_limit(records, limit): default=[], type=["array", "null"], description=( - "List of DOIs to process. Can be used with date_from/date_to or together " - "with file_keys to filter the resolved key set, as the date search is limited to naming patterns that include a date." + "List of DOIs to process. Can be used with date_from/date_to or " + "together with file_keys to filter the resolved key set. The maximum date range is 1 Year." ), title="DOIs", ), diff --git a/tests/units/aps/test_unit_aps_reharvest.py b/tests/units/aps/test_unit_aps_reharvest.py index 6518bd92..b28f826b 100644 --- a/tests/units/aps/test_unit_aps_reharvest.py +++ b/tests/units/aps/test_unit_aps_reharvest.py @@ -212,22 +212,6 @@ def test_collect_articles_dois_without_dates_defaults_last_year(self, dag): assert len(result) == 1 assert result[0]["identifiers"]["doi"] == "10.1103/found" - def test_collect_articles_dois_range_too_large_raises(self, dag): - task = dag.get_task("collect_articles") - function_to_unit_test = task.python_callable - - mock_repo = self._build_repo({}) - - with pytest.raises(ValueError, match="must not exceed one year"): - function_to_unit_test( - repo=mock_repo, - params={ - "dois": ["10.1103/x"], - "date_from": "2024-01-01", - "date_to": "2026-01-01", - }, - ) - def test_prepare_trigger_conf_dry_run_and_normal(self, dag): task = dag.get_task("prepare_trigger_conf") function_to_unit_test = task.python_callable diff --git a/tests/units/hindawi/test_unit_hindawi_reharvest.py b/tests/units/hindawi/test_unit_hindawi_reharvest.py index 84db23c8..1641644c 100644 --- a/tests/units/hindawi/test_unit_hindawi_reharvest.py +++ b/tests/units/hindawi/test_unit_hindawi_reharvest.py @@ -186,22 +186,6 @@ def test_collect_records_dois_without_dates_defaults_last_year(self, dag): assert len(result) == 1 assert "10.1155/2022/1234567" in result[0] - def test_collect_records_dois_range_too_large_raises(self, dag): - task = dag.get_task("collect_records") - function_to_unit_test = task.python_callable - - mock_repo = self._build_repo({}) - - with pytest.raises(ValueError, match="must not exceed one year"): - function_to_unit_test( - repo=mock_repo, - params={ - "dois": ["10.1155/2022/1234567"], - "date_from": "2020-01-01", - "date_to": "2022-01-01", - }, - ) - def test_prepare_trigger_conf_dry_run_and_normal(self, dag): task = dag.get_task("prepare_trigger_conf") function_to_unit_test = task.python_callable