diff --git a/dags/aps/aps_process_file.py b/dags/aps/aps_process_file.py index 5e7f5f8b..cdc6ad61 100644 --- a/dags/aps/aps_process_file.py +++ b/dags/aps/aps_process_file.py @@ -9,6 +9,7 @@ from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask +from common.notification_service import FailedDagNotifier from common.scoap3_s3 import Scoap3Repository from common.utils import create_or_update_article, upload_json_to_s3 from inspire_utils.record import get_value @@ -47,6 +48,7 @@ def add_data_availability(parsed_json, parsed_xml): @dag( + on_failure_callback=FailedDagNotifier(), schedule=None, start_date=pendulum.today("UTC").add(days=-1), tags=["process", "aps"], diff --git a/dags/aps/aps_pull_api.py b/dags/aps/aps_pull_api.py index 889d4497..a197b303 100644 --- a/dags/aps/aps_pull_api.py +++ b/dags/aps/aps_pull_api.py @@ -9,6 +9,7 @@ from aps.aps_params import APSParams from aps.repository import APSRepository from aps.utils import save_file_in_s3, split_json +from common.notification_service import FailedDagNotifier from common.utils import set_harvesting_interval APS_REPO = APSRepository() @@ -16,6 +17,7 @@ @dag( + on_failure_callback=FailedDagNotifier(), start_date=pendulum.today("UTC").add(days=-1), schedule="0 */6 * * *", tags=["pull", "aps"], diff --git a/dags/aps/aps_reharvest.py b/dags/aps/aps_reharvest.py index f38485c3..d7fedc28 100644 --- a/dags/aps/aps_reharvest.py +++ b/dags/aps/aps_reharvest.py @@ -7,6 +7,7 @@ from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sdk import Param, dag, task from aps.repository import APSRepository +from common.notification_service import FailedDagNotifier logger = logging.getLogger("airflow.task") APS_REPO = APSRepository() @@ -100,6 +101,7 @@ def _resolve_file_keys(file_keys, all_snapshot_keys): @dag( + on_failure_callback=FailedDagNotifier(), start_date=pendulum.today("UTC").add(days=-1), schedule=None, catchup=False, diff --git a/dags/common/notification_service.py b/dags/common/notification_service.py new file mode 100644 index 00000000..9ea3bae4 --- /dev/null +++ b/dags/common/notification_service.py @@ -0,0 +1,60 @@ +import logging +import os + +import requests +from airflow.sdk import BaseNotifier + +logger = logging.getLogger("airflow.task") + + +def send_zulip_message(message: str) -> None: + site = os.getenv("ZULIP_SITE", "") + bot_email = os.getenv("ZULIP_BOT_EMAIL", "") + api_key = os.getenv("ZULIP_BOT_API_KEY", "") + stream = os.getenv("ZULIP_STREAM", "") + topic = os.getenv("ZULIP_TOPIC", "") + if not all([site, bot_email, api_key]): + logger.warning( + "Zulip notification skipped: ZULIP_SITE, ZULIP_BOT_EMAIL, or ZULIP_BOT_API_KEY not configured" + ) + return + + url = f"{site.rstrip('/')}/api/v1/messages" + response = requests.post( + url, + auth=(bot_email, api_key), + data={ + "type": "stream", + "to": stream, + "topic": topic, + "content": message, + }, + timeout=10, + ) + response.raise_for_status() + + +def dag_failure_callback(context: dict) -> None: + dag = context.get("dag") + dag_id = dag.dag_id if dag else "unknown" + run_id = context.get("run_id", "unknown") + + base_url = os.getenv("AIRFLOW_BASE_URL", "localhost:8080") + if not base_url.startswith(("http://", "https://")): + base_url = f"https://{base_url}" + run_url = f"{base_url.rstrip('/')}/dags/{dag_id}/runs/{run_id}/?state=failed" + + message = ( + f":love_letter: **DAG failed**: `{dag_id}`\n" + f"- **Run ID**: `{run_id}`\n" + f"- **Failed tasks**: [view in Airflow]({run_url})" + ) + try: + send_zulip_message(message) + except Exception: + logger.exception("Failed to send Zulip notification for DAG %s", dag_id) + + +class FailedDagNotifier(BaseNotifier): + def notify(self, context): + dag_failure_callback(context) diff --git a/dags/elsevier/elsevier_file_processing.py b/dags/elsevier/elsevier_file_processing.py index 902971d8..507c24f6 100644 --- a/dags/elsevier/elsevier_file_processing.py +++ b/dags/elsevier/elsevier_file_processing.py @@ -5,6 +5,7 @@ from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask +from common.notification_service import FailedDagNotifier from common.scoap3_s3 import Scoap3Repository from common.utils import ( create_or_update_article, @@ -40,6 +41,7 @@ def enrich_elsevier(enhanced_file): @dag( + on_failure_callback=FailedDagNotifier(), schedule=None, start_date=pendulum.today("UTC").add(days=-1), tags=["process", "elsevier"], diff --git a/dags/elsevier/elsevier_pull_sftp.py b/dags/elsevier/elsevier_pull_sftp.py index 7582c7d6..80dd1b51 100644 --- a/dags/elsevier/elsevier_pull_sftp.py +++ b/dags/elsevier/elsevier_pull_sftp.py @@ -3,6 +3,7 @@ import pendulum from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sdk import dag, task +from common.notification_service import FailedDagNotifier from common.pull_ftp import migrate_from_ftp as migrate_from_ftp_common from common.pull_ftp import reprocess_files from elsevier.repository import ElsevierRepository @@ -16,6 +17,7 @@ @dag( + on_failure_callback=FailedDagNotifier(), start_date=pendulum.today("UTC").add(days=-1), schedule="5 */6 * * *", tags=["pull", "elsevier"], diff --git a/dags/hindawi/hindawi_file_processing.py b/dags/hindawi/hindawi_file_processing.py index 0011e618..ea12dcc6 100644 --- a/dags/hindawi/hindawi_file_processing.py +++ b/dags/hindawi/hindawi_file_processing.py @@ -6,6 +6,7 @@ from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask +from common.notification_service import FailedDagNotifier from common.scoap3_s3 import Scoap3Repository from common.utils import create_or_update_article, upload_json_to_s3 from hindawi.parser import HindawiParser @@ -29,6 +30,7 @@ def enrich_hindawi(enhanced_file): @dag( + on_failure_callback=FailedDagNotifier(), schedule=None, start_date=pendulum.today("UTC").add(days=-1), tags=["process", "hindawi"], diff --git a/dags/hindawi/hindawi_pull_api.py b/dags/hindawi/hindawi_pull_api.py index fa78d398..086df08a 100644 --- a/dags/hindawi/hindawi_pull_api.py +++ b/dags/hindawi/hindawi_pull_api.py @@ -4,6 +4,7 @@ import pendulum from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sdk import dag, task +from common.notification_service import FailedDagNotifier from common.utils import set_harvesting_interval from hindawi.hindawi_api_client import HindawiApiClient from hindawi.hindawi_params import HindawiParams @@ -15,6 +16,7 @@ @dag( + on_failure_callback=FailedDagNotifier(), start_date=pendulum.today("UTC").add(days=-1), schedule="15 */6 * * *", tags=["pull", "hindawi"], diff --git a/dags/iop/iop_process_file.py b/dags/iop/iop_process_file.py index fe795e98..9dabbfd3 100644 --- a/dags/iop/iop_process_file.py +++ b/dags/iop/iop_process_file.py @@ -12,6 +12,7 @@ from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask +from common.notification_service import FailedDagNotifier from common.scoap3_s3 import Scoap3Repository from common.utils import create_or_update_article, upload_json_to_s3 from inspire_utils.record import get_value @@ -66,6 +67,7 @@ def iop_enrich_file(enhanced_file): @dag( + on_failure_callback=FailedDagNotifier(), schedule=None, start_date=pendulum.today("UTC").add(days=-1), tags=["process", "iop"], diff --git a/dags/iop/iop_pull_sftp.py b/dags/iop/iop_pull_sftp.py index 06f26613..f7908727 100644 --- a/dags/iop/iop_pull_sftp.py +++ b/dags/iop/iop_pull_sftp.py @@ -4,6 +4,7 @@ import pendulum from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sdk import dag, task +from common.notification_service import FailedDagNotifier from iop.repository import IOPRepository from iop.sftp_service import IOPSFTPService @@ -14,6 +15,7 @@ @dag( + on_failure_callback=FailedDagNotifier(), start_date=pendulum.today("UTC").add(days=-1), schedule="25 */6 * * *", tags=["pull", "iop"], diff --git a/dags/iop/iop_reharvest.py b/dags/iop/iop_reharvest.py index cfeea3a3..794523a2 100644 --- a/dags/iop/iop_reharvest.py +++ b/dags/iop/iop_reharvest.py @@ -6,6 +6,7 @@ import pendulum from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sdk import Param, dag, task +from common.notification_service import FailedDagNotifier from common.utils import parse_without_names_spaces from iop.repository import IOPRepository @@ -122,6 +123,7 @@ def _enforce_limit(records, limit): @dag( + on_failure_callback=FailedDagNotifier(), start_date=pendulum.today("UTC").add(days=-1), schedule=None, catchup=False, diff --git a/dags/jagiellonian/jagiellonian_process_file.py b/dags/jagiellonian/jagiellonian_process_file.py index 241ce7c0..8b53720d 100644 --- a/dags/jagiellonian/jagiellonian_process_file.py +++ b/dags/jagiellonian/jagiellonian_process_file.py @@ -9,6 +9,7 @@ from airflow.sdk import dag, task from common.enhancer import Enhancer from common.enricher import Enricher +from common.notification_service import FailedDagNotifier from common.utils import create_or_update_article from jagiellonian.parser import JagiellonianParser @@ -28,6 +29,7 @@ def update_filename_extension(filename, type): @dag( + on_failure_callback=FailedDagNotifier(), schedule=None, start_date=pendulum.today("UTC").add(days=-1), tags=["process", "jagiellonian"], diff --git a/dags/jagiellonian/jagiellonian_pull_api.py b/dags/jagiellonian/jagiellonian_pull_api.py index 8be4c98d..88c8f598 100644 --- a/dags/jagiellonian/jagiellonian_pull_api.py +++ b/dags/jagiellonian/jagiellonian_pull_api.py @@ -7,6 +7,7 @@ from airflow.providers.http.hooks.http import HttpHook from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sdk import dag, task +from common.notification_service import FailedDagNotifier logger = logging.getLogger("airflow.task") @@ -42,6 +43,7 @@ def get_latest_s3_file(): @dag( + on_failure_callback=FailedDagNotifier(), default_args=default_args, description="Transfer Crossref journal data to S3", schedule="35 */6 * * *", diff --git a/dags/log_cleanup.py b/dags/log_cleanup.py index 9e728a0e..491ff88c 100644 --- a/dags/log_cleanup.py +++ b/dags/log_cleanup.py @@ -5,11 +5,13 @@ from airflow.sdk import dag, task from airflow.sdk.definitions.param import Param +from common.notification_service import FailedDagNotifier logger = logging.getLogger("airflow.task") @dag( + on_failure_callback=FailedDagNotifier(), schedule="0 */12 * * *", catchup=False, tags=["service", "log_cleanup"], diff --git a/dags/oup/oup_process_file.py b/dags/oup/oup_process_file.py index 5b5a2a50..6c75071e 100644 --- a/dags/oup/oup_process_file.py +++ b/dags/oup/oup_process_file.py @@ -7,6 +7,7 @@ from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask +from common.notification_service import FailedDagNotifier from common.scoap3_s3 import Scoap3Repository from common.utils import ( create_or_update_article, @@ -50,6 +51,7 @@ def oup_validate_record(enriched_file): @dag( + on_failure_callback=FailedDagNotifier(), schedule=None, tags=["process", "oup"], start_date=pendulum.today("UTC").add(days=-1), diff --git a/dags/oup/oup_pull_ftp.py b/dags/oup/oup_pull_ftp.py index ea649050..12966705 100644 --- a/dags/oup/oup_pull_ftp.py +++ b/dags/oup/oup_pull_ftp.py @@ -4,6 +4,7 @@ import pendulum from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sdk import dag, task +from common.notification_service import FailedDagNotifier from oup.ftp_service import OUPSFTPService from oup.repository import OUPRepository @@ -14,6 +15,7 @@ @dag( + on_failure_callback=FailedDagNotifier(), start_date=pendulum.today("UTC").add(days=-1), schedule="45 */6 * * *", tags=["pull", "oup"], diff --git a/dags/oup/oup_reharvest.py b/dags/oup/oup_reharvest.py index 61fe1347..90b28652 100644 --- a/dags/oup/oup_reharvest.py +++ b/dags/oup/oup_reharvest.py @@ -7,6 +7,7 @@ import pendulum from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sdk import Param, dag, task +from common.notification_service import FailedDagNotifier from common.utils import parse_without_names_spaces from oup.repository import OUPRepository @@ -151,6 +152,7 @@ def _enforce_limit(records, limit): @dag( + on_failure_callback=FailedDagNotifier(), start_date=pendulum.today("UTC").add(days=-1), schedule=None, catchup=False, diff --git a/dags/springer/springer_process_file.py b/dags/springer/springer_process_file.py index 02685ac1..5c1dca12 100644 --- a/dags/springer/springer_process_file.py +++ b/dags/springer/springer_process_file.py @@ -18,6 +18,7 @@ from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask +from common.notification_service import FailedDagNotifier from common.scoap3_s3 import Scoap3Repository from common.utils import ( create_or_update_article, @@ -101,6 +102,7 @@ def springer_validate_record(enriched_file): @dag( + on_failure_callback=FailedDagNotifier(), schedule=None, tags=["process", "springer"], start_date=pendulum.today("UTC").add(days=-1), diff --git a/dags/springer/springer_pull_sftp.py b/dags/springer/springer_pull_sftp.py index d605c9cb..788b25a3 100644 --- a/dags/springer/springer_pull_sftp.py +++ b/dags/springer/springer_pull_sftp.py @@ -4,6 +4,7 @@ import pendulum from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sdk import dag, task +from common.notification_service import FailedDagNotifier from springer.repository import SpringerRepository from springer.sftp_service import SpringerSFTPService @@ -14,6 +15,7 @@ @dag( + on_failure_callback=FailedDagNotifier(), start_date=pendulum.today("UTC").add(days=-1), schedule="55 */6 * * *", tags=["pull", "springer"], diff --git a/tests/units/common/test_notification_service.py b/tests/units/common/test_notification_service.py new file mode 100644 index 00000000..9cf7bd8e --- /dev/null +++ b/tests/units/common/test_notification_service.py @@ -0,0 +1,141 @@ +from unittest.mock import MagicMock, patch + +import pytest +import requests +from common.notification_service import ( + FailedDagNotifier, + dag_failure_callback, + send_zulip_message, +) + + +@pytest.fixture +def zulip_env(monkeypatch): + monkeypatch.setenv("ZULIP_SITE", "https://zulip.example.com") + monkeypatch.setenv("ZULIP_BOT_EMAIL", "bot@example.com") + monkeypatch.setenv("ZULIP_BOT_API_KEY", "secret") + monkeypatch.setenv("ZULIP_STREAM", "alerts") + monkeypatch.setenv("ZULIP_TOPIC", "DAG failures") + + +class TestSendZulipMessage: + def test_sends_post_to_zulip_api(self, zulip_env): + with patch("common.notification_service.requests.post") as mock_post: + mock_post.return_value = MagicMock(status_code=200) + send_zulip_message("test message") + + mock_post.assert_called_once_with( + "https://zulip.example.com/api/v1/messages", + auth=("bot@example.com", "secret"), + data={ + "type": "stream", + "to": "alerts", + "topic": "DAG failures", + "content": "test message", + }, + timeout=10, + ) + + def test_skips_when_site_empty(self, monkeypatch): + monkeypatch.setenv("ZULIP_SITE", "") + monkeypatch.setenv("ZULIP_BOT_EMAIL", "bot@example.com") + monkeypatch.setenv("ZULIP_BOT_API_KEY", "secret") + + with patch("common.notification_service.requests.post") as mock_post: + send_zulip_message("test message") + + mock_post.assert_not_called() + + def test_skips_when_credentials_empty(self, monkeypatch): + monkeypatch.setenv("ZULIP_SITE", "https://zulip.example.com") + monkeypatch.setenv("ZULIP_BOT_EMAIL", "") + monkeypatch.setenv("ZULIP_BOT_API_KEY", "") + + with patch("common.notification_service.requests.post") as mock_post: + send_zulip_message("test message") + + mock_post.assert_not_called() + + def test_raises_on_http_error(self, zulip_env): + with patch("common.notification_service.requests.post") as mock_post: + mock_post.return_value = MagicMock( + status_code=403, + raise_for_status=MagicMock(side_effect=requests.HTTPError("403")), + ) + with pytest.raises(requests.HTTPError): + send_zulip_message("test message") + + +class TestDagFailureCallback: + def _make_context(self, dag_id="test_dag", run_id="run_123"): + dag = MagicMock() + dag.dag_id = dag_id + return {"dag": dag, "run_id": run_id} + + def test_sends_notification_with_dag_info(self, zulip_env, monkeypatch): + monkeypatch.setenv("AIRFLOW_BASE_URL", "http://airflow.example.com") + context = self._make_context(dag_id="aps_pull_api") + + with patch("common.notification_service.requests.post") as mock_post: + mock_post.return_value = MagicMock(status_code=200) + dag_failure_callback(context) + + mock_post.assert_called_once() + message = mock_post.call_args[1]["data"]["content"] + assert "aps_pull_api" in message + assert "run_123" in message + assert ( + "http://airflow.example.com/dags/aps_pull_api/runs/run_123/?state=failed" + in message + ) + + def test_base_url_without_scheme(self, zulip_env, monkeypatch): + monkeypatch.setenv("AIRFLOW_BASE_URL", "scoap3-dev.siscern.org") + context = self._make_context(dag_id="aps_pull_api") + + with patch("common.notification_service.requests.post") as mock_post: + mock_post.return_value = MagicMock(status_code=200) + dag_failure_callback(context) + + message = mock_post.call_args[1]["data"]["content"] + assert ( + "https://scoap3-dev.siscern.org/dags/aps_pull_api/runs/run_123/?state=failed" + in message + ) + + def test_does_not_raise_when_zulip_fails(self, zulip_env): + context = self._make_context() + + with patch( + "common.notification_service.requests.post", + side_effect=Exception("network error"), + ): + dag_failure_callback(context) + + +class TestFailedDagNotifier: + def _make_context(self, dag_id="test_dag", run_id="run_123"): + dag = MagicMock() + dag.dag_id = dag_id + return {"dag": dag, "run_id": run_id} + + def test_notify_sends_zulip_message(self, zulip_env, monkeypatch): + monkeypatch.setenv("AIRFLOW_BASE_URL", "http://airflow.example.com") + context = self._make_context(dag_id="aps_pull_api") + + with patch("common.notification_service.requests.post") as mock_post: + mock_post.return_value = MagicMock(status_code=200) + FailedDagNotifier().notify(context) + + mock_post.assert_called_once() + message = mock_post.call_args[1]["data"]["content"] + assert "aps_pull_api" in message + + def test_notify_does_not_raise_when_zulip_fails(self, zulip_env): + context = self._make_context() + + with patch( + "common.notification_service.requests.post", + side_effect=Exception("network error"), + ): + FailedDagNotifier().notify(context)