Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dags/aps/aps_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.notification_service import FailedDagNotifier
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article, upload_json_to_s3
from inspire_utils.record import get_value
Expand Down Expand Up @@ -47,6 +48,7 @@ def add_data_availability(parsed_json, parsed_xml):


@dag(
on_failure_callback=FailedDagNotifier(),
schedule=None,
start_date=pendulum.today("UTC").add(days=-1),
tags=["process", "aps"],
Expand Down
2 changes: 2 additions & 0 deletions dags/aps/aps_pull_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
from aps.aps_params import APSParams
from aps.repository import APSRepository
from aps.utils import save_file_in_s3, split_json
from common.notification_service import FailedDagNotifier
from common.utils import set_harvesting_interval

APS_REPO = APSRepository()
logger = logging.getLogger("airflow.task")


@dag(
on_failure_callback=FailedDagNotifier(),
start_date=pendulum.today("UTC").add(days=-1),
schedule="0 */6 * * *",
tags=["pull", "aps"],
Expand Down
2 changes: 2 additions & 0 deletions dags/aps/aps_reharvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sdk import Param, dag, task
from aps.repository import APSRepository
from common.notification_service import FailedDagNotifier

logger = logging.getLogger("airflow.task")
APS_REPO = APSRepository()
Expand Down Expand Up @@ -100,6 +101,7 @@ def _resolve_file_keys(file_keys, all_snapshot_keys):


@dag(
on_failure_callback=FailedDagNotifier(),
start_date=pendulum.today("UTC").add(days=-1),
schedule=None,
catchup=False,
Expand Down
60 changes: 60 additions & 0 deletions dags/common/notification_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
import os

import requests
from airflow.sdk import BaseNotifier

logger = logging.getLogger("airflow.task")


def send_zulip_message(message: str) -> None:
site = os.getenv("ZULIP_SITE", "")
bot_email = os.getenv("ZULIP_BOT_EMAIL", "")
api_key = os.getenv("ZULIP_BOT_API_KEY", "")
stream = os.getenv("ZULIP_STREAM", "")
topic = os.getenv("ZULIP_TOPIC", "")
if not all([site, bot_email, api_key]):
logger.warning(
"Zulip notification skipped: ZULIP_SITE, ZULIP_BOT_EMAIL, or ZULIP_BOT_API_KEY not configured"
)
return

url = f"{site.rstrip('/')}/api/v1/messages"
response = requests.post(
url,
auth=(bot_email, api_key),
data={
"type": "stream",
"to": stream,
"topic": topic,
"content": message,
},
timeout=10,
)
response.raise_for_status()


def dag_failure_callback(context: dict) -> None:
dag = context.get("dag")
dag_id = dag.dag_id if dag else "unknown"
run_id = context.get("run_id", "unknown")

base_url = os.getenv("AIRFLOW_BASE_URL", "localhost:8080")
if not base_url.startswith(("http://", "https://")):
base_url = f"https://{base_url}"
run_url = f"{base_url.rstrip('/')}/dags/{dag_id}/runs/{run_id}/?state=failed"

message = (
f":love_letter: **DAG failed**: `{dag_id}`\n"
f"- **Run ID**: `{run_id}`\n"
f"- **Failed tasks**: [view in Airflow]({run_url})"
)
try:
send_zulip_message(message)
except Exception:
logger.exception("Failed to send Zulip notification for DAG %s", dag_id)


class FailedDagNotifier(BaseNotifier):
def notify(self, context):
dag_failure_callback(context)
2 changes: 2 additions & 0 deletions dags/elsevier/elsevier_file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.notification_service import FailedDagNotifier
from common.scoap3_s3 import Scoap3Repository
from common.utils import (
create_or_update_article,
Expand Down Expand Up @@ -40,6 +41,7 @@ def enrich_elsevier(enhanced_file):


@dag(
on_failure_callback=FailedDagNotifier(),
schedule=None,
start_date=pendulum.today("UTC").add(days=-1),
tags=["process", "elsevier"],
Expand Down
2 changes: 2 additions & 0 deletions dags/elsevier/elsevier_pull_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pendulum
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sdk import dag, task
from common.notification_service import FailedDagNotifier
from common.pull_ftp import migrate_from_ftp as migrate_from_ftp_common
from common.pull_ftp import reprocess_files
from elsevier.repository import ElsevierRepository
Expand All @@ -16,6 +17,7 @@


@dag(
on_failure_callback=FailedDagNotifier(),
start_date=pendulum.today("UTC").add(days=-1),
schedule="5 */6 * * *",
tags=["pull", "elsevier"],
Expand Down
2 changes: 2 additions & 0 deletions dags/hindawi/hindawi_file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.notification_service import FailedDagNotifier
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article, upload_json_to_s3
from hindawi.parser import HindawiParser
Expand All @@ -29,6 +30,7 @@ def enrich_hindawi(enhanced_file):


@dag(
on_failure_callback=FailedDagNotifier(),
schedule=None,
start_date=pendulum.today("UTC").add(days=-1),
tags=["process", "hindawi"],
Expand Down
2 changes: 2 additions & 0 deletions dags/hindawi/hindawi_pull_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pendulum
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sdk import dag, task
from common.notification_service import FailedDagNotifier
from common.utils import set_harvesting_interval
from hindawi.hindawi_api_client import HindawiApiClient
from hindawi.hindawi_params import HindawiParams
Expand All @@ -15,6 +16,7 @@


@dag(
on_failure_callback=FailedDagNotifier(),
start_date=pendulum.today("UTC").add(days=-1),
schedule="15 */6 * * *",
tags=["pull", "hindawi"],
Expand Down
2 changes: 2 additions & 0 deletions dags/iop/iop_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.notification_service import FailedDagNotifier
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article, upload_json_to_s3
from inspire_utils.record import get_value
Expand Down Expand Up @@ -66,6 +67,7 @@ def iop_enrich_file(enhanced_file):


@dag(
on_failure_callback=FailedDagNotifier(),
schedule=None,
start_date=pendulum.today("UTC").add(days=-1),
tags=["process", "iop"],
Expand Down
2 changes: 2 additions & 0 deletions dags/iop/iop_pull_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pendulum
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sdk import dag, task
from common.notification_service import FailedDagNotifier
from iop.repository import IOPRepository
from iop.sftp_service import IOPSFTPService

Expand All @@ -14,6 +15,7 @@


@dag(
on_failure_callback=FailedDagNotifier(),
start_date=pendulum.today("UTC").add(days=-1),
schedule="25 */6 * * *",
tags=["pull", "iop"],
Expand Down
2 changes: 2 additions & 0 deletions dags/iop/iop_reharvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pendulum
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sdk import Param, dag, task
from common.notification_service import FailedDagNotifier
from common.utils import parse_without_names_spaces
from iop.repository import IOPRepository

Expand Down Expand Up @@ -122,6 +123,7 @@ def _enforce_limit(records, limit):


@dag(
on_failure_callback=FailedDagNotifier(),
start_date=pendulum.today("UTC").add(days=-1),
schedule=None,
catchup=False,
Expand Down
2 changes: 2 additions & 0 deletions dags/jagiellonian/jagiellonian_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from airflow.sdk import dag, task
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.notification_service import FailedDagNotifier
from common.utils import create_or_update_article
from jagiellonian.parser import JagiellonianParser

Expand All @@ -28,6 +29,7 @@ def update_filename_extension(filename, type):


@dag(
on_failure_callback=FailedDagNotifier(),
schedule=None,
start_date=pendulum.today("UTC").add(days=-1),
tags=["process", "jagiellonian"],
Expand Down
2 changes: 2 additions & 0 deletions dags/jagiellonian/jagiellonian_pull_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sdk import dag, task
from common.notification_service import FailedDagNotifier

logger = logging.getLogger("airflow.task")

Expand Down Expand Up @@ -42,6 +43,7 @@ def get_latest_s3_file():


@dag(
on_failure_callback=FailedDagNotifier(),
default_args=default_args,
description="Transfer Crossref journal data to S3",
schedule="35 */6 * * *",
Expand Down
2 changes: 2 additions & 0 deletions dags/log_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

from airflow.sdk import dag, task
from airflow.sdk.definitions.param import Param
from common.notification_service import FailedDagNotifier

logger = logging.getLogger("airflow.task")


@dag(
on_failure_callback=FailedDagNotifier(),
schedule="0 */12 * * *",
catchup=False,
tags=["service", "log_cleanup"],
Expand Down
2 changes: 2 additions & 0 deletions dags/oup/oup_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.notification_service import FailedDagNotifier
from common.scoap3_s3 import Scoap3Repository
from common.utils import (
create_or_update_article,
Expand Down Expand Up @@ -50,6 +51,7 @@ def oup_validate_record(enriched_file):


@dag(
on_failure_callback=FailedDagNotifier(),
schedule=None,
tags=["process", "oup"],
start_date=pendulum.today("UTC").add(days=-1),
Expand Down
2 changes: 2 additions & 0 deletions dags/oup/oup_pull_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pendulum
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sdk import dag, task
from common.notification_service import FailedDagNotifier
from oup.ftp_service import OUPSFTPService
from oup.repository import OUPRepository

Expand All @@ -14,6 +15,7 @@


@dag(
on_failure_callback=FailedDagNotifier(),
start_date=pendulum.today("UTC").add(days=-1),
schedule="45 */6 * * *",
tags=["pull", "oup"],
Expand Down
2 changes: 2 additions & 0 deletions dags/oup/oup_reharvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pendulum
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sdk import Param, dag, task
from common.notification_service import FailedDagNotifier
from common.utils import parse_without_names_spaces
from oup.repository import OUPRepository

Expand Down Expand Up @@ -151,6 +152,7 @@ def _enforce_limit(records, limit):


@dag(
on_failure_callback=FailedDagNotifier(),
start_date=pendulum.today("UTC").add(days=-1),
schedule=None,
catchup=False,
Expand Down
2 changes: 2 additions & 0 deletions dags/springer/springer_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.notification_service import FailedDagNotifier
from common.scoap3_s3 import Scoap3Repository
from common.utils import (
create_or_update_article,
Expand Down Expand Up @@ -101,6 +102,7 @@ def springer_validate_record(enriched_file):


@dag(
on_failure_callback=FailedDagNotifier(),
schedule=None,
tags=["process", "springer"],
start_date=pendulum.today("UTC").add(days=-1),
Expand Down
2 changes: 2 additions & 0 deletions dags/springer/springer_pull_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pendulum
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sdk import dag, task
from common.notification_service import FailedDagNotifier
from springer.repository import SpringerRepository
from springer.sftp_service import SpringerSFTPService

Expand All @@ -14,6 +15,7 @@


@dag(
on_failure_callback=FailedDagNotifier(),
start_date=pendulum.today("UTC").add(days=-1),
schedule="55 */6 * * *",
tags=["pull", "springer"],
Expand Down
Loading
Loading