diff --git a/dags/elsevier/elsevier_reharvest.py b/dags/elsevier/elsevier_reharvest.py
new file mode 100644
index 00000000..343e48bf
--- /dev/null
+++ b/dags/elsevier/elsevier_reharvest.py
@@ -0,0 +1,437 @@
+import fnmatch
+import logging
+import os
+from datetime import date, datetime, timedelta
+from io import BytesIO
+
+import pendulum
+from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
+from airflow.sdk import Param, dag, task
+from common.notification_service import FailedDagNotifier
+from common.utils import parse_without_names_spaces
+from elsevier.repository import ElsevierRepository
+from elsevier.trigger_file_processing import trigger_file_processing_elsevier
+
+logger = logging.getLogger("airflow.task")
+ELSEVIER_REPO = ElsevierRepository()
+MAX_XML_SCAN_FILES = 10_000
+
+
+def _parse_date(value):
+ if not value:
+ return None
+ return datetime.strptime(value, "%Y-%m-%d").date()
+
+
+def _normalize_dois(dois):
+ if not dois:
+ return set()
+ return {doi.strip().lower() for doi in dois if isinstance(doi, str) and doi.strip()}
+
+
+def _normalize_file_keys(file_keys):
+ return [key.strip() for key in file_keys if isinstance(key, str) and key.strip()]
+
+
+def _is_glob_pattern(value):
+ return any(char in value for char in ("*", "?", "["))
+
+
+def _is_elsevier_main_xml_key(key, repo):
+ return key.startswith(repo.EXTRACTED_DIR) and os.path.basename(key) == "main.xml"
+
+
+def _extract_doi_from_xml(xml_bytes):
+ try:
+ xml = parse_without_names_spaces(
+ BytesIO(xml_bytes) if isinstance(xml_bytes, bytes) else xml_bytes
+ )
+ doi_element = xml.find("item-info/doi")
+ if doi_element is not None and doi_element.text:
+ return doi_element.text.strip().lower()
+ except Exception:
+ pass
+ return None
+
+
+def _extract_received_date_from_xml(xml_bytes):
+ try:
+ xml = parse_without_names_spaces(
+ BytesIO(xml_bytes) if isinstance(xml_bytes, bytes) else xml_bytes
+ )
+ date_element = xml.find("head/date-received")
+ if date_element is None:
+ return None
+
+ day_text = date_element.get("day")
+ month_text = date_element.get("month")
+ year_text = date_element.get("year")
+ if not (day_text and month_text and year_text):
+ return None
+
+ return date(int(year_text), int(month_text), int(day_text))
+ except Exception:
+ return None
+
+
+def _enforce_limit(records, limit):
+ if limit is None:
+ return
+ if len(records) > limit:
+ raise ValueError(
+ f"Matched {len(records)} records, which is above limit={limit}. "
+ "Please reduce the date range or increase the limit."
+ )
+
+
+def _log_if_xml_scan_exceeds_threshold(total_files, scan_reason):
+ if total_files > MAX_XML_SCAN_FILES:
+ logger.error(
+ "XML scan threshold exceeded for %s: %s files to parse (> %s). "
+ "Consider narrowing filters.",
+ scan_reason,
+ total_files,
+ MAX_XML_SCAN_FILES,
+ )
+ raise ValueError(
+ f"XML scan threshold exceeded for {scan_reason}: {total_files} files to parse (> {MAX_XML_SCAN_FILES}). Consider narrowing filters."
+ )
+
+
+def _resolve_file_keys(file_keys, all_xml_keys, repo):
+ """Resolve a mix of exact S3 keys and glob patterns.
+
+ Patterns are matched against the path after the leading repo.EXTRACTED_DIR prefix,
+ so '*' matches every Elsevier main.xml key under that extracted folder.
+ """
+ all_xml_keys_set = set(all_xml_keys)
+ prefix = repo.EXTRACTED_DIR
+
+ exact_keys = [key for key in file_keys if not _is_glob_pattern(key)]
+ glob_patterns = [key for key in file_keys if _is_glob_pattern(key)]
+
+ missing = [key for key in exact_keys if key not in all_xml_keys_set]
+ if missing:
+ raise ValueError(f"Some requested file_keys do not exist: {missing}")
+
+ resolved = list(exact_keys)
+
+ for pattern in glob_patterns:
+ matched = [
+ key
+ for key in all_xml_keys
+ if fnmatch.fnmatch(
+ key[len(prefix) :] if key.startswith(prefix) else key,
+ pattern,
+ )
+ ]
+ if not matched:
+ logger.warning("Pattern %r matched no Elsevier main.xml keys", pattern)
+ resolved.extend(matched)
+
+ seen = set()
+ deduped = []
+ for key in resolved:
+ if key not in seen:
+ seen.add(key)
+ deduped.append(key)
+ return deduped
+
+
+def _find_dataset_key_for_xml_key(xml_key, dataset_keys, repo):
+ relative_key = (
+ xml_key[len(repo.EXTRACTED_DIR) :]
+ if xml_key.startswith(repo.EXTRACTED_DIR)
+ else xml_key
+ )
+ parts = relative_key.split("/")
+
+ for depth in range(len(parts) - 1, 0, -1):
+ candidate = f"{repo.EXTRACTED_DIR}{'/'.join(parts[:depth])}/dataset.xml"
+ if candidate in dataset_keys:
+ return candidate
+
+ return None
+
+
+@dag(
+ on_failure_callback=FailedDagNotifier(),
+ start_date=pendulum.today("UTC").add(days=-1),
+ schedule=None,
+ catchup=False,
+ tags=["reharvest", "elsevier"],
+ params={
+ "date_from": Param(
+ default=None,
+ type=["string", "null"],
+ description="Start ce:date-received (from XML) in YYYY-MM-DD format",
+ title="Date from",
+ ),
+ "date_to": Param(
+ default=None,
+ type=["string", "null"],
+ description="End ce:date-received (from XML) in YYYY-MM-DD format",
+ title="Date to",
+ ),
+ "file_keys": Param(
+ default=[],
+ type=["array", "null"],
+ description=(
+ "Exact Elsevier extracted main.xml keys or glob patterns matched relative "
+ f"to '{ELSEVIER_REPO.EXTRACTED_DIR}'. Examples: '*' or "
+ "'CERNAB00000010772A/*/*/*/main.xml'."
+ ),
+ title="File keys",
+ ),
+ "dois": Param(
+ default=[],
+ type=["array", "null"],
+ description=(
+ "List of DOIs to process. Can be used with date_from/date_to or together "
+ "with file_keys to filter the resolved key set. The maximum date range is 1 Year."
+ ),
+ title="DOIs",
+ ),
+ "limit": Param(
+ 1000,
+ type=["integer"],
+ description="Maximum number of records to process",
+ title="Limit",
+ ),
+ "dry_run": Param(
+ default=True,
+ type=["boolean", "null"],
+ description="Whether to perform a dry run. If true, no downstream DAGs will be triggered",
+ title="Dry run",
+ ),
+ },
+)
+def elsevier_reharvest():
+ @task()
+ def collect_records(repo=ELSEVIER_REPO, **kwargs):
+ params = kwargs.get("params", {})
+
+ date_from = _parse_date(params.get("date_from"))
+ date_to = _parse_date(params.get("date_to"))
+ file_keys = _normalize_file_keys(params.get("file_keys") or [])
+ target_dois = _normalize_dois(params.get("dois") or [])
+ limit = params.get("limit")
+
+ if limit is not None and (not isinstance(limit, int) or limit <= 0):
+ raise ValueError("limit must be a positive integer when provided")
+
+ if bool(file_keys) and (date_from or date_to):
+ raise ValueError(
+ "Invalid parameters: date_from/date_to cannot be used together with file_keys"
+ )
+
+ if (date_from and not date_to) or (date_to and not date_from):
+ raise ValueError("Both date_from and date_to must be provided together")
+
+ all_xml_keys = [
+ obj.key
+ for obj in repo.s3.objects.filter(Prefix=repo.EXTRACTED_DIR).all()
+ if _is_elsevier_main_xml_key(obj.key, repo)
+ ]
+ logger.info(
+ "Found %s Elsevier extracted main.xml file(s) in %s",
+ len(all_xml_keys),
+ repo.EXTRACTED_DIR,
+ )
+
+ xml_bytes_cache = {}
+
+ def _get_xml_bytes(key):
+ if key not in xml_bytes_cache:
+ xml_bytes_cache[key] = repo.get_by_id(key).getvalue()
+ return xml_bytes_cache[key]
+
+ def _sort_key(key):
+ record_date = _extract_received_date_from_xml(_get_xml_bytes(key))
+ return (record_date or date.min, key)
+
+ if file_keys:
+ resolved_keys = _resolve_file_keys(file_keys, all_xml_keys, repo)
+
+ if target_dois:
+ _log_if_xml_scan_exceeds_threshold(
+ len(resolved_keys),
+ "date extraction for sorting file_keys before DOI filtering",
+ )
+ resolved_keys = sorted(resolved_keys, key=_sort_key, reverse=True)
+ _log_if_xml_scan_exceeds_threshold(
+ len(resolved_keys),
+ "DOI extraction from file_keys candidates",
+ )
+ found_dois = set()
+ deduped_by_doi = {}
+
+ for key in resolved_keys:
+ doi = _extract_doi_from_xml(_get_xml_bytes(key))
+ if not doi:
+ logger.warning("Could not extract DOI from file: %s", key)
+ continue
+
+ if doi not in target_dois:
+ continue
+
+ found_dois.add(doi)
+ if doi not in deduped_by_doi:
+ deduped_by_doi[doi] = key
+
+ missing_dois = sorted(target_dois - found_dois)
+ if missing_dois:
+ logger.warning(
+ "Some requested DOIs were not found: %s", missing_dois
+ )
+
+ selected_keys = list(deduped_by_doi.values())
+ else:
+ selected_keys = resolved_keys
+
+ _enforce_limit(selected_keys, limit)
+ logger.info(
+ "Selected %s Elsevier main.xml key(s) from file_keys (with glob expansion)",
+ len(selected_keys),
+ )
+ return selected_keys
+
+ if target_dois and not (date_from and date_to):
+ date_to = date.today()
+ date_from = date_to - timedelta(days=365)
+
+ if not (date_from and date_to):
+ raise ValueError(
+ "Invalid parameters: provide either date_from+date_to, file_keys, or dois"
+ )
+
+ if target_dois and (date_to - date_from).days > 366:
+ raise ValueError(
+ "For DOI search the date range must not exceed one year. "
+ "Please use a smaller range."
+ )
+
+ logger.info(
+ "Selecting Elsevier main.xml files by XML ce:date-received range %s to %s",
+ date_from,
+ date_to,
+ )
+
+ selected_records = []
+ _log_if_xml_scan_exceeds_threshold(
+ len(all_xml_keys),
+ "ce:date-received extraction from all main.xml candidates",
+ )
+ for key in all_xml_keys:
+ record_date = _extract_received_date_from_xml(_get_xml_bytes(key))
+ if record_date is None:
+ continue
+ if date_from <= record_date <= date_to:
+ selected_records.append({"key": key, "record_date": record_date})
+
+ selected_records.sort(
+ key=lambda record: (record["record_date"], record["key"]),
+ reverse=True,
+ )
+
+ logger.info(
+ "Found %s Elsevier main.xml file(s) in the requested date range",
+ len(selected_records),
+ )
+
+ if not target_dois:
+ selected_keys = [record["key"] for record in selected_records]
+ _enforce_limit(selected_keys, limit)
+ logger.info("Selected %s Elsevier main.xml key(s)", len(selected_keys))
+ return selected_keys
+
+ found_dois = set()
+ deduped_by_doi = {}
+
+ _log_if_xml_scan_exceeds_threshold(
+ len(selected_records),
+ "DOI extraction after date-range filtering",
+ )
+
+ for record in selected_records:
+ key = record["key"]
+ doi = _extract_doi_from_xml(_get_xml_bytes(key))
+
+ if not doi:
+ logger.warning("Could not extract DOI from file: %s", key)
+ continue
+
+ if doi not in target_dois:
+ continue
+
+ found_dois.add(doi)
+ if doi not in deduped_by_doi:
+ deduped_by_doi[doi] = key
+
+ missing_dois = sorted(target_dois - found_dois)
+ if missing_dois:
+ logger.warning("Some requested DOIs were not found: %s", missing_dois)
+
+ selected_keys = list(deduped_by_doi.values())
+ _enforce_limit(selected_keys, limit)
+
+ logger.info(
+ "Selected %s Elsevier main.xml key(s) after DOI filtering",
+ len(selected_keys),
+ )
+ return selected_keys
+
+ @task()
+ def prepare_trigger_conf(file_keys, repo=ELSEVIER_REPO, **kwargs):
+ dry_run = bool(kwargs.get("params", {}).get("dry_run", False))
+ if dry_run:
+ logger.info(
+ "Dry run enabled. %s record(s) matched. No downstream runs will be triggered.",
+ len(file_keys),
+ )
+ return []
+
+ dataset_keys = {
+ obj.key
+ for obj in repo.s3.objects.filter(Prefix=repo.EXTRACTED_DIR).all()
+ if os.path.basename(obj.key) == "dataset.xml"
+ }
+ requested_file_keys = set(file_keys)
+ resolved_dataset_keys = []
+
+ for key in file_keys:
+ dataset_key = _find_dataset_key_for_xml_key(key, dataset_keys, repo)
+ if not dataset_key:
+ raise ValueError(f"Could not find sibling dataset.xml for file: {key}")
+ if dataset_key not in resolved_dataset_keys:
+ resolved_dataset_keys.append(dataset_key)
+
+ all_confs = trigger_file_processing_elsevier(
+ publisher="elsevier",
+ repo=repo,
+ logger=logger,
+ filenames=resolved_dataset_keys,
+ )
+ confs = [conf for conf in all_confs if conf["file_name"] in requested_file_keys]
+
+ prepared_file_keys = {conf["file_name"] for conf in confs}
+ missing_file_keys = sorted(requested_file_keys - prepared_file_keys)
+ if missing_file_keys:
+ raise ValueError(
+ f"Could not build Elsevier metadata for file(s): {missing_file_keys}"
+ )
+
+ logger.info("Prepared %s downstream trigger conf(s)", len(confs))
+ return confs
+
+ file_keys = collect_records()
+ trigger_confs = prepare_trigger_conf(file_keys)
+
+ TriggerDagRunOperator.partial(
+ task_id="elsevier_reharvest_trigger_file_processing",
+ trigger_dag_id="elsevier_process_file",
+ reset_dag_run=True,
+ ).expand(conf=trigger_confs)
+
+
+elsevier_reharvest_dag = elsevier_reharvest()
diff --git a/dags/elsevier/repository.py b/dags/elsevier/repository.py
index 89f77015..3c4aa159 100644
--- a/dags/elsevier/repository.py
+++ b/dags/elsevier/repository.py
@@ -21,7 +21,7 @@ def __init__(self):
def get_all_raw_filenames(self):
return [
- f.key.removeprefix("raw/")
+ f.key.removeprefix(self.ZIPED_DIR)
for f in self.s3.objects.filter(Prefix=self.ZIPED_DIR).all()
]
diff --git a/dags/jagiellonian/jagiellonian_process_file.py b/dags/jagiellonian/jagiellonian_process_file.py
index 8b53720d..cc3630a6 100644
--- a/dags/jagiellonian/jagiellonian_process_file.py
+++ b/dags/jagiellonian/jagiellonian_process_file.py
@@ -2,9 +2,9 @@
import logging
import os
from datetime import UTC, datetime
+from io import BytesIO
import pendulum
-from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator
from airflow.sdk import dag, task
from common.enhancer import Enhancer
@@ -12,8 +12,10 @@
from common.notification_service import FailedDagNotifier
from common.utils import create_or_update_article
from jagiellonian.parser import JagiellonianParser
+from jagiellonian.repository import JagiellonianRepository
logger = logging.getLogger("airflow.task")
+JAGIELLONIAN_REPO = JagiellonianRepository()
FILE_EXTENSIONS = {"pdf": ".pdf", "xml": ".xml", "pdfa": ".pdf"}
@@ -39,6 +41,16 @@ def jagiellonian_process_file():
def parse(**kwargs):
if "params" in kwargs and "article" in kwargs["params"]:
article = kwargs["params"]["article"]
+
+ if (
+ isinstance(article, dict)
+ and isinstance(article.get("dois"), list)
+ and article.get("dois")
+ and isinstance(article["dois"][0], dict)
+ and article["dois"][0].get("value")
+ ):
+ return article
+
parsed = JagiellonianParser().parse(article)
if "dois" not in parsed:
@@ -105,21 +117,11 @@ def enrich(enhanced_file):
return Enricher()(enhanced_file)
@task(task_id="jagiellonian-save-to-s3")
- def save_to_s3(enriched_file):
- s3_bucket = os.getenv("JAGIELLONIAN_BUCKET_NAME", "jagiellonian")
- aws_conn_id = os.getenv("AWS_CONN_ID", "aws_s3_minio")
-
+ def save_to_s3(enriched_file, repo=JAGIELLONIAN_REPO):
doi = enriched_file["dois"][0]["value"]
key = f"{doi}_metadata_{(datetime.now(UTC))}.json"
- s3_hook = S3Hook(aws_conn_id=aws_conn_id)
-
- s3_hook.load_string(
- string_data=json.dumps(enriched_file, indent=2),
- key=key,
- bucket_name=s3_bucket,
- replace=True,
- )
+ repo.save(key=key, obj=BytesIO(json.dumps(enriched_file, indent=2).encode()))
@task()
def create_or_update(enriched_file):
diff --git a/dags/jagiellonian/jagiellonian_pull_api.py b/dags/jagiellonian/jagiellonian_pull_api.py
index 88c8f598..35067ed6 100644
--- a/dags/jagiellonian/jagiellonian_pull_api.py
+++ b/dags/jagiellonian/jagiellonian_pull_api.py
@@ -3,33 +3,14 @@
from datetime import timedelta
import pendulum
-from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sdk import dag, task
from common.notification_service import FailedDagNotifier
+from jagiellonian.repository import JagiellonianRepository
logger = logging.getLogger("airflow.task")
-
-
-def get_latest_s3_file():
- s3_bucket = os.getenv("JAGIELLONIAN_BUCKET_NAME", "jagiellonian")
- aws_conn_id = os.getenv("AWS_CONN_ID", "aws_s3_minio")
-
- s3_hook = S3Hook(aws_conn_id=aws_conn_id)
- objects = s3_hook.list_keys(bucket_name=s3_bucket)
- files = [obj for obj in objects if not obj.endswith("/")]
- if not files:
- return None
-
- file_timestamps = []
- for file_key in files:
- object_info = s3_hook.get_key(key=file_key, bucket_name=s3_bucket)
- file_timestamps.append((file_key, object_info.last_modified))
-
- file_timestamps.sort(key=lambda x: x[1], reverse=True)
- latest_timestamp = file_timestamps[0][1].strftime("%Y-%m-%d")
- return latest_timestamp
+JAGIELLONIAN_REPO = JagiellonianRepository()
default_args = {
@@ -53,7 +34,7 @@ def get_latest_s3_file():
)
def jagiellonian_pull_api(from_date=None):
@task(task_id="jagiellonian_fetch_crossref_api")
- def fetch_crossref_api(from_date_param=None):
+ def fetch_crossref_api(from_date_param=None, repo=JAGIELLONIAN_REPO):
http_conn_id = os.getenv("HTTP_CONN_ID", "crossref_api")
endpoint_filter = ""
@@ -61,7 +42,7 @@ def fetch_crossref_api(from_date_param=None):
logger.info("Using provided from_date: %s", from_date_param)
endpoint_filter = f"from-created-date:{from_date_param}"
else:
- latest_s3_file = get_latest_s3_file()
+ latest_s3_file = repo.find_the_last_uploaded_file_date()
if latest_s3_file:
logger.info("Using latest S3 file date: %s", latest_s3_file)
endpoint_filter = f"from-created-date:{latest_s3_file}"
diff --git a/dags/jagiellonian/jagiellonian_reharvest.py b/dags/jagiellonian/jagiellonian_reharvest.py
new file mode 100644
index 00000000..c4effd0b
--- /dev/null
+++ b/dags/jagiellonian/jagiellonian_reharvest.py
@@ -0,0 +1,311 @@
+import fnmatch
+import json
+import logging
+import re
+from datetime import date, datetime, timedelta
+
+import pendulum
+from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
+from airflow.sdk import Param, dag, task
+from common.notification_service import FailedDagNotifier
+from jagiellonian.repository import JagiellonianRepository
+
+logger = logging.getLogger("airflow.task")
+JAGIELLONIAN_REPO = JagiellonianRepository()
+
+_FILENAME_DATE_PATTERN = re.compile(r"_metadata_(\d{4}-\d{2}-\d{2})")
+_FILENAME_DATETIME_PATTERN = re.compile(
+ r"_metadata_(\d{4}-\d{2}-\d{2})[ T](\d{2})[:_](\d{2})[:_](\d{2})"
+)
+
+
+def _parse_date(value):
+ if not value:
+ return None
+ return datetime.strptime(value, "%Y-%m-%d").date()
+
+
+def _normalize_dois(dois):
+ if not dois:
+ return set()
+ return {doi.strip().lower() for doi in dois if isinstance(doi, str) and doi.strip()}
+
+
+def _normalize_file_keys(file_keys):
+ return [k.strip() for k in file_keys if isinstance(k, str) and k.strip()]
+
+
+def _extract_date_from_key(key):
+ if not key:
+ return None
+
+ match = _FILENAME_DATE_PATTERN.search(key)
+ if not match:
+ return None
+
+ try:
+ return datetime.strptime(match.group(1), "%Y-%m-%d").date()
+ except ValueError:
+ return None
+
+
+def _extract_datetime_from_key(key):
+ if not key:
+ return datetime.min
+
+ match = _FILENAME_DATETIME_PATTERN.search(key)
+ if not match:
+ return datetime.min
+
+ try:
+ return datetime.strptime(
+ f"{match.group(1)} {match.group(2)}:{match.group(3)}:{match.group(4)}",
+ "%Y-%m-%d %H:%M:%S",
+ )
+ except ValueError:
+ return datetime.min
+
+
+def _extract_doi_from_json(article_json):
+ if not isinstance(article_json, dict):
+ return None
+
+ doi_entries = article_json.get("dois") or []
+ if isinstance(doi_entries, list):
+ for doi_entry in doi_entries:
+ if isinstance(doi_entry, dict):
+ value = doi_entry.get("value")
+ if isinstance(value, str) and value.strip():
+ return value.strip().lower()
+ elif isinstance(doi_entry, str) and doi_entry.strip():
+ return doi_entry.strip().lower()
+
+ raw_doi = article_json.get("DOI")
+ if isinstance(raw_doi, str) and raw_doi.strip():
+ return raw_doi.strip().lower()
+
+ return None
+
+
+def _is_glob_pattern(value):
+ return any(c in value for c in ("*", "?", "["))
+
+
+def _resolve_file_keys(file_keys, all_json_keys):
+ all_json_keys_set = set(all_json_keys)
+
+ exact_keys = [k for k in file_keys if not _is_glob_pattern(k)]
+ glob_patterns = [k for k in file_keys if _is_glob_pattern(k)]
+
+ missing = [k for k in exact_keys if k not in all_json_keys_set]
+ if missing:
+ raise ValueError(f"Some requested file_keys do not exist: {missing}")
+
+ resolved = list(exact_keys)
+
+ for pattern in glob_patterns:
+ matched = [key for key in all_json_keys if fnmatch.fnmatch(key, pattern)]
+ if not matched:
+ logger.warning(
+ "Pattern %r matched no Jagiellonian metadata JSON keys", pattern
+ )
+ resolved.extend(matched)
+
+ seen = set()
+ deduped = []
+ for key in resolved:
+ if key not in seen:
+ seen.add(key)
+ deduped.append(key)
+ return deduped
+
+
+def _enforce_limit(records, limit):
+ if limit is None:
+ return
+ if len(records) > limit:
+ raise ValueError(
+ f"Matched {len(records)} records, which is above limit={limit}. "
+ "Please reduce the date range or increase the limit."
+ )
+
+
+@dag(
+ on_failure_callback=FailedDagNotifier(),
+ start_date=pendulum.today("UTC").add(days=-1),
+ schedule=None,
+ catchup=False,
+ tags=["reharvest", "jagiellonian"],
+ params={
+ "date_from": Param(
+ default=None,
+ type=["string", "null"],
+ description="Start date in YYYY-MM-DD format (from metadata filename)",
+ title="Date from",
+ ),
+ "date_to": Param(
+ default=None,
+ type=["string", "null"],
+ description="End date in YYYY-MM-DD format (from metadata filename)",
+ title="Date to",
+ ),
+ "file_keys": Param(
+ default=[],
+ type=["array", "null"],
+ description=(
+ "Exact Jagiellonian metadata JSON keys or glob patterns. "
+ "Example: '*' or '10.5506/*_metadata_2026-04-09*.json'."
+ ),
+ title="File keys",
+ ),
+ "dois": Param(
+ default=[],
+ type=["array", "null"],
+ description=(
+ "List of DOIs to process. DOI is extracted from metadata JSON at "
+ "dois[].value. If date_from/date_to are not provided, this will search "
+ "only records from the last 3 years."
+ ),
+ title="DOIs",
+ ),
+ "limit": Param(
+ 1000,
+ type=["integer"],
+ description="Maximum number of records to process",
+ title="Limit",
+ ),
+ "dry_run": Param(
+ default=True,
+ type=["boolean", "null"],
+ description="Whether to perform a dry run. If true, no downstream DAGs will be triggered",
+ title="Dry run",
+ ),
+ },
+)
+def jagiellonian_reharvest():
+ @task()
+ def collect_records(repo=JAGIELLONIAN_REPO, **kwargs):
+ params = kwargs.get("params", {})
+
+ date_from = _parse_date(params.get("date_from"))
+ date_to = _parse_date(params.get("date_to"))
+ file_keys = _normalize_file_keys(params.get("file_keys") or [])
+ dois = _normalize_dois(params.get("dois") or [])
+ target_dois = set(dois)
+ limit = params.get("limit")
+
+ if limit is not None and (not isinstance(limit, int) or limit <= 0):
+ raise ValueError("limit must be a positive integer when provided")
+
+ if bool(file_keys) and (date_from or date_to):
+ raise ValueError(
+ "Invalid parameters: date_from/date_to cannot be used together with file_keys"
+ )
+
+ if (date_from and not date_to) or (date_to and not date_from):
+ raise ValueError("Both date_from and date_to must be provided together")
+
+ all_json_keys = [
+ obj.key for obj in repo.s3_bucket.objects.all() if obj.key.endswith(".json")
+ ]
+
+ logger.info(
+ "Found %s Jagiellonian metadata JSON file(s) in storage", len(all_json_keys)
+ )
+
+ if file_keys:
+ resolved_keys = _resolve_file_keys(file_keys, all_json_keys)
+ else:
+ if target_dois and not (date_from and date_to):
+ date_to = date.today()
+ date_from = date_to - timedelta(days=365 * 3)
+
+ if not (date_from and date_to):
+ raise ValueError(
+ "Invalid parameters: provide either date_from+date_to, file_keys, or dois"
+ )
+
+ logger.info(
+ "Selecting Jagiellonian metadata JSON in date range %s to %s",
+ date_from,
+ date_to,
+ )
+
+ resolved_keys = []
+ for key in all_json_keys:
+ key_date = _extract_date_from_key(key)
+ if key_date is None:
+ continue
+ if date_from <= key_date <= date_to:
+ resolved_keys.append(key)
+
+ resolved_keys = sorted(
+ resolved_keys, key=_extract_datetime_from_key, reverse=True
+ )
+
+ deduped_by_doi = {}
+ keys_without_doi = []
+ found_dois = set()
+
+ for key in resolved_keys:
+ article_json = json.loads(repo.get_by_id(key).getvalue().decode("utf-8"))
+ doi = _extract_doi_from_json(article_json)
+
+ if not doi:
+ logger.warning("Could not extract DOI from metadata JSON: %s", key)
+ if not target_dois:
+ keys_without_doi.append({"key": key, "article": article_json})
+ continue
+
+ if target_dois and doi not in target_dois:
+ continue
+
+ if target_dois:
+ found_dois.add(doi)
+
+ if doi not in deduped_by_doi:
+ deduped_by_doi[doi] = {"key": key, "article": article_json}
+
+ if target_dois:
+ missing_dois = sorted(target_dois - found_dois)
+ if missing_dois:
+ logger.warning("Some requested DOIs were not found: %s", missing_dois)
+
+ selected_records = list(deduped_by_doi.values()) + keys_without_doi
+ _enforce_limit(selected_records, limit)
+
+ logger.info(
+ "Collected %s deduplicated Jagiellonian record(s)", len(selected_records)
+ )
+ logger.info(
+ "Selected Jagiellonian metadata keys: %s",
+ [r["key"] for r in selected_records],
+ )
+
+ return [record["article"] for record in selected_records]
+
+ @task()
+ def prepare_trigger_conf(records, **kwargs):
+ dry_run = bool(kwargs.get("params", {}).get("dry_run", False))
+ if dry_run:
+ logger.info(
+ "Dry run enabled. %s record(s) matched. No downstream runs will be triggered.",
+ len(records),
+ )
+ return []
+
+ confs = [{"article": record} for record in records]
+ logger.info("Prepared %s downstream trigger conf(s)", len(confs))
+ return confs
+
+ records = collect_records()
+ trigger_confs = prepare_trigger_conf(records)
+
+ TriggerDagRunOperator.partial(
+ task_id="jagiellonian_reharvest_trigger_file_processing",
+ trigger_dag_id="jagiellonian_process_file",
+ reset_dag_run=True,
+ ).expand(conf=trigger_confs)
+
+
+jagiellonian_reharvest_dag = jagiellonian_reharvest()
diff --git a/dags/jagiellonian/repository.py b/dags/jagiellonian/repository.py
new file mode 100644
index 00000000..398ebd1a
--- /dev/null
+++ b/dags/jagiellonian/repository.py
@@ -0,0 +1,38 @@
+import io
+import os
+
+from common.repository import IRepository
+from common.s3_service import S3Service
+
+
+class JagiellonianRepository(IRepository):
+ def __init__(self) -> None:
+ super().__init__()
+ self.s3_bucket = S3Service(
+ os.getenv("JAGIELLONIAN_BUCKET_NAME", "jagiellonian")
+ )
+
+ def find_all(self):
+ files = []
+ for obj in self.s3_bucket.objects.all():
+ file_name = os.path.basename(obj.key)
+ files.append(file_name)
+ return files
+
+ def get_by_id(self, id):
+ retfile = io.BytesIO()
+ self.s3_bucket.download_fileobj(id, retfile)
+ return retfile
+
+ def find_the_last_uploaded_file_date(self):
+ objects = list(self.s3_bucket.objects.all())
+ if not objects:
+ return
+ dates = [obj.last_modified.strftime("%Y-%m-%d") for obj in objects]
+ return max(dates)
+
+ def save(self, key, obj):
+ self.s3_bucket.upload_fileobj(obj, key)
+
+ def delete_all(self):
+ self.s3_bucket.objects.all().delete()
diff --git a/dags/springer/repository.py b/dags/springer/repository.py
index 925e560e..47496443 100644
--- a/dags/springer/repository.py
+++ b/dags/springer/repository.py
@@ -19,28 +19,27 @@ def __init__(self) -> None:
def get_all_raw_filenames(self):
return [
- f.key.removeprefix("raw/")
+ f.key.removeprefix(self.ZIPED_DIR)
for f in self.s3.objects.filter(Prefix=self.ZIPED_DIR).all()
]
def find_all(self, filenames_to_process=None):
- ret_dict = {}
- filenames = []
+ grouped_files = {}
filenames = (
filenames_to_process
if filenames_to_process
else self.__find_all_extracted_files()
)
+ if not filenames:
+ return []
for file in filenames:
- file_parts = file.split("/")
- last_part = file_parts[-1]
+ last_part = os.path.basename(file)
filename_without_extension = last_part.split(".")[0]
- if filename_without_extension not in ret_dict:
- ret_dict[filename_without_extension] = dict()
- ret_dict[filename_without_extension][
- "xml" if self.is_meta(last_part) else "pdf"
- ] = file
- return list(ret_dict.values())
+ if filename_without_extension not in grouped_files:
+ grouped_files[filename_without_extension] = {}
+ extension = "xml" if self.is_meta(last_part) else "pdf"
+ grouped_files[filename_without_extension][extension] = file
+ return list(grouped_files.values())
def get_by_id(self, id):
retfile = BytesIO()
diff --git a/dags/springer/springer_reharvest.py b/dags/springer/springer_reharvest.py
new file mode 100644
index 00000000..610e1c2c
--- /dev/null
+++ b/dags/springer/springer_reharvest.py
@@ -0,0 +1,357 @@
+import base64
+import fnmatch
+import logging
+import re
+from datetime import date, datetime, timedelta
+
+import pendulum
+from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
+from airflow.sdk import Param, dag, task
+from common.notification_service import FailedDagNotifier
+from common.utils import parse_without_names_spaces
+from springer.repository import SpringerRepository
+
+logger = logging.getLogger("airflow.task")
+SPRINGER_REPO = SpringerRepository()
+_ARCHIVE_DT_PATTERN = re.compile(r"ftp_PUB_(\d{2}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})")
+
+
+def _parse_date(value):
+ if not value:
+ return None
+ return datetime.strptime(value, "%Y-%m-%d").date()
+
+
+def _normalize_dois(dois):
+ if not dois:
+ return set()
+ return {doi.strip().lower() for doi in dois if isinstance(doi, str) and doi.strip()}
+
+
+def _normalize_file_keys(file_keys):
+ return [k.strip() for k in file_keys if isinstance(k, str) and k.strip()]
+
+
+def _is_glob_pattern(value):
+ return any(c in value for c in ("*", "?", "["))
+
+
+def _extract_archive_datetime_from_key(key):
+ if not key:
+ return datetime.min
+
+ match = _ARCHIVE_DT_PATTERN.search(key)
+ if not match:
+ return datetime.min
+
+ try:
+ return datetime.strptime(
+ f"{match.group(1)}_{match.group(2)}", "%y-%m-%d_%H-%M-%S"
+ )
+ except ValueError:
+ return datetime.min
+
+
+def _extract_archive_date_from_key(key):
+ dt_value = _extract_archive_datetime_from_key(key)
+ if dt_value == datetime.min:
+ return None
+ return dt_value.date()
+
+
+def _extract_doi_from_xml(xml_bytes):
+ try:
+ if isinstance(xml_bytes, bytes):
+ xml_text = xml_bytes.decode("utf-8")
+ else:
+ xml_text = xml_bytes
+ xml = parse_without_names_spaces(xml_text)
+ doi_element = xml.find("./Journal/Volume/Issue/Article/ArticleInfo/ArticleDOI")
+ if doi_element is not None and doi_element.text:
+ return doi_element.text.strip().lower()
+ except Exception:
+ pass
+ return None
+
+
+def _resolve_file_keys(file_keys, all_xml_keys):
+ all_xml_keys_set = set(all_xml_keys)
+ prefix = SPRINGER_REPO.EXTRACTED_DIR
+
+ exact_keys = [k for k in file_keys if not _is_glob_pattern(k)]
+ glob_patterns = [k for k in file_keys if _is_glob_pattern(k)]
+
+ missing = [k for k in exact_keys if k not in all_xml_keys_set]
+ if missing:
+ raise ValueError(f"Some requested file_keys do not exist: {missing}")
+
+ resolved = list(exact_keys)
+
+ for pattern in glob_patterns:
+ matched = [
+ key
+ for key in all_xml_keys
+ if fnmatch.fnmatch(
+ key[len(prefix) :] if key.startswith(prefix) else key, pattern
+ )
+ ]
+ if not matched:
+ logger.warning("Pattern %r matched no Springer extracted XML keys", pattern)
+ resolved.extend(matched)
+
+ seen = set()
+ deduped = []
+ for key in resolved:
+ if key not in seen:
+ seen.add(key)
+ deduped.append(key)
+ return deduped
+
+
+def _filter_keys_by_dois(keys, target_dois, repo):
+ deduped = {}
+ found_dois = set()
+
+ for key in keys:
+ file_obj = repo.get_by_id(key)
+ doi = _extract_doi_from_xml(file_obj.getvalue())
+
+ if not doi:
+ logger.warning("Could not extract DOI from file: %s", key)
+ continue
+
+ if doi not in target_dois:
+ continue
+
+ found_dois.add(doi)
+ if doi not in deduped:
+ deduped[doi] = key
+
+ missing_dois = sorted(target_dois - found_dois)
+ if missing_dois:
+ logger.warning("Some requested DOIs were not found: %s", missing_dois)
+
+ return list(deduped.values())
+
+
+def _enforce_limit(records, limit):
+ if limit is None:
+ return
+ if len(records) > limit:
+ raise ValueError(
+ f"Matched {len(records)} records, which is above limit={limit}. "
+ "Please reduce the date range or increase the limit."
+ )
+
+
+@dag(
+ on_failure_callback=FailedDagNotifier(),
+ start_date=pendulum.today("UTC").add(days=-1),
+ schedule=None,
+ catchup=False,
+ tags=["reharvest", "springer"],
+ params={
+ "date_from": Param(
+ default=None,
+ type=["string", "null"],
+ description="Start date from extracted archive folder in YYYY-MM-DD format",
+ title="Date from",
+ ),
+ "date_to": Param(
+ default=None,
+ type=["string", "null"],
+ description="End date from extracted archive folder in YYYY-MM-DD format",
+ title="Date to",
+ ),
+ "file_keys": Param(
+ default=[],
+ type=["array", "null"],
+ description=(
+ "Exact Springer extracted XML keys or glob patterns matched relative "
+ f"to '{SPRINGER_REPO.EXTRACTED_DIR}'. Examples: '*' or "
+ f"'EPJC/ftp_PUB_26-03-29_20-00-46/*'."
+ ),
+ title="File keys",
+ ),
+ "dois": Param(
+ default=[],
+ type=["array", "null"],
+ description=(
+ "List of DOIs to process. Can be used with date_from/date_to or together "
+ "with file_keys to filter the resolved key set. The maximum date range is 1 Year."
+ ),
+ title="DOIs",
+ ),
+ "limit": Param(
+ 1000,
+ type=["integer"],
+ description="Maximum number of records to process",
+ title="Limit",
+ ),
+ "dry_run": Param(
+ default=True,
+ type=["boolean", "null"],
+ description="Whether to perform a dry run. If true, no downstream DAGs will be triggered",
+ title="Dry run",
+ ),
+ },
+)
+def springer_reharvest():
+ @task()
+ def collect_records(repo=SPRINGER_REPO, **kwargs):
+ params = kwargs.get("params", {})
+
+ date_from = _parse_date(params.get("date_from"))
+ date_to = _parse_date(params.get("date_to"))
+ file_keys = _normalize_file_keys(params.get("file_keys") or [])
+ dois = _normalize_dois(params.get("dois") or [])
+ target_dois = set(dois)
+ limit = params.get("limit")
+
+ if limit is not None and (not isinstance(limit, int) or limit <= 0):
+ raise ValueError("limit must be a positive integer when provided")
+
+ if bool(file_keys) and (date_from or date_to):
+ raise ValueError(
+ "Invalid parameters: date_from/date_to cannot be used together with file_keys"
+ )
+
+ if (date_from and not date_to) or (date_to and not date_from):
+ raise ValueError("Both date_from and date_to must be provided together")
+
+ all_xml_keys = [
+ obj.key
+ for obj in repo.s3.objects.filter(Prefix=repo.EXTRACTED_DIR).all()
+ if repo.is_meta(obj.key)
+ ]
+ logger.info(
+ "Found %s Springer extracted XML file(s) in %s",
+ len(all_xml_keys),
+ repo.EXTRACTED_DIR,
+ )
+
+ if file_keys:
+ resolved_keys = _resolve_file_keys(file_keys, all_xml_keys)
+
+ if target_dois:
+ resolved_keys = sorted(
+ resolved_keys,
+ key=_extract_archive_datetime_from_key,
+ reverse=True,
+ )
+ selected_keys = _filter_keys_by_dois(
+ keys=resolved_keys,
+ target_dois=target_dois,
+ repo=repo,
+ )
+ else:
+ selected_keys = resolved_keys
+
+ _enforce_limit(selected_keys, limit)
+ logger.info(
+ "Selected %s Springer XML key(s) from file_keys (with glob expansion)",
+ len(selected_keys),
+ )
+ return selected_keys
+
+ if dois and not (date_from and date_to):
+ date_to = date.today()
+ date_from = date_to - timedelta(days=365)
+
+ if not (date_from and date_to):
+ raise ValueError(
+ "Invalid parameters: provide either date_from+date_to, file_keys, or dois"
+ )
+
+ if dois and (date_to - date_from).days > 366:
+ raise ValueError(
+ "For DOI search the date range must not exceed one year. "
+ "Please use a smaller range."
+ )
+
+ logger.info(
+ "Selecting Springer extracted XML files in date range %s to %s",
+ date_from,
+ date_to,
+ )
+
+ keys_in_range = []
+ for key in all_xml_keys:
+ key_date = _extract_archive_date_from_key(key)
+ if key_date is None:
+ continue
+ if date_from <= key_date <= date_to:
+ keys_in_range.append(key)
+
+ keys_in_range.sort(key=_extract_archive_datetime_from_key, reverse=True)
+
+ logger.info(
+ "Found %s Springer extracted XML file(s) in the requested date range",
+ len(keys_in_range),
+ )
+
+ deduped = {}
+ keys_without_doi = []
+ found_dois = set()
+
+ for key in keys_in_range:
+ file_obj = repo.get_by_id(key)
+ doi = _extract_doi_from_xml(file_obj.getvalue())
+
+ if not doi:
+ logger.warning("Could not extract DOI from file: %s", key)
+ if not target_dois:
+ keys_without_doi.append(key)
+ continue
+
+ if target_dois and doi not in target_dois:
+ continue
+
+ if target_dois:
+ found_dois.add(doi)
+
+ if doi not in deduped:
+ deduped[doi] = key
+
+ if target_dois:
+ missing_dois = sorted(target_dois - found_dois)
+ if missing_dois:
+ logger.warning("Some requested DOIs were not found: %s", missing_dois)
+
+ selected_keys = list(deduped.values()) + keys_without_doi
+
+ _enforce_limit(selected_keys, limit)
+ logger.info("Collected %s deduplicated Springer XML key(s)", len(selected_keys))
+ logger.info("Selected Springer XML keys: %s", selected_keys)
+ return selected_keys
+
+ @task()
+ def prepare_trigger_conf(file_keys, repo=SPRINGER_REPO, **kwargs):
+ dry_run = bool(kwargs.get("params", {}).get("dry_run", False))
+ if dry_run:
+ logger.info(
+ "Dry run enabled. %s record(s) matched. No downstream runs will be triggered.",
+ len(file_keys),
+ )
+ return []
+
+ confs = []
+ for key in file_keys:
+ file_obj = repo.get_by_id(key)
+ encoded_xml = base64.b64encode(file_obj.getvalue()).decode("utf-8")
+ confs.append({"file": encoded_xml, "file_name": key})
+
+ logger.info("Prepared %s downstream trigger conf(s)", len(confs))
+ return confs
+
+ file_keys = collect_records()
+ trigger_confs = prepare_trigger_conf(file_keys)
+
+ TriggerDagRunOperator.partial(
+ task_id="springer_reharvest_trigger_file_processing",
+ trigger_dag_id="springer_process_file",
+ reset_dag_run=True,
+ ).expand(conf=trigger_confs)
+
+
+springer_reharvest_dag = springer_reharvest()
diff --git a/tests/integration/elsevier/cassettes/test_dag_enrich_file.yaml b/tests/integration/elsevier/cassettes/test_dag_enrich_file.yaml
new file mode 100644
index 00000000..26925bd7
--- /dev/null
+++ b/tests/integration/elsevier/cassettes/test_dag_enrich_file.yaml
@@ -0,0 +1,83 @@
+interactions:
+- request:
+ body: null
+ headers:
+ Accept:
+ - '*/*'
+ Accept-Encoding:
+ - gzip, deflate
+ Connection:
+ - keep-alive
+ User-Agent:
+ - python-requests/2.32.5
+ method: GET
+ uri: https://oaipmh.arxiv.org/oai?verb=GetRecord&identifier=oai:arXiv.org:2112.01211&metadataPrefix=arXiv
+ response:
+ body:
+ string: "\n\n 2026-04-16T11:06:33Z\n
+ \ http://oaipmh.arxiv.org/oai\n\n\n \n
+ \ \n \n oai:arXiv.org:2112.01211\n
+ \ 2022-10-25\n physics:hep-th\n
+ \ physics:hep-ph\n \n\n \n
+ \ \n
+ \ 2112.01211\n 2022-10-23\n 2022-10-25\n
+ \ \n \n Antoniadis\n
+ \ Ignatios\n \n
+ \ \n Nanopoulos\n
+ \ Dimitri V.\n \n
+ \ \n Rizos\n John\n
+ \ \n \n Particle physics
+ and cosmology of the string derived no-scale flipped $SU(5)$\n hep-th
+ hep-ph\n 47 pages, no figures. v2: published
+ version, few minor changes\n ACT-05-21, MI-HET-768\n
+ \ 10.1140/epjc/s10052-022-10353-6\n http://creativecommons.org/licenses/by/4.0/\n
+ \ In a recent paper, we identified a cosmological sector
+ of a flipped $SU(5)$ model derived in the free fermionic formulation of the
+ heterotic superstring, containing the inflaton and the goldstino superfields
+ with a superpotential leading to Starobinsky type inflation, while $SU(5){\\times}U(1)$
+ is still unbroken. Here, we study the properties and phenomenology of the
+ vacuum after the end of inflation, where the gauge group is broken to the
+ Standard Model. We identify a set of vacuum expectation values, triggered
+ by the breaking of an anomalous $U(1)_A$ gauge symmetry at roughly an order
+ of magnitude below the string scale, that solve the F and D-flatness supersymmetric
+ conditions up to 6th order in the superpotential which is explicitly computed,
+ leading to a successful particle phenomenology. In particular, all extra colour
+ triplets become superheavy guaranteeing observable proton stability, while
+ the Higgs doublet mass matrix has a massless pair eigenstate with realistic
+ hierarchical Yukawa couplings to quarks and leptons. The supersymmetry breaking
+ scale is constrained to be high, consistent with the non observation of supersymmetric
+ signals at the LHC.\n \n\n \n
+ \ \n\n \n"
+ headers:
+ Accept-Ranges:
+ - bytes
+ Age:
+ - '0'
+ Connection:
+ - keep-alive
+ Content-Length:
+ - '3311'
+ Date:
+ - Thu, 16 Apr 2026 11:06:33 GMT
+ X-Cache:
+ - MISS, MISS, MISS
+ X-Served-By:
+ - cache-lga21993-LGA, cache-lga21973-LGA, cache-fra-eddf8230078-FRA
+ X-Timer:
+ - S1776337593.348975,VS0,VE220
+ content-type:
+ - text/xml
+ server:
+ - Google Frontend
+ via:
+ - 1.1 google, 1.1 varnish, 1.1 varnish, 1.1 varnish
+ x-cloud-trace-context:
+ - 663ac66e9488896105c2df6ae226a91d
+ status:
+ code: 200
+ message: OK
+version: 1
diff --git a/tests/integration/elsevier/test_elsevier_dag_process_file.py b/tests/integration/elsevier/test_elsevier_dag_process_file.py
new file mode 100644
index 00000000..3b0ed7de
--- /dev/null
+++ b/tests/integration/elsevier/test_elsevier_dag_process_file.py
@@ -0,0 +1,132 @@
+import tarfile
+
+import pytest
+from airflow.models import DagBag
+from common.utils import parse_without_names_spaces
+from elsevier.elsevier_file_processing import enhance_elsevier, enrich_elsevier
+from elsevier.parser import ElsevierParser
+from freezegun import freeze_time
+
+DAG_NAME = "elsevier_process_file"
+
+
+@pytest.fixture
+def dag():
+ dagbag = DagBag(dag_folder="dags/", include_examples=False)
+ assert dagbag.import_errors.get(f"dags/{DAG_NAME}.py") is None
+ return dagbag.get_dag(dag_id=DAG_NAME)
+
+
+@pytest.fixture
+def article():
+ data_dir = "./data/elsevier/"
+ test_file = "CERNQ000000010669A.tar"
+
+ with tarfile.open(data_dir + test_file, "r") as tar:
+ xml_members = [m for m in tar.getmembers() if m.name.endswith("main.xml")]
+ xml_bytes = tar.extractfile(xml_members[0]).read()
+
+ xml = parse_without_names_spaces(xml_bytes)
+ parser = ElsevierParser()
+ return parser.parse(xml)
+
+
+def test_dag_loaded(dag):
+ assert dag is not None
+ assert "parse" in dag.task_ids
+ assert "enhance" in dag.task_ids
+ assert "populate_files" in dag.task_ids
+ assert "enrich" in dag.task_ids
+ assert "save_to_s3" in dag.task_ids
+ assert "create_or_update" in dag.task_ids
+
+
+publisher = "Elsevier"
+
+generic_pseudo_parser_output = {
+ "abstract": "this is abstracts",
+ "copyright_holder": "copyright_holder",
+ "copyright_year": "2020",
+ "copyright_statement": "copyright_statement",
+ "copyright_material": "copyright_material",
+ "date_published": "2022-05-20",
+ "title": "title",
+ "subtitle": "subtitle",
+}
+
+expected_output = {
+ "abstracts": [{"value": "this is abstracts", "source": publisher}],
+ "acquisition_source": {
+ "source": publisher,
+ "method": publisher,
+ "date": "2022-05-20T00:00:00",
+ },
+ "copyright": [
+ {
+ "holder": "copyright_holder",
+ "year": "2020",
+ "statement": "copyright_statement",
+ "material": "copyright_material",
+ }
+ ],
+ "imprints": [{"date": "2022-05-20", "publisher": publisher}],
+ "record_creation_date": "2022-05-20T00:00:00",
+ "titles": [{"title": "title", "subtitle": "subtitle", "source": publisher}],
+}
+
+empty_generic_pseudo_parser_output = {
+ "abstract": "",
+ "copyright_holder": "",
+ "copyright_year": "",
+ "copyright_statement": "",
+ "copyright_material": "",
+ "date_published": "",
+ "title": "",
+ "subtitle": "",
+}
+
+expected_output_from_empty_input = {
+ "abstracts": [{"value": "", "source": publisher}],
+ "acquisition_source": {
+ "source": publisher,
+ "method": publisher,
+ "date": "2022-05-20T00:00:00",
+ },
+ "copyright": [{"holder": "", "year": "", "statement": "", "material": ""}],
+ "imprints": [{"date": "", "publisher": publisher}],
+ "record_creation_date": "2022-05-20T00:00:00",
+ "titles": [{"title": "", "subtitle": "", "source": publisher}],
+}
+
+
+@pytest.mark.parametrize(
+ ("test_input", "expected", "publisher"),
+ [
+ pytest.param(generic_pseudo_parser_output, expected_output, publisher),
+ pytest.param(
+ empty_generic_pseudo_parser_output,
+ expected_output_from_empty_input,
+ publisher,
+ ),
+ ],
+)
+@freeze_time("2022-05-20")
+def test_dag_enhance_file(test_input, expected, publisher):
+ assert expected == enhance_elsevier(test_input)
+
+
+@pytest.mark.vcr
+def test_dag_enrich_file(assertListEqual):
+ input_article = {
+ "arxiv_eprints": [{"value": "2112.01211"}],
+ "curated": "Test Value",
+ "citeable": "Test Value",
+ }
+ assertListEqual(
+ {
+ "arxiv_eprints": [
+ {"value": "2112.01211", "categories": list(set(["hep-th", "hep-ph"]))}
+ ],
+ },
+ enrich_elsevier(input_article),
+ )
diff --git a/tests/integration/jagiellonian/test_integration_jagiellonian_pull_api.py b/tests/integration/jagiellonian/test_integration_jagiellonian_pull_api.py
index a4cf846b..7291960b 100644
--- a/tests/integration/jagiellonian/test_integration_jagiellonian_pull_api.py
+++ b/tests/integration/jagiellonian/test_integration_jagiellonian_pull_api.py
@@ -1,144 +1,52 @@
-import contextlib
-import os
-from datetime import UTC, datetime
from unittest import mock
from unittest.mock import MagicMock
-from urllib.parse import urlparse
-import boto3
import pytest
-from airflow.models import Connection, DagBag
-from airflow.utils.session import create_session
-from botocore.client import Config
+from airflow.models import DagBag
+from jagiellonian.repository import JagiellonianRepository
-endpoint = os.getenv("S3_ENDPOINT", "s3")
-parsed = urlparse(endpoint if "://" in endpoint else f"http://{endpoint}")
-MINIO_HOST = parsed.hostname or "s3"
+DAG_NAME = "jagiellonian_pull_api"
-@pytest.fixture(scope="class")
-def dagbag():
- return DagBag(dag_folder="dags/", include_examples=False)
+@pytest.fixture
+def dag():
+ dagbag = DagBag(dag_folder="dags/", include_examples=False)
+ assert dagbag.import_errors.get(f"dags/{DAG_NAME}.py") is None
+ return dagbag.get_dag(dag_id=DAG_NAME)
-@pytest.mark.usefixtures("dagbag")
-class TestIntegrationJagiellonianPullApi:
- def setup_method(self):
- os.environ["AIRFLOW_CONN_AWS_S3_MINIO_TEST"] = (
- f"aws://airflow:Airflow01@{MINIO_HOST}:9000"
- f"?endpoint_url=http%3A%2F%2F{MINIO_HOST}%3A9000"
- "®ion_name=us-east-1&verify=false"
- )
+@pytest.fixture
+def repo():
+ r = JagiellonianRepository()
+ for obj in r.s3_bucket.objects.all():
+ obj.delete()
+ r.s3_bucket.put_object(Key="test.json", Body=b"")
+ yield r
+ for obj in r.s3_bucket.objects.all():
+ obj.delete()
- with create_session() as session:
- session.query(Connection).filter(
- Connection.conn_id == "crossref_api_test"
- ).delete()
- conn = Connection(
- conn_id="crossref_api_test",
- conn_type="http",
- host="https://api.production.crossref.org",
- )
- session.add(conn)
- session.commit()
+def test_dag_loaded(dag):
+ assert dag is not None
- self.dag_id = "jagiellonian_pull_api"
- self.execution_date = datetime.now(UTC)
- self.dag = DagBag(dag_folder="dags/", include_examples=False).get_dag(
- self.dag_id
- )
- assert self.dag is not None, f"DAG {self.dag_id} failed to load"
+@mock.patch("airflow.providers.http.hooks.http.HttpHook.run")
+def test_fetch_crossref_api(mock_http_run, dag, repo):
+ mock_http_response = MagicMock()
+ mock_http_response.json.return_value = {
+ "message": {"items": [], "total-results": 0}
+ }
+ mock_http_response.raise_for_status = MagicMock()
+ mock_http_run.return_value = mock_http_response
- s3 = boto3.client(
- "s3",
- endpoint_url=f"http://{MINIO_HOST}:9000",
- aws_access_key_id="airflow",
- aws_secret_access_key="Airflow01",
- region_name="us-east-1",
- config=Config(signature_version="s3v4"),
- verify=False,
- )
+ task = dag.get_task("jagiellonian_fetch_crossref_api")
+ function_to_unit_test = task.python_callable
- with contextlib.suppress(s3.exceptions.BucketAlreadyOwnedByYou):
- s3.create_bucket(Bucket="jagiellonian-test", ACL="public-read-write")
- response = s3.list_objects_v2(Bucket="jagiellonian-test")
+ results = function_to_unit_test(repo=repo)
- if "Contents" in response:
- objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]]
+ assert mock_http_run.called
- if objects_to_delete:
- s3.delete_objects(
- Bucket="jagiellonian", Delete={"Objects": objects_to_delete}
- )
+ called_endpoint = mock_http_run.call_args[1]["data"]["filter"]
+ assert "from-created-date:" in called_endpoint
- def teardown_method(self):
- if "AIRFLOW_CONN_AWS_S3_MINIO_TEST" in os.environ:
- del os.environ["AIRFLOW_CONN_AWS_S3_MINIO_TEST"]
-
- s3 = boto3.client(
- "s3",
- endpoint_url=f"http://{MINIO_HOST}:9000",
- aws_access_key_id="airflow",
- aws_secret_access_key="Airflow01",
- region_name="us-east-1",
- config=Config(signature_version="s3v4"),
- verify=False,
- )
- response = s3.list_objects_v2(Bucket="jagiellonian-test")
-
- if "Contents" in response:
- objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]]
-
- if objects_to_delete:
- s3.delete_objects(
- Bucket="jagiellonian-test", Delete={"Objects": objects_to_delete}
- )
-
- s3.delete_bucket(Bucket="jagiellonian-test")
- with create_session() as session:
- session.query(Connection).filter(
- Connection.conn_id == "crossref_api_test"
- ).delete()
- session.commit()
-
- @mock.patch.dict(
- os.environ,
- {
- "JAGIELLONIAN_BUCKET_NAME": "jagiellonian-test",
- "AWS_CONN_ID": "aws_s3_minio_test",
- "HTTP_CONN_ID": "crossref_api_test",
- },
- )
- @mock.patch("airflow.providers.http.hooks.http.HttpHook.run")
- def test_fetch_crossref_api(self, mock_http_run):
- mock_http_response = MagicMock()
- mock_http_response.json.return_value = {
- "message": {"items": [], "total-results": 0}
- }
- mock_http_response.raise_for_status = MagicMock()
- mock_http_run.return_value = mock_http_response
-
- s3 = boto3.client(
- "s3",
- endpoint_url=f"http://{MINIO_HOST}:9000",
- aws_access_key_id="airflow",
- aws_secret_access_key="Airflow01",
- region_name="us-east-1",
- config=Config(signature_version="s3v4"),
- verify=False,
- )
- s3.put_object(Bucket="jagiellonian-test", Key="test.json", Body=b"")
-
- task = self.dag.get_task("jagiellonian_fetch_crossref_api")
- function_to_unit_test = task.python_callable
-
- results = function_to_unit_test()
-
- assert mock_http_run.called
-
- called_endpoint = mock_http_run.call_args[1]["data"]["filter"]
- assert "from-created-date:" in called_endpoint
-
- assert results == []
+ assert results == []
diff --git a/tests/integration/jagiellonian/test_jagiellonian_process_file.py b/tests/integration/jagiellonian/test_jagiellonian_process_file.py
index 331835eb..6bce9f83 100644
--- a/tests/integration/jagiellonian/test_jagiellonian_process_file.py
+++ b/tests/integration/jagiellonian/test_jagiellonian_process_file.py
@@ -1,116 +1,45 @@
-import os
-from datetime import UTC, datetime
-from unittest import mock
-from urllib.parse import urlparse
-
-import boto3
import pytest
from airflow.models import DagBag
-from botocore.client import Config
-
-endpoint = os.getenv("S3_ENDPOINT", "s3")
-parsed = urlparse(endpoint if "://" in endpoint else f"http://{endpoint}")
-MINIO_HOST = parsed.hostname or "s3"
-
-
-@pytest.fixture(scope="class")
-def dagbag():
- return DagBag(dag_folder="dags/", include_examples=False)
-
-
-@pytest.mark.usefixtures("dagbag")
-class TestJagiellonianProcessFile:
- def setup_method(self):
- os.environ["AIRFLOW_CONN_AWS_S3_MINIO_TEST"] = (
- f"aws://airflow:Airflow01@{MINIO_HOST}:9000"
- f"?endpoint_url=http%3A%2F%2F{MINIO_HOST}%3A9000"
- "®ion_name=us-east-1&verify=false"
- )
-
- self.dag_id = "jagiellonian_process_file"
- self.execution_date = datetime.now(UTC)
-
- self.dag = DagBag(dag_folder="dags/", include_examples=False).get_dag(
- self.dag_id
- )
- assert self.dag is not None, f"DAG {self.dag_id} failed to load"
-
- s3 = boto3.client(
- "s3",
- endpoint_url=f"http://{MINIO_HOST}:9000",
- aws_access_key_id="airflow",
- aws_secret_access_key="Airflow01",
- region_name="us-east-1",
- config=Config(signature_version="s3v4"),
- verify=False,
- )
-
- s3.create_bucket(Bucket="jagiellonian-test", ACL="public-read-write")
- response = s3.list_objects_v2(Bucket="jagiellonian-test")
+from jagiellonian.repository import JagiellonianRepository
- if "Contents" in response:
- objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]]
+DAG_NAME = "jagiellonian_process_file"
- if objects_to_delete:
- s3.delete_objects(
- Bucket="jagiellonian", Delete={"Objects": objects_to_delete}
- )
- def teardown_method(self):
- if "AIRFLOW_CONN_AWS_S3_MINIO_TEST" in os.environ:
- del os.environ["AIRFLOW_CONN_AWS_S3_MINIO_TEST"]
- s3 = boto3.client(
- "s3",
- endpoint_url=f"http://{MINIO_HOST}:9000",
- aws_access_key_id="airflow",
- aws_secret_access_key="Airflow01",
- region_name="us-east-1",
- config=Config(signature_version="s3v4"),
- verify=False,
- )
- response = s3.list_objects_v2(Bucket="jagiellonian-test")
+@pytest.fixture
+def dag():
+ dagbag = DagBag(dag_folder="dags/", include_examples=False)
+ assert dagbag.import_errors.get(f"dags/{DAG_NAME}.py") is None
+ return dagbag.get_dag(dag_id=DAG_NAME)
- if "Contents" in response:
- objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]]
- if objects_to_delete:
- s3.delete_objects(
- Bucket="jagiellonian-test", Delete={"Objects": objects_to_delete}
- )
+@pytest.fixture
+def repo():
+ r = JagiellonianRepository()
+ for obj in r.s3_bucket.objects.all():
+ obj.delete()
+ yield r
+ for obj in r.s3_bucket.objects.all():
+ obj.delete()
- s3.delete_bucket(Bucket="jagiellonian-test")
- @mock.patch.dict(
- os.environ,
- {
- "JAGIELLONIAN_BUCKET_NAME": "jagiellonian-test",
- "AWS_CONN_ID": "aws_s3_minio_test",
- },
- )
- def test_save_to_s3(self):
- sample_article = {
- "title": "Test Article",
- "authors": ["Author 1", "Author 2"],
- "abstract": "This is a test abstract",
- "dois": [{"value": "10.1234/test.123"}],
- "files": [],
- }
+def test_dag_loaded(dag):
+ assert dag is not None
- task = self.dag.get_task("jagiellonian-save-to-s3")
- function_to_unit_test = task.python_callable
- function_to_unit_test(sample_article)
+def test_save_to_s3(dag, repo):
+ sample_article = {
+ "title": "Test Article",
+ "authors": ["Author 1", "Author 2"],
+ "abstract": "This is a test abstract",
+ "dois": [{"value": "10.1234/test.123"}],
+ "files": [],
+ }
- s3 = boto3.client(
- "s3",
- endpoint_url=f"http://{MINIO_HOST}:9000",
- aws_access_key_id="airflow",
- aws_secret_access_key="Airflow01",
- region_name="us-east-1",
- config=Config(signature_version="s3v4"),
- verify=False,
- )
+ task = dag.get_task("jagiellonian-save-to-s3")
+ function_to_unit_test = task.python_callable
- response = s3.list_objects_v2(Bucket="jagiellonian-test")
+ function_to_unit_test(sample_article, repo=repo)
- assert "10.1234/test.123" in response["Contents"][0]["Key"]
+ objects = list(repo.s3_bucket.objects.all())
+ assert len(objects) == 1
+ assert "10.1234/test.123" in objects[0].key
diff --git a/tests/integration/springer/test_springer_dag_process_file.py b/tests/integration/springer/test_springer_dag_process_file.py
index 441a8a25..7972cae0 100644
--- a/tests/integration/springer/test_springer_dag_process_file.py
+++ b/tests/integration/springer/test_springer_dag_process_file.py
@@ -91,11 +91,11 @@ def springer_data_files_in_s3():
s3_files = {obj.key for obj in repo.s3.objects.all()}
assert (
- "extracted/EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/BodyRef/PDF/10052_2025_Article_15241.pdf"
+ f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/BodyRef/PDF/10052_2025_Article_15241.pdf"
in s3_files
)
assert (
- "extracted/JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/BodyRef/PDF/13130_2026_Article_28203.pdf"
+ f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/BodyRef/PDF/13130_2026_Article_28203.pdf"
in s3_files
)
@@ -105,7 +105,13 @@ def springer_data_files_in_s3():
def test_dag_loaded(dag):
assert dag is not None
- assert len(dag.tasks) == 7
+ assert "parse_file" in dag.task_ids
+ assert "add_data_availability" in dag.task_ids
+ assert "enhance_file" in dag.task_ids
+ assert "populate_files" in dag.task_ids
+ assert "enrich_file" in dag.task_ids
+ assert "save_to_s3" in dag.task_ids
+ assert "create_or_update" in dag.task_ids
publisher = "Springer"
@@ -310,6 +316,7 @@ def test_dag_process_file_no_input_file(article):
def test_extract_data_availability_data(
dag, epjc_data_article, springer_data_files_in_s3
):
+ repo = SpringerRepository()
expected = {
"statement": "This manuscript has associated data in a data repository. [Authors' comment: The public release of data supporting the findings of this article will follow the CERN Open Data Policy [124]. Inquiries about plots and tables associated with this article can be addressed to atlas.publications@cern.ch.]\nThismanuscripthasassociatedcode/software in a data repository. [Authors' comment: The ATLAS Collaboration's Athena software, including the configuration of the event generators, is open source (https://gitlab.cern.ch/atlas/athena).]",
"urls": [
@@ -319,7 +326,7 @@ def test_extract_data_availability_data(
result = dag.test(
run_conf={
"file": base64.b64encode(ET.tostring(epjc_data_article)).decode(),
- "file_name": "extracted/EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta",
+ "file_name": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta",
"parse_pdf": True,
},
mark_success_pattern="save_to_s3|create_or_update",
@@ -332,13 +339,14 @@ def test_extract_data_availability_data(
def test_extract_data_availability_no_data(
dag, jhep_data_article, springer_data_files_in_s3
):
+ repo = SpringerRepository()
expected = {
"statement": "This article has no associated data or the data will not be deposited.\nThis article has no associated code or the code will not be deposited.",
}
result = dag.test(
run_conf={
"file": base64.b64encode(ET.tostring(jhep_data_article)).decode(),
- "file_name": "extracted/JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/13130_2026_Article_28203.xml.scoap",
+ "file_name": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_26-02-19_08-01-28_data/JOU=13130/VOL=2026.2026/ISU=2/ART=28203/13130_2026_Article_28203.xml.scoap",
"parse_pdf": True,
},
mark_success_pattern="save_to_s3|create_or_update",
@@ -351,10 +359,11 @@ def test_extract_data_availability_no_data(
def test_extract_data_availability_disabled_by_default(
dag, epjc_data_article, springer_data_files_in_s3
):
+ repo = SpringerRepository()
result = dag.test(
run_conf={
"file": base64.b64encode(ET.tostring(epjc_data_article)).decode(),
- "file_name": "extracted/EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta",
+ "file_name": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_26-01-26_08-01-27_data/JOU=10052/VOL=2026.86/ISU=1/ART=15241/10052_2025_Article_15241.xml.Meta",
},
mark_success_pattern="save_to_s3|create_or_update",
)
diff --git a/tests/integration/springer/test_springer_dag_pull_sftp.py b/tests/integration/springer/test_springer_dag_pull_sftp.py
index 180d2ef9..c1db1e5d 100644
--- a/tests/integration/springer/test_springer_dag_pull_sftp.py
+++ b/tests/integration/springer/test_springer_dag_pull_sftp.py
@@ -17,22 +17,31 @@ def dag():
def test_dag_loaded(dag):
assert dag is not None
- assert len(dag.tasks) == 3
+ assert "migrate_from_ftp" in dag.task_ids
+ assert "prepare_trigger_conf" in dag.task_ids
+ assert "springer_trigger_file_processing" in dag.task_ids
def test_dag_run(dag):
repo = SpringerRepository()
repo.delete_all()
- dag.test()
+ dag.test(
+ run_conf={
+ "filenames_pull": {
+ "enabled": True,
+ "filenames": ["JHEP/ftp_PUB_19-01-29_20-02-10_JHEP.zip"],
+ "force_from_ftp": True,
+ },
+ "force_pull": False,
+ "excluded_directories": [],
+ },
+ mark_success_pattern="springer_trigger_file_processing",
+ )
expected_subset = [
{
- "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta",
- "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf",
- },
- {
- "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap",
- "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf",
+ "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap",
+ "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf",
},
]
found_files = repo.find_all()
@@ -77,16 +86,16 @@ def test_force_pull_from_sftp():
)
expected_files = [
{
- "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf",
- "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta",
+ "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf",
+ "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta",
},
{
- "pdf": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf",
- "xml": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta",
+ "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf",
+ "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta",
},
{
- "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf",
- "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap",
+ "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf",
+ "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap",
},
]
assert repo.find_all() == expected_files
@@ -117,8 +126,8 @@ def test_force_pull_from_sftp_with_excluded_folder():
)
expected_files = [
{
- "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap",
- "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf",
+ "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap",
+ "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf",
}
]
assert repo.find_all() == expected_files
@@ -149,16 +158,16 @@ def test_pull_from_sftp_and_reprocess():
)
excepted_files = [
{
- "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf",
- "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta",
+ "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf",
+ "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta",
},
{
- "pdf": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf",
- "xml": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta",
+ "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf",
+ "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta",
},
{
- "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf",
- "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap",
+ "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf",
+ "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap",
},
]
assert repo.find_all() == excepted_files
@@ -180,16 +189,16 @@ def test_pull_from_sftp_and_reprocess():
)
excepted_files = [
{
- "pdf": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf",
- "xml": "extracted/EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta",
+ "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/BodyRef/PDF/10052_2019_Article_6572.pdf",
+ "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-01-29_20-02-10_EPJC/JOU=10052/VOL=2019.79/ISU=1/ART=6572/10052_2019_Article_6572.xml.Meta",
},
{
- "pdf": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf",
- "xml": "extracted/EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta",
+ "pdf": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/BodyRef/PDF/10052_2019_Article_6540.pdf",
+ "xml": f"{repo.EXTRACTED_DIR}EPJC/ftp_PUB_19-02-06_16-01-13_EPJC_stripped/JOU=10052/VOL=2019.79/ISU=2/ART=6540/10052_2019_Article_6540.xml.Meta",
},
{
- "pdf": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf",
- "xml": "extracted/JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap",
+ "pdf": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/BodyRef/PDF/13130_2019_Article_9848.pdf",
+ "xml": f"{repo.EXTRACTED_DIR}JHEP/ftp_PUB_19-01-29_20-02-10_JHEP/JOU=13130/VOL=2019.2019/ISU=1/ART=9848/13130_2019_Article_9848.xml.scoap",
},
]
assert repo.find_all() == excepted_files
diff --git a/tests/units/elsevier/test_unit_elsevier_reharvest.py b/tests/units/elsevier/test_unit_elsevier_reharvest.py
new file mode 100644
index 00000000..d4243185
--- /dev/null
+++ b/tests/units/elsevier/test_unit_elsevier_reharvest.py
@@ -0,0 +1,269 @@
+from datetime import date
+from io import BytesIO
+from unittest import mock
+
+import pytest
+from airflow.models import DagBag
+
+
+class _S3Object:
+ def __init__(self, key):
+ self.key = key
+
+
+def _main_xml_with_doi_and_received_date(doi, year, month, day):
+ return f'''
+
+
+ {doi}
+
+
+
+
+
+'''.encode()
+
+
+def _dataset_xml():
+ return b""
+
+
+@pytest.fixture(scope="class")
+def dagbag():
+ return DagBag(dag_folder="dags/", include_examples=False)
+
+
+@pytest.mark.usefixtures("dagbag")
+class TestUnitElsevierReharvest:
+ def setup_method(self):
+ self.dag_id = "elsevier_reharvest"
+ dagbag = DagBag(dag_folder="dags/", include_examples=False)
+ self.dag = dagbag.dags.get(self.dag_id)
+ assert self.dag is not None, f"DAG {self.dag_id} failed to load"
+
+ def _build_repo(self, payload_by_key):
+ mock_repo = mock.MagicMock()
+ mock_repo.EXTRACTED_DIR = "extracted/"
+ mock_repo.s3.objects.filter.return_value.all.return_value = [
+ _S3Object(key) for key in payload_by_key
+ ]
+
+ def _get_by_id(key):
+ return BytesIO(payload_by_key[key])
+
+ mock_repo.get_by_id.side_effect = _get_by_id
+ return mock_repo
+
+ def test_dag_structure(self):
+ assert "elsevier_reharvest_trigger_file_processing" in self.dag.task_ids
+ task = self.dag.get_task("elsevier_reharvest_trigger_file_processing")
+ assert "MappedOperator" in str(type(task)) or task.is_mapped
+ assert task.partial_kwargs["trigger_dag_id"] == "elsevier_process_file"
+ assert task.partial_kwargs["reset_dag_run"] is True
+
+ def test_collect_records_file_keys_and_dois_filters_resolved_keys(self):
+ task = self.dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "extracted/CERNAB00000010772A/CERNAB00000010772/older/article/main.xml": _main_xml_with_doi_and_received_date(
+ "10.1016/one", 2025, 1, 10
+ ),
+ "extracted/CERNAB00000010772A/CERNAB00000010772/newer/article/main.xml": _main_xml_with_doi_and_received_date(
+ "10.1016/one", 2025, 2, 10
+ ),
+ "extracted/CERNAB00000010772A/CERNAB00000010772/other/article/main.xml": _main_xml_with_doi_and_received_date(
+ "10.1016/two", 2025, 3, 10
+ ),
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ result = function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "file_keys": ["*"],
+ "dois": ["10.1016/one"],
+ },
+ )
+
+ assert result == [
+ "extracted/CERNAB00000010772A/CERNAB00000010772/newer/article/main.xml"
+ ]
+
+ def test_collect_records_date_range_uses_received_date_from_xml(self):
+ task = self.dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "extracted/CERNAB00000010772A/CERNAB00000010772/in-range/main.xml": _main_xml_with_doi_and_received_date(
+ "10.1016/one", 2025, 9, 5
+ ),
+ "extracted/CERNAB00000010772A/CERNAB00000010772/out-of-range/main.xml": _main_xml_with_doi_and_received_date(
+ "10.1016/two", 2025, 6, 1
+ ),
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ result = function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "date_from": "2025-09-01",
+ "date_to": "2025-09-30",
+ },
+ )
+
+ assert result == [
+ "extracted/CERNAB00000010772A/CERNAB00000010772/in-range/main.xml"
+ ]
+
+ def test_collect_records_limit_exceeded_raises(self):
+ task = self.dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "extracted/CERNAB00000010772A/CERNAB00000010772/one/main.xml": _main_xml_with_doi_and_received_date(
+ "10.1016/one", 2025, 7, 1
+ ),
+ "extracted/CERNAB00000010772A/CERNAB00000010772/two/main.xml": _main_xml_with_doi_and_received_date(
+ "10.1016/two", 2025, 7, 2
+ ),
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ with pytest.raises(ValueError, match="above limit"):
+ function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "file_keys": ["*"],
+ "limit": 1,
+ },
+ )
+
+ def test_collect_records_logs_and_raises_when_date_scan_is_over_10k(self):
+ task = self.dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ mock_repo = mock.MagicMock()
+ mock_repo.EXTRACTED_DIR = "extracted/"
+ mock_repo.s3.objects.filter.return_value.all.return_value = [
+ _S3Object(f"extracted/collection/{i}/main.xml") for i in range(10001)
+ ]
+ mock_repo.get_by_id.return_value = BytesIO(b"")
+
+ mocked_logger = mock.MagicMock()
+ with (
+ mock.patch.dict(
+ function_to_unit_test.__globals__,
+ {
+ "logger": mocked_logger,
+ "_extract_received_date_from_xml": lambda _: date(2025, 9, 5),
+ },
+ ),
+ pytest.raises(ValueError, match="XML scan threshold exceeded"),
+ ):
+ function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "date_from": "2025-09-01",
+ "date_to": "2025-09-30",
+ },
+ )
+
+ mocked_logger.error.assert_any_call(
+ "XML scan threshold exceeded for %s: %s files to parse (> %s). "
+ "Consider narrowing filters.",
+ "ce:date-received extraction from all main.xml candidates",
+ 10001,
+ 10000,
+ )
+
+ def test_collect_records_logs_and_raises_when_file_keys_doi_scan_is_over_10k(self):
+ task = self.dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ mock_repo = mock.MagicMock()
+ mock_repo.EXTRACTED_DIR = "extracted/"
+ mock_repo.s3.objects.filter.return_value.all.return_value = [
+ _S3Object(f"extracted/collection/{i}/main.xml") for i in range(10001)
+ ]
+ mock_repo.get_by_id.return_value = BytesIO(b"")
+
+ mocked_logger = mock.MagicMock()
+ with (
+ mock.patch.dict(
+ function_to_unit_test.__globals__,
+ {
+ "logger": mocked_logger,
+ "_extract_received_date_from_xml": lambda _: date(2025, 9, 5),
+ "_extract_doi_from_xml": lambda _: "10.1016/one",
+ },
+ ),
+ pytest.raises(ValueError, match="XML scan threshold exceeded"),
+ ):
+ function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "file_keys": ["*"],
+ "dois": ["10.1016/one"],
+ },
+ )
+
+ mocked_logger.error.assert_any_call(
+ "XML scan threshold exceeded for %s: %s files to parse (> %s). "
+ "Consider narrowing filters.",
+ "date extraction for sorting file_keys before DOI filtering",
+ 10001,
+ 10000,
+ )
+
+ def test_prepare_trigger_conf_dry_run_and_normal(self):
+ task = self.dag.get_task("prepare_trigger_conf")
+ function_to_unit_test = task.python_callable
+
+ file_key = (
+ "extracted/CERNAB00000010772A/"
+ "CERNAB00000010772/03702693/v846sC/S0370269322005871/main.xml"
+ )
+ dataset_key = "extracted/CERNAB00000010772A/CERNAB00000010772/dataset.xml"
+
+ mock_repo = mock.MagicMock()
+ mock_repo.EXTRACTED_DIR = "extracted/"
+ mock_repo.s3.objects.filter.return_value.all.return_value = [
+ _S3Object(file_key),
+ _S3Object(dataset_key),
+ ]
+ mock_repo.get_by_id.return_value = BytesIO(_dataset_xml())
+
+ dry_run_result = function_to_unit_test(
+ file_keys=[file_key],
+ repo=mock_repo,
+ params={"dry_run": True},
+ )
+ assert dry_run_result == []
+
+ mocked_trigger = mock.MagicMock(
+ return_value=[
+ {
+ "file_name": file_key,
+ "metadata": {"files": {"xml": file_key}, "meta": "data1"},
+ }
+ ]
+ )
+
+ with mock.patch.dict(
+ function_to_unit_test.__globals__,
+ {"trigger_file_processing_elsevier": mocked_trigger},
+ ):
+ normal_result = function_to_unit_test(
+ file_keys=[file_key],
+ repo=mock_repo,
+ params={"dry_run": False},
+ )
+
+ assert normal_result == [
+ {
+ "file_name": file_key,
+ "metadata": {"files": {"xml": file_key}, "meta": "data1"},
+ }
+ ]
diff --git a/tests/units/jagiellonian/test_unit_jagiellonian_reharvest.py b/tests/units/jagiellonian/test_unit_jagiellonian_reharvest.py
new file mode 100644
index 00000000..e63428b9
--- /dev/null
+++ b/tests/units/jagiellonian/test_unit_jagiellonian_reharvest.py
@@ -0,0 +1,182 @@
+import json
+from io import BytesIO
+from unittest import mock
+
+import pytest
+from airflow.models import DagBag
+from freezegun import freeze_time
+
+
+class _S3Object:
+ def __init__(self, key):
+ self.key = key
+
+
+@pytest.fixture(scope="class")
+def dagbag():
+ return DagBag(dag_folder="dags/", include_examples=False)
+
+
+@pytest.fixture(scope="class")
+def dag(dagbag):
+ return dagbag.dags.get("jagiellonian_reharvest")
+
+
+class TestUnitJagiellonianReharvest:
+ def _build_repo(self, payload_by_key):
+ mock_repo = mock.MagicMock()
+ keys = list(payload_by_key.keys())
+ mock_repo.s3_bucket.objects.all.return_value = [_S3Object(key) for key in keys]
+
+ def _get_by_id(key):
+ return BytesIO(json.dumps(payload_by_key[key]).encode("utf-8"))
+
+ mock_repo.get_by_id.side_effect = _get_by_id
+ return mock_repo
+
+ def test_dag_structure(self, dag):
+ assert "jagiellonian_reharvest_trigger_file_processing" in dag.task_ids
+ task = dag.get_task("jagiellonian_reharvest_trigger_file_processing")
+ assert "MappedOperator" in str(type(task)) or task.is_mapped
+ assert task.partial_kwargs["trigger_dag_id"] == "jagiellonian_process_file"
+ assert task.partial_kwargs["reset_dag_run"] is True
+
+ def test_collect_records_invalid_combination_dates_with_file_keys(self, dag):
+ task = dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ mock_repo = self._build_repo({})
+
+ with pytest.raises(
+ ValueError, match="date_from/date_to cannot be used together with file_keys"
+ ):
+ function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "file_keys": ["10.5506/a_metadata_2026-04-09 00:35:13+00:00.json"],
+ "date_from": "2026-04-01",
+ "date_to": "2026-04-09",
+ },
+ )
+
+ def test_collect_records_date_range_dedup_keeps_newest(self, dag):
+ task = dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "10.5506/aphyspolb.57.4-a4_metadata_2026-04-09 00_35_13.599324+00_00.json": {
+ "dois": [{"value": "10.5506/aphyspolb.57.4-a4"}],
+ "titles": [{"title": "new"}],
+ },
+ "10.5506/aphyspolb.57.4-a4_metadata_2026-04-09 00_10_13.599324+00_00.json": {
+ "dois": [{"value": "10.5506/aphyspolb.57.4-a4"}],
+ "titles": [{"title": "old"}],
+ },
+ "10.5506/aphyspolb.57.4-a5_metadata_2026-04-09 00_20_00.000000+00_00.json": {
+ "dois": [{"value": "10.5506/aphyspolb.57.4-a5"}],
+ },
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ result = function_to_unit_test(
+ repo=mock_repo,
+ params={"date_from": "2026-04-09", "date_to": "2026-04-09"},
+ )
+
+ assert len(result) == 2
+ doi_to_title = {
+ article["dois"][0]["value"]: (article.get("titles") or [{"title": None}])[
+ 0
+ ]["title"]
+ for article in result
+ }
+ assert doi_to_title["10.5506/aphyspolb.57.4-a4"] == "new"
+
+ def test_collect_records_file_keys_glob_patterns(self, dag):
+ task = dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "10.5506/aphyspolb.57.4-a4_metadata_2026-04-09 00_35_13.599324+00_00.json": {
+ "dois": [{"value": "10.5506/aphyspolb.57.4-a4"}],
+ },
+ "10.5506/aphyspolb.57.4-a5_metadata_2026-04-09 00_35_13.599324+00_00.json": {
+ "dois": [{"value": "10.5506/aphyspolb.57.4-a5"}],
+ },
+ "10.5506/aphyspolb.57.4-a6_metadata_2026-04-10 00_35_13.599324+00_00.json": {
+ "dois": [{"value": "10.5506/aphyspolb.57.4-a6"}],
+ },
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ result = function_to_unit_test(
+ repo=mock_repo,
+ params={"file_keys": ["10.5506/*_metadata_2026-04-09*.json"]},
+ )
+
+ dois = sorted([article["dois"][0]["value"] for article in result])
+ assert dois == ["10.5506/aphyspolb.57.4-a4", "10.5506/aphyspolb.57.4-a5"]
+
+ @freeze_time("2026-04-20")
+ def test_collect_records_dois_without_dates_defaults_last_3_years(self, dag):
+ task = dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "10.5506/old_metadata_2020-01-01 00_00_00.000000+00_00.json": {
+ "dois": [{"value": "10.5506/old"}],
+ },
+ "10.5506/new_metadata_2026-04-09 00_35_13.599324+00_00.json": {
+ "dois": [{"value": "10.5506/new"}],
+ },
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ result = function_to_unit_test(
+ repo=mock_repo,
+ params={"dois": ["10.5506/new"]},
+ )
+
+ assert len(result) == 1
+ assert result[0]["dois"][0]["value"] == "10.5506/new"
+
+ def test_collect_records_limit_exceeded_raises(self, dag):
+ task = dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "10.5506/a_metadata_2026-04-09 00_00_00.000000+00_00.json": {
+ "dois": [{"value": "10.5506/a"}],
+ },
+ "10.5506/b_metadata_2026-04-09 00_00_01.000000+00_00.json": {
+ "dois": [{"value": "10.5506/b"}],
+ },
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ with pytest.raises(ValueError, match="above limit"):
+ function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "date_from": "2026-04-09",
+ "date_to": "2026-04-09",
+ "limit": 1,
+ },
+ )
+
+ def test_prepare_trigger_conf_dry_run_and_normal(self, dag):
+ task = dag.get_task("prepare_trigger_conf")
+ function_to_unit_test = task.python_callable
+
+ records = [{"dois": [{"value": "10.5506/aphyspolb.57.4-a4"}]}]
+
+ dry_run_result = function_to_unit_test(
+ records=records, params={"dry_run": True}
+ )
+ assert dry_run_result == []
+
+ normal_result = function_to_unit_test(
+ records=records, params={"dry_run": False}
+ )
+ assert len(normal_result) == 1
+ assert normal_result[0]["article"] == records[0]
diff --git a/tests/units/springer/test_springer_repository.py b/tests/units/springer/test_springer_repository.py
index c5b559ba..a4bbca91 100644
--- a/tests/units/springer/test_springer_repository.py
+++ b/tests/units/springer/test_springer_repository.py
@@ -11,22 +11,28 @@ def __init__(self, key) -> None:
S3_RETURNED_VALUES = [
- "file1.txt",
- "file1.scoap",
- "file1.pdf",
- "file2.txt",
- "file2.Meta",
- "file2.pdf",
+ "extracted/file1.txt",
+ "extracted/file1.scoap",
+ "extracted/file1.pdf",
+ "extracted/file2.txt",
+ "extracted/file2.Meta",
+ "extracted/file2.pdf",
]
FIND_ALL_EXPECTED_VALUES = [
- {"xml": "file1.scoap", "pdf": "file1.pdf"},
- {"xml": "file2.Meta", "pdf": "file2.pdf"},
+ {
+ "xml": "extracted/file1.scoap",
+ "pdf": "extracted/file1.pdf",
+ },
+ {
+ "xml": "extracted/file2.Meta",
+ "pdf": "extracted/file2.pdf",
+ },
]
FIND_ALL_EXTRACTED_FILES_EXPECTED_VALUES = [
- "file1.scoap",
- "file1.pdf",
- "file2.Meta",
- "file2.pdf",
+ "extracted/file1.scoap",
+ "extracted/file1.pdf",
+ "extracted/file2.Meta",
+ "extracted/file2.pdf",
]
@@ -58,7 +64,7 @@ def test_save_zip_file(boto3_fixture):
filename = "test.zip"
repo = SpringerRepository()
repo.save(filename, file)
- upload_mock.assert_called_with(file, "raw/" + filename)
+ upload_mock.assert_called_with(file, repo.ZIPED_DIR + filename)
def test_save_file(boto3_fixture):
@@ -67,7 +73,7 @@ def test_save_file(boto3_fixture):
filename = "test.pdf"
repo = SpringerRepository()
repo.save(filename, file)
- upload_mock.assert_called_with(file, "extracted/" + filename)
+ upload_mock.assert_called_with(file, repo.EXTRACTED_DIR + filename)
def test_file_is_meta():
diff --git a/tests/units/springer/test_unit_springer_reharvest.py b/tests/units/springer/test_unit_springer_reharvest.py
new file mode 100644
index 00000000..7898bf5e
--- /dev/null
+++ b/tests/units/springer/test_unit_springer_reharvest.py
@@ -0,0 +1,202 @@
+import base64
+from io import BytesIO
+from unittest import mock
+
+import pytest
+from airflow.models import DagBag
+
+
+class _S3Object:
+ def __init__(self, key):
+ self.key = key
+
+
+def _springer_xml_with_doi(doi):
+ return f"""
+
+
+
+
+
+
+ {doi}
+
+
+
+
+
+
+""".encode()
+
+
+@pytest.fixture(scope="class")
+def dagbag():
+ return DagBag(dag_folder="dags/", include_examples=False)
+
+
+@pytest.mark.usefixtures("dagbag")
+class TestUnitSpringerReharvest:
+ def setup_method(self):
+ self.dag_id = "springer_reharvest"
+ dagbag = DagBag(dag_folder="dags/", include_examples=False)
+ self.dag = dagbag.dags.get(self.dag_id)
+ assert self.dag is not None, f"DAG {self.dag_id} failed to load"
+
+ def _build_repo(self, payload_by_key):
+ mock_repo = mock.MagicMock()
+ mock_repo.EXTRACTED_DIR = "extracted/"
+ mock_repo.s3.objects.filter.return_value.all.return_value = [
+ _S3Object(key) for key in payload_by_key
+ ]
+ mock_repo.is_meta.side_effect = lambda key: key.endswith(
+ ".Meta"
+ ) or key.endswith(".scoap")
+
+ def _get_by_id(key):
+ return BytesIO(payload_by_key[key])
+
+ mock_repo.get_by_id.side_effect = _get_by_id
+ return mock_repo
+
+ def test_dag_structure(self):
+ assert "springer_reharvest_trigger_file_processing" in self.dag.task_ids
+ task = self.dag.get_task("springer_reharvest_trigger_file_processing")
+ assert "MappedOperator" in str(type(task)) or task.is_mapped
+ assert task.partial_kwargs["trigger_dag_id"] == "springer_process_file"
+ assert task.partial_kwargs["reset_dag_run"] is True
+
+ def test_collect_records_file_keys_and_dois_filters_resolved_keys(self):
+ task = self.dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "extracted/EPJC/ftp_PUB_25-03-01_00-00-00/article.Meta": _springer_xml_with_doi(
+ "10.1007/one"
+ ),
+ "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta": _springer_xml_with_doi(
+ "10.1007/one"
+ ),
+ "extracted/JHEP/ftp_PUB_26-03-30_20-00-50/other.Meta": _springer_xml_with_doi(
+ "10.1007/two"
+ ),
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ result = function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "file_keys": ["*"],
+ "dois": ["10.1007/one"],
+ },
+ )
+
+ assert result == ["extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta"]
+
+ def test_collect_records_date_range_dedup_keeps_newest(self):
+ task = self.dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "extracted/EPJC/ftp_PUB_25-08-15_00-00-00/article.Meta": _springer_xml_with_doi(
+ "10.1007/one"
+ ),
+ "extracted/JHEP/ftp_PUB_26-01-30_20-00-50/article.Meta": _springer_xml_with_doi(
+ "10.1007/one"
+ ),
+ "extracted/JHEP/ftp_PUB_25-12-19_20-00-50/other.scoap": _springer_xml_with_doi(
+ "10.1007/two"
+ ),
+ "extracted/JHEP/not-a-drop/ignored.Meta": _springer_xml_with_doi(
+ "10.1007/three"
+ ),
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ result = function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "date_from": "2025-08-15",
+ "date_to": "2026-01-30",
+ },
+ )
+
+ assert len(result) == 2
+ assert "extracted/JHEP/ftp_PUB_26-01-30_20-00-50/article.Meta" in result
+ assert "extracted/JHEP/ftp_PUB_25-12-19_20-00-50/other.scoap" in result
+
+ def test_collect_records_glob_patterns(self):
+ task = self.dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta": _springer_xml_with_doi(
+ "10.1007/one"
+ ),
+ "extracted/JHEP/ftp_PUB_26-03-30_20-00-50/other.scoap": _springer_xml_with_doi(
+ "10.1007/two"
+ ),
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ result_all = function_to_unit_test(
+ repo=mock_repo,
+ params={"file_keys": ["*"]},
+ )
+ assert set(result_all) == set(payload_by_key.keys())
+
+ result_epjc = function_to_unit_test(
+ repo=mock_repo,
+ params={"file_keys": ["EPJC/*"]},
+ )
+ assert result_epjc == ["extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta"]
+
+ def test_collect_records_limit_exceeded_raises(self):
+ task = self.dag.get_task("collect_records")
+ function_to_unit_test = task.python_callable
+
+ payload_by_key = {
+ "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta": _springer_xml_with_doi(
+ "10.1007/one"
+ ),
+ "extracted/JHEP/ftp_PUB_26-03-30_20-00-50/other.scoap": _springer_xml_with_doi(
+ "10.1007/two"
+ ),
+ }
+ mock_repo = self._build_repo(payload_by_key)
+
+ with pytest.raises(ValueError, match="above limit"):
+ function_to_unit_test(
+ repo=mock_repo,
+ params={
+ "file_keys": ["*"],
+ "limit": 1,
+ },
+ )
+
+ def test_prepare_trigger_conf_dry_run_and_normal(self):
+ task = self.dag.get_task("prepare_trigger_conf")
+ function_to_unit_test = task.python_callable
+
+ file_key = "extracted/EPJC/ftp_PUB_26-03-29_20-00-46/article.Meta"
+ xml_bytes = _springer_xml_with_doi("10.1007/one")
+
+ mock_repo = mock.MagicMock()
+ mock_repo.get_by_id.return_value = BytesIO(xml_bytes)
+
+ dry_run_result = function_to_unit_test(
+ file_keys=[file_key],
+ repo=mock_repo,
+ params={"dry_run": True},
+ )
+ assert dry_run_result == []
+
+ normal_result = function_to_unit_test(
+ file_keys=[file_key],
+ repo=mock_repo,
+ params={"dry_run": False},
+ )
+ assert len(normal_result) == 1
+ assert normal_result[0]["file_name"] == file_key
+
+ decoded_xml = base64.b64decode(normal_result[0]["file"]).decode("utf-8")
+ assert "10.1007/one" in decoded_xml