diff --git a/charts/workspace/server.py b/charts/workspace/server.py index 7d31ecb..a096c62 100644 --- a/charts/workspace/server.py +++ b/charts/workspace/server.py @@ -861,6 +861,46 @@ def list_tasks(parent=None): continue return tasks + @staticmethod + def reconcile_running(max_tasks=1000): + """Reconcile every non-terminal task once; return the count touched. + + This is what the background TaskReconciler calls so a finished task's + completion hook fires (and finished_at / waiting-for-input update) even + when no client is reading it. Without it, _reconcile_status only runs + lazily on list/get/stream, so a headless webhook/cron callback can be + arbitrarily delayed — or never fire if nothing polls (issue #96). + + Best-effort: a bad task dir is skipped, never raised. Terminal tasks + are skipped cheaply (no tmux subprocess). + """ + ClaudeTaskManager.ensure_tasks_dir() + try: + entries = sorted(os.listdir(ClaudeTaskManager.TASKS_DIR), reverse=True) + except OSError: + return 0 + reconciled = 0 + for entry in entries[:max_tasks]: + task_dir = os.path.join(ClaudeTaskManager.TASKS_DIR, entry) + meta_path = os.path.join(task_dir, 'task.json') + if not os.path.isfile(meta_path): + continue + try: + with open(meta_path, 'r') as f: + meta = json.load(f) + except (json.JSONDecodeError, OSError): + continue + # Cheap skip for terminal tasks — avoids the tmux has-session call. + if meta.get('status') not in ('running', 'waiting-for-input'): + continue + try: + ClaudeTaskManager._reconcile_status(meta, task_dir) + reconciled += 1 + except Exception as e: + print(f'[task-reconciler] reconcile {entry} failed: {e}', + file=sys.stderr) + return reconciled + @staticmethod def get_task(task_id): task_dir = os.path.join(ClaudeTaskManager.TASKS_DIR, task_id) @@ -6141,6 +6181,51 @@ def open_localhost(self): except Exception as e: self.send_error_response(f'Error opening localhost in Chrome: {str(e)}') +class TaskReconciler: + """Single-process background poller that reconciles non-terminal task + status on an interval, so completion hooks fire and finished_at / + waiting-for-input update even when no client is reading the task. + + Idempotent; safe to start once. Modeled on memory.sync.ClaudeMemorySyncer. + See issue #96. + """ + + _started = False + _thread = None + _stop_event = threading.Event() + _last_run_at = 0.0 + _last_reconciled = 0 + _start_lock = threading.Lock() + + @classmethod + def start(cls, *, interval_seconds=10): + with cls._start_lock: + if cls._started: + return + cls._started = True + + def _loop(): + while not cls._stop_event.is_set(): + try: + cls._last_reconciled = ClaudeTaskManager.reconcile_running() + cls._last_run_at = time.time() + except Exception as e: + print(f'[task-reconciler] pass failed: {e}', file=sys.stderr) + cls._stop_event.wait(interval_seconds) + + t = threading.Thread(target=_loop, name='task-reconciler', daemon=True) + cls._thread = t + t.start() + + @classmethod + def status(cls): + return { + 'running': cls._started and (cls._thread is not None and cls._thread.is_alive()), + 'last_run_at': cls._last_run_at or None, + 'last_reconciled': cls._last_reconciled, + } + + if __name__ == "__main__": # Change to the directory containing our files os.chdir('/tmp/browser') @@ -6164,6 +6249,19 @@ def open_localhost(self): except Exception as e: print(f'[memory] syncer start failed: {e}', file=sys.stderr) + # Background task reconciler: flips finished tasks running -> completed and + # fires their completion hooks even when nothing is reading them, so headless + # webhook/cron callbacks are timely (issue #96). + try: + _reconcile_interval = int(os.environ.get('KC_RECONCILE_INTERVAL', '10')) + except (TypeError, ValueError): + _reconcile_interval = 10 + try: + TaskReconciler.start(interval_seconds=_reconcile_interval) + print(f'[tasks] background reconciler started ({_reconcile_interval}s)') + except Exception as e: + print(f'[tasks] reconciler start failed: {e}', file=sys.stderr) + print("Starting Browser API Server on port 6080...") print("Available endpoints:") print(" GET / - Browser interface") diff --git a/charts/workspace/tests/task_reconciler_test.py b/charts/workspace/tests/task_reconciler_test.py new file mode 100644 index 0000000..e7c121f --- /dev/null +++ b/charts/workspace/tests/task_reconciler_test.py @@ -0,0 +1,152 @@ +"""Unit tests for the background task reconciler (issue #96). + +Covers ClaudeTaskManager.reconcile_running — the method the TaskReconciler +daemon calls so a finished task's status flips and its completion hook fires +even when no client is reading the task — plus the TaskReconciler lifecycle. + +tmux is simulated via subprocess.run stubs; TASKS_DIR is a tempdir; +_fire_completion_hook is mocked so no network I/O happens. + +Run with: python3 -m unittest tests.task_reconciler_test +(from charts/workspace/) +""" + +import json +import os +import shutil +import sys +import tempfile +import unittest +from unittest import mock + +HERE = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.dirname(HERE)) +import server # noqa: E402 + +CTM = server.ClaudeTaskManager + + +def _tmux_dead(*args, **kwargs): + """subprocess.run stub: tmux has-session reports the session is gone + (returncode 1); every other tmux op succeeds.""" + argv = args[0] if args else kwargs.get('args', []) + if len(argv) >= 2 and argv[0] == 'tmux' and argv[1] == 'has-session': + return mock.Mock(returncode=1, stdout='', stderr='no session') + return mock.Mock(returncode=0, stdout='', stderr='') + + +class ReconcileRunningTests(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.mkdtemp(prefix='kcrecon-') + self._orig = CTM.TASKS_DIR + CTM.TASKS_DIR = self.tmp + self.addCleanup(self._restore) + + def _restore(self): + CTM.TASKS_DIR = self._orig + shutil.rmtree(self.tmp, ignore_errors=True) + + def _task(self, tid, **meta): + d = os.path.join(self.tmp, tid) + os.makedirs(d, exist_ok=True) + m = {'task_id': tid} + m.update(meta) + with open(os.path.join(d, 'task.json'), 'w') as f: + json.dump(m, f) + return d + + def _read(self, tid): + with open(os.path.join(self.tmp, tid, 'task.json')) as f: + return json.load(f) + + def test_dead_running_task_completes_and_fires_hook(self): + self._task('t1', status='running', tmux_session='kube-coder-t1', + response_url='http://hook.example/cb') + with mock.patch.object(server.subprocess, 'run', side_effect=_tmux_dead), \ + mock.patch.object(CTM, '_fire_completion_hook') as fire: + n = CTM.reconcile_running() + self.assertEqual(n, 1) + fire.assert_called_once() + meta = self._read('t1') + self.assertEqual(meta['status'], 'completed') + self.assertIn('finished_at', meta) + self.assertIn('hook_fired_at', meta) + + def test_completes_without_hook_when_no_response_url(self): + self._task('t1', status='running', tmux_session='s') + with mock.patch.object(server.subprocess, 'run', side_effect=_tmux_dead), \ + mock.patch.object(CTM, '_fire_completion_hook') as fire: + CTM.reconcile_running() + fire.assert_not_called() + self.assertEqual(self._read('t1')['status'], 'completed') + + def test_terminal_tasks_skipped_without_tmux_call(self): + self._task('done', status='completed', tmux_session='x') + self._task('killed', status='killed', tmux_session='y') + with mock.patch.object(server.subprocess, 'run') as run: + n = CTM.reconcile_running() + self.assertEqual(n, 0) + run.assert_not_called() # terminal → cheap skip, no subprocess + + def test_live_session_stays_running(self): + self._task('t1', status='running', tmux_session='kube-coder-t1') + # has-session returns 0 (alive); capture-pane returns stable output. + with mock.patch.object(server.subprocess, 'run', + return_value=mock.Mock(returncode=0, stdout='x', stderr='')), \ + mock.patch.object(CTM, '_fire_completion_hook') as fire: + n = CTM.reconcile_running() + self.assertEqual(n, 1) + fire.assert_not_called() + self.assertEqual(self._read('t1')['status'], 'running') + + def test_corrupt_and_missing_meta_skipped(self): + # A corrupt task dir + a dir with no task.json must not break the pass. + bad = os.path.join(self.tmp, 'bad'); os.makedirs(bad) + with open(os.path.join(bad, 'task.json'), 'w') as f: + f.write('{not json') + os.makedirs(os.path.join(self.tmp, 'nometa')) + self._task('good', status='running', tmux_session='s') + with mock.patch.object(server.subprocess, 'run', side_effect=_tmux_dead), \ + mock.patch.object(CTM, '_fire_completion_hook'): + n = CTM.reconcile_running() + self.assertEqual(n, 1) # only the good running task + + def test_missing_tasks_dir_returns_zero(self): + CTM.TASKS_DIR = os.path.join(self.tmp, 'gone') + shutil.rmtree(self.tmp, ignore_errors=True) + # ensure_tasks_dir recreates it empty → zero tasks, no raise. + self.assertEqual(CTM.reconcile_running(), 0) + + def test_max_tasks_bound(self): + for i in range(5): + self._task(f't{i}', status='running', tmux_session='s') + with mock.patch.object(server.subprocess, 'run', side_effect=_tmux_dead), \ + mock.patch.object(CTM, '_fire_completion_hook'): + n = CTM.reconcile_running(max_tasks=2) + self.assertEqual(n, 2) + + def test_reconcile_failure_does_not_raise(self): + self._task('t1', status='running', tmux_session='s') + with mock.patch.object(CTM, '_reconcile_status', side_effect=RuntimeError('boom')): + n = CTM.reconcile_running() # must swallow + continue + self.assertEqual(n, 0) + + +class TaskReconcilerLifecycleTests(unittest.TestCase): + def test_start_is_idempotent(self): + orig = server.TaskReconciler._started + server.TaskReconciler._started = True + try: + # Second start() must be a no-op and must not raise / spawn a thread. + server.TaskReconciler.start(interval_seconds=10) + finally: + server.TaskReconciler._started = orig + + def test_status_shape(self): + s = server.TaskReconciler.status() + for k in ('running', 'last_run_at', 'last_reconciled'): + self.assertIn(k, s) + + +if __name__ == '__main__': + unittest.main()