Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 103 additions & 9 deletions nerve/cron/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,73 @@ def _parse_interval(interval: str) -> int:
return total or 7200 # Default 2h


# Unix crontab day-of-week numbering is 0=Sun..6=Sat (7 also means Sun).
# APScheduler's numeric day_of_week is 0=Mon..6=Sun, and CronTrigger.from_crontab
# does NOT remap, so a numeric DOW like "1" (Unix Monday) gets read as APScheduler
# 1 = Tuesday, i.e. every numeric-DOW cron fires one weekday late. APScheduler does
# accept unambiguous three-letter day names, so we translate the numbers to names.
_UNIX_DOW_TO_NAME = {
0: "sun", 1: "mon", 2: "tue", 3: "wed",
4: "thu", 5: "fri", 6: "sat", 7: "sun",
}


def _remap_dow_value(value: str) -> str:
"""Map a single Unix DOW number to an APScheduler day name.

Non-numeric atoms (already a name like ``mon``, or ``*``) and numbers
outside 0-7 pass through unchanged so APScheduler can validate them.
"""
v = value.strip()
if v.isdigit() and int(v) in _UNIX_DOW_TO_NAME:
return _UNIX_DOW_TO_NAME[int(v)]
return v


def _remap_dow_atom(atom: str) -> str:
"""Remap one comma-separated DOW atom, preserving range and step syntax.

Handles ``*``, single values (``1``), ranges (``1-5``), and any of those
with a step suffix (``*/2``, ``1-5/2``). Only the numeric components are
translated; everything else is left intact.
"""
base, sep, step = atom.partition("/")
if base in ("*", ""):
remapped = base
elif "-" in base:
lo, _, hi = base.partition("-")
remapped = f"{_remap_dow_value(lo)}-{_remap_dow_value(hi)}"
else:
remapped = _remap_dow_value(base)
return f"{remapped}{sep}{step}" if sep else remapped


def _crontab_to_trigger(schedule: str) -> CronTrigger:
"""Build a CronTrigger from a 5-field crontab string with Unix DOW semantics.

Drop-in replacement for ``CronTrigger.from_crontab`` that fixes the
day-of-week off-by-one (see ``_UNIX_DOW_TO_NAME``). Only the DOW field is
treated differently; the other four fields and the no-explicit-timezone
behaviour are identical to ``from_crontab``. Raises ``ValueError`` for
anything that is not a 5-field expression, so interval strings like ``4h``
keep falling through to the IntervalTrigger path.
"""
fields = schedule.split()
if len(fields) != 5:
raise ValueError(f"Not a 5-field crontab expression: {schedule!r}")
minute, hour, day, month, day_of_week = fields
remapped_dow = ",".join(
_remap_dow_atom(atom) for atom in day_of_week.split(",")
)
return CronTrigger(
minute=minute,
hour=hour,
day=day,
month=month,
day_of_week=remapped_dow,
)


def _parse_timestamp(ts: str) -> datetime:
"""Parse a UTC timestamp string from the database into an aware datetime."""
if "T" not in ts:
Expand All @@ -79,9 +146,13 @@ def __init__(self, config: NerveConfig, engine: AgentEngine, db: Database):
self._jobs: list[CronJob] = []
self._source_runners: list[SourceRunner] = []
self._job_locks: dict[str, asyncio.Lock] = {}
# Set when the scheduler starts; used as the catch-up baseline for a
# job that has no run history yet (so it is not fired spuriously).
self._server_start_time: datetime | None = None

async def start(self) -> None:
"""Load jobs and start the scheduler."""
self._server_start_time = datetime.now(timezone.utc)
# Load job definitions from both files
self._jobs = self._load_merged_jobs()

Expand Down Expand Up @@ -119,7 +190,7 @@ async def start(self) -> None:
schedule_str = getattr(source_config, "schedule", "*/15 * * * *")

try:
trigger = CronTrigger.from_crontab(schedule_str)
trigger = _crontab_to_trigger(schedule_str)
except ValueError:
seconds = _parse_interval(schedule_str)
trigger = IntervalTrigger(seconds=seconds)
Expand Down Expand Up @@ -178,7 +249,7 @@ async def _make_trigger(self, job: CronJob) -> CronTrigger | IntervalTrigger:
the cadence survives restarts (persistent timer).
"""
try:
return CronTrigger.from_crontab(job.schedule)
return _crontab_to_trigger(job.schedule)
except ValueError:
pass

Expand All @@ -205,12 +276,8 @@ async def _catchup_missed_jobs(self) -> None:
if not job.enabled or not job.catchup:
continue

last_run = await self.db.get_last_successful_cron_run(job.id)
if not last_run or not last_run.get("finished_at"):
continue # first-ever run — no catch-up

last_time = _parse_timestamp(last_run["finished_at"])
if self._is_overdue(job, last_time, now):
baseline = await self._catchup_baseline(job, now)
if self._is_overdue(job, baseline, now):
overdue.append(job)

if not overdue:
Expand All @@ -224,11 +291,38 @@ async def _catchup_missed_jobs(self) -> None:
*(self._run_job_wrapper(job) for job in overdue),
)

async def _catchup_baseline(self, job: CronJob, now: datetime) -> datetime:
"""Reference time a missed fire is measured against for catch-up.

Prefers the last successful run. If the job has never succeeded, it
falls back to its most recent attempt of any status (so a job that
has only ever errored is still retried after a missed fire), and
finally to the server start time.

Anchoring a job with no history to the server start time means a
brand-new, not-yet-due job is never fired spuriously: its first fire
is still in the future, so it is not overdue. This is what restores
catch-up for a weekly job that missed its first fire during one of
the daemon restarts (previously such a job was skipped forever
because it had no successful run to measure against).
"""
last_success = await self.db.get_last_successful_cron_run(job.id)
if last_success and last_success.get("finished_at"):
return _parse_timestamp(last_success["finished_at"])

last_attempt = await self.db.get_last_cron_run(job.id)
if last_attempt:
stamp = last_attempt.get("finished_at") or last_attempt.get("started_at")
if stamp:
return _parse_timestamp(stamp)

return self._server_start_time or now

@staticmethod
def _is_overdue(job: CronJob, last_run: datetime, now: datetime) -> bool:
"""Check if a job should have fired between *last_run* and *now*."""
try:
trigger = CronTrigger.from_crontab(job.schedule)
trigger = _crontab_to_trigger(job.schedule)
next_fire = trigger.get_next_fire_time(last_run, last_run)
return next_fire is not None and next_fire < now
except ValueError:
Expand Down
16 changes: 16 additions & 0 deletions nerve/db/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ async def get_last_successful_cron_run(self, job_id: str) -> dict | None:
row = await cursor.fetchone()
return dict(row) if row else None

async def get_last_cron_run(self, job_id: str) -> dict | None:
"""Get the most recent cron_logs entry for a job, regardless of status.

Unlike get_last_successful_cron_run, this ignores status, so a job
that has only ever errored still has a reference point for catch-up.
Ordered by started_at (always set via the column default) so a row
whose run never finished still sorts correctly.
"""
async with self.db.execute(
"SELECT * FROM cron_logs WHERE job_id = ? "
"ORDER BY started_at DESC, id DESC LIMIT 1",
(job_id,),
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None

async def get_latest_cron_session_id(self, job_id: str) -> str | None:
"""Return the most recently active session id for a cron job.

Expand Down
Loading