diff --git a/dags/aps/aps_reharvest.py b/dags/aps/aps_reharvest.py
index d7fedc28..dcaa203e 100644
--- a/dags/aps/aps_reharvest.py
+++ b/dags/aps/aps_reharvest.py
@@ -131,7 +131,7 @@ def _resolve_file_keys(file_keys, all_snapshot_keys):
"dois": Param(
default=[],
type=["array", "null"],
- description="List of DOIs to process",
+ description="List of DOIs to process. This will only search for records from the last 3 years, if date_from/date_to are not provided.",
title="DOIs",
),
"limit": Param(
@@ -197,13 +197,8 @@ def collect_articles(repo=APS_REPO, **kwargs):
elif dois:
if not date_from and not date_to:
date_to = date.today()
- date_from = date_to - timedelta(days=365)
+ date_from = date_to - timedelta(days=365 * 3)
- if (date_to - date_from).days > 366:
- raise ValueError(
- "For DOI search the date range must not exceed one year. "
- "Please use a smaller range."
- )
logger.info(
"Selecting APS snapshot files for DOI search in date range %s to %s",
date_from,
diff --git a/dags/hindawi/hindawi_reharvest.py b/dags/hindawi/hindawi_reharvest.py
new file mode 100644
index 00000000..7890568d
--- /dev/null
+++ b/dags/hindawi/hindawi_reharvest.py
@@ -0,0 +1,291 @@
+import fnmatch
+import logging
+import re
+from datetime import date, datetime, timedelta
+
+import pendulum
+from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
+from airflow.sdk import Param, dag, task
+from common.notification_service import FailedDagNotifier
+from hindawi.repository import HindawiRepository
+from hindawi.utils import split_xmls
+
+logger = logging.getLogger("airflow.task")
+HINDAWI_REPO = HindawiRepository()
+
+
+def _parse_date(value):
+ if not value:
+ return None
+ return datetime.strptime(value, "%Y-%m-%d").date()
+
+
+def _normalize_dois(dois):
+ if not dois:
+ return set()
+ return {doi.strip().lower() for doi in dois if isinstance(doi, str) and doi.strip()}
+
+
+def _extract_key_date(s3_key):
+ key_date = s3_key.split("/")[0]
+ return _parse_date(key_date)
+
+
+def _extract_key_datetime(s3_key):
+ try:
+ key_ts = s3_key.split("/")[1].removesuffix(".xml")
+ return datetime.strptime(key_ts, "%Y-%m-%dT%H:%M")
+ except (IndexError, ValueError):
+ return datetime.min
+
+
+def _extract_doi_from_record(record_xml):
+ match = re.search(r"10\.1155/[^\s<]+", record_xml, flags=re.IGNORECASE)
+ if not match:
+ return None
+ return match.group(0).strip().lower()
+
+
+def _enforce_limit(records, limit):
+ if limit is None:
+ return
+ if len(records) > limit:
+ raise ValueError(
+ f"Matched {len(records)} records, which is above limit={limit}. "
+ "Please reduce the date range or increase the limit."
+ )
+
+
+def _is_glob_pattern(value):
+ return any(c in value for c in ("*", "?", "["))
+
+
+def _resolve_file_keys(file_keys, all_snapshot_keys):
+ all_snapshot_keys_set = set(all_snapshot_keys)
+
+ exact_keys = [k for k in file_keys if not _is_glob_pattern(k)]
+ glob_patterns = [k for k in file_keys if _is_glob_pattern(k)]
+
+ missing_keys = [k for k in exact_keys if k not in all_snapshot_keys_set]
+ if missing_keys:
+ raise ValueError(f"Some requested file_keys do not exist: {missing_keys}")
+
+ resolved = list(exact_keys)
+
+ for pattern in glob_patterns:
+ matched = [key for key in all_snapshot_keys if fnmatch.fnmatch(key, pattern)]
+ if not matched:
+ logger.warning("Pattern %r matched no Hindawi snapshot keys", pattern)
+ resolved.extend(matched)
+
+ seen = set()
+ deduped = []
+ for key in resolved:
+ if key not in seen:
+ seen.add(key)
+ deduped.append(key)
+ return deduped
+
+
+@dag(
+ on_failure_callback=FailedDagNotifier(),
+ start_date=pendulum.today("UTC").add(days=-1),
+ schedule=None,
+ catchup=False,
+ tags=["reharvest", "hindawi"],
+ params={
+ "date_from": Param(
+ default=None,
+ type=["string", "null"],
+ description="Start date in YYYY-MM-DD format",
+ title="Date from",
+ ),
+ "date_to": Param(
+ default=None,
+ type=["string", "null"],
+ description="End date in YYYY-MM-DD format",
+ title="Date to",
+ ),
+ "file_keys": Param(
+ default=[],
+ type=["array", "null"],
+ description=(
+ "List of Hindawi snapshot keys to process. Supports glob patterns "
+ "(e.g. '*' or '2022-09-30/*')."
+ ),
+ title="File keys",
+ ),
+ "dois": Param(
+ default=[],
+ type=["array", "null"],
+ description="List of DOIs to process. This will only search for records from the last 3 years, if date_from/date_to are not provided.",
+ title="DOIs",
+ ),
+ "limit": Param(
+ 1000,
+ type=["integer"],
+ description="Maximum number of records to process",
+ title="Limit",
+ ),
+ "dry_run": Param(
+ default=True,
+ type=["boolean", "null"],
+ description="Whether to perform a dry run. If true, no downstream DAGs will be triggered",
+ title="Dry run",
+ ),
+ },
+)
+def hindawi_reharvest():
+ @task()
+ def collect_records(repo=HINDAWI_REPO, **kwargs):
+ params = kwargs.get("params", {})
+
+ date_from = _parse_date(params.get("date_from"))
+ date_to = _parse_date(params.get("date_to"))
+ file_keys = params.get("file_keys") or []
+ dois = _normalize_dois(params.get("dois") or [])
+ limit = params.get("limit")
+
+ if limit is not None and (not isinstance(limit, int) or limit <= 0):
+ raise ValueError("limit must be a positive integer when provided")
+
+ if bool(file_keys) and bool(dois):
+ raise ValueError(
+ "Invalid parameters: file_keys and dois cannot be used together"
+ )
+
+ if bool(file_keys) and (date_from or date_to):
+ raise ValueError(
+ "Invalid parameters: date_from/date_to cannot be used together with file_keys"
+ )
+
+ if (date_from and not date_to) or (date_to and not date_from):
+ raise ValueError("Both date_from and date_to must be provided together")
+
+ all_snapshot_keys = [
+ obj.key
+ for obj in repo.s3_bucket.objects.all()
+ if obj.key.endswith(".xml")
+ and "/" in obj.key
+ and not obj.key.startswith("parsed/")
+ ]
+ logger.info(
+ "Found %s total Hindawi snapshot file(s) in storage",
+ len(all_snapshot_keys),
+ )
+
+ selected_keys = []
+ target_dois = set(dois)
+
+ if file_keys:
+ logger.info(
+ "Selecting Hindawi snapshot files from file_keys (with glob expansion)"
+ )
+ selected_keys = _resolve_file_keys(file_keys, all_snapshot_keys)
+
+ elif dois:
+ if not date_from and not date_to:
+ date_to = date.today()
+ date_from = date_to - timedelta(days=365 * 3)
+
+ logger.info(
+ "Selecting Hindawi snapshot files for DOI search in date range %s to %s",
+ date_from,
+ date_to,
+ )
+ keys_in_range = []
+ for key in all_snapshot_keys:
+ key_date = _extract_key_date(key)
+ if not key_date:
+ continue
+ if date_from <= key_date <= date_to:
+ keys_in_range.append(key)
+
+ selected_keys = sorted(
+ keys_in_range, key=_extract_key_datetime, reverse=True
+ )
+
+ elif date_from and date_to:
+ logger.info(
+ "Selecting Hindawi snapshot files in date range %s to %s",
+ date_from,
+ date_to,
+ )
+ for key in all_snapshot_keys:
+ key_date = _extract_key_date(key)
+ if not key_date:
+ continue
+ if date_from <= key_date <= date_to:
+ selected_keys.append(key)
+
+ else:
+ raise ValueError(
+ "Invalid parameters: provide either date_from+date_to, file_keys, or dois"
+ )
+
+ logger.info("Selected %s Hindawi snapshot key(s)", len(selected_keys))
+ logger.info("Selected Hindawi snapshot keys: %s", selected_keys)
+
+ deduped_records = {}
+ found_dois = set()
+
+ for key in selected_keys:
+ snapshot_dt = _extract_key_datetime(key)
+ for record in split_xmls(repo, key):
+ identifier = _extract_doi_from_record(record)
+ if not identifier:
+ logger.warning(
+ "Skipping Hindawi record with missing DOI in file %s", key
+ )
+ continue
+
+ if target_dois and identifier not in target_dois:
+ continue
+
+ if target_dois:
+ found_dois.add(identifier)
+
+ existing = deduped_records.get(identifier)
+ if not existing or snapshot_dt > existing["snapshot_dt"]:
+ deduped_records[identifier] = {
+ "record": record,
+ "snapshot_dt": snapshot_dt,
+ }
+
+ records = [entry["record"] for entry in deduped_records.values()]
+
+ if target_dois:
+ missing_dois = sorted(target_dois - found_dois)
+ if missing_dois:
+ logger.warning("Some requested DOIs were not found: %s", missing_dois)
+
+ _enforce_limit(records, limit)
+
+ logger.info("Collected %s deduplicated Hindawi record(s)", len(records))
+ return records
+
+ @task()
+ def prepare_trigger_conf(records, **kwargs):
+ dry_run = bool(kwargs.get("params", {}).get("dry_run", False))
+ if dry_run:
+ logger.info(
+ "Dry run enabled. %s record(s) matched. No downstream runs will be triggered.",
+ len(records),
+ )
+ return []
+
+ confs = [{"record": record} for record in records]
+ logger.info("Prepared %s downstream trigger conf(s)", len(confs))
+ return confs
+
+ records = collect_records()
+ trigger_confs = prepare_trigger_conf(records)
+
+ TriggerDagRunOperator.partial(
+ task_id="hindawi_reharvest_trigger_file_processing",
+ trigger_dag_id="hindawi_file_processing",
+ reset_dag_run=True,
+ ).expand(conf=trigger_confs)
+
+
+hindawi_reharvest_dag = hindawi_reharvest()
diff --git a/dags/iop/iop_reharvest.py b/dags/iop/iop_reharvest.py
index 794523a2..ee45f2ce 100644
--- a/dags/iop/iop_reharvest.py
+++ b/dags/iop/iop_reharvest.py
@@ -157,7 +157,7 @@ def _enforce_limit(records, limit):
type=["array", "null"],
description=(
"List of DOIs to process. Can be used with date_from/date_to or "
- "together with file_keys to filter the resolved key set."
+ "together with file_keys to filter the resolved key set. The maximum date range is 1 Year."
),
title="DOIs",
),
diff --git a/dags/oup/oup_reharvest.py b/dags/oup/oup_reharvest.py
index 90b28652..e85b4b3c 100644
--- a/dags/oup/oup_reharvest.py
+++ b/dags/oup/oup_reharvest.py
@@ -185,8 +185,8 @@ def _enforce_limit(records, limit):
default=[],
type=["array", "null"],
description=(
- "List of DOIs to process. Can be used with date_from/date_to or together "
- "with file_keys to filter the resolved key set, as the date search is limited to naming patterns that include a date."
+ "List of DOIs to process. Can be used with date_from/date_to or "
+ "together with file_keys to filter the resolved key set. The maximum date range is 1 Year."
),
title="DOIs",
),
diff --git a/tests/units/aps/test_unit_aps_reharvest.py b/tests/units/aps/test_unit_aps_reharvest.py
index 6518bd92..b28f826b 100644
--- a/tests/units/aps/test_unit_aps_reharvest.py
+++ b/tests/units/aps/test_unit_aps_reharvest.py
@@ -212,22 +212,6 @@ def test_collect_articles_dois_without_dates_defaults_last_year(self, dag):
assert len(result) == 1
assert result[0]["identifiers"]["doi"] == "10.1103/found"
- def test_collect_articles_dois_range_too_large_raises(self, dag):
- task = dag.get_task("collect_articles")
- function_to_unit_test = task.python_callable
-
- mock_repo = self._build_repo({})
-
- with pytest.raises(ValueError, match="must not exceed one year"):
- function_to_unit_test(
- repo=mock_repo,
- params={
- "dois": ["10.1103/x"],
- "date_from": "2024-01-01",
- "date_to": "2026-01-01",
- },
- )
-
def test_prepare_trigger_conf_dry_run_and_normal(self, dag):
task = dag.get_task("prepare_trigger_conf")
function_to_unit_test = task.python_callable
diff --git a/tests/units/hindawi/test_unit_hindawi_reharvest.py b/tests/units/hindawi/test_unit_hindawi_reharvest.py
new file mode 100644
index 00000000..1641644c
--- /dev/null
+++ b/tests/units/hindawi/test_unit_hindawi_reharvest.py
@@ -0,0 +1,204 @@
+from io import BytesIO
+from unittest import mock
+
+import pytest
+from airflow.models import DagBag
+from freezegun import freeze_time
+
+
+class _S3Object:
+ def __init__(self, key):
+ self.key = key
+
+
+def _record_xml(doi):
+ return f"""
+
+
+
+
+ {doi}
+
+
+
+
+""".strip()
+
+
+def _snapshot_xml(*records):
+ records_xml = "".join(records)
+ return f"{records_xml}".encode()
+
+
+@pytest.fixture(scope="class")
+def dagbag():
+ return DagBag(dag_folder="dags/", include_examples=False)
+
+
+@pytest.fixture(scope="class")
+def dag(dagbag):
+ return dagbag.dags.get("hindawi_reharvest")
+
+
+class TestUnitHindawiReharvest:
+ def _build_repo(self, payload_by_key):
+ mock_repo = mock.MagicMock()
+ keys = list(payload_by_key.keys())
+ mock_repo.s3_bucket.objects.all.return_value = [_S3Object(key) for key in keys]
+
+ def _get_by_id(key):
+ return BytesIO(payload_by_key[key])
+
+ mock_repo.get_by_id.side_effect = _get_by_id
+ return mock_repo
+
+ def test_dag_structure(self, dag):
+ assert "hindawi_reharvest_trigger_file_processing" in dag.task_ids
+ task = dag.get_task("hindawi_reharvest_trigger_file_processing")
+ assert "MappedOperator" in str(type(task)) or task.is_mapped
+ assert task.partial_kwargs["trigger_dag_id"] == "hindawi_file_processing"
+ assert task.partial_kwargs["reset_dag_run"] is True
+
+ def test_collect_records_invalid_combination_file_keys_and_dois(self, dag):
+ task = dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ mock_repo = self._build_repo({})
+
+ with pytest.raises(ValueError, match="file_keys and dois"):
+ function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "file_keys": ["2022-09-30/2022-09-30T12:00.xml"],
+ "dois": ["10.1155/2022/1234567"],
+ },
+ )
+
+ def test_collect_records_date_range_dedup_keeps_newest(self, dag):
+ task = dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "2022-09-30/2022-09-30T12:00.xml": _snapshot_xml(
+ _record_xml("10.1155/2022/1111111"),
+ _record_xml("10.1155/2022/2222222"),
+ ),
+ "2022-09-30/2022-09-30T13:00.xml": _snapshot_xml(
+ _record_xml("10.1155/2022/1111111")
+ ),
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ result = function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "date_from": "2022-09-30",
+ "date_to": "2022-09-30",
+ },
+ )
+
+ assert len(result) == 2
+ doi_hits = sorted(["10.1155/2022/1111111" in record for record in result])
+ assert doi_hits == [False, True]
+ latest_key_calls = [call.args[0] for call in mock_repo.get_by_id.call_args_list]
+ assert "2022-09-30/2022-09-30T13:00.xml" in latest_key_calls
+
+ def test_collect_records_limit_exceeded_raises(self, dag):
+ task = dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "2022-09-30/2022-09-30T12:00.xml": _snapshot_xml(
+ _record_xml("10.1155/2022/1111111"),
+ _record_xml("10.1155/2022/2222222"),
+ )
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ with pytest.raises(ValueError, match="above limit"):
+ function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "date_from": "2022-09-30",
+ "date_to": "2022-09-30",
+ "limit": 1,
+ },
+ )
+
+ def test_collect_records_file_keys_glob_patterns(self, dag):
+ task = dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "2022-09-30/2022-09-30T12:00.xml": _snapshot_xml(
+ _record_xml("10.1155/2022/1111111")
+ ),
+ "2022-09-30/2022-09-30T13:00.xml": _snapshot_xml(
+ _record_xml("10.1155/2022/2222222")
+ ),
+ "2022-10-01/2022-10-01T12:00.xml": _snapshot_xml(
+ _record_xml("10.1155/2022/3333333")
+ ),
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ result = function_to_unit_test(
+ repo=mock_repo,
+ params={"file_keys": ["2022-09-30/*"]},
+ )
+
+ dois = sorted(
+ [
+ next(
+ part
+ for part in [record]
+ if "10.1155/2022/1111111" in part or "10.1155/2022/2222222" in part
+ )
+ for record in result
+ ]
+ )
+ assert len(dois) == 2
+ assert any("10.1155/2022/1111111" in record for record in result)
+ assert any("10.1155/2022/2222222" in record for record in result)
+
+ @freeze_time("2022-10-02")
+ def test_collect_records_dois_without_dates_defaults_last_year(self, dag):
+ task = dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "2020-01-01/2020-01-01T10:00.xml": _snapshot_xml(
+ _record_xml("10.1155/2020/9999999")
+ ),
+ "2022-09-30/2022-09-30T12:00.xml": _snapshot_xml(
+ _record_xml("10.1155/2022/1234567")
+ ),
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ result = function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "dois": ["10.1155/2022/1234567"],
+ },
+ )
+
+ assert len(result) == 1
+ assert "10.1155/2022/1234567" in result[0]
+
+ def test_prepare_trigger_conf_dry_run_and_normal(self, dag):
+ task = dag.get_task("prepare_trigger_conf")
+ function_to_unit_test = task.python_callable
+
+ records = [_record_xml("10.1155/2022/1234567")]
+
+ dry_run_result = function_to_unit_test(
+ records=records, params={"dry_run": True}
+ )
+ assert dry_run_result == []
+
+ normal_result = function_to_unit_test(
+ records=records, params={"dry_run": False}
+ )
+ assert len(normal_result) == 1
+ assert normal_result[0]["record"] == records[0]