Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions nerve/cron/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,73 @@ def _parse_interval(interval: str) -> int:
return total or 7200 # Default 2h


# Unix crontab day-of-week numbering is 0=Sun..6=Sat (7 also means Sun).
# APScheduler's numeric day_of_week is 0=Mon..6=Sun, and CronTrigger.from_crontab
# does NOT remap, so a numeric DOW like "1" (Unix Monday) gets read as APScheduler
# 1 = Tuesday, i.e. every numeric-DOW cron fires one weekday late. APScheduler does
# accept unambiguous three-letter day names, so we translate the numbers to names.
_UNIX_DOW_TO_NAME = {
0: "sun", 1: "mon", 2: "tue", 3: "wed",
4: "thu", 5: "fri", 6: "sat", 7: "sun",
}


def _remap_dow_value(value: str) -> str:
"""Map a single Unix DOW number to an APScheduler day name.

Non-numeric atoms (already a name like ``mon``, or ``*``) and numbers
outside 0-7 pass through unchanged so APScheduler can validate them.
"""
v = value.strip()
if v.isdigit() and int(v) in _UNIX_DOW_TO_NAME:
return _UNIX_DOW_TO_NAME[int(v)]
return v


def _remap_dow_atom(atom: str) -> str:
"""Remap one comma-separated DOW atom, preserving range and step syntax.

Handles ``*``, single values (``1``), ranges (``1-5``), and any of those
with a step suffix (``*/2``, ``1-5/2``). Only the numeric components are
translated; everything else is left intact.
"""
base, sep, step = atom.partition("/")
if base in ("*", ""):
remapped = base
elif "-" in base:
lo, _, hi = base.partition("-")
remapped = f"{_remap_dow_value(lo)}-{_remap_dow_value(hi)}"
else:
remapped = _remap_dow_value(base)
return f"{remapped}{sep}{step}" if sep else remapped


def _crontab_to_trigger(schedule: str) -> CronTrigger:
"""Build a CronTrigger from a 5-field crontab string with Unix DOW semantics.

Drop-in replacement for ``CronTrigger.from_crontab`` that fixes the
day-of-week off-by-one (see ``_UNIX_DOW_TO_NAME``). Only the DOW field is
treated differently; the other four fields and the no-explicit-timezone
behaviour are identical to ``from_crontab``. Raises ``ValueError`` for
anything that is not a 5-field expression, so interval strings like ``4h``
keep falling through to the IntervalTrigger path.
"""
fields = schedule.split()
if len(fields) != 5:
raise ValueError(f"Not a 5-field crontab expression: {schedule!r}")
minute, hour, day, month, day_of_week = fields
remapped_dow = ",".join(
_remap_dow_atom(atom) for atom in day_of_week.split(",")
)
return CronTrigger(
minute=minute,
hour=hour,
day=day,
month=month,
day_of_week=remapped_dow,
)


def _parse_timestamp(ts: str) -> datetime:
"""Parse a UTC timestamp string from the database into an aware datetime."""
if "T" not in ts:
Expand Down Expand Up @@ -119,7 +186,7 @@ async def start(self) -> None:
schedule_str = getattr(source_config, "schedule", "*/15 * * * *")

