Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 36 additions & 15 deletions nerve/cron/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

import asyncio
import logging
from datetime import datetime, timezone
from datetime import datetime, timezone, tzinfo
from typing import TYPE_CHECKING
from zoneinfo import ZoneInfo

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
Expand Down Expand Up @@ -100,7 +101,9 @@ def _remap_dow_atom(atom: str) -> str:
return f"{remapped}{sep}{step}" if sep else remapped


def _crontab_to_trigger(schedule: str) -> CronTrigger:
def _crontab_to_trigger(
schedule: str, timezone: tzinfo | None = None,
) -> CronTrigger:
"""Build a CronTrigger from a 5-field crontab string with Unix DOW semantics.

Drop-in replacement for ``CronTrigger.from_crontab`` that fixes the
Expand All @@ -123,6 +126,7 @@ def _crontab_to_trigger(schedule: str) -> CronTrigger:
day=day,
month=month,
day_of_week=remapped_dow,
timezone=timezone,
)


Expand All @@ -142,7 +146,8 @@ def __init__(self, config: NerveConfig, engine: AgentEngine, db: Database):
self.config = config
self.engine = engine
self.db = db
self.scheduler = AsyncIOScheduler()
self.timezone = ZoneInfo(config.timezone)
self.scheduler = AsyncIOScheduler(timezone=self.timezone)
self._jobs: list[CronJob] = []
self._source_runners: list[SourceRunner] = []
self._job_locks: dict[str, asyncio.Lock] = {}
Expand Down Expand Up @@ -193,10 +198,14 @@ async def start(self) -> None:
schedule_str = getattr(source_config, "schedule", "*/15 * * * *")

try:
trigger = _crontab_to_trigger(schedule_str)
trigger = _crontab_to_trigger(
schedule_str, timezone=self.timezone,
)
except ValueError:
seconds = _parse_interval(schedule_str)
trigger = IntervalTrigger(seconds=seconds)
trigger = IntervalTrigger(
seconds=seconds, timezone=self.timezone,
)

self.scheduler.add_job(
self._run_source_wrapper,
Expand All @@ -213,7 +222,7 @@ async def start(self) -> None:
# Daily cleanup of expired messages and consumer cursors
self.scheduler.add_job(
self._cleanup_expired,
CronTrigger(hour=3, minute=0),
CronTrigger(hour=3, minute=0, timezone=self.timezone),
id="cleanup",
name="Cleanup expired data",
replace_existing=True,
Expand All @@ -223,7 +232,9 @@ async def start(self) -> None:
# scheduler is disabled; Nerve owns wakeup timing here.
self.scheduler.add_job(
self._sweep_wakeups,
IntervalTrigger(seconds=_WAKEUP_SWEEP_SECONDS),
IntervalTrigger(
seconds=_WAKEUP_SWEEP_SECONDS, timezone=self.timezone,
),
id="wakeup_sweep",
name="Fire due session wakeups",
replace_existing=True,
Expand Down Expand Up @@ -252,7 +263,7 @@ async def _make_trigger(self, job: CronJob) -> CronTrigger | IntervalTrigger:
the cadence survives restarts (persistent timer).
"""
try:
return _crontab_to_trigger(job.schedule)
return _crontab_to_trigger(job.schedule, timezone=self.timezone)
except ValueError:
pass

Expand All @@ -263,8 +274,12 @@ async def _make_trigger(self, job: CronJob) -> CronTrigger | IntervalTrigger:
logger.debug(
"Aligning interval for %s: start_date=%s", job.id, start_date,
)
return IntervalTrigger(seconds=seconds, start_date=start_date)
return IntervalTrigger(seconds=seconds)
return IntervalTrigger(
seconds=seconds,
start_date=start_date,
timezone=self.timezone,
)
return IntervalTrigger(seconds=seconds, timezone=self.timezone)

async def _catchup_missed_jobs(self) -> None:
"""Fire jobs that should have run while the server was down.
Expand All @@ -284,7 +299,7 @@ async def _catchup_missed_jobs(self) -> None:
continue # first-ever run — no catch-up

last_time = _parse_timestamp(last_run["finished_at"])
if self._is_overdue(job, last_time, now):
if self._is_overdue(job, last_time, now, self.timezone):
overdue.append(job)

if not overdue:
Expand All @@ -299,10 +314,17 @@ async def _catchup_missed_jobs(self) -> None:
)

@staticmethod
def _is_overdue(job: CronJob, last_run: datetime, now: datetime) -> bool:
def _is_overdue(
job: CronJob,
last_run: datetime,
now: datetime,
trigger_timezone: tzinfo | None = None,
) -> bool:
"""Check if a job should have fired between *last_run* and *now*."""
try:
trigger = _crontab_to_trigger(job.schedule)
trigger = _crontab_to_trigger(
job.schedule, timezone=trigger_timezone or timezone.utc,
)
next_fire = trigger.get_next_fire_time(last_run, last_run)
return next_fire is not None and next_fire < now
except ValueError:
Expand Down Expand Up @@ -363,8 +385,7 @@ async def _maybe_rotate_context(
logger.warning("Invalid context_rotate_at: %s", rotate_at)
return False

local_tz = datetime.now().astimezone().tzinfo
today_rotate = datetime.now(local_tz).replace(
today_rotate = now.astimezone(self.timezone).replace(
hour=hour, minute=minute, second=0, microsecond=0,
)
today_rotate_utc = today_rotate.astimezone(timezone.utc)
Expand Down
87 changes: 83 additions & 4 deletions tests/test_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, MagicMock, patch
from zoneinfo import ZoneInfo

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -52,10 +53,10 @@ def _make_cron_log(finished_at: str) -> dict:
return {"job_id": "test-job", "finished_at": finished_at, "status": "success"}


@pytest_asyncio.fixture
async def cron_service():
def _make_cron_service(timezone_name: str = "UTC") -> CronService:
"""Minimal CronService with mocked dependencies."""
config = MagicMock()
config.timezone = timezone_name
config.cron.system_file = MagicMock()
config.cron.jobs_file = MagicMock()
config.agent.cron_model = "test-model"
Expand All @@ -70,8 +71,13 @@ async def cron_service():
db.log_cron_finish = AsyncMock()
db.get_last_successful_cron_run = AsyncMock(return_value=None)

svc = CronService(config, engine, db)
return svc
return CronService(config, engine, db)


@pytest_asyncio.fixture
async def cron_service():
"""Minimal CronService with mocked dependencies."""
return _make_cron_service()


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -123,6 +129,18 @@ def test_default_on_garbage(self):
assert _parse_interval("???") == 7200


# ---------------------------------------------------------------------------
# Configured timezone
# ---------------------------------------------------------------------------

class TestConfiguredTimezone:
def test_scheduler_uses_configured_timezone(self):
svc = _make_cron_service("America/New_York")

assert str(svc.timezone) == "America/New_York"
assert str(svc.scheduler.timezone) == "America/New_York"


# ---------------------------------------------------------------------------
# _crontab_to_trigger: Unix day-of-week semantics
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -238,6 +256,20 @@ def test_interval_multiple_missed(self):
last_run = _utc_now() - timedelta(hours=10)
assert CronService._is_overdue(job, last_run, _utc_now()) is True

def test_crontab_uses_configured_timezone(self):
"""Catch-up checks crontab fires in the configured timezone."""
job = _make_job(schedule="0 9 * * *")
last_run = datetime(2026, 1, 1, 13, 30, tzinfo=timezone.utc)
now = datetime(2026, 1, 1, 14, 30, tzinfo=timezone.utc)

assert (
CronService._is_overdue(
job, last_run, now, ZoneInfo("America/New_York"),
)
is True
)
assert CronService._is_overdue(job, last_run, now, timezone.utc) is False

def test_weekly_overdue_after_exactly_one_week(self):
"""A weekly Monday job is overdue one week later, not 6 or 8 days."""
job = _make_job(schedule="0 13 * * 1") # Mondays 13:00 UTC
Expand Down Expand Up @@ -302,6 +334,26 @@ async def test_crontab_unchanged(self, cron_service):
from apscheduler.triggers.cron import CronTrigger
assert isinstance(trigger, CronTrigger)

@pytest.mark.asyncio
async def test_crontab_uses_configured_timezone(self):
svc = _make_cron_service("America/Los_Angeles")

trigger = await svc._make_trigger(_make_job(schedule="30 11 * * *"))

from apscheduler.triggers.cron import CronTrigger
assert isinstance(trigger, CronTrigger)
assert str(trigger.timezone) == "America/Los_Angeles"

@pytest.mark.asyncio
async def test_interval_uses_configured_timezone(self):
svc = _make_cron_service("America/Los_Angeles")

trigger = await svc._make_trigger(_make_job(schedule="4h"))

from apscheduler.triggers.interval import IntervalTrigger
assert isinstance(trigger, IntervalTrigger)
assert str(trigger.timezone) == "America/Los_Angeles"


# ---------------------------------------------------------------------------
# _catchup_missed_jobs
Expand Down Expand Up @@ -573,6 +625,33 @@ async def test_no_rotation_no_memorize(self, cron_service):
assert rotated is False
cron_service.engine.schedule_memorize.assert_not_awaited()

@pytest.mark.asyncio
async def test_rotate_at_uses_configured_timezone(self):
"""Daily rotate_at uses config timezone, not the server timezone."""
svc = _make_cron_service("America/New_York")
svc.db.get_session = AsyncMock(return_value={
"connected_at": "2026-01-01T13:59:00+00:00",
})

class FixedDateTime(datetime):
@classmethod
def now(cls, tz=None):
fixed = datetime(2026, 1, 1, 15, 0, tzinfo=timezone.utc)
if tz is None:
return fixed.replace(tzinfo=None)
return fixed.astimezone(tz)

with patch("nerve.cron.service.datetime", FixedDateTime):
rotated = await svc._maybe_rotate_context(
"cron:pers", rotate_hours=0, rotate_at="09:00",
)

assert rotated is True
svc.engine.schedule_memorize.assert_awaited_once_with("cron:pers")
svc.engine.sessions.mark_idle.assert_awaited_once_with(
"cron:pers", preserve_sdk_id=False,
)

@pytest.mark.asyncio
async def test_manual_rotation_forces_disabled_rotation_window(self, cron_service):
"""Manual rotation clears context even when scheduled rotation is disabled."""
Expand Down
1 change: 1 addition & 0 deletions tests/test_wakeups.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class TestWakeupSweep:
async def svc(self, db):
await db.create_session("s1", source="web")
config = MagicMock()
config.timezone = "UTC"
engine = AsyncMock()
# is_running is synchronous — default to "not running".
engine.sessions = MagicMock()
Expand Down