From ac944fb8c8478972bc30e3bd48a89a848d88f365 Mon Sep 17 00:00:00 2001 From: Alex Fedotyev <61838744+alex-fedotyev@users.noreply.github.com> Date: Mon, 22 Jun 2026 16:53:57 +0000 Subject: [PATCH 1/2] cron: fix day-of-week off-by-one in crontab schedules APScheduler's numeric day_of_week is 0=Mon..6=Sun, while Unix crontab is 0=Sun..6=Sat (7 also means Sun). CronTrigger.from_crontab passes the number straight through without remapping, so every numeric day-of-week schedule fired one weekday late: "0 13 * * 1" (Monday) actually ran on Tuesday, and "0 3 * * 0" (Sunday) ran on Monday. Add _crontab_to_trigger, a from_crontab replacement that translates the day-of-week field to APScheduler's day-name aliases (preserving *, ranges like 1-5, lists like 1,4, and step suffixes) and leaves the other four fields and the timezone handling unchanged. Route the three trigger-construction sites (job scheduling, source scheduling, and the overdue check) through it. Tests cover the day-of-week mapping, parity with from_crontab for non-DOW schedules, the interval-string fallback, and a weekly _is_overdue case that is due after exactly one week. After a restart, every numeric day-of-week cron shifts to the day its schedule already specifies; that is the intended correction. --- nerve/cron/service.py | 73 ++++++++++++++++++++++++++++++-- tests/test_cron.py | 96 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 165 insertions(+), 4 deletions(-) diff --git a/nerve/cron/service.py b/nerve/cron/service.py index f8dfd02..a012a2d 100644 --- a/nerve/cron/service.py +++ b/nerve/cron/service.py @@ -59,6 +59,73 @@ def _parse_interval(interval: str) -> int: return total or 7200 # Default 2h +# Unix crontab day-of-week numbering is 0=Sun..6=Sat (7 also means Sun). +# APScheduler's numeric day_of_week is 0=Mon..6=Sun, and CronTrigger.from_crontab +# does NOT remap, so a numeric DOW like "1" (Unix Monday) gets read as APScheduler +# 1 = Tuesday, i.e. every numeric-DOW cron fires one weekday late. APScheduler does +# accept unambiguous three-letter day names, so we translate the numbers to names. +_UNIX_DOW_TO_NAME = { + 0: "sun", 1: "mon", 2: "tue", 3: "wed", + 4: "thu", 5: "fri", 6: "sat", 7: "sun", +} + + +def _remap_dow_value(value: str) -> str: + """Map a single Unix DOW number to an APScheduler day name. + + Non-numeric atoms (already a name like ``mon``, or ``*``) and numbers + outside 0-7 pass through unchanged so APScheduler can validate them. + """ + v = value.strip() + if v.isdigit() and int(v) in _UNIX_DOW_TO_NAME: + return _UNIX_DOW_TO_NAME[int(v)] + return v + + +def _remap_dow_atom(atom: str) -> str: + """Remap one comma-separated DOW atom, preserving range and step syntax. + + Handles ``*``, single values (``1``), ranges (``1-5``), and any of those + with a step suffix (``*/2``, ``1-5/2``). Only the numeric components are + translated; everything else is left intact. + """ + base, sep, step = atom.partition("/") + if base in ("*", ""): + remapped = base + elif "-" in base: + lo, _, hi = base.partition("-") + remapped = f"{_remap_dow_value(lo)}-{_remap_dow_value(hi)}" + else: + remapped = _remap_dow_value(base) + return f"{remapped}{sep}{step}" if sep else remapped + + +def _crontab_to_trigger(schedule: str) -> CronTrigger: + """Build a CronTrigger from a 5-field crontab string with Unix DOW semantics. + + Drop-in replacement for ``CronTrigger.from_crontab`` that fixes the + day-of-week off-by-one (see ``_UNIX_DOW_TO_NAME``). Only the DOW field is + treated differently; the other four fields and the no-explicit-timezone + behaviour are identical to ``from_crontab``. Raises ``ValueError`` for + anything that is not a 5-field expression, so interval strings like ``4h`` + keep falling through to the IntervalTrigger path. + """ + fields = schedule.split() + if len(fields) != 5: + raise ValueError(f"Not a 5-field crontab expression: {schedule!r}") + minute, hour, day, month, day_of_week = fields + remapped_dow = ",".join( + _remap_dow_atom(atom) for atom in day_of_week.split(",") + ) + return CronTrigger( + minute=minute, + hour=hour, + day=day, + month=month, + day_of_week=remapped_dow, + ) + + def _parse_timestamp(ts: str) -> datetime: """Parse a UTC timestamp string from the database into an aware datetime.""" if "T" not in ts: @@ -119,7 +186,7 @@ async def start(self) -> None: schedule_str = getattr(source_config, "schedule", "*/15 * * * *") try: - trigger = CronTrigger.from_crontab(schedule_str) + trigger = _crontab_to_trigger(schedule_str) except ValueError: seconds = _parse_interval(schedule_str) trigger = IntervalTrigger(seconds=seconds) @@ -178,7 +245,7 @@ async def _make_trigger(self, job: CronJob) -> CronTrigger | IntervalTrigger: the cadence survives restarts (persistent timer). """ try: - return CronTrigger.from_crontab(job.schedule) + return _crontab_to_trigger(job.schedule) except ValueError: pass @@ -228,7 +295,7 @@ async def _catchup_missed_jobs(self) -> None: def _is_overdue(job: CronJob, last_run: datetime, now: datetime) -> bool: """Check if a job should have fired between *last_run* and *now*.""" try: - trigger = CronTrigger.from_crontab(job.schedule) + trigger = _crontab_to_trigger(job.schedule) next_fire = trigger.get_next_fire_time(last_run, last_run) return next_fire is not None and next_fire < now except ValueError: diff --git a/tests/test_cron.py b/tests/test_cron.py index 0e3eaa3..655f2c5 100644 --- a/tests/test_cron.py +++ b/tests/test_cron.py @@ -10,7 +10,12 @@ import pytest_asyncio from nerve.cron.jobs import CronJob -from nerve.cron.service import CronService, _parse_interval, _parse_timestamp +from nerve.cron.service import ( + CronService, + _crontab_to_trigger, + _parse_interval, + _parse_timestamp, +) # --------------------------------------------------------------------------- @@ -118,6 +123,82 @@ def test_default_on_garbage(self): assert _parse_interval("???") == 7200 +# --------------------------------------------------------------------------- +# _crontab_to_trigger: Unix day-of-week semantics +# --------------------------------------------------------------------------- + +# A Saturday, so "next fire" lands on a distinct weekday for any DOW value. +_DOW_BASE = datetime(2026, 6, 20, 0, 0, tzinfo=timezone.utc) + + +def _fire_weekdays(schedule: str) -> set[str]: + """Collect the weekday abbreviations a crontab fires on within one week.""" + trigger = _crontab_to_trigger(schedule) + end = _DOW_BASE + timedelta(days=8) + days: set[str] = set() + prev = None + cur = _DOW_BASE + while True: + fire = trigger.get_next_fire_time(prev, cur) + if fire is None or fire > end: + break + days.add(fire.strftime("%a")) + prev = fire + # Jump to the start of the next day so per-minute schedules don't loop. + cur = fire.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) + return days + + +class TestCrontabToTrigger: + def test_numeric_monday_fires_monday(self): + """The bug: Unix DOW 1 (Monday) was firing Tuesday via from_crontab.""" + fire = _crontab_to_trigger("0 13 * * 1").get_next_fire_time(None, _DOW_BASE) + assert fire.strftime("%A") == "Monday" + assert (fire.hour, fire.minute) == (13, 0) + + def test_numeric_sunday_fires_sunday(self): + fire = _crontab_to_trigger("0 13 * * 0").get_next_fire_time(None, _DOW_BASE) + assert fire.strftime("%A") == "Sunday" + + def test_seven_also_means_sunday(self): + fire = _crontab_to_trigger("0 13 * * 7").get_next_fire_time(None, _DOW_BASE) + assert fire.strftime("%A") == "Sunday" + + def test_range_weekdays_only(self): + assert _fire_weekdays("* * * * 1-5") == {"Mon", "Tue", "Wed", "Thu", "Fri"} + + def test_list_monday_and_thursday(self): + assert _fire_weekdays("0 9 * * 1,4") == {"Mon", "Thu"} + + def test_star_dow_fires_every_day(self): + assert _fire_weekdays("0 9 * * *") == { + "Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun", + } + + def test_day_name_passthrough(self): + """Already-named DOW values are left intact (and stay correct).""" + fire = _crontab_to_trigger("0 13 * * mon").get_next_fire_time(None, _DOW_BASE) + assert fire.strftime("%A") == "Monday" + + @pytest.mark.parametrize( + "schedule", + ["0 5 * * *", "*/30 * * * *", "0 */4 * * *", "17 */4 * * *", "13 13 * * *"], + ) + def test_non_dow_fields_match_from_crontab(self, schedule): + """Schedules without a numeric DOW behave exactly like from_crontab.""" + from apscheduler.triggers.cron import CronTrigger + + ours = _crontab_to_trigger(schedule).get_next_fire_time(None, _DOW_BASE) + ref = CronTrigger.from_crontab(schedule).get_next_fire_time(None, _DOW_BASE) + assert ours == ref + + @pytest.mark.parametrize("schedule", ["4h", "30m", "1h30m", "???", ""]) + def test_non_crontab_raises_value_error(self, schedule): + """Interval strings must still raise so the IntervalTrigger path runs.""" + with pytest.raises(ValueError): + _crontab_to_trigger(schedule) + + # --------------------------------------------------------------------------- # _is_overdue # --------------------------------------------------------------------------- @@ -157,6 +238,19 @@ def test_interval_multiple_missed(self): last_run = _utc_now() - timedelta(hours=10) assert CronService._is_overdue(job, last_run, _utc_now()) is True + def test_weekly_overdue_after_exactly_one_week(self): + """A weekly Monday job is overdue one week later, not 6 or 8 days.""" + job = _make_job(schedule="0 13 * * 1") # Mondays 13:00 UTC + last_run = datetime(2026, 6, 15, 13, 0, tzinfo=timezone.utc) # a Monday + # Six days later (Sunday): the next Monday fire has not arrived yet. + assert CronService._is_overdue( + job, last_run, last_run + timedelta(days=6), + ) is False + # Just past the next Monday fire, so now overdue. + assert CronService._is_overdue( + job, last_run, last_run + timedelta(days=7, minutes=1), + ) is True + # --------------------------------------------------------------------------- # _make_trigger (interval alignment) From 1153f5a6b2251d187513a57aafe70d72b2e96c92 Mon Sep 17 00:00:00 2001 From: Alex Fedotyev <61838744+alex-fedotyev@users.noreply.github.com> Date: Mon, 22 Jun 2026 17:32:55 +0000 Subject: [PATCH 2/2] cron: catch up jobs that have never recorded a successful run _catchup_missed_jobs skipped any job whose last successful run was missing, so a weekly job that missed its first fire during a daemon restart was starved: with no success row to measure against, it was never backfilled and waited a full week for the next scheduled fire. Replace the success-only check with _catchup_baseline, which measures a missed fire against the last successful run, then the most recent attempt of any status (so a job that has only ever errored is still retried), then the server start time. Anchoring a job with no history to server start keeps a brand-new, not-yet-due job from firing spuriously, since its first fire is still in the future. Add get_last_cron_run (most recent cron_logs row regardless of status) and record the server start time on startup. Tests cover the baseline selection (success, error-only, no-history) and end-to-end catch-up for a weekly job that only ever errored. --- nerve/cron/service.py | 39 +++++++++++++--- nerve/db/cron.py | 16 +++++++ tests/test_cron.py | 102 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 149 insertions(+), 8 deletions(-) diff --git a/nerve/cron/service.py b/nerve/cron/service.py index a012a2d..bd69628 100644 --- a/nerve/cron/service.py +++ b/nerve/cron/service.py @@ -146,9 +146,13 @@ def __init__(self, config: NerveConfig, engine: AgentEngine, db: Database): self._jobs: list[CronJob] = [] self._source_runners: list[SourceRunner] = [] self._job_locks: dict[str, asyncio.Lock] = {} + # Set when the scheduler starts; used as the catch-up baseline for a + # job that has no run history yet (so it is not fired spuriously). + self._server_start_time: datetime | None = None async def start(self) -> None: """Load jobs and start the scheduler.""" + self._server_start_time = datetime.now(timezone.utc) # Load job definitions from both files self._jobs = self._load_merged_jobs() @@ -272,12 +276,8 @@ async def _catchup_missed_jobs(self) -> None: if not job.enabled or not job.catchup: continue - last_run = await self.db.get_last_successful_cron_run(job.id) - if not last_run or not last_run.get("finished_at"): - continue # first-ever run — no catch-up - - last_time = _parse_timestamp(last_run["finished_at"]) - if self._is_overdue(job, last_time, now): + baseline = await self._catchup_baseline(job, now) + if self._is_overdue(job, baseline, now): overdue.append(job) if not overdue: @@ -291,6 +291,33 @@ async def _catchup_missed_jobs(self) -> None: *(self._run_job_wrapper(job) for job in overdue), ) + async def _catchup_baseline(self, job: CronJob, now: datetime) -> datetime: + """Reference time a missed fire is measured against for catch-up. + + Prefers the last successful run. If the job has never succeeded, it + falls back to its most recent attempt of any status (so a job that + has only ever errored is still retried after a missed fire), and + finally to the server start time. + + Anchoring a job with no history to the server start time means a + brand-new, not-yet-due job is never fired spuriously: its first fire + is still in the future, so it is not overdue. This is what restores + catch-up for a weekly job that missed its first fire during one of + the daemon restarts (previously such a job was skipped forever + because it had no successful run to measure against). + """ + last_success = await self.db.get_last_successful_cron_run(job.id) + if last_success and last_success.get("finished_at"): + return _parse_timestamp(last_success["finished_at"]) + + last_attempt = await self.db.get_last_cron_run(job.id) + if last_attempt: + stamp = last_attempt.get("finished_at") or last_attempt.get("started_at") + if stamp: + return _parse_timestamp(stamp) + + return self._server_start_time or now + @staticmethod def _is_overdue(job: CronJob, last_run: datetime, now: datetime) -> bool: """Check if a job should have fired between *last_run* and *now*.""" diff --git a/nerve/db/cron.py b/nerve/db/cron.py index 6a9d204..fcdd8c0 100644 --- a/nerve/db/cron.py +++ b/nerve/db/cron.py @@ -84,6 +84,22 @@ async def get_last_successful_cron_run(self, job_id: str) -> dict | None: row = await cursor.fetchone() return dict(row) if row else None + async def get_last_cron_run(self, job_id: str) -> dict | None: + """Get the most recent cron_logs entry for a job, regardless of status. + + Unlike get_last_successful_cron_run, this ignores status, so a job + that has only ever errored still has a reference point for catch-up. + Ordered by started_at (always set via the column default) so a row + whose run never finished still sorts correctly. + """ + async with self.db.execute( + "SELECT * FROM cron_logs WHERE job_id = ? " + "ORDER BY started_at DESC, id DESC LIMIT 1", + (job_id,), + ) as cursor: + row = await cursor.fetchone() + return dict(row) if row else None + async def get_latest_cron_session_id(self, job_id: str) -> str | None: """Return the most recently active session id for a cron job. diff --git a/tests/test_cron.py b/tests/test_cron.py index 655f2c5..8a654d2 100644 --- a/tests/test_cron.py +++ b/tests/test_cron.py @@ -48,8 +48,13 @@ def _hours_ago(h: float) -> str: return (_utc_now() - timedelta(hours=h)).isoformat() -def _make_cron_log(finished_at: str) -> dict: - return {"job_id": "test-job", "finished_at": finished_at, "status": "success"} +def _make_cron_log(finished_at: str, status: str = "success") -> dict: + return { + "job_id": "test-job", + "started_at": finished_at, + "finished_at": finished_at, + "status": status, + } @pytest_asyncio.fixture @@ -69,6 +74,7 @@ async def cron_service(): db.log_cron_start = AsyncMock(return_value=1) db.log_cron_finish = AsyncMock() db.get_last_successful_cron_run = AsyncMock(return_value=None) + db.get_last_cron_run = AsyncMock(return_value=None) svc = CronService(config, engine, db) return svc @@ -423,6 +429,98 @@ async def test_crontab_overdue_catches_up(self, cron_service): cron_service.db.log_cron_start.assert_called_once() + @pytest.mark.asyncio + async def test_never_succeeded_with_error_history_catches_up(self, cron_service): + """A weekly job that only ever errored still catches up after a miss. + + Previously a job with no successful run was skipped forever, so a + weekly job that missed its first fire across a restart was starved. + """ + job = _make_job(id="weekly-erroring", schedule="0 13 * * 1") + cron_service._jobs = [job] + cron_service.db.get_last_successful_cron_run.return_value = None + # An 8-day-old attempt of any status guarantees a missed weekly fire + # regardless of which weekday the test runs on. + cron_service.db.get_last_cron_run.return_value = ( + _make_cron_log(_hours_ago(8 * 24), status="error") + ) + + await cron_service._catchup_missed_jobs() + + cron_service.db.log_cron_start.assert_called_once_with("weekly-erroring") + cron_service.engine.run_cron.assert_called_once() + + @pytest.mark.asyncio + async def test_no_history_uses_server_start_and_does_not_fire(self, cron_service): + """A job with no history at all is anchored to server start, not fired.""" + job = _make_job(id="brand-new", schedule="0 13 * * 1") + cron_service._jobs = [job] + cron_service.db.get_last_successful_cron_run.return_value = None + cron_service.db.get_last_cron_run.return_value = None + cron_service._server_start_time = _utc_now() + + await cron_service._catchup_missed_jobs() + + cron_service.db.log_cron_start.assert_not_called() + cron_service.engine.run_cron.assert_not_called() + + +# --------------------------------------------------------------------------- +# _catchup_baseline: which reference time catch-up measures against +# --------------------------------------------------------------------------- + +class TestCatchupBaseline: + @pytest.mark.asyncio + async def test_prefers_last_successful_run(self, cron_service): + job = _make_job(schedule="0 13 * * 1") + ts = _hours_ago(50) + cron_service.db.get_last_successful_cron_run.return_value = _make_cron_log(ts) + cron_service.db.get_last_cron_run.return_value = _make_cron_log(_hours_ago(1)) + + now = _utc_now() + baseline = await cron_service._catchup_baseline(job, now) + + assert baseline == _parse_timestamp(ts) + # The success path must short-circuit before the any-status lookup. + cron_service.db.get_last_cron_run.assert_not_called() + + @pytest.mark.asyncio + async def test_falls_back_to_last_attempt_when_never_succeeded(self, cron_service): + job = _make_job(schedule="0 13 * * 1") + ts = _hours_ago(30) + cron_service.db.get_last_successful_cron_run.return_value = None + cron_service.db.get_last_cron_run.return_value = ( + _make_cron_log(ts, status="error") + ) + + baseline = await cron_service._catchup_baseline(job, _utc_now()) + + assert baseline == _parse_timestamp(ts) + + @pytest.mark.asyncio + async def test_falls_back_to_server_start_with_no_history(self, cron_service): + job = _make_job(schedule="0 13 * * 1") + cron_service.db.get_last_successful_cron_run.return_value = None + cron_service.db.get_last_cron_run.return_value = None + start = _utc_now() - timedelta(minutes=2) + cron_service._server_start_time = start + + baseline = await cron_service._catchup_baseline(job, _utc_now()) + + assert baseline == start + + @pytest.mark.asyncio + async def test_defaults_to_now_when_server_start_unset(self, cron_service): + job = _make_job(schedule="0 13 * * 1") + cron_service.db.get_last_successful_cron_run.return_value = None + cron_service.db.get_last_cron_run.return_value = None + cron_service._server_start_time = None + + now = _utc_now() + baseline = await cron_service._catchup_baseline(job, now) + + assert baseline == now + # --------------------------------------------------------------------------- # CronJob.catchup field