try:
trigger = CronTrigger.from_crontab(schedule_str)
trigger = _crontab_to_trigger(schedule_str)
except ValueError:
seconds = _parse_interval(schedule_str)
trigger = IntervalTrigger(seconds=seconds)
Expand Down Expand Up @@ -178,7 +245,7 @@ async def _make_trigger(self, job: CronJob) -> CronTrigger | IntervalTrigger:
the cadence survives restarts (persistent timer).
"""
try:
return CronTrigger.from_crontab(job.schedule)
return _crontab_to_trigger(job.schedule)
except ValueError:
pass

Expand Down Expand Up @@ -228,7 +295,7 @@ async def _catchup_missed_jobs(self) -> None:
def _is_overdue(job: CronJob, last_run: datetime, now: datetime) -> bool:
"""Check if a job should have fired between *last_run* and *now*."""
try:
trigger = CronTrigger.from_crontab(job.schedule)
trigger = _crontab_to_trigger(job.schedule)
next_fire = trigger.get_next_fire_time(last_run, last_run)
return next_fire is not None and next_fire < now
except ValueError:
Expand Down
96 changes: 95 additions & 1 deletion tests/test_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
import pytest_asyncio

from nerve.cron.jobs import CronJob
from nerve.cron.service import CronService, _parse_interval, _parse_timestamp
from nerve.cron.service import (
CronService,
_crontab_to_trigger,
_parse_interval,
_parse_timestamp,
)


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -118,6 +123,82 @@ def test_default_on_garbage(self):
assert _parse_interval("???") == 7200


# ---------------------------------------------------------------------------
# _crontab_to_trigger: Unix day-of-week semantics
# ---------------------------------------------------------------------------

# A Saturday, so "next fire" lands on a distinct weekday for any DOW value.
_DOW_BASE = datetime(2026, 6, 20, 0, 0, tzinfo=timezone.utc)


def _fire_weekdays(schedule: str) -> set[str]:
"""Collect the weekday abbreviations a crontab fires on within one week."""
trigger = _crontab_to_trigger(schedule)
end = _DOW_BASE + timedelta(days=8)
days: set[str] = set()
prev = None
cur = _DOW_BASE
while True:
fire = trigger.get_next_fire_time(prev, cur)
if fire is None or fire > end:
break
days.add(fire.strftime("%a"))
prev = fire
# Jump to the start of the next day so per-minute schedules don't loop.
cur = fire.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
return days


class TestCrontabToTrigger:
def test_numeric_monday_fires_monday(self):
"""The bug: Unix DOW 1 (Monday) was firing Tuesday via from_crontab."""
fire = _crontab_to_trigger("0 13 * * 1").get_next_fire_time(None, _DOW_BASE)
assert fire.strftime("%A") == "Monday"
assert (fire.hour, fire.minute) == (13, 0)

def test_numeric_sunday_fires_sunday(self):
fire = _crontab_to_trigger("0 13 * * 0").get_next_fire_time(None, _DOW_BASE)
assert fire.strftime("%A") == "Sunday"

def test_seven_also_means_sunday(self):
fire = _crontab_to_trigger("0 13 * * 7").get_next_fire_time(None, _DOW_BASE)
assert fire.strftime("%A") == "Sunday"

def test_range_weekdays_only(self):
assert _fire_weekdays("* * * * 1-5") == {"Mon", "Tue", "Wed", "Thu", "Fri"}

def test_list_monday_and_thursday(self):
assert _fire_weekdays("0 9 * * 1,4") == {"Mon", "Thu"}

def test_star_dow_fires_every_day(self):
assert _fire_weekdays("0 9 * * *") == {
"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun",
}

def test_day_name_passthrough(self):
"""Already-named DOW values are left intact (and stay correct)."""
fire = _crontab_to_trigger("0 13 * * mon").get_next_fire_time(None, _DOW_BASE)
assert fire.strftime("%A") == "Monday"

@pytest.mark.parametrize(
"schedule",
["0 5 * * *", "*/30 * * * *", "0 */4 * * *", "17 */4 * * *", "13 13 * * *"],
)
def test_non_dow_fields_match_from_crontab(self, schedule):
"""Schedules without a numeric DOW behave exactly like from_crontab."""
from apscheduler.triggers.cron import CronTrigger

ours = _crontab_to_trigger(schedule).get_next_fire_time(None, _DOW_BASE)
ref = CronTrigger.from_crontab(schedule).get_next_fire_time(None, _DOW_BASE)
assert ours == ref

@pytest.mark.parametrize("schedule", ["4h", "30m", "1h30m", "???", ""])
def test_non_crontab_raises_value_error(self, schedule):
"""Interval strings must still raise so the IntervalTrigger path runs."""
with pytest.raises(ValueError):
_crontab_to_trigger(schedule)


# ---------------------------------------------------------------------------
# _is_overdue
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -157,6 +238,19 @@ def test_interval_multiple_missed(self):
last_run = _utc_now() - timedelta(hours=10)
assert CronService._is_overdue(job, last_run, _utc_now()) is True

def test_weekly_overdue_after_exactly_one_week(self):
"""A weekly Monday job is overdue one week later, not 6 or 8 days."""
job = _make_job(schedule="0 13 * * 1") # Mondays 13:00 UTC
last_run = datetime(2026, 6, 15, 13, 0, tzinfo=timezone.utc) # a Monday
# Six days later (Sunday): the next Monday fire has not arrived yet.
assert CronService._is_overdue(
job, last_run, last_run + timedelta(days=6),
) is False
# Just past the next Monday fire, so now overdue.
assert CronService._is_overdue(
job, last_run, last_run + timedelta(days=7, minutes=1),
) is True


# ---------------------------------------------------------------------------
# _make_trigger (interval alignment)
Expand Down