diff --git a/server/cmd/api/api/display.go b/server/cmd/api/api/display.go index 534f2a01..88943284 100644 --- a/server/cmd/api/api/display.go +++ b/server/cmd/api/api/display.go @@ -8,10 +8,12 @@ import ( "os/exec" "strconv" "strings" + "time" nekooapi "github.com/m1k1o/neko/server/lib/oapi" "github.com/onkernel/kernel-images/server/lib/logger" oapi "github.com/onkernel/kernel-images/server/lib/oapi" + "github.com/onkernel/kernel-images/server/lib/recorder" ) // PatchDisplay updates the display configuration. When require_idle @@ -62,18 +64,29 @@ func (s *ApiService) PatchDisplay(ctx context.Context, req oapi.PatchDisplayRequ requireIdle = *req.Body.RequireIdle } - // Check if resize is safe (no active sessions or recordings) + // Check if resize is safe (no active live view sessions) if requireIdle { live := s.getActiveNekoSessions(ctx) - isRecording := s.anyRecordingActive(ctx) - resizableNow := (live == 0) && !isRecording - - log.Info("checking if resize is safe", "live_sessions", live, "is_recording", isRecording, "resizable", resizableNow) - - if !resizableNow { + if live > 0 { + log.Info("resize refused: live view or replay active", "live_sessions", live) return oapi.PatchDisplay409JSONResponse{ ConflictErrorJSONResponse: oapi.ConflictErrorJSONResponse{ - Message: "resize refused: live view or recording/replay active", + Message: "resize refused: live view or replay active", + }, + }, nil + } + + // Gracefully stop active recordings so the resize can proceed. + // They will be restarted (with new segment files) after the resize completes. + stopped, stopErr := s.stopActiveRecordings(ctx) + if len(stopped) > 0 { + defer s.restartRecordings(context.WithoutCancel(ctx), stopped) + } + if stopErr != nil { + log.Error("failed to stop recordings for resize", "error", stopErr) + return oapi.PatchDisplay500JSONResponse{ + InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{ + Message: fmt.Sprintf("failed to stop recordings for resize: %s", stopErr.Error()), }, }, nil } @@ -361,6 +374,107 @@ func (s *ApiService) getCurrentResolution(ctx context.Context) (int, int, int, e return width, height, refreshRate, nil } +// stoppedRecordingInfo holds state captured from a recording that was stopped +// so it can be restarted after a display resize. +type stoppedRecordingInfo struct { + id string + params recorder.FFmpegRecordingParams + outputPath string +} + +// stopActiveRecordings gracefully stops every recording that is currently in +// progress and deregisters them from the manager. It returns info needed to +// restart each recording later. Recordings that were successfully stopped are +// always included in the returned slice, even when a later recording fails to +// stop (so the caller can restart whatever was stopped). +func (s *ApiService) stopActiveRecordings(ctx context.Context) ([]stoppedRecordingInfo, error) { + log := logger.FromContext(ctx) + var stopped []stoppedRecordingInfo + + for _, rec := range s.recordManager.ListActiveRecorders(ctx) { + if !rec.IsRecording(ctx) { + continue + } + + id := rec.ID() + + ffmpegRec, ok := rec.(*recorder.FFmpegRecorder) + if !ok { + log.Warn("cannot capture params from non-FFmpeg recorder, skipping", "id", id) + continue + } + + params := ffmpegRec.Params() + outputPath := ffmpegRec.OutputPath() + + log.Info("stopping recording for resize", "id", id) + if err := rec.Stop(ctx); err != nil { + // Stop() returns finalization errors even when the process was + // successfully terminated. Only treat it as a hard failure if + // the process is still running. + if rec.IsRecording(ctx) { + log.Error("failed to stop recording for resize", "id", id, "error", err) + return stopped, fmt.Errorf("failed to stop recording %s: %w", id, err) + } + log.Warn("recording stopped with finalization warning", "id", id, "error", err) + } + + if err := s.recordManager.DeregisterRecorder(ctx, rec); err != nil { + log.Error("failed to deregister recorder, skipping restart to avoid ID conflict", "id", id, "error", err) + continue + } + + stopped = append(stopped, stoppedRecordingInfo{ + id: id, + params: params, + outputPath: outputPath, + }) + log.Info("recording stopped and deregistered for resize", "id", id) + } + + return stopped, nil +} + +// restartRecordings re-creates and starts recordings that were previously +// stopped for a display resize. The old (finalized) recording file is renamed +// to preserve it before the new recording begins at the same output path. +func (s *ApiService) restartRecordings(ctx context.Context, stopped []stoppedRecordingInfo) { + log := logger.FromContext(ctx) + + for _, info := range stopped { + // Best-effort: preserve the pre-resize segment by renaming the finalized file. + // If this fails the old file may be overwritten, but we still restart recording. + if _, err := os.Stat(info.outputPath); err == nil { + preservedPath := strings.TrimSuffix(info.outputPath, ".mp4") + + fmt.Sprintf("-before-resize-%d.mp4", time.Now().UnixMilli()) + if err := os.Rename(info.outputPath, preservedPath); err != nil { + log.Error("failed to rename pre-resize recording, old file may be overwritten", "id", info.id, "error", err) + } else { + log.Info("preserved pre-resize recording segment", "id", info.id, "path", preservedPath) + } + } + + rec, err := s.factory(info.id, info.params) + if err != nil { + log.Error("failed to create recorder for restart", "id", info.id, "error", err) + continue + } + + if err := s.recordManager.RegisterRecorder(ctx, rec); err != nil { + log.Error("failed to register restarted recorder", "id", info.id, "error", err) + continue + } + + if err := rec.Start(ctx); err != nil { + log.Error("failed to start restarted recording", "id", info.id, "error", err) + _ = s.recordManager.DeregisterRecorder(ctx, rec) + continue + } + + log.Info("recording restarted after resize", "id", info.id) + } +} + // isNekoEnabled checks if Neko service is enabled func (s *ApiService) isNekoEnabled() bool { return os.Getenv("ENABLE_WEBRTC") == "true" diff --git a/server/cmd/api/api/display_test.go b/server/cmd/api/api/display_test.go new file mode 100644 index 00000000..05ce1e2c --- /dev/null +++ b/server/cmd/api/api/display_test.go @@ -0,0 +1,241 @@ +package api + +import ( + "context" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/onkernel/kernel-images/server/lib/recorder" + "github.com/onkernel/kernel-images/server/lib/scaletozero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testMockFFmpegBin = filepath.Join("..", "..", "..", "lib", "recorder", "testdata", "mock_ffmpeg.sh") + +func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFactory { + t.Helper() + fr := 5 + disp := 0 + size := 1 + config := recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + OutputDir: &tempDir, + } + return recorder.NewFFmpegRecorderFactory(testMockFFmpegBin, config, scaletozero.NewNoopController()) +} + +func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService { + t.Helper() + svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + require.NoError(t, err) + return svc +} + +func TestStopActiveRecordings(t *testing.T) { + t.Run("stops and deregisters active recording", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + rec, err := factory("test-rec", recorder.FFmpegRecordingParams{}) + require.NoError(t, err) + require.NoError(t, mgr.RegisterRecorder(ctx, rec)) + require.NoError(t, rec.Start(ctx)) + time.Sleep(50 * time.Millisecond) + require.True(t, rec.IsRecording(ctx)) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + require.Len(t, stopped, 1) + assert.Equal(t, "test-rec", stopped[0].id) + assert.NotNil(t, stopped[0].params.FrameRate) + assert.Equal(t, filepath.Join(tempDir, "test-rec.mp4"), stopped[0].outputPath) + + _, exists := mgr.GetRecorder("test-rec") + assert.False(t, exists, "recorder should be deregistered after stop") + }) + + t.Run("stops multiple active recordings", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + ids := []string{"rec-a", "rec-b"} + for _, id := range ids { + rec, err := factory(id, recorder.FFmpegRecordingParams{}) + require.NoError(t, err) + require.NoError(t, mgr.RegisterRecorder(ctx, rec)) + require.NoError(t, rec.Start(ctx)) + } + time.Sleep(50 * time.Millisecond) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + assert.Len(t, stopped, 2) + + stoppedIDs := map[string]bool{} + for _, s := range stopped { + stoppedIDs[s.id] = true + } + for _, id := range ids { + assert.True(t, stoppedIDs[id], "recording %s should have been stopped", id) + _, exists := mgr.GetRecorder(id) + assert.False(t, exists, "recorder %s should be deregistered", id) + } + }) + + t.Run("skips non-recording recorders", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + mock := &mockRecorder{id: "idle-rec", isRecordingFlag: false} + require.NoError(t, mgr.RegisterRecorder(ctx, mock)) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + assert.Empty(t, stopped) + + _, exists := mgr.GetRecorder("idle-rec") + assert.True(t, exists, "non-recording recorder should remain registered") + }) + + t.Run("returns empty when no recorders exist", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + assert.Empty(t, stopped) + }) +} + +func TestRestartRecordings(t *testing.T) { + t.Run("renames old file and starts new recording", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + outputPath := filepath.Join(tempDir, "test-rec.mp4") + require.NoError(t, os.WriteFile(outputPath, []byte("fake video data"), 0644)) + + fr := 5 + disp := 0 + size := 1 + info := stoppedRecordingInfo{ + id: "test-rec", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + OutputDir: &tempDir, + }, + outputPath: outputPath, + } + + svc.restartRecordings(ctx, []stoppedRecordingInfo{info}) + + entries, err := os.ReadDir(tempDir) + require.NoError(t, err) + + foundRenamed := false + for _, e := range entries { + if strings.Contains(e.Name(), "before-resize") { + foundRenamed = true + data, readErr := os.ReadFile(filepath.Join(tempDir, e.Name())) + require.NoError(t, readErr) + assert.Equal(t, []byte("fake video data"), data) + } + } + assert.True(t, foundRenamed, "pre-resize recording should be preserved with renamed file") + + rec, exists := mgr.GetRecorder("test-rec") + require.True(t, exists, "restarted recorder should be registered") + assert.True(t, rec.IsRecording(ctx), "restarted recorder should be recording") + + _ = rec.Stop(ctx) + }) + + t.Run("starts recording even when no old file exists", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + fr := 5 + disp := 0 + size := 1 + info := stoppedRecordingInfo{ + id: "fresh-rec", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + OutputDir: &tempDir, + }, + outputPath: filepath.Join(tempDir, "fresh-rec.mp4"), + } + + svc.restartRecordings(ctx, []stoppedRecordingInfo{info}) + + rec, exists := mgr.GetRecorder("fresh-rec") + require.True(t, exists, "recorder should be registered") + assert.True(t, rec.IsRecording(ctx)) + + _ = rec.Stop(ctx) + }) +} + +func TestStopAndRestartRecordings_RoundTrip(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + // Start a recording + rec, err := factory("round-trip", recorder.FFmpegRecordingParams{}) + require.NoError(t, err) + require.NoError(t, mgr.RegisterRecorder(ctx, rec)) + require.NoError(t, rec.Start(ctx)) + time.Sleep(50 * time.Millisecond) + require.True(t, rec.IsRecording(ctx)) + + // Stop all active recordings + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + require.Len(t, stopped, 1) + assert.Equal(t, "round-trip", stopped[0].id) + + // Verify the recorder was deregistered + _, exists := mgr.GetRecorder("round-trip") + require.False(t, exists) + + // Restart recordings + svc.restartRecordings(ctx, stopped) + + // Verify the recording resumed with the same ID + newRec, exists := mgr.GetRecorder("round-trip") + require.True(t, exists, "recorder should be re-registered after restart") + assert.True(t, newRec.IsRecording(ctx), "recorder should be actively recording") + + _ = newRec.Stop(ctx) +} diff --git a/server/lib/recorder/ffmeg_test.go b/server/lib/recorder/ffmeg_test.go index 22aea379..09c1bbe7 100644 --- a/server/lib/recorder/ffmeg_test.go +++ b/server/lib/recorder/ffmeg_test.go @@ -50,6 +50,38 @@ func TestFFmpegRecorder_StartAndStop(t *testing.T) { require.False(t, rec.IsRecording(t.Context())) } +func TestFFmpegRecorder_Params(t *testing.T) { + tempDir := t.TempDir() + params := defaultParams(tempDir) + rec := &FFmpegRecorder{ + id: "params-test", + binaryPath: mockBin, + params: params, + outputPath: filepath.Join(tempDir, "params-test.mp4"), + stz: scaletozero.NewOncer(scaletozero.NewNoopController()), + } + + got := rec.Params() + assert.Equal(t, *params.FrameRate, *got.FrameRate) + assert.Equal(t, *params.DisplayNum, *got.DisplayNum) + assert.Equal(t, *params.MaxSizeInMB, *got.MaxSizeInMB) + assert.Equal(t, *params.OutputDir, *got.OutputDir) +} + +func TestFFmpegRecorder_OutputPath(t *testing.T) { + tempDir := t.TempDir() + expected := filepath.Join(tempDir, "path-test.mp4") + rec := &FFmpegRecorder{ + id: "path-test", + binaryPath: mockBin, + params: defaultParams(tempDir), + outputPath: expected, + stz: scaletozero.NewOncer(scaletozero.NewNoopController()), + } + + assert.Equal(t, expected, rec.OutputPath()) +} + func TestFFmpegRecorder_ForceStop(t *testing.T) { tempDir := t.TempDir() rec := &FFmpegRecorder{ diff --git a/server/lib/recorder/ffmpeg.go b/server/lib/recorder/ffmpeg.go index 52e6b054..20f19a70 100644 --- a/server/lib/recorder/ffmpeg.go +++ b/server/lib/recorder/ffmpeg.go @@ -141,6 +141,45 @@ func (fr *FFmpegRecorder) ID() string { return fr.id } +// Params returns a deep copy of the merged recording parameters. +func (fr *FFmpegRecorder) Params() FFmpegRecordingParams { + fr.mu.Lock() + defer fr.mu.Unlock() + return fr.params.clone() +} + +func (p FFmpegRecordingParams) clone() FFmpegRecordingParams { + c := p + if p.FrameRate != nil { + v := *p.FrameRate + c.FrameRate = &v + } + if p.DisplayNum != nil { + v := *p.DisplayNum + c.DisplayNum = &v + } + if p.MaxSizeInMB != nil { + v := *p.MaxSizeInMB + c.MaxSizeInMB = &v + } + if p.MaxDurationInSeconds != nil { + v := *p.MaxDurationInSeconds + c.MaxDurationInSeconds = &v + } + if p.OutputDir != nil { + v := *p.OutputDir + c.OutputDir = &v + } + return c +} + +// OutputPath returns the filesystem path where the recording is stored. +func (fr *FFmpegRecorder) OutputPath() string { + fr.mu.Lock() + defer fr.mu.Unlock() + return fr.outputPath +} + // Start begins the recording process by launching ffmpeg with the configured parameters. func (fr *FFmpegRecorder) Start(ctx context.Context) error { log := logger.FromContext(ctx)