From 94641ab74fb6c0c6f1e54f2ce4622af4fd5bcf41 Mon Sep 17 00:00:00 2001 From: imran31415 Date: Tue, 16 Jun 2026 22:44:27 +0000 Subject: [PATCH] feat(events): backend SSE /api/events + event broker (part 1 of #93) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the push infrastructure the SPA needs to replace per-route polling: - EventBroker: in-process pub/sub fanning dashboard events to connected SSE clients. Bounded per-subscriber queues; a slow client drops its oldest event rather than blocking the publisher (SSE is lossy-tolerant — the SPA reconciles via a normal fetch on (re)connect). publish() never raises and never blocks the caller (reconcile loop / request thread). - GET /api/events: Server-Sent Events firehose. Subscribes to the broker and forwards each event as a named SSE frame; heartbeat comments keep proxies from closing idle connections; STREAM_MAX_SECONDS caps the lifetime (client reconnects on the `end` event). Framing mirrors the existing handle_claude_stream_output, incl. X-Accel-Buffering: no. - Emit points (fire on real transitions, so any caller — background reconciler or a lazy list/get — triggers them): * task.created — create_task / create_terminal_task * task.status — _reconcile_status completed / waiting-for-input / back-to-running transitions Built on the #96 reconcile loop: the daemon now drives events even when no client is reading a task. Tests: tests/event_broker_test.py (8) — fan-out, unsubscribe, no-sub safety, drop-oldest, and the reconcile→task.status integration. Full suite 466, OK. Part 1 of #93. Part 2 wires the SPA EventSource client + drops the poll loops (also finishing part 2 of #101). Co-Authored-By: Claude Opus 4.8 (1M context) --- charts/workspace/server.py | 138 +++++++++++++++++++ charts/workspace/tests/event_broker_test.py | 143 ++++++++++++++++++++ 2 files changed, 281 insertions(+) create mode 100644 charts/workspace/tests/event_broker_test.py diff --git a/charts/workspace/server.py b/charts/workspace/server.py index 5219936..d6625c6 100644 --- a/charts/workspace/server.py +++ b/charts/workspace/server.py @@ -14,6 +14,7 @@ import secrets import shutil import threading +import queue import uuid import urllib.parse import urllib.request @@ -753,6 +754,13 @@ def send_prompt(): threading.Thread(target=send_prompt, daemon=True).start() + EventBroker.publish('task.created', { + 'task_id': meta.get('task_id'), + 'status': meta.get('status'), + 'name': meta.get('name'), + 'assistant': meta.get('assistant'), + 'parent_task_id': meta.get('parent_task_id'), + }) return meta @staticmethod @@ -812,6 +820,12 @@ def create_terminal_task(workdir=None): capture_output=True, text=True, ) + EventBroker.publish('task.created', { + 'task_id': meta.get('task_id'), + 'status': meta.get('status'), + 'name': meta.get('name'), + 'kind': meta.get('kind'), + }) return meta @staticmethod @@ -1288,6 +1302,11 @@ def mutate(m): meta.pop('last_input_prompt', None) if fire_hook: ClaudeTaskManager._fire_completion_hook(updated) + EventBroker.publish('task.status', { + 'task_id': meta.get('task_id'), + 'status': 'completed', + 'finished_at': meta.get('finished_at'), + }) return # Session is alive — derive waiting-for-input from render *quiescence* @@ -1327,6 +1346,9 @@ def mutate(m): meta['status'] = 'running' meta.pop('waiting_for_input', None) meta.pop('last_input_prompt', None) + EventBroker.publish('task.status', { + 'task_id': meta.get('task_id'), 'status': 'running', + }) return # Screen unchanged since the previous capture. @@ -1342,6 +1364,9 @@ def mutate(m): if updated is not None: meta['status'] = 'waiting-for-input' meta['waiting_for_input'] = True + EventBroker.publish('task.status', { + 'task_id': meta.get('task_id'), 'status': 'waiting-for-input', + }) def _shell_quote(s): @@ -2997,6 +3022,14 @@ def do_GET(self): # --- Claude Task API (GET) --- # Query string is already stripped at the top; handlers re-parse it from self.path when needed. claude_path = normalized_path + + # /api/events — Server-Sent Events firehose of dashboard events + # (task.created / task.status). Lets the SPA replace per-route polling + # with push (issue #93). + if claude_path == '/api/events': + self.handle_events_stream() + return + # /api/mode — public deployment-mode probe used by the SPA at boot to # decide whether to hide mutation UI. Intentionally unauthenticated so # the read-only public demo can fetch it without an auth proxy in @@ -3576,6 +3609,59 @@ def capture(): time.sleep(poll_interval) + def handle_events_stream(self): + """Server-Sent Events firehose of dashboard events (task.created / + task.status). Subscribes to EventBroker and forwards each event as a + named SSE frame so the SPA can replace per-route polling (issue #93). + + Framing mirrors handle_claude_stream_output: heartbeat comments keep + proxies from closing an idle connection, and STREAM_MAX_SECONDS caps + the lifetime so a never-disconnecting client can't pin a handler + thread forever (the SPA reconnects on the `end` event). + """ + if not self.check_claude_auth(): + self.send_json({'error': 'Unauthorized'}, 401) + return + + self.send_response(200) + self.send_header('Content-Type', 'text/event-stream') + self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate') + self.send_header('X-Accel-Buffering', 'no') + self.send_header('Connection', 'keep-alive') + self.end_headers() + + def write_raw(payload_bytes): + try: + self.wfile.write(payload_bytes) + self.wfile.flush() + return True + except (BrokenPipeError, ConnectionResetError, OSError): + return False + + q = EventBroker.subscribe() + started = time.time() + # Greet so the client can flip to "connected" and stop its poll fallback. + if not write_raw(b'event: ready\ndata: {}\n\n'): + EventBroker.unsubscribe(q) + return + try: + while True: + if time.time() - started > STREAM_MAX_SECONDS: + write_raw(b'event: end\ndata: timeout\n\n') + return + try: + event = q.get(timeout=15) + except queue.Empty: + if not write_raw(b': keep-alive\n\n'): + return + continue + payload = json.dumps(event.get('data', {})) + frame = f"event: {event.get('type', 'message')}\ndata: {payload}\n\n" + if not write_raw(frame.encode('utf-8')): + return + finally: + EventBroker.unsubscribe(q) + def handle_claude_create_task(self): if not self.check_claude_auth(): self.send_json({'error': 'Unauthorized'}, 401) @@ -6194,6 +6280,58 @@ def open_localhost(self): except Exception as e: self.send_error_response(f'Error opening localhost in Chrome: {str(e)}') +class EventBroker: + """In-process fan-out of dashboard events to connected /api/events SSE + clients, so the SPA can replace per-route polling with push (issue #93). + + Each subscriber gets a small bounded queue. If a client is too slow we drop + its oldest event rather than block the publisher — SSE is lossy-tolerant + here because the SPA reconciles via a normal fetch on (re)connect, so a + dropped event at worst delays an update until the next poll-fallback tick. + publish() never raises and never blocks the caller (e.g. the reconcile + loop or a request thread). + """ + + QUEUE_MAX = 200 + _subscribers = set() + _lock = threading.Lock() + + @classmethod + def subscribe(cls): + q = queue.Queue(maxsize=cls.QUEUE_MAX) + with cls._lock: + cls._subscribers.add(q) + return q + + @classmethod + def unsubscribe(cls, q): + with cls._lock: + cls._subscribers.discard(q) + + @classmethod + def subscriber_count(cls): + with cls._lock: + return len(cls._subscribers) + + @classmethod + def publish(cls, event_type, data=None): + """Fan an event out to every subscriber. Never raises, never blocks.""" + event = {'type': event_type, 'data': data or {}, 'ts': time.time()} + with cls._lock: + subs = list(cls._subscribers) + for q in subs: + try: + q.put_nowait(event) + except queue.Full: + # Slow consumer — drop its oldest event to make room. + try: + q.get_nowait() + q.put_nowait(event) + except (queue.Empty, queue.Full): + pass + return event + + class TaskReconciler: """Single-process background poller that reconciles non-terminal task status on an interval, so completion hooks fire and finished_at / diff --git a/charts/workspace/tests/event_broker_test.py b/charts/workspace/tests/event_broker_test.py new file mode 100644 index 0000000..eb457d4 --- /dev/null +++ b/charts/workspace/tests/event_broker_test.py @@ -0,0 +1,143 @@ +"""Unit tests for the SSE EventBroker and its task event emit points (#93). + +Covers the in-process pub/sub (subscribe/publish/unsubscribe, fan-out, +no-subscriber safety, slow-consumer drop-oldest) and the integration where +reconciling a finished task publishes a `task.status` event. + +Run with: python3 -m unittest tests.event_broker_test +(from charts/workspace/) +""" + +import json +import os +import queue +import shutil +import sys +import tempfile +import unittest +from unittest import mock + +HERE = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.dirname(HERE)) +import server # noqa: E402 + +EB = server.EventBroker +CTM = server.ClaudeTaskManager + + +def _tmux_dead(*args, **kwargs): + argv = args[0] if args else kwargs.get('args', []) + if len(argv) >= 2 and argv[0] == 'tmux' and argv[1] == 'has-session': + return mock.Mock(returncode=1, stdout='', stderr='no session') + return mock.Mock(returncode=0, stdout='', stderr='') + + +class BrokerTestCase(unittest.TestCase): + def setUp(self): + # Isolate the class-level subscriber set per test. + self._orig = set(EB._subscribers) + EB._subscribers = set() + self.addCleanup(lambda: setattr(EB, '_subscribers', self._orig)) + + +class BrokerBasicsTests(BrokerTestCase): + def test_subscriber_receives_published_event(self): + q = EB.subscribe() + EB.publish('task.status', {'task_id': 't1', 'status': 'completed'}) + evt = q.get_nowait() + self.assertEqual(evt['type'], 'task.status') + self.assertEqual(evt['data']['task_id'], 't1') + self.assertIn('ts', evt) + + def test_fan_out_to_all_subscribers(self): + q1, q2 = EB.subscribe(), EB.subscribe() + self.assertEqual(EB.subscriber_count(), 2) + EB.publish('task.created', {'task_id': 'x'}) + self.assertEqual(q1.get_nowait()['data']['task_id'], 'x') + self.assertEqual(q2.get_nowait()['data']['task_id'], 'x') + + def test_unsubscribe_stops_delivery(self): + q = EB.subscribe() + EB.unsubscribe(q) + self.assertEqual(EB.subscriber_count(), 0) + EB.publish('task.status', {'task_id': 't'}) + with self.assertRaises(queue.Empty): + q.get_nowait() + + def test_publish_with_no_subscribers_is_safe(self): + # Must not raise. + evt = EB.publish('task.status', {'task_id': 't'}) + self.assertEqual(evt['type'], 'task.status') + + def test_publish_defaults_empty_data(self): + q = EB.subscribe() + EB.publish('ready') + self.assertEqual(q.get_nowait()['data'], {}) + + def test_slow_consumer_drops_oldest(self): + with mock.patch.object(EB, 'QUEUE_MAX', 3): + q = EB.subscribe() + for i in range(5): + EB.publish('task.status', {'n': i}) + drained = [] + try: + while True: + drained.append(q.get_nowait()['data']['n']) + except queue.Empty: + pass + # Bounded to QUEUE_MAX, oldest dropped, newest retained. + self.assertEqual(len(drained), 3) + self.assertEqual(drained[-1], 4) + self.assertNotIn(0, drained) + + +class ReconcileEmitTests(BrokerTestCase): + def setUp(self): + super().setUp() + self.tmp = tempfile.mkdtemp(prefix='kcevt-') + self._orig_dir = CTM.TASKS_DIR + CTM.TASKS_DIR = self.tmp + self.addCleanup(self._restore) + + def _restore(self): + CTM.TASKS_DIR = self._orig_dir + shutil.rmtree(self.tmp, ignore_errors=True) + + def _task(self, tid, **meta): + d = os.path.join(self.tmp, tid) + os.makedirs(d, exist_ok=True) + m = {'task_id': tid} + m.update(meta) + with open(os.path.join(d, 'task.json'), 'w') as f: + json.dump(m, f) + + def _drain(self, q): + out = [] + try: + while True: + out.append(q.get_nowait()) + except queue.Empty: + pass + return out + + def test_completed_transition_publishes_task_status(self): + q = EB.subscribe() + self._task('t1', status='running', tmux_session='kube-coder-t1') + with mock.patch.object(server.subprocess, 'run', side_effect=_tmux_dead): + CTM.reconcile_running() + events = self._drain(q) + status_events = [e for e in events if e['type'] == 'task.status'] + self.assertEqual(len(status_events), 1) + self.assertEqual(status_events[0]['data']['status'], 'completed') + self.assertEqual(status_events[0]['data']['task_id'], 't1') + + def test_no_event_when_nothing_changes(self): + q = EB.subscribe() + self._task('done', status='completed', tmux_session='x') + with mock.patch.object(server.subprocess, 'run', side_effect=_tmux_dead): + CTM.reconcile_running() + self.assertEqual(self._drain(q), []) # terminal task → no transition → no event + + +if __name__ == '__main__': + unittest.main()