diff --git a/nerve/cron/service.py b/nerve/cron/service.py index 259e2bb..a1b849a 100644 --- a/nerve/cron/service.py +++ b/nerve/cron/service.py @@ -7,8 +7,9 @@ import asyncio import logging -from datetime import datetime, timezone +from datetime import datetime, timezone, tzinfo from typing import TYPE_CHECKING +from zoneinfo import ZoneInfo from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger @@ -100,7 +101,9 @@ def _remap_dow_atom(atom: str) -> str: return f"{remapped}{sep}{step}" if sep else remapped -def _crontab_to_trigger(schedule: str) -> CronTrigger: +def _crontab_to_trigger( + schedule: str, timezone: tzinfo | None = None, +) -> CronTrigger: """Build a CronTrigger from a 5-field crontab string with Unix DOW semantics. Drop-in replacement for ``CronTrigger.from_crontab`` that fixes the @@ -123,6 +126,7 @@ def _crontab_to_trigger(schedule: str) -> CronTrigger: day=day, month=month, day_of_week=remapped_dow, + timezone=timezone, ) @@ -142,7 +146,8 @@ def __init__(self, config: NerveConfig, engine: AgentEngine, db: Database): self.config = config self.engine = engine self.db = db - self.scheduler = AsyncIOScheduler() + self.timezone = ZoneInfo(config.timezone) + self.scheduler = AsyncIOScheduler(timezone=self.timezone) self._jobs: list[CronJob] = [] self._source_runners: list[SourceRunner] = [] self._job_locks: dict[str, asyncio.Lock] = {} @@ -193,10 +198,14 @@ async def start(self) -> None: schedule_str = getattr(source_config, "schedule", "*/15 * * * *") try: - trigger = _crontab_to_trigger(schedule_str) + trigger = _crontab_to_trigger( + schedule_str, timezone=self.timezone, + ) except ValueError: seconds = _parse_interval(schedule_str) - trigger = IntervalTrigger(seconds=seconds) + trigger = IntervalTrigger( + seconds=seconds, timezone=self.timezone, + ) self.scheduler.add_job( self._run_source_wrapper, @@ -213,7 +222,7 @@ async def start(self) -> None: # Daily cleanup of expired messages and consumer cursors self.scheduler.add_job( self._cleanup_expired, - CronTrigger(hour=3, minute=0), + CronTrigger(hour=3, minute=0, timezone=self.timezone), id="cleanup", name="Cleanup expired data", replace_existing=True, @@ -223,7 +232,9 @@ async def start(self) -> None: # scheduler is disabled; Nerve owns wakeup timing here. self.scheduler.add_job( self._sweep_wakeups, - IntervalTrigger(seconds=_WAKEUP_SWEEP_SECONDS), + IntervalTrigger( + seconds=_WAKEUP_SWEEP_SECONDS, timezone=self.timezone, + ), id="wakeup_sweep", name="Fire due session wakeups", replace_existing=True, @@ -252,7 +263,7 @@ async def _make_trigger(self, job: CronJob) -> CronTrigger | IntervalTrigger: the cadence survives restarts (persistent timer). """ try: - return _crontab_to_trigger(job.schedule) + return _crontab_to_trigger(job.schedule, timezone=self.timezone) except ValueError: pass @@ -263,8 +274,12 @@ async def _make_trigger(self, job: CronJob) -> CronTrigger | IntervalTrigger: logger.debug( "Aligning interval for %s: start_date=%s", job.id, start_date, ) - return IntervalTrigger(seconds=seconds, start_date=start_date) - return IntervalTrigger(seconds=seconds) + return IntervalTrigger( + seconds=seconds, + start_date=start_date, + timezone=self.timezone, + ) + return IntervalTrigger(seconds=seconds, timezone=self.timezone) async def _catchup_missed_jobs(self) -> None: """Fire jobs that should have run while the server was down. @@ -284,7 +299,7 @@ async def _catchup_missed_jobs(self) -> None: continue # first-ever run — no catch-up last_time = _parse_timestamp(last_run["finished_at"]) - if self._is_overdue(job, last_time, now): + if self._is_overdue(job, last_time, now, self.timezone): overdue.append(job) if not overdue: @@ -299,10 +314,17 @@ async def _catchup_missed_jobs(self) -> None: ) @staticmethod - def _is_overdue(job: CronJob, last_run: datetime, now: datetime) -> bool: + def _is_overdue( + job: CronJob, + last_run: datetime, + now: datetime, + trigger_timezone: tzinfo | None = None, + ) -> bool: """Check if a job should have fired between *last_run* and *now*.""" try: - trigger = _crontab_to_trigger(job.schedule) + trigger = _crontab_to_trigger( + job.schedule, timezone=trigger_timezone or timezone.utc, + ) next_fire = trigger.get_next_fire_time(last_run, last_run) return next_fire is not None and next_fire < now except ValueError: @@ -363,8 +385,7 @@ async def _maybe_rotate_context( logger.warning("Invalid context_rotate_at: %s", rotate_at) return False - local_tz = datetime.now().astimezone().tzinfo - today_rotate = datetime.now(local_tz).replace( + today_rotate = now.astimezone(self.timezone).replace( hour=hour, minute=minute, second=0, microsecond=0, ) today_rotate_utc = today_rotate.astimezone(timezone.utc) diff --git a/tests/test_cron.py b/tests/test_cron.py index dd18169..5b1a5c1 100644 --- a/tests/test_cron.py +++ b/tests/test_cron.py @@ -5,6 +5,7 @@ import asyncio from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, MagicMock, patch +from zoneinfo import ZoneInfo import pytest import pytest_asyncio @@ -52,10 +53,10 @@ def _make_cron_log(finished_at: str) -> dict: return {"job_id": "test-job", "finished_at": finished_at, "status": "success"} -@pytest_asyncio.fixture -async def cron_service(): +def _make_cron_service(timezone_name: str = "UTC") -> CronService: """Minimal CronService with mocked dependencies.""" config = MagicMock() + config.timezone = timezone_name config.cron.system_file = MagicMock() config.cron.jobs_file = MagicMock() config.agent.cron_model = "test-model" @@ -70,8 +71,13 @@ async def cron_service(): db.log_cron_finish = AsyncMock() db.get_last_successful_cron_run = AsyncMock(return_value=None) - svc = CronService(config, engine, db) - return svc + return CronService(config, engine, db) + + +@pytest_asyncio.fixture +async def cron_service(): + """Minimal CronService with mocked dependencies.""" + return _make_cron_service() # --------------------------------------------------------------------------- @@ -123,6 +129,18 @@ def test_default_on_garbage(self): assert _parse_interval("???") == 7200 +# --------------------------------------------------------------------------- +# Configured timezone +# --------------------------------------------------------------------------- + +class TestConfiguredTimezone: + def test_scheduler_uses_configured_timezone(self): + svc = _make_cron_service("America/New_York") + + assert str(svc.timezone) == "America/New_York" + assert str(svc.scheduler.timezone) == "America/New_York" + + # --------------------------------------------------------------------------- # _crontab_to_trigger: Unix day-of-week semantics # --------------------------------------------------------------------------- @@ -238,6 +256,20 @@ def test_interval_multiple_missed(self): last_run = _utc_now() - timedelta(hours=10) assert CronService._is_overdue(job, last_run, _utc_now()) is True + def test_crontab_uses_configured_timezone(self): + """Catch-up checks crontab fires in the configured timezone.""" + job = _make_job(schedule="0 9 * * *") + last_run = datetime(2026, 1, 1, 13, 30, tzinfo=timezone.utc) + now = datetime(2026, 1, 1, 14, 30, tzinfo=timezone.utc) + + assert ( + CronService._is_overdue( + job, last_run, now, ZoneInfo("America/New_York"), + ) + is True + ) + assert CronService._is_overdue(job, last_run, now, timezone.utc) is False + def test_weekly_overdue_after_exactly_one_week(self): """A weekly Monday job is overdue one week later, not 6 or 8 days.""" job = _make_job(schedule="0 13 * * 1") # Mondays 13:00 UTC @@ -302,6 +334,26 @@ async def test_crontab_unchanged(self, cron_service): from apscheduler.triggers.cron import CronTrigger assert isinstance(trigger, CronTrigger) + @pytest.mark.asyncio + async def test_crontab_uses_configured_timezone(self): + svc = _make_cron_service("America/Los_Angeles") + + trigger = await svc._make_trigger(_make_job(schedule="30 11 * * *")) + + from apscheduler.triggers.cron import CronTrigger + assert isinstance(trigger, CronTrigger) + assert str(trigger.timezone) == "America/Los_Angeles" + + @pytest.mark.asyncio + async def test_interval_uses_configured_timezone(self): + svc = _make_cron_service("America/Los_Angeles") + + trigger = await svc._make_trigger(_make_job(schedule="4h")) + + from apscheduler.triggers.interval import IntervalTrigger + assert isinstance(trigger, IntervalTrigger) + assert str(trigger.timezone) == "America/Los_Angeles" + # --------------------------------------------------------------------------- # _catchup_missed_jobs @@ -573,6 +625,33 @@ async def test_no_rotation_no_memorize(self, cron_service): assert rotated is False cron_service.engine.schedule_memorize.assert_not_awaited() + @pytest.mark.asyncio + async def test_rotate_at_uses_configured_timezone(self): + """Daily rotate_at uses config timezone, not the server timezone.""" + svc = _make_cron_service("America/New_York") + svc.db.get_session = AsyncMock(return_value={ + "connected_at": "2026-01-01T13:59:00+00:00", + }) + + class FixedDateTime(datetime): + @classmethod + def now(cls, tz=None): + fixed = datetime(2026, 1, 1, 15, 0, tzinfo=timezone.utc) + if tz is None: + return fixed.replace(tzinfo=None) + return fixed.astimezone(tz) + + with patch("nerve.cron.service.datetime", FixedDateTime): + rotated = await svc._maybe_rotate_context( + "cron:pers", rotate_hours=0, rotate_at="09:00", + ) + + assert rotated is True + svc.engine.schedule_memorize.assert_awaited_once_with("cron:pers") + svc.engine.sessions.mark_idle.assert_awaited_once_with( + "cron:pers", preserve_sdk_id=False, + ) + @pytest.mark.asyncio async def test_manual_rotation_forces_disabled_rotation_window(self, cron_service): """Manual rotation clears context even when scheduled rotation is disabled.""" diff --git a/tests/test_wakeups.py b/tests/test_wakeups.py index 0d619a1..26b62af 100644 --- a/tests/test_wakeups.py +++ b/tests/test_wakeups.py @@ -142,6 +142,7 @@ class TestWakeupSweep: async def svc(self, db): await db.create_session("s1", source="web") config = MagicMock() + config.timezone = "UTC" engine = AsyncMock() # is_running is synchronous — default to "not running". engine.sessions = MagicMock()