Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions charts/workspace/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import secrets
import shutil
import threading
import queue
import uuid
import urllib.parse
import urllib.request
Expand Down Expand Up @@ -753,6 +754,13 @@ def send_prompt():

threading.Thread(target=send_prompt, daemon=True).start()

EventBroker.publish('task.created', {
'task_id': meta.get('task_id'),
'status': meta.get('status'),
'name': meta.get('name'),
'assistant': meta.get('assistant'),
'parent_task_id': meta.get('parent_task_id'),
})
return meta

@staticmethod
Expand Down Expand Up @@ -812,6 +820,12 @@ def create_terminal_task(workdir=None):
capture_output=True, text=True,
)

EventBroker.publish('task.created', {
'task_id': meta.get('task_id'),
'status': meta.get('status'),
'name': meta.get('name'),
'kind': meta.get('kind'),
})
return meta

@staticmethod
Expand Down Expand Up @@ -1288,6 +1302,11 @@ def mutate(m):
meta.pop('last_input_prompt', None)
if fire_hook:
ClaudeTaskManager._fire_completion_hook(updated)
EventBroker.publish('task.status', {
'task_id': meta.get('task_id'),
'status': 'completed',
'finished_at': meta.get('finished_at'),
})
return

# Session is alive — derive waiting-for-input from render *quiescence*
Expand Down Expand Up @@ -1327,6 +1346,9 @@ def mutate(m):
meta['status'] = 'running'
meta.pop('waiting_for_input', None)
meta.pop('last_input_prompt', None)
EventBroker.publish('task.status', {
'task_id': meta.get('task_id'), 'status': 'running',
})
return

# Screen unchanged since the previous capture.
Expand All @@ -1342,6 +1364,9 @@ def mutate(m):
if updated is not None:
meta['status'] = 'waiting-for-input'
meta['waiting_for_input'] = True
EventBroker.publish('task.status', {
'task_id': meta.get('task_id'), 'status': 'waiting-for-input',
})


def _shell_quote(s):
Expand Down Expand Up @@ -2997,6 +3022,14 @@ def do_GET(self):
# --- Claude Task API (GET) ---
# Query string is already stripped at the top; handlers re-parse it from self.path when needed.
claude_path = normalized_path

# /api/events — Server-Sent Events firehose of dashboard events
# (task.created / task.status). Lets the SPA replace per-route polling
# with push (issue #93).
if claude_path == '/api/events':
self.handle_events_stream()
return

# /api/mode — public deployment-mode probe used by the SPA at boot to
# decide whether to hide mutation UI. Intentionally unauthenticated so
# the read-only public demo can fetch it without an auth proxy in
Expand Down Expand Up @@ -3576,6 +3609,59 @@ def capture():

time.sleep(poll_interval)

def handle_events_stream(self):
"""Server-Sent Events firehose of dashboard events (task.created /
task.status). Subscribes to EventBroker and forwards each event as a
named SSE frame so the SPA can replace per-route polling (issue #93).

Framing mirrors handle_claude_stream_output: heartbeat comments keep
proxies from closing an idle connection, and STREAM_MAX_SECONDS caps
the lifetime so a never-disconnecting client can't pin a handler
thread forever (the SPA reconnects on the `end` event).
"""
if not self.check_claude_auth():
self.send_json({'error': 'Unauthorized'}, 401)
return

self.send_response(200)
self.send_header('Content-Type', 'text/event-stream')
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.send_header('X-Accel-Buffering', 'no')
self.send_header('Connection', 'keep-alive')
self.end_headers()

def write_raw(payload_bytes):
try:
self.wfile.write(payload_bytes)
self.wfile.flush()
return True
except (BrokenPipeError, ConnectionResetError, OSError):
return False

q = EventBroker.subscribe()
started = time.time()
# Greet so the client can flip to "connected" and stop its poll fallback.
if not write_raw(b'event: ready\ndata: {}\n\n'):
EventBroker.unsubscribe(q)
return
try:
while True:
if time.time() - started > STREAM_MAX_SECONDS:
write_raw(b'event: end\ndata: timeout\n\n')
return
try:
event = q.get(timeout=15)
except queue.Empty:
if not write_raw(b': keep-alive\n\n'):
return
continue
payload = json.dumps(event.get('data', {}))
frame = f"event: {event.get('type', 'message')}\ndata: {payload}\n\n"
if not write_raw(frame.encode('utf-8')):
return
finally:
EventBroker.unsubscribe(q)

def handle_claude_create_task(self):
if not self.check_claude_auth():
self.send_json({'error': 'Unauthorized'}, 401)
Expand Down Expand Up @@ -6194,6 +6280,58 @@ def open_localhost(self):
except Exception as e:
self.send_error_response(f'Error opening localhost in Chrome: {str(e)}')

class EventBroker:
"""In-process fan-out of dashboard events to connected /api/events SSE
clients, so the SPA can replace per-route polling with push (issue #93).

Each subscriber gets a small bounded queue. If a client is too slow we drop
its oldest event rather than block the publisher — SSE is lossy-tolerant
here because the SPA reconciles via a normal fetch on (re)connect, so a
dropped event at worst delays an update until the next poll-fallback tick.
publish() never raises and never blocks the caller (e.g. the reconcile
loop or a request thread).
"""

QUEUE_MAX = 200
_subscribers = set()
_lock = threading.Lock()

@classmethod
def subscribe(cls):
q = queue.Queue(maxsize=cls.QUEUE_MAX)
with cls._lock:
cls._subscribers.add(q)
return q

@classmethod
def unsubscribe(cls, q):
with cls._lock:
cls._subscribers.discard(q)

@classmethod
def subscriber_count(cls):
with cls._lock:
return len(cls._subscribers)

@classmethod
def publish(cls, event_type, data=None):
"""Fan an event out to every subscriber. Never raises, never blocks."""
event = {'type': event_type, 'data': data or {}, 'ts': time.time()}
with cls._lock:
subs = list(cls._subscribers)
for q in subs:
try:
q.put_nowait(event)
except queue.Full:
# Slow consumer — drop its oldest event to make room.
try:
q.get_nowait()
q.put_nowait(event)
except (queue.Empty, queue.Full):
pass
return event


class TaskReconciler:
"""Single-process background poller that reconciles non-terminal task
status on an interval, so completion hooks fire and finished_at /
Expand Down
143 changes: 143 additions & 0 deletions charts/workspace/tests/event_broker_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""Unit tests for the SSE EventBroker and its task event emit points (#93).

Covers the in-process pub/sub (subscribe/publish/unsubscribe, fan-out,
no-subscriber safety, slow-consumer drop-oldest) and the integration where
reconciling a finished task publishes a `task.status` event.

Run with: python3 -m unittest tests.event_broker_test
(from charts/workspace/)
"""

import json
import os
import queue
import shutil
import sys
import tempfile
import unittest
from unittest import mock

HERE = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(HERE))
import server # noqa: E402

EB = server.EventBroker
CTM = server.ClaudeTaskManager


def _tmux_dead(*args, **kwargs):
argv = args[0] if args else kwargs.get('args', [])
if len(argv) >= 2 and argv[0] == 'tmux' and argv[1] == 'has-session':
return mock.Mock(returncode=1, stdout='', stderr='no session')
return mock.Mock(returncode=0, stdout='', stderr='')


class BrokerTestCase(unittest.TestCase):
def setUp(self):
# Isolate the class-level subscriber set per test.
self._orig = set(EB._subscribers)
EB._subscribers = set()
self.addCleanup(lambda: setattr(EB, '_subscribers', self._orig))


class BrokerBasicsTests(BrokerTestCase):
def test_subscriber_receives_published_event(self):
q = EB.subscribe()
EB.publish('task.status', {'task_id': 't1', 'status': 'completed'})
evt = q.get_nowait()
self.assertEqual(evt['type'], 'task.status')
self.assertEqual(evt['data']['task_id'], 't1')
self.assertIn('ts', evt)

def test_fan_out_to_all_subscribers(self):
q1, q2 = EB.subscribe(), EB.subscribe()
self.assertEqual(EB.subscriber_count(), 2)
EB.publish('task.created', {'task_id': 'x'})
self.assertEqual(q1.get_nowait()['data']['task_id'], 'x')
self.assertEqual(q2.get_nowait()['data']['task_id'], 'x')

def test_unsubscribe_stops_delivery(self):
q = EB.subscribe()
EB.unsubscribe(q)
self.assertEqual(EB.subscriber_count(), 0)
EB.publish('task.status', {'task_id': 't'})
with self.assertRaises(queue.Empty):
q.get_nowait()

def test_publish_with_no_subscribers_is_safe(self):
# Must not raise.
evt = EB.publish('task.status', {'task_id': 't'})
self.assertEqual(evt['type'], 'task.status')

def test_publish_defaults_empty_data(self):
q = EB.subscribe()
EB.publish('ready')
self.assertEqual(q.get_nowait()['data'], {})

def test_slow_consumer_drops_oldest(self):
with mock.patch.object(EB, 'QUEUE_MAX', 3):
q = EB.subscribe()
for i in range(5):
EB.publish('task.status', {'n': i})
drained = []
try:
while True:
drained.append(q.get_nowait()['data']['n'])
except queue.Empty:
pass
# Bounded to QUEUE_MAX, oldest dropped, newest retained.
self.assertEqual(len(drained), 3)
self.assertEqual(drained[-1], 4)
self.assertNotIn(0, drained)


class ReconcileEmitTests(BrokerTestCase):
def setUp(self):
super().setUp()
self.tmp = tempfile.mkdtemp(prefix='kcevt-')
self._orig_dir = CTM.TASKS_DIR
CTM.TASKS_DIR = self.tmp
self.addCleanup(self._restore)

def _restore(self):
CTM.TASKS_DIR = self._orig_dir
shutil.rmtree(self.tmp, ignore_errors=True)

def _task(self, tid, **meta):
d = os.path.join(self.tmp, tid)
os.makedirs(d, exist_ok=True)
m = {'task_id': tid}
m.update(meta)
with open(os.path.join(d, 'task.json'), 'w') as f:
json.dump(m, f)

def _drain(self, q):
out = []
try:
while True:
out.append(q.get_nowait())
except queue.Empty:
pass
return out

def test_completed_transition_publishes_task_status(self):
q = EB.subscribe()
self._task('t1', status='running', tmux_session='kube-coder-t1')
with mock.patch.object(server.subprocess, 'run', side_effect=_tmux_dead):
CTM.reconcile_running()
events = self._drain(q)
status_events = [e for e in events if e['type'] == 'task.status']
self.assertEqual(len(status_events), 1)
self.assertEqual(status_events[0]['data']['status'], 'completed')
self.assertEqual(status_events[0]['data']['task_id'], 't1')

def test_no_event_when_nothing_changes(self):
q = EB.subscribe()
self._task('done', status='completed', tmux_session='x')
with mock.patch.object(server.subprocess, 'run', side_effect=_tmux_dead):
CTM.reconcile_running()
self.assertEqual(self._drain(q), []) # terminal task → no transition → no event


if __name__ == '__main__':
unittest.main()