Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion software/control/core/job_processing.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import abc
import multiprocessing
import multiprocessing.connection
import queue
import os
import threading
import time
import json
from datetime import datetime
from contextlib import contextmanager
from typing import ClassVar, Dict, Generic, List, Optional, Set, Tuple, TypeVar, Union
from typing import Callable, ClassVar, Dict, Generic, List, Optional, Set, Tuple, TypeVar, Union
from uuid import uuid4

from dataclasses import dataclass, field
Expand Down Expand Up @@ -1064,13 +1066,63 @@ def __init__(
self._bp_pending_bytes = bp_pending_bytes
self._bp_capacity_event = bp_capacity_event

self._on_unexpected_exit: Optional[Callable[[Optional[int]], None]] = None
self._watchdog: Optional[threading.Thread] = None
# Set by kill()/terminate()/shutdown() so the watchdog can distinguish
# intentional exit from segfault/OOM. Separate from _shutdown_event, which
# is a multiprocessing primitive that shutdown() nulls during cleanup.
self._intentional_exit = False

# Clean up stale metadata files from previous crashed acquisitions
# Only run when explicitly requested (i.e., when OME-TIFF saving is being used)
if cleanup_stale_ome_files:
removed = ome_tiff_writer.cleanup_stale_metadata_files()
if removed:
self._log.info(f"Cleaned up {len(removed)} stale OME-TIFF metadata files")

def set_unexpected_exit_handler(self, handler: Optional[Callable[[Optional[int]], None]]) -> None:
"""Register a callback to invoke if the subprocess dies without a clean shutdown.

The handler is called from the watchdog thread with the subprocess exitcode
(which may be None, a positive int, or a negative signal number on POSIX).
"""
self._on_unexpected_exit = handler

def start(self):
super().start()
self._watchdog = threading.Thread(
target=self._watch_subprocess,
daemon=True,
name=f"JobRunner-watchdog[{self.pid}]",
)
self._watchdog.start()

def kill(self):
self._intentional_exit = True
super().kill()

Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The watchdog treats an exit as "expected" only when _shutdown_event is set. In this codebase JobRunner.terminate() is used during shutdown (e.g., MultiPointController.close), but terminate() does not set _shutdown_event, so the watchdog will log "died UNEXPECTEDLY" and invoke the handler during intentional termination. Consider overriding terminate() (and any other explicit-stop path you use) to set _shutdown_event the same way as kill()/shutdown() before signaling the process.

Suggested change
def terminate(self):
# Mark as expected so the watchdog treats the exit as intentional.
if self._shutdown_event is not None:
self._shutdown_event.set()
super().terminate()

Copilot uses AI. Check for mistakes.
def terminate(self):
self._intentional_exit = True
super().terminate()

def _watch_subprocess(self) -> None:
"""Block until the subprocess exits, then distinguish expected vs. unexpected death."""
pid = self.pid
multiprocessing.connection.wait([self.sentinel])
exitcode = self.exitcode
if self._intentional_exit:
self._log.info(f"JobRunner PID={pid} exited cleanly (exitcode={exitcode})")
return
self._log.error(
f"JobRunner PID={pid} died UNEXPECTEDLY (exitcode={exitcode}). Pending save jobs will not complete."
)
handler = self._on_unexpected_exit
if handler is not None:
try:
handler(exitcode)
except Exception:
self._log.exception("JobRunner unexpected-exit handler raised")

def dispatch(self, job: Job):
# Inject acquisition_info into SaveOMETiffJob instances before serialization.
# The job object is pickled when placed in the queue, so injection must happen here.
Expand Down Expand Up @@ -1174,6 +1226,7 @@ def shutdown(self, timeout_s=1.0):
# Guard against double shutdown
if self._shutdown_event is None:
return
self._intentional_exit = True
self._shutdown_event.set()
# Send sentinel to wake up worker blocked on queue.get()
try:
Expand Down
4 changes: 3 additions & 1 deletion software/control/core/memory_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ def _get_linux_pss_mb(pid: int) -> float:
pss_total_kb += int(parts[1])

return pss_total_kb / 1024
except (FileNotFoundError, PermissionError, ValueError):
except (FileNotFoundError, PermissionError, ValueError, ProcessLookupError):
# ProcessLookupError (errno ESRCH) can occur when the process exited between
# PID enumeration and the smaps_rollup read, or when the PID is a zombie.
pass
return 0.0

Expand Down
20 changes: 15 additions & 5 deletions software/control/core/multi_point_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,24 +332,26 @@ def __init__(
if Acquisition.USE_MULTIPROCESSING:
# Try to use pre-warmed runner for the first job class
if can_use_prewarmed and not used_prewarmed:
if prewarmed_job_runner.is_ready():
# is_alive() must be checked alongside is_ready(): the subprocess sets
# _ready_event early in run() and the Event survives in shared memory
# after death, so is_ready() alone can't detect a corpse.
if prewarmed_job_runner.is_alive() and prewarmed_job_runner.is_ready():
self._log.info(f"Using pre-warmed job runner for {job_class.__name__} jobs")
job_runner = prewarmed_job_runner
# Configure it with current acquisition settings
job_runner.set_unexpected_exit_handler(self._on_job_runner_died)
job_runner.set_acquisition_info(self.acquisition_info)
if zarr_writer_info:
job_runner.set_zarr_writer_info(zarr_writer_info)
used_prewarmed = True
else:
self._log.warning(
f"Pre-warmed job runner not ready (possibly hung during warmup), "
f"Pre-warmed job runner unavailable (died or hung during warmup); "
f"shutting it down and creating new one for {job_class.__name__}"
)
# Shutdown the hung pre-warmed runner to avoid resource leak
try:
prewarmed_job_runner.shutdown(timeout_s=1.0)
except Exception as e:
self._log.error(f"Error shutting down hung pre-warmed runner: {e}")
self._log.error(f"Error shutting down unusable pre-warmed runner: {e}")
# Don't try to use pre-warmed runner again for subsequent job classes
can_use_prewarmed = False

Expand All @@ -366,13 +368,21 @@ def __init__(
# Pass zarr writer info for ZARR_V3 format
zarr_writer_info=zarr_writer_info,
)
# Must precede start() so the watchdog covers warmup-time deaths.
job_runner.set_unexpected_exit_handler(self._on_job_runner_died)
job_runner.start()
# Subprocess starts warming up in background - don't block here

self._job_runners.append((job_class, job_runner))
self._abort_on_failed_job = abort_on_failed_jobs
self._first_job_dispatched = False # Track if we've waited for subprocess warmup

def _on_job_runner_died(self, exitcode: Optional[int]) -> None:
"""Invoked by JobRunner's watchdog when a subprocess dies unexpectedly."""
self._log.error(f"JobRunner subprocess died unexpectedly (exitcode={exitcode}); aborting acquisition.")
self._acquisition_error_count += 1
self.request_abort_fn()

def update_use_piezo(self, value):
self.use_piezo = value
self._log.info(f"MultiPointWorker: updated use_piezo to {value}")
Expand Down
147 changes: 147 additions & 0 deletions software/tests/control/core/test_job_runner_watchdog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Tests for JobRunner watchdog (unexpected subprocess death detection).

These tests cover the watchdog thread that distinguishes intentional shutdown
from unexpected subprocess death (segfault, SIGKILL, OOM kill) and invokes a
registered handler so an acquisition can abort instead of silently rotting.
"""

import os
import signal
import threading
import time

import pytest

from control.core.job_processing import JobRunner


@pytest.fixture
def runner():
"""Provide an unstarted JobRunner; ensure cleanup even if the test crashes mid-run."""
r = JobRunner()
r.daemon = True
yield r
if r.is_alive():
try:
r.kill()
r.join(timeout=2.0)
except Exception:
pass


# Watchdog runs in a daemon thread; allow it to finish after the sentinel fires.
_WATCHDOG_GRACE_S = 0.3


class TestWatchdogUnexpectedDeath:
"""Verify the watchdog detects unexpected subprocess death and invokes the handler."""

def test_sigkill_fires_handler_with_negative_exitcode(self, runner):
handler_fired = threading.Event()
received_exitcode = []

def handler(exitcode):
received_exitcode.append(exitcode)
handler_fired.set()

runner.set_unexpected_exit_handler(handler)
runner.start()
assert runner.wait_ready(timeout_s=5.0)

os.kill(runner.pid, signal.SIGKILL)

assert handler_fired.wait(timeout=5.0), "Watchdog handler did not fire after SIGKILL"
assert received_exitcode == [-signal.SIGKILL]


class TestWatchdogIntentionalExit:
"""Verify intentional stop paths (kill/terminate/shutdown) do NOT fire the handler."""

def test_kill_does_not_fire_handler(self, runner):
handler_fired = threading.Event()
runner.set_unexpected_exit_handler(lambda ec: handler_fired.set())
runner.start()
assert runner.wait_ready(timeout_s=5.0)

runner.kill()
runner.join(timeout=2.0)
time.sleep(_WATCHDOG_GRACE_S)

assert not handler_fired.is_set(), "Handler fired despite intentional kill()"

def test_terminate_does_not_fire_handler(self, runner):
handler_fired = threading.Event()
runner.set_unexpected_exit_handler(lambda ec: handler_fired.set())
runner.start()
assert runner.wait_ready(timeout_s=5.0)

runner.terminate()
runner.join(timeout=2.0)
time.sleep(_WATCHDOG_GRACE_S)

assert not handler_fired.is_set(), "Handler fired despite intentional terminate()"

def test_shutdown_does_not_fire_handler(self, runner):
handler_fired = threading.Event()
runner.set_unexpected_exit_handler(lambda ec: handler_fired.set())
runner.start()
assert runner.wait_ready(timeout_s=5.0)

runner.shutdown(timeout_s=2.0)
time.sleep(_WATCHDOG_GRACE_S)

assert not handler_fired.is_set(), "Handler fired despite intentional shutdown()"


class TestWatchdogResilience:
"""Verify the watchdog is robust to handler misbehavior and shutdown ordering."""

def test_handler_exception_does_not_propagate(self, runner):
# The watchdog daemon thread must catch handler exceptions (it logs them).
# If propagation happened, the test process would not reach the post-join asserts.
runner.set_unexpected_exit_handler(lambda ec: (_ for _ in ()).throw(RuntimeError("boom")))
runner.start()
assert runner.wait_ready(timeout_s=5.0)

os.kill(runner.pid, signal.SIGKILL)
runner.join(timeout=5.0)
time.sleep(_WATCHDOG_GRACE_S)

assert not runner.is_alive()

def test_intentional_exit_survives_shutdown_cleanup(self, runner):
"""Regression: shutdown() nulls _shutdown_event during cleanup. The intent flag
must be a separate attribute that survives that nullification, or the watchdog
could read None and misclassify intentional shutdown as unexpected death.
"""
handler_fired = threading.Event()
runner.set_unexpected_exit_handler(lambda ec: handler_fired.set())
runner.start()
assert runner.wait_ready(timeout_s=5.0)

runner.shutdown(timeout_s=2.0)

assert runner._intentional_exit is True
assert runner._shutdown_event is None

time.sleep(_WATCHDOG_GRACE_S)
assert not handler_fired.is_set()


class TestPreWarmedAdoption:
"""Document the load-bearing assumption behind the is_alive() check at adoption."""

def test_is_ready_returns_true_for_dead_subprocess(self, runner):
"""is_ready() reads a multiprocessing.Event the subprocess sets early in run().
After SIGKILL the Event remains set in shared memory, so is_ready() alone cannot
distinguish a live runner from a corpse. is_alive() must also be checked before
adopting a pre-warmed runner.
"""
runner.start()
assert runner.wait_ready(timeout_s=5.0)

os.kill(runner.pid, signal.SIGKILL)
runner.join(timeout=5.0)

assert runner.is_ready() is True, "is_ready() should still report True even after death"
assert runner.is_alive() is False, "is_alive() should report False after death"
Loading