diff --git a/software/control/core/job_processing.py b/software/control/core/job_processing.py index bcf183df0..8a72615d2 100644 --- a/software/control/core/job_processing.py +++ b/software/control/core/job_processing.py @@ -1,12 +1,14 @@ import abc import multiprocessing +import multiprocessing.connection import queue import os +import threading import time import json from datetime import datetime from contextlib import contextmanager -from typing import ClassVar, Dict, Generic, List, Optional, Set, Tuple, TypeVar, Union +from typing import Callable, ClassVar, Dict, Generic, List, Optional, Set, Tuple, TypeVar, Union from uuid import uuid4 from dataclasses import dataclass, field @@ -1064,6 +1066,13 @@ def __init__( self._bp_pending_bytes = bp_pending_bytes self._bp_capacity_event = bp_capacity_event + self._on_unexpected_exit: Optional[Callable[[Optional[int]], None]] = None + self._watchdog: Optional[threading.Thread] = None + # Set by kill()/terminate()/shutdown() so the watchdog can distinguish + # intentional exit from segfault/OOM. Separate from _shutdown_event, which + # is a multiprocessing primitive that shutdown() nulls during cleanup. + self._intentional_exit = False + # Clean up stale metadata files from previous crashed acquisitions # Only run when explicitly requested (i.e., when OME-TIFF saving is being used) if cleanup_stale_ome_files: @@ -1071,6 +1080,49 @@ def __init__( if removed: self._log.info(f"Cleaned up {len(removed)} stale OME-TIFF metadata files") + def set_unexpected_exit_handler(self, handler: Optional[Callable[[Optional[int]], None]]) -> None: + """Register a callback to invoke if the subprocess dies without a clean shutdown. + + The handler is called from the watchdog thread with the subprocess exitcode + (which may be None, a positive int, or a negative signal number on POSIX). + """ + self._on_unexpected_exit = handler + + def start(self): + super().start() + self._watchdog = threading.Thread( + target=self._watch_subprocess, + daemon=True, + name=f"JobRunner-watchdog[{self.pid}]", + ) + self._watchdog.start() + + def kill(self): + self._intentional_exit = True + super().kill() + + def terminate(self): + self._intentional_exit = True + super().terminate() + + def _watch_subprocess(self) -> None: + """Block until the subprocess exits, then distinguish expected vs. unexpected death.""" + pid = self.pid + multiprocessing.connection.wait([self.sentinel]) + exitcode = self.exitcode + if self._intentional_exit: + self._log.info(f"JobRunner PID={pid} exited cleanly (exitcode={exitcode})") + return + self._log.error( + f"JobRunner PID={pid} died UNEXPECTEDLY (exitcode={exitcode}). Pending save jobs will not complete." + ) + handler = self._on_unexpected_exit + if handler is not None: + try: + handler(exitcode) + except Exception: + self._log.exception("JobRunner unexpected-exit handler raised") + def dispatch(self, job: Job): # Inject acquisition_info into SaveOMETiffJob instances before serialization. # The job object is pickled when placed in the queue, so injection must happen here. @@ -1174,6 +1226,7 @@ def shutdown(self, timeout_s=1.0): # Guard against double shutdown if self._shutdown_event is None: return + self._intentional_exit = True self._shutdown_event.set() # Send sentinel to wake up worker blocked on queue.get() try: diff --git a/software/control/core/memory_profiler.py b/software/control/core/memory_profiler.py index 8d6ab5421..92dc20212 100644 --- a/software/control/core/memory_profiler.py +++ b/software/control/core/memory_profiler.py @@ -259,7 +259,9 @@ def _get_linux_pss_mb(pid: int) -> float: pss_total_kb += int(parts[1]) return pss_total_kb / 1024 - except (FileNotFoundError, PermissionError, ValueError): + except (FileNotFoundError, PermissionError, ValueError, ProcessLookupError): + # ProcessLookupError (errno ESRCH) can occur when the process exited between + # PID enumeration and the smaps_rollup read, or when the PID is a zombie. pass return 0.0 diff --git a/software/control/core/multi_point_worker.py b/software/control/core/multi_point_worker.py index fa5fabb71..3df033581 100644 --- a/software/control/core/multi_point_worker.py +++ b/software/control/core/multi_point_worker.py @@ -332,24 +332,26 @@ def __init__( if Acquisition.USE_MULTIPROCESSING: # Try to use pre-warmed runner for the first job class if can_use_prewarmed and not used_prewarmed: - if prewarmed_job_runner.is_ready(): + # is_alive() must be checked alongside is_ready(): the subprocess sets + # _ready_event early in run() and the Event survives in shared memory + # after death, so is_ready() alone can't detect a corpse. + if prewarmed_job_runner.is_alive() and prewarmed_job_runner.is_ready(): self._log.info(f"Using pre-warmed job runner for {job_class.__name__} jobs") job_runner = prewarmed_job_runner - # Configure it with current acquisition settings + job_runner.set_unexpected_exit_handler(self._on_job_runner_died) job_runner.set_acquisition_info(self.acquisition_info) if zarr_writer_info: job_runner.set_zarr_writer_info(zarr_writer_info) used_prewarmed = True else: self._log.warning( - f"Pre-warmed job runner not ready (possibly hung during warmup), " + f"Pre-warmed job runner unavailable (died or hung during warmup); " f"shutting it down and creating new one for {job_class.__name__}" ) - # Shutdown the hung pre-warmed runner to avoid resource leak try: prewarmed_job_runner.shutdown(timeout_s=1.0) except Exception as e: - self._log.error(f"Error shutting down hung pre-warmed runner: {e}") + self._log.error(f"Error shutting down unusable pre-warmed runner: {e}") # Don't try to use pre-warmed runner again for subsequent job classes can_use_prewarmed = False @@ -366,6 +368,8 @@ def __init__( # Pass zarr writer info for ZARR_V3 format zarr_writer_info=zarr_writer_info, ) + # Must precede start() so the watchdog covers warmup-time deaths. + job_runner.set_unexpected_exit_handler(self._on_job_runner_died) job_runner.start() # Subprocess starts warming up in background - don't block here @@ -373,6 +377,12 @@ def __init__( self._abort_on_failed_job = abort_on_failed_jobs self._first_job_dispatched = False # Track if we've waited for subprocess warmup + def _on_job_runner_died(self, exitcode: Optional[int]) -> None: + """Invoked by JobRunner's watchdog when a subprocess dies unexpectedly.""" + self._log.error(f"JobRunner subprocess died unexpectedly (exitcode={exitcode}); aborting acquisition.") + self._acquisition_error_count += 1 + self.request_abort_fn() + def update_use_piezo(self, value): self.use_piezo = value self._log.info(f"MultiPointWorker: updated use_piezo to {value}") diff --git a/software/tests/control/core/test_job_runner_watchdog.py b/software/tests/control/core/test_job_runner_watchdog.py new file mode 100644 index 000000000..cde9dacb3 --- /dev/null +++ b/software/tests/control/core/test_job_runner_watchdog.py @@ -0,0 +1,147 @@ +"""Tests for JobRunner watchdog (unexpected subprocess death detection). + +These tests cover the watchdog thread that distinguishes intentional shutdown +from unexpected subprocess death (segfault, SIGKILL, OOM kill) and invokes a +registered handler so an acquisition can abort instead of silently rotting. +""" + +import os +import signal +import threading +import time + +import pytest + +from control.core.job_processing import JobRunner + + +@pytest.fixture +def runner(): + """Provide an unstarted JobRunner; ensure cleanup even if the test crashes mid-run.""" + r = JobRunner() + r.daemon = True + yield r + if r.is_alive(): + try: + r.kill() + r.join(timeout=2.0) + except Exception: + pass + + +# Watchdog runs in a daemon thread; allow it to finish after the sentinel fires. +_WATCHDOG_GRACE_S = 0.3 + + +class TestWatchdogUnexpectedDeath: + """Verify the watchdog detects unexpected subprocess death and invokes the handler.""" + + def test_sigkill_fires_handler_with_negative_exitcode(self, runner): + handler_fired = threading.Event() + received_exitcode = [] + + def handler(exitcode): + received_exitcode.append(exitcode) + handler_fired.set() + + runner.set_unexpected_exit_handler(handler) + runner.start() + assert runner.wait_ready(timeout_s=5.0) + + os.kill(runner.pid, signal.SIGKILL) + + assert handler_fired.wait(timeout=5.0), "Watchdog handler did not fire after SIGKILL" + assert received_exitcode == [-signal.SIGKILL] + + +class TestWatchdogIntentionalExit: + """Verify intentional stop paths (kill/terminate/shutdown) do NOT fire the handler.""" + + def test_kill_does_not_fire_handler(self, runner): + handler_fired = threading.Event() + runner.set_unexpected_exit_handler(lambda ec: handler_fired.set()) + runner.start() + assert runner.wait_ready(timeout_s=5.0) + + runner.kill() + runner.join(timeout=2.0) + time.sleep(_WATCHDOG_GRACE_S) + + assert not handler_fired.is_set(), "Handler fired despite intentional kill()" + + def test_terminate_does_not_fire_handler(self, runner): + handler_fired = threading.Event() + runner.set_unexpected_exit_handler(lambda ec: handler_fired.set()) + runner.start() + assert runner.wait_ready(timeout_s=5.0) + + runner.terminate() + runner.join(timeout=2.0) + time.sleep(_WATCHDOG_GRACE_S) + + assert not handler_fired.is_set(), "Handler fired despite intentional terminate()" + + def test_shutdown_does_not_fire_handler(self, runner): + handler_fired = threading.Event() + runner.set_unexpected_exit_handler(lambda ec: handler_fired.set()) + runner.start() + assert runner.wait_ready(timeout_s=5.0) + + runner.shutdown(timeout_s=2.0) + time.sleep(_WATCHDOG_GRACE_S) + + assert not handler_fired.is_set(), "Handler fired despite intentional shutdown()" + + +class TestWatchdogResilience: + """Verify the watchdog is robust to handler misbehavior and shutdown ordering.""" + + def test_handler_exception_does_not_propagate(self, runner): + # The watchdog daemon thread must catch handler exceptions (it logs them). + # If propagation happened, the test process would not reach the post-join asserts. + runner.set_unexpected_exit_handler(lambda ec: (_ for _ in ()).throw(RuntimeError("boom"))) + runner.start() + assert runner.wait_ready(timeout_s=5.0) + + os.kill(runner.pid, signal.SIGKILL) + runner.join(timeout=5.0) + time.sleep(_WATCHDOG_GRACE_S) + + assert not runner.is_alive() + + def test_intentional_exit_survives_shutdown_cleanup(self, runner): + """Regression: shutdown() nulls _shutdown_event during cleanup. The intent flag + must be a separate attribute that survives that nullification, or the watchdog + could read None and misclassify intentional shutdown as unexpected death. + """ + handler_fired = threading.Event() + runner.set_unexpected_exit_handler(lambda ec: handler_fired.set()) + runner.start() + assert runner.wait_ready(timeout_s=5.0) + + runner.shutdown(timeout_s=2.0) + + assert runner._intentional_exit is True + assert runner._shutdown_event is None + + time.sleep(_WATCHDOG_GRACE_S) + assert not handler_fired.is_set() + + +class TestPreWarmedAdoption: + """Document the load-bearing assumption behind the is_alive() check at adoption.""" + + def test_is_ready_returns_true_for_dead_subprocess(self, runner): + """is_ready() reads a multiprocessing.Event the subprocess sets early in run(). + After SIGKILL the Event remains set in shared memory, so is_ready() alone cannot + distinguish a live runner from a corpse. is_alive() must also be checked before + adopting a pre-warmed runner. + """ + runner.start() + assert runner.wait_ready(timeout_s=5.0) + + os.kill(runner.pid, signal.SIGKILL) + runner.join(timeout=5.0) + + assert runner.is_ready() is True, "is_ready() should still report True even after death" + assert runner.is_alive() is False, "is_alive() should report False after death"