From 1a8e693b3f76b00f7d0e713711b118f6fb451f38 Mon Sep 17 00:00:00 2001 From: Hiro Tamada Date: Mon, 23 Feb 2026 15:27:53 -0500 Subject: [PATCH 1/3] feat: gracefully stop and restart recordings during display resize Instead of refusing display resize requests (409) when recordings are active, PatchDisplay now stops active recordings, performs the resize, then restarts them with the same ID and params. The pre-resize segment is preserved by renaming it (e.g. `-before-resize-.mp4`). Live view sessions still block the resize as before. Co-authored-by: Cursor --- server/cmd/api/api/display.go | 128 ++++++++++++++- server/cmd/api/api/display_test.go | 241 +++++++++++++++++++++++++++++ server/lib/recorder/ffmeg_test.go | 32 ++++ server/lib/recorder/ffmpeg.go | 14 ++ 4 files changed, 407 insertions(+), 8 deletions(-) create mode 100644 server/cmd/api/api/display_test.go diff --git a/server/cmd/api/api/display.go b/server/cmd/api/api/display.go index 534f2a01..63b91573 100644 --- a/server/cmd/api/api/display.go +++ b/server/cmd/api/api/display.go @@ -8,10 +8,12 @@ import ( "os/exec" "strconv" "strings" + "time" nekooapi "github.com/m1k1o/neko/server/lib/oapi" "github.com/onkernel/kernel-images/server/lib/logger" oapi "github.com/onkernel/kernel-images/server/lib/oapi" + "github.com/onkernel/kernel-images/server/lib/recorder" ) // PatchDisplay updates the display configuration. When require_idle @@ -62,18 +64,29 @@ func (s *ApiService) PatchDisplay(ctx context.Context, req oapi.PatchDisplayRequ requireIdle = *req.Body.RequireIdle } - // Check if resize is safe (no active sessions or recordings) + // Check if resize is safe (no active live view sessions) if requireIdle { live := s.getActiveNekoSessions(ctx) - isRecording := s.anyRecordingActive(ctx) - resizableNow := (live == 0) && !isRecording - - log.Info("checking if resize is safe", "live_sessions", live, "is_recording", isRecording, "resizable", resizableNow) - - if !resizableNow { + if live > 0 { + log.Info("resize refused: live view or replay active", "live_sessions", live) return oapi.PatchDisplay409JSONResponse{ ConflictErrorJSONResponse: oapi.ConflictErrorJSONResponse{ - Message: "resize refused: live view or recording/replay active", + Message: "resize refused: live view or replay active", + }, + }, nil + } + + // Gracefully stop active recordings so the resize can proceed. + // They will be restarted (with new segment files) after the resize completes. + stopped, stopErr := s.stopActiveRecordings(ctx) + if len(stopped) > 0 { + defer s.restartRecordings(context.WithoutCancel(ctx), stopped) + } + if stopErr != nil { + log.Error("failed to stop recordings for resize", "error", stopErr) + return oapi.PatchDisplay500JSONResponse{ + InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{ + Message: fmt.Sprintf("failed to stop recordings for resize: %s", stopErr.Error()), }, }, nil } @@ -361,6 +374,105 @@ func (s *ApiService) getCurrentResolution(ctx context.Context) (int, int, int, e return width, height, refreshRate, nil } +// stoppedRecordingInfo holds state captured from a recording that was stopped +// so it can be restarted after a display resize. +type stoppedRecordingInfo struct { + id string + params recorder.FFmpegRecordingParams + outputPath string +} + +// stopActiveRecordings gracefully stops every recording that is currently in +// progress and deregisters them from the manager. It returns info needed to +// restart each recording later. Recordings that were successfully stopped are +// always included in the returned slice, even when a later recording fails to +// stop (so the caller can restart whatever was stopped). +func (s *ApiService) stopActiveRecordings(ctx context.Context) ([]stoppedRecordingInfo, error) { + log := logger.FromContext(ctx) + var stopped []stoppedRecordingInfo + + for _, rec := range s.recordManager.ListActiveRecorders(ctx) { + if !rec.IsRecording(ctx) { + continue + } + + id := rec.ID() + + ffmpegRec, ok := rec.(*recorder.FFmpegRecorder) + if !ok { + log.Warn("cannot capture params from non-FFmpeg recorder, skipping", "id", id) + continue + } + + params := ffmpegRec.Params() + outputPath := ffmpegRec.OutputPath() + + log.Info("stopping recording for resize", "id", id) + if err := rec.Stop(ctx); err != nil { + // Stop() returns finalization errors even when the process was + // successfully terminated. Only treat it as a hard failure if + // the process is still running. + if rec.IsRecording(ctx) { + log.Error("failed to stop recording for resize", "id", id, "error", err) + return stopped, fmt.Errorf("failed to stop recording %s: %w", id, err) + } + log.Warn("recording stopped with finalization warning", "id", id, "error", err) + } + + if err := s.recordManager.DeregisterRecorder(ctx, rec); err != nil { + log.Error("failed to deregister recorder", "id", id, "error", err) + } + + stopped = append(stopped, stoppedRecordingInfo{ + id: id, + params: params, + outputPath: outputPath, + }) + log.Info("recording stopped and deregistered for resize", "id", id) + } + + return stopped, nil +} + +// restartRecordings re-creates and starts recordings that were previously +// stopped for a display resize. The old (finalized) recording file is renamed +// to preserve it before the new recording begins at the same output path. +func (s *ApiService) restartRecordings(ctx context.Context, stopped []stoppedRecordingInfo) { + log := logger.FromContext(ctx) + + for _, info := range stopped { + // Preserve the pre-resize segment by renaming the finalized file. + if _, err := os.Stat(info.outputPath); err == nil { + preservedPath := strings.TrimSuffix(info.outputPath, ".mp4") + + fmt.Sprintf("-before-resize-%d.mp4", time.Now().UnixMilli()) + if err := os.Rename(info.outputPath, preservedPath); err != nil { + log.Error("failed to rename pre-resize recording", "id", info.id, "error", err) + continue + } + log.Info("preserved pre-resize recording segment", "id", info.id, "path", preservedPath) + } + + rec, err := s.factory(info.id, info.params) + if err != nil { + log.Error("failed to create recorder for restart", "id", info.id, "error", err) + continue + } + + if err := s.recordManager.RegisterRecorder(ctx, rec); err != nil { + log.Error("failed to register restarted recorder", "id", info.id, "error", err) + continue + } + + if err := rec.Start(ctx); err != nil { + log.Error("failed to start restarted recording", "id", info.id, "error", err) + _ = s.recordManager.DeregisterRecorder(ctx, rec) + continue + } + + log.Info("recording restarted after resize", "id", info.id) + } +} + // isNekoEnabled checks if Neko service is enabled func (s *ApiService) isNekoEnabled() bool { return os.Getenv("ENABLE_WEBRTC") == "true" diff --git a/server/cmd/api/api/display_test.go b/server/cmd/api/api/display_test.go new file mode 100644 index 00000000..05ce1e2c --- /dev/null +++ b/server/cmd/api/api/display_test.go @@ -0,0 +1,241 @@ +package api + +import ( + "context" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/onkernel/kernel-images/server/lib/recorder" + "github.com/onkernel/kernel-images/server/lib/scaletozero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testMockFFmpegBin = filepath.Join("..", "..", "..", "lib", "recorder", "testdata", "mock_ffmpeg.sh") + +func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFactory { + t.Helper() + fr := 5 + disp := 0 + size := 1 + config := recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + OutputDir: &tempDir, + } + return recorder.NewFFmpegRecorderFactory(testMockFFmpegBin, config, scaletozero.NewNoopController()) +} + +func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService { + t.Helper() + svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + require.NoError(t, err) + return svc +} + +func TestStopActiveRecordings(t *testing.T) { + t.Run("stops and deregisters active recording", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + rec, err := factory("test-rec", recorder.FFmpegRecordingParams{}) + require.NoError(t, err) + require.NoError(t, mgr.RegisterRecorder(ctx, rec)) + require.NoError(t, rec.Start(ctx)) + time.Sleep(50 * time.Millisecond) + require.True(t, rec.IsRecording(ctx)) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + require.Len(t, stopped, 1) + assert.Equal(t, "test-rec", stopped[0].id) + assert.NotNil(t, stopped[0].params.FrameRate) + assert.Equal(t, filepath.Join(tempDir, "test-rec.mp4"), stopped[0].outputPath) + + _, exists := mgr.GetRecorder("test-rec") + assert.False(t, exists, "recorder should be deregistered after stop") + }) + + t.Run("stops multiple active recordings", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + ids := []string{"rec-a", "rec-b"} + for _, id := range ids { + rec, err := factory(id, recorder.FFmpegRecordingParams{}) + require.NoError(t, err) + require.NoError(t, mgr.RegisterRecorder(ctx, rec)) + require.NoError(t, rec.Start(ctx)) + } + time.Sleep(50 * time.Millisecond) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + assert.Len(t, stopped, 2) + + stoppedIDs := map[string]bool{} + for _, s := range stopped { + stoppedIDs[s.id] = true + } + for _, id := range ids { + assert.True(t, stoppedIDs[id], "recording %s should have been stopped", id) + _, exists := mgr.GetRecorder(id) + assert.False(t, exists, "recorder %s should be deregistered", id) + } + }) + + t.Run("skips non-recording recorders", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + mock := &mockRecorder{id: "idle-rec", isRecordingFlag: false} + require.NoError(t, mgr.RegisterRecorder(ctx, mock)) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + assert.Empty(t, stopped) + + _, exists := mgr.GetRecorder("idle-rec") + assert.True(t, exists, "non-recording recorder should remain registered") + }) + + t.Run("returns empty when no recorders exist", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + assert.Empty(t, stopped) + }) +} + +func TestRestartRecordings(t *testing.T) { + t.Run("renames old file and starts new recording", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + outputPath := filepath.Join(tempDir, "test-rec.mp4") + require.NoError(t, os.WriteFile(outputPath, []byte("fake video data"), 0644)) + + fr := 5 + disp := 0 + size := 1 + info := stoppedRecordingInfo{ + id: "test-rec", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + OutputDir: &tempDir, + }, + outputPath: outputPath, + } + + svc.restartRecordings(ctx, []stoppedRecordingInfo{info}) + + entries, err := os.ReadDir(tempDir) + require.NoError(t, err) + + foundRenamed := false + for _, e := range entries { + if strings.Contains(e.Name(), "before-resize") { + foundRenamed = true + data, readErr := os.ReadFile(filepath.Join(tempDir, e.Name())) + require.NoError(t, readErr) + assert.Equal(t, []byte("fake video data"), data) + } + } + assert.True(t, foundRenamed, "pre-resize recording should be preserved with renamed file") + + rec, exists := mgr.GetRecorder("test-rec") + require.True(t, exists, "restarted recorder should be registered") + assert.True(t, rec.IsRecording(ctx), "restarted recorder should be recording") + + _ = rec.Stop(ctx) + }) + + t.Run("starts recording even when no old file exists", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + fr := 5 + disp := 0 + size := 1 + info := stoppedRecordingInfo{ + id: "fresh-rec", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + OutputDir: &tempDir, + }, + outputPath: filepath.Join(tempDir, "fresh-rec.mp4"), + } + + svc.restartRecordings(ctx, []stoppedRecordingInfo{info}) + + rec, exists := mgr.GetRecorder("fresh-rec") + require.True(t, exists, "recorder should be registered") + assert.True(t, rec.IsRecording(ctx)) + + _ = rec.Stop(ctx) + }) +} + +func TestStopAndRestartRecordings_RoundTrip(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + // Start a recording + rec, err := factory("round-trip", recorder.FFmpegRecordingParams{}) + require.NoError(t, err) + require.NoError(t, mgr.RegisterRecorder(ctx, rec)) + require.NoError(t, rec.Start(ctx)) + time.Sleep(50 * time.Millisecond) + require.True(t, rec.IsRecording(ctx)) + + // Stop all active recordings + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + require.Len(t, stopped, 1) + assert.Equal(t, "round-trip", stopped[0].id) + + // Verify the recorder was deregistered + _, exists := mgr.GetRecorder("round-trip") + require.False(t, exists) + + // Restart recordings + svc.restartRecordings(ctx, stopped) + + // Verify the recording resumed with the same ID + newRec, exists := mgr.GetRecorder("round-trip") + require.True(t, exists, "recorder should be re-registered after restart") + assert.True(t, newRec.IsRecording(ctx), "recorder should be actively recording") + + _ = newRec.Stop(ctx) +} diff --git a/server/lib/recorder/ffmeg_test.go b/server/lib/recorder/ffmeg_test.go index 22aea379..09c1bbe7 100644 --- a/server/lib/recorder/ffmeg_test.go +++ b/server/lib/recorder/ffmeg_test.go @@ -50,6 +50,38 @@ func TestFFmpegRecorder_StartAndStop(t *testing.T) { require.False(t, rec.IsRecording(t.Context())) } +func TestFFmpegRecorder_Params(t *testing.T) { + tempDir := t.TempDir() + params := defaultParams(tempDir) + rec := &FFmpegRecorder{ + id: "params-test", + binaryPath: mockBin, + params: params, + outputPath: filepath.Join(tempDir, "params-test.mp4"), + stz: scaletozero.NewOncer(scaletozero.NewNoopController()), + } + + got := rec.Params() + assert.Equal(t, *params.FrameRate, *got.FrameRate) + assert.Equal(t, *params.DisplayNum, *got.DisplayNum) + assert.Equal(t, *params.MaxSizeInMB, *got.MaxSizeInMB) + assert.Equal(t, *params.OutputDir, *got.OutputDir) +} + +func TestFFmpegRecorder_OutputPath(t *testing.T) { + tempDir := t.TempDir() + expected := filepath.Join(tempDir, "path-test.mp4") + rec := &FFmpegRecorder{ + id: "path-test", + binaryPath: mockBin, + params: defaultParams(tempDir), + outputPath: expected, + stz: scaletozero.NewOncer(scaletozero.NewNoopController()), + } + + assert.Equal(t, expected, rec.OutputPath()) +} + func TestFFmpegRecorder_ForceStop(t *testing.T) { tempDir := t.TempDir() rec := &FFmpegRecorder{ diff --git a/server/lib/recorder/ffmpeg.go b/server/lib/recorder/ffmpeg.go index 52e6b054..0c6e9953 100644 --- a/server/lib/recorder/ffmpeg.go +++ b/server/lib/recorder/ffmpeg.go @@ -141,6 +141,20 @@ func (fr *FFmpegRecorder) ID() string { return fr.id } +// Params returns a copy of the merged recording parameters. +func (fr *FFmpegRecorder) Params() FFmpegRecordingParams { + fr.mu.Lock() + defer fr.mu.Unlock() + return fr.params +} + +// OutputPath returns the filesystem path where the recording is stored. +func (fr *FFmpegRecorder) OutputPath() string { + fr.mu.Lock() + defer fr.mu.Unlock() + return fr.outputPath +} + // Start begins the recording process by launching ffmpeg with the configured parameters. func (fr *FFmpegRecorder) Start(ctx context.Context) error { log := logger.FromContext(ctx) From 3d62c2abb9ea7589af80914c8c1fe05c49b5c214 Mon Sep 17 00:00:00 2001 From: Hiro Tamada Date: Mon, 23 Feb 2026 15:41:08 -0500 Subject: [PATCH 2/3] fix: address PR review feedback for recording resize - Don't block recording restart when rename of old file fails (rename is best-effort to preserve the pre-resize segment) - Deep copy pointer fields in FFmpegRecordingParams.clone() so Params() callers cannot mutate recorder internals Co-authored-by: Cursor --- server/cmd/api/api/display.go | 9 +++++---- server/lib/recorder/ffmpeg.go | 29 +++++++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/server/cmd/api/api/display.go b/server/cmd/api/api/display.go index 63b91573..a31e6206 100644 --- a/server/cmd/api/api/display.go +++ b/server/cmd/api/api/display.go @@ -441,15 +441,16 @@ func (s *ApiService) restartRecordings(ctx context.Context, stopped []stoppedRec log := logger.FromContext(ctx) for _, info := range stopped { - // Preserve the pre-resize segment by renaming the finalized file. + // Best-effort: preserve the pre-resize segment by renaming the finalized file. + // If this fails the old file may be overwritten, but we still restart recording. if _, err := os.Stat(info.outputPath); err == nil { preservedPath := strings.TrimSuffix(info.outputPath, ".mp4") + fmt.Sprintf("-before-resize-%d.mp4", time.Now().UnixMilli()) if err := os.Rename(info.outputPath, preservedPath); err != nil { - log.Error("failed to rename pre-resize recording", "id", info.id, "error", err) - continue + log.Error("failed to rename pre-resize recording, old file may be overwritten", "id", info.id, "error", err) + } else { + log.Info("preserved pre-resize recording segment", "id", info.id, "path", preservedPath) } - log.Info("preserved pre-resize recording segment", "id", info.id, "path", preservedPath) } rec, err := s.factory(info.id, info.params) diff --git a/server/lib/recorder/ffmpeg.go b/server/lib/recorder/ffmpeg.go index 0c6e9953..20f19a70 100644 --- a/server/lib/recorder/ffmpeg.go +++ b/server/lib/recorder/ffmpeg.go @@ -141,11 +141,36 @@ func (fr *FFmpegRecorder) ID() string { return fr.id } -// Params returns a copy of the merged recording parameters. +// Params returns a deep copy of the merged recording parameters. func (fr *FFmpegRecorder) Params() FFmpegRecordingParams { fr.mu.Lock() defer fr.mu.Unlock() - return fr.params + return fr.params.clone() +} + +func (p FFmpegRecordingParams) clone() FFmpegRecordingParams { + c := p + if p.FrameRate != nil { + v := *p.FrameRate + c.FrameRate = &v + } + if p.DisplayNum != nil { + v := *p.DisplayNum + c.DisplayNum = &v + } + if p.MaxSizeInMB != nil { + v := *p.MaxSizeInMB + c.MaxSizeInMB = &v + } + if p.MaxDurationInSeconds != nil { + v := *p.MaxDurationInSeconds + c.MaxDurationInSeconds = &v + } + if p.OutputDir != nil { + v := *p.OutputDir + c.OutputDir = &v + } + return c } // OutputPath returns the filesystem path where the recording is stored. From 0820c4ae1f20479db53821bcf7860190f95d1347 Mon Sep 17 00:00:00 2001 From: Hiro Tamada Date: Mon, 23 Feb 2026 16:13:00 -0500 Subject: [PATCH 3/3] fix: skip restart when deregister fails to avoid ID conflict If DeregisterRecorder fails, the old recorder remains registered. Appending to the stopped list would cause RegisterRecorder to fail with a duplicate ID during restart. Co-authored-by: Cursor --- server/cmd/api/api/display.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/cmd/api/api/display.go b/server/cmd/api/api/display.go index a31e6206..88943284 100644 --- a/server/cmd/api/api/display.go +++ b/server/cmd/api/api/display.go @@ -420,7 +420,8 @@ func (s *ApiService) stopActiveRecordings(ctx context.Context) ([]stoppedRecordi } if err := s.recordManager.DeregisterRecorder(ctx, rec); err != nil { - log.Error("failed to deregister recorder", "id", id, "error", err) + log.Error("failed to deregister recorder, skipping restart to avoid ID conflict", "id", id, "error", err) + continue } stopped = append(stopped, stoppedRecordingInfo{