From 0e0af23d4e928d8cd53ee559a0b099d78bc397fc Mon Sep 17 00:00:00 2001 From: Carl Reid Date: Thu, 10 Apr 2025 17:41:46 +0200 Subject: [PATCH 1/6] chore: Update SixLabors.ImageSharp (patched vuln) --- src/BuildClientAPI/BuildClientAPI.csproj | 2 ++ src/StreamMaster.API/StreamMaster.API.csproj | 1 + src/StreamMaster.Application/StreamMaster.Application.csproj | 1 + .../StreamMaster.Infrastructure.EF.PGSQL.Console.csproj | 3 +++ .../StreamMaster.Infrastructure.EF.PGSQL.csproj | 1 + .../StreamMaster.Infrastructure.EF.csproj | 2 ++ .../StreamMaster.Infrastructure.csproj | 1 + .../StreamMaster.SchedulesDirect.csproj | 2 +- src/StreamMaster.Streams/StreamMaster.Streams.csproj | 2 +- .../StreamMaster.Application.UnitTests.csproj | 1 + .../StreamMaster.Infrastructure.UnitTests.csproj | 1 + .../StreamMaster.Streams.UnitTests.csproj | 1 + 12 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/BuildClientAPI/BuildClientAPI.csproj b/src/BuildClientAPI/BuildClientAPI.csproj index b7fb0412b..3b5c604c0 100644 --- a/src/BuildClientAPI/BuildClientAPI.csproj +++ b/src/BuildClientAPI/BuildClientAPI.csproj @@ -14,6 +14,8 @@ + + diff --git a/src/StreamMaster.API/StreamMaster.API.csproj b/src/StreamMaster.API/StreamMaster.API.csproj index 209344f5b..f434856a6 100644 --- a/src/StreamMaster.API/StreamMaster.API.csproj +++ b/src/StreamMaster.API/StreamMaster.API.csproj @@ -60,6 +60,7 @@ + diff --git a/src/StreamMaster.Application/StreamMaster.Application.csproj b/src/StreamMaster.Application/StreamMaster.Application.csproj index abc8adbe2..212f5bdfe 100644 --- a/src/StreamMaster.Application/StreamMaster.Application.csproj +++ b/src/StreamMaster.Application/StreamMaster.Application.csproj @@ -75,6 +75,7 @@ + diff --git a/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj b/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj index ea2e9d792..68f4d4d13 100644 --- a/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj +++ b/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj @@ -6,6 +6,9 @@ Exe true + + + diff --git a/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj b/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj index 2327b83bc..c61c30352 100644 --- a/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj +++ b/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj @@ -27,6 +27,7 @@ + diff --git a/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj b/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj index e7165b095..ded033e73 100644 --- a/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj +++ b/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj @@ -13,6 +13,8 @@ + + diff --git a/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj b/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj index 495bc751c..86c307e6a 100644 --- a/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj +++ b/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj @@ -31,6 +31,7 @@ + diff --git a/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj b/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj index ad0982df4..4176b6894 100644 --- a/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj +++ b/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/StreamMaster.Streams/StreamMaster.Streams.csproj b/src/StreamMaster.Streams/StreamMaster.Streams.csproj index fad049d7f..c264d46e1 100644 --- a/src/StreamMaster.Streams/StreamMaster.Streams.csproj +++ b/src/StreamMaster.Streams/StreamMaster.Streams.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj b/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj index abec59742..9399636c4 100644 --- a/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj +++ b/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj @@ -13,6 +13,7 @@ + diff --git a/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj b/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj index 1374b3eb6..e032c6df6 100644 --- a/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj +++ b/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj @@ -12,6 +12,7 @@ + diff --git a/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj b/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj index f48ba63e0..58c66b924 100644 --- a/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj +++ b/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj @@ -12,6 +12,7 @@ + From 6e82c489fff906bd72ae18821f91e110e1ea7dcb Mon Sep 17 00:00:00 2001 From: Carl Reid Date: Thu, 10 Apr 2025 19:16:27 +0200 Subject: [PATCH 2/6] feat: Use `where`/`which` to locate executables --- src/StreamMaster.Domain/Common/FileUtil.cs | 38 ++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/StreamMaster.Domain/Common/FileUtil.cs b/src/StreamMaster.Domain/Common/FileUtil.cs index 9a51ce5f9..c654a254a 100644 --- a/src/StreamMaster.Domain/Common/FileUtil.cs +++ b/src/StreamMaster.Domain/Common/FileUtil.cs @@ -79,6 +79,7 @@ public static bool IsFilePathValidAndExists(string filePath) { return IsValidFilePath(filePath) && File.Exists(filePath); } + /// /// Searches for the specified executable name in predefined directories. /// @@ -116,6 +117,43 @@ public static bool IsFilePathValidAndExists(string filePath) } } + // If nothing found, attempt to use .NET Process to locate the executable. + try + { + using Process process = new(); + + // Set up process to just get the path without actually running the program + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + process.StartInfo.FileName = "where"; + process.StartInfo.Arguments = executableName; + } + else + { + process.StartInfo.FileName = "which"; + process.StartInfo.Arguments = executableName; + } + + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.UseShellExecute = false; + process.StartInfo.CreateNoWindow = true; + + if (process.Start()) + { + string? output = process.StandardOutput.ReadLine(); + process.WaitForExit(); + + if (!string.IsNullOrEmpty(output) && File.Exists(output)) + { + return output; + } + } + } + catch + { + // Silently fail if the process approach doesn't work + } + return null; } From ce2e2868d2a0f13af68d976c9252cd0090cba8ca Mon Sep 17 00:00:00 2001 From: Carl Reid Date: Thu, 10 Apr 2025 19:42:26 +0200 Subject: [PATCH 3/6] fix: Attempt to gracefully stop process --- .../Factories/CommandExecutor.cs | 60 ++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/src/StreamMaster.Streams/Factories/CommandExecutor.cs b/src/StreamMaster.Streams/Factories/CommandExecutor.cs index 837b65a5f..476edb43e 100644 --- a/src/StreamMaster.Streams/Factories/CommandExecutor.cs +++ b/src/StreamMaster.Streams/Factories/CommandExecutor.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using System.Runtime.InteropServices; using System.Text; namespace StreamMaster.Streams.Factories; @@ -31,6 +32,11 @@ public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string s _process = new Process(); ConfigureProcess(_process, exec, options); + using var registration = cancellationToken.Register(() => + { + GracefullyTerminateProcess(); + }); + cancellationToken.ThrowIfCancellationRequested(); if (!_process.Start()) @@ -83,6 +89,58 @@ public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string s } } + /// + /// Gracefully terminates the process using appropriate signals + /// + private void GracefullyTerminateProcess() + { + if (_process == null || _process.HasExited) + return; + + try + { + logger.LogDebug("Attempting to gracefully terminate process {ProcessId}", _process.Id); + + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + if (!_process.WaitForExit(3000)) + { + logger.LogWarning("Process {ProcessId} did not terminate gracefully, forcing kill", _process.Id); + _process.Kill(true); + } + } + else + { + if (!_process.HasExited) + { + Process.Start("kill", $"-TERM {_process.Id}"); + + if (!_process.WaitForExit(3000)) + { + logger.LogWarning("Process {ProcessId} did not terminate after SIGTERM, sending SIGKILL", _process.Id); + Process.Start("kill", $"-KILL {_process.Id}"); + } + } + } + } + catch (Exception ex) + { + logger.LogError(ex, "Error gracefully terminating process {ProcessId}", _process.Id); + + try + { + if (!_process.HasExited) + { + _process.Kill(true); + } + } + catch (Exception killEx) + { + logger.LogError(killEx, "Failed to force kill process {ProcessId}", _process.Id); + } + } + } + private void CleanupOldLogs(string directoryPath, int maxLogsToKeep) { try @@ -202,7 +260,7 @@ public void Dispose() { if (!_process.HasExited) { - _process.Kill(); + GracefullyTerminateProcess(); } _process.Dispose(); } From 176e6dddc05cd0a14429067d2747010071bc0593 Mon Sep 17 00:00:00 2001 From: Carl Reid Date: Thu, 10 Apr 2025 19:43:24 +0200 Subject: [PATCH 4/6] fix: Use shared token on underlying stream process This way, when the stop command is called, the underlying stream process should be signaled to cancel right away too. --- .../Broadcasters/SourceBroadcaster.cs | 12 ++++++--- .../Factories/CommandExecutor.cs | 27 ++++++++++++++++++- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs b/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs index a5686a61c..8a0f727b8 100644 --- a/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs +++ b/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs @@ -106,11 +106,14 @@ public async Task SetSourceStreamAsync(SMStreamInfo SMStreamInfo, Cancella this.SMStreamInfo = SMStreamInfo; + // Start a new streaming task + _cancellationTokenSource = new CancellationTokenSource(); + Stopwatch stopwatch = Stopwatch.StartNew(); try { (Stream? stream, int processId, ProxyStreamError? error) = - await streamFactory.GetStream(SMStreamInfo, cancellationToken).ConfigureAwait(false); + await streamFactory.GetStream(SMStreamInfo, _cancellationTokenSource.Token).ConfigureAwait(false); stopwatch.Stop(); if (stream == null || error != null) { @@ -118,8 +121,6 @@ public async Task SetSourceStreamAsync(SMStreamInfo SMStreamInfo, Cancella return 0; } - // Start a new streaming task - _cancellationTokenSource = new CancellationTokenSource(); _streamingTask = Task.Run(() => RunPipelineAsync(stream, SMStreamInfo.Name, cancellationToken: _cancellationTokenSource.Token), _cancellationTokenSource.Token); return stopwatch.ElapsedMilliseconds; } @@ -193,6 +194,11 @@ private async Task RunPipelineAsync(Stream sourceStream, string name, int buffer timeoutCts?.Dispose(); } + if (cancellationToken.IsCancellationRequested) + { + break; + } + if (bytesRead == 0) { if (!hasReadData) diff --git a/src/StreamMaster.Streams/Factories/CommandExecutor.cs b/src/StreamMaster.Streams/Factories/CommandExecutor.cs index 476edb43e..14d0fe33c 100644 --- a/src/StreamMaster.Streams/Factories/CommandExecutor.cs +++ b/src/StreamMaster.Streams/Factories/CommandExecutor.cs @@ -34,6 +34,7 @@ public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string s using var registration = cancellationToken.Register(() => { + logger.LogDebug("Cancellation requested for Stream process"); GracefullyTerminateProcess(); }); @@ -46,7 +47,20 @@ public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string s return new GetStreamResult(null, -1, error); } + if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + try + { + Process.Start("setpgrp", $"{_process.Id}"); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to set process group for {ProcessId}", _process.Id); + } + } + string stderrFilePath = Path.Combine(BuildInfo.CommandErrorFolder, $"stderr_{_process.Id}.log"); + Directory.CreateDirectory(Path.GetDirectoryName(stderrFilePath)!); errorWriter = new StreamWriter(stderrFilePath, append: true, Encoding.UTF8); // Clean up older logs to keep only the latest 10 @@ -64,6 +78,7 @@ public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string s } }; _process.BeginErrorReadLine(); + _process.EnableRaisingEvents = true; // Ensure Exited event is raised _process.Exited += Process_Exited; stopwatch.Stop(); @@ -242,6 +257,16 @@ private static void ConfigureProcess(Process process, string commandExec, string process.StartInfo.RedirectStandardOutput = true; process.StartInfo.RedirectStandardError = true; process.StartInfo.StandardOutputEncoding = Encoding.UTF8; + process.StartInfo.StandardErrorEncoding = Encoding.UTF8; + process.StartInfo.WindowStyle = ProcessWindowStyle.Hidden; + + if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + process.StartInfo.Environment["SM_PROCESS_ID"] = process.Id.ToString(); + process.StartInfo.Environment["SM_PROCESS_TYPE"] = "STREAM"; + } + + process.EnableRaisingEvents = true; } /// @@ -273,4 +298,4 @@ public void Dispose() _disposed = true; GC.SuppressFinalize(this); } -} +} \ No newline at end of file From 25f525663d193fde4c330dc658f2b54a7cc57a34 Mon Sep 17 00:00:00 2001 From: Carl Reid Date: Sat, 12 Apr 2025 09:48:19 +0200 Subject: [PATCH 5/6] feat: Wrap `Stream` for termination handling --- .../Broadcasters/SourceBroadcaster.cs | 21 ++ .../Factories/CommandExecutor.cs | 341 ++++++++++++------ .../Factories/ProcessStreamWrapper.cs | 127 +++++++ 3 files changed, 374 insertions(+), 115 deletions(-) create mode 100644 src/StreamMaster.Streams/Factories/ProcessStreamWrapper.cs diff --git a/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs b/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs index 8a0f727b8..bd5a9dfcf 100644 --- a/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs +++ b/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs @@ -30,8 +30,10 @@ public class SourceBroadcaster(ILogger logger, StreamConnect /// public StreamHandlerMetrics Metrics => StreamMetricsRecorder.Metrics; + public bool IsMultiView { get; set; } public CancellationToken CancellationToken { get; } = cancellationToken; + /// public ConcurrentDictionary ChannelBroadcasters { get; } = new(); @@ -72,6 +74,7 @@ public bool RemoveChannelBroadcaster(string Id) { return ChannelBroadcasters.TryRemove(Id, out _); } + public async Task SetSourceMultiViewStreamAsync(IChannelBroadcaster channelBroadcaster, CancellationToken cancellationToken) { logger.LogInformation("Setting source stream {Name} to {StreamName}", Name, SMStreamInfo.Name); @@ -269,6 +272,7 @@ private async Task RunPipelineAsync(Stream sourceStream, string name, int buffer public async Task StopAsync() { await _stopLock.WaitAsync().ConfigureAwait(false); + Task? taskToAwait = null; try { if (Interlocked.CompareExchange(ref _isStopped, 1, 0) == 0) @@ -277,12 +281,29 @@ public async Task StopAsync() { _cancellationTokenSource?.Cancel(); } + taskToAwait = _streamingTask; } } finally { _stopLock.Release(); } + + if (taskToAwait != null) + { + try + { + await taskToAwait.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + logger.LogDebug("Task was already cancelled"); + } + catch (Exception ex) + { + logger.LogError(ex, "Error during SourceBroadcaster streaming task completion wait."); + } + } } /// diff --git a/src/StreamMaster.Streams/Factories/CommandExecutor.cs b/src/StreamMaster.Streams/Factories/CommandExecutor.cs index 14d0fe33c..2d978005f 100644 --- a/src/StreamMaster.Streams/Factories/CommandExecutor.cs +++ b/src/StreamMaster.Streams/Factories/CommandExecutor.cs @@ -9,8 +9,6 @@ namespace StreamMaster.Streams.Factories; /// public class CommandExecutor(ILogger logger) : ICommandExecutor, IDisposable { - private StreamWriter? errorWriter; - private Process? _process; private bool _disposed; /// @@ -18,6 +16,10 @@ public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string s { Stopwatch stopwatch = Stopwatch.StartNew(); + Process? currentProcess = null; + StreamWriter? currentErrorWriter = null; + Stream? wrappedStream = null; + try { string? exec = FileUtil.GetExec(commandProfile.Command); @@ -26,216 +28,302 @@ public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string s logger.LogCritical("Command \"{command}\" not found", commandProfile.Command); return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.FileNotFound, Message = $"{commandProfile.Command} not found" }); } - string options = BuildCommand(commandProfile.Parameters, clientUserAgent, streamUrl, secondsIn); - _process = new Process(); - ConfigureProcess(_process, exec, options); + currentProcess = new Process(); + ConfigureProcess(currentProcess, exec, options); using var registration = cancellationToken.Register(() => { - logger.LogDebug("Cancellation requested for Stream process"); - GracefullyTerminateProcess(); + Process? processToCancel = currentProcess; + logger.LogDebug("Cancellation requested for Stream process {ProcessId}", processToCancel?.Id ?? -1); + if (processToCancel != null) + { + GracefullyTerminateProcessInternal(processToCancel, logger); + } }); - cancellationToken.ThrowIfCancellationRequested(); + cancellationToken.ThrowIfCancellationRequested(); // Check cancellation after registration - if (!_process.Start()) + if (!currentProcess.Start()) { + currentProcess.Dispose(); + currentProcess = null; ProxyStreamError error = new() { ErrorCode = ProxyStreamErrorCode.ProcessStartFailed, Message = "Failed to start process" }; logger.LogError("Error: {ErrorMessage}", error.Message); return new GetStreamResult(null, -1, error); } + logger.LogInformation("Process {ProcessId} started successfully.", currentProcess.Id); + if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { try { - Process.Start("setpgrp", $"{_process.Id}"); + Process.Start("setpgrp", $"{currentProcess.Id}"); } catch (Exception ex) { - logger.LogWarning(ex, "Failed to set process group for {ProcessId}", _process.Id); + logger.LogWarning(ex, "Failed to set process group for {ProcessId}", currentProcess.Id); } } - string stderrFilePath = Path.Combine(BuildInfo.CommandErrorFolder, $"stderr_{_process.Id}.log"); + string stderrFilePath = Path.Combine(BuildInfo.CommandErrorFolder, $"stderr_{currentProcess.Id}.log"); Directory.CreateDirectory(Path.GetDirectoryName(stderrFilePath)!); - errorWriter = new StreamWriter(stderrFilePath, append: true, Encoding.UTF8); - - // Clean up older logs to keep only the latest 10 + currentErrorWriter = new StreamWriter(stderrFilePath, append: true, Encoding.UTF8); CleanupOldLogs(BuildInfo.CommandErrorFolder, 10); - _process.ErrorDataReceived += (_, e) => + currentProcess.ErrorDataReceived += (sender, e) => { - if (!string.IsNullOrWhiteSpace(e.Data)) + StreamWriter? writer = currentErrorWriter; + if (writer != null && !string.IsNullOrWhiteSpace(e.Data)) { - lock (errorWriter) // Ensure thread-safe writes + lock (writer) { - errorWriter.WriteLine(e.Data); - errorWriter.Flush(); + try + { + writer.WriteLine(e.Data); + writer.Flush(); + } + catch (ObjectDisposedException) { /* Ignore if writer was disposed concurrently */ } + catch (Exception ex) { logger.LogError(ex, "Error writing stderr for Process {ProcessId}", (sender as Process)?.Id ?? -1); } } } }; - _process.BeginErrorReadLine(); - _process.EnableRaisingEvents = true; // Ensure Exited event is raised - _process.Exited += Process_Exited; + currentProcess.BeginErrorReadLine(); + currentProcess.EnableRaisingEvents = true; + + currentProcess.Exited += (sender, e) => Process_Exited(sender as Process, currentErrorWriter); stopwatch.Stop(); - logger.LogInformation("Opened command with args \"{options}\" in {ElapsedMilliseconds} ms", commandProfile.Command + ' ' + commandProfile.Parameters, stopwatch.ElapsedMilliseconds); + logger.LogInformation("Opened command with args \"{options}\" for ProcessId {ProcessId} in {ElapsedMilliseconds} ms", commandProfile.Command + ' ' + commandProfile.Parameters, currentProcess.Id, stopwatch.ElapsedMilliseconds); + + Action terminateDelegate = (processToTerminate) => + { + GracefullyTerminateProcessInternal(processToTerminate, logger); + }; + + wrappedStream = new ProcessStreamWrapper(currentProcess.StandardOutput.BaseStream, currentProcess, terminateDelegate, logger); + + var processId = currentProcess.Id; + var streamToReturn = wrappedStream; + currentProcess = null; // Ownership transferred to wrapper + currentErrorWriter = null; // Ownership (disposal) transferred to Exited event/wrapper - return new GetStreamResult(_process.StandardOutput.BaseStream, _process.Id, null); + return new GetStreamResult(streamToReturn, processId, null); } - catch (OperationCanceledException ex) + catch (OperationCanceledException) { - ProxyStreamError error = new() { ErrorCode = ProxyStreamErrorCode.OperationCancelled, Message = "Operation was cancelled" }; - logger.LogError(ex, "Error: {ErrorMessage}", error.Message); - return new GetStreamResult(null, -1, error); + logger.LogInformation("ExecuteCommand cancelled for streamUrl: {StreamUrl}", streamUrl); + CleanupFailedExecution(currentProcess, currentErrorWriter); + return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.OperationCancelled, Message = "Operation was cancelled" }); } catch (Exception ex) { - ProxyStreamError error = new() { ErrorCode = ProxyStreamErrorCode.UnknownError, Message = ex.Message }; - logger.LogError(ex, "Error: {ErrorMessage}", error.Message); - return new GetStreamResult(null, -1, error); + logger.LogError(ex, "Error executing command for streamUrl {StreamUrl}: {ErrorMessage}", streamUrl, ex.Message); + CleanupFailedExecution(currentProcess, currentErrorWriter); + return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.UnknownError, Message = ex.Message }); } - finally + } + + private void CleanupFailedExecution(Process? process, StreamWriter? writer) + { + if (process != null) { - stopwatch.Stop(); + logger.LogWarning("Cleaning up process {ProcessId} due to execution failure.", process.Id); + // Attempt termination before disposing + GracefullyTerminateProcessInternal(process, logger); + try { process.Dispose(); } + catch (Exception ex) { logger.LogError(ex, "Error disposing failed process {ProcessId}.", process.Id); } + } + if (writer != null) + { + try { writer.Dispose(); } + catch (Exception ex) { logger.LogError(ex, "Error disposing failed error writer."); } } } - /// - /// Gracefully terminates the process using appropriate signals - /// - private void GracefullyTerminateProcess() + private static void GracefullyTerminateProcessInternal(Process? processToTerminate, ILogger log) { - if (_process == null || _process.HasExited) + if (processToTerminate == null) + { + log.LogDebug("GracefullyTerminateProcessInternal called with null process."); return; + } try { - logger.LogDebug("Attempting to gracefully terminate process {ProcessId}", _process.Id); + bool alreadyExited = false; + try + { + alreadyExited = processToTerminate.HasExited; + } + catch (InvalidOperationException) + { + log.LogWarning("Error checking HasExited for process {ProcessId} (may already be disposed or inaccessible). Assuming exited.", processToTerminate.Id); + alreadyExited = true; + } + catch (System.ComponentModel.Win32Exception ex) + { + log.LogWarning(ex, "Error checking HasExited for process {ProcessId}. Assuming exited.", processToTerminate.Id); + alreadyExited = true; + } + + if (alreadyExited) + { + log.LogDebug("GracefullyTerminateProcessInternal: Process {ProcessId} already exited.", processToTerminate.Id); + return; + } + + log.LogDebug("Attempting to gracefully terminate process {ProcessId}", processToTerminate.Id); if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { - if (!_process.WaitForExit(3000)) + log.LogDebug("Waiting for process {ProcessId} to exit (Windows)...", processToTerminate.Id); + if (!processToTerminate.WaitForExit(3000)) { - logger.LogWarning("Process {ProcessId} did not terminate gracefully, forcing kill", _process.Id); - _process.Kill(true); + log.LogWarning("Process {ProcessId} did not terminate gracefully after wait, forcing kill", processToTerminate.Id); + bool exitedBeforeKill = false; + try { exitedBeforeKill = processToTerminate.HasExited; } catch { } + if (!exitedBeforeKill) + { + processToTerminate.Kill(true); + } + } + else + { + log.LogDebug("Process {ProcessId} exited gracefully after wait (Windows).", processToTerminate.Id); } } else { - if (!_process.HasExited) + log.LogDebug("Sending SIGTERM to process {ProcessId}...", processToTerminate.Id); + Process.Start("kill", $"-TERM {processToTerminate.Id}"); + if (!processToTerminate.WaitForExit(3000)) { - Process.Start("kill", $"-TERM {_process.Id}"); - - if (!_process.WaitForExit(3000)) + log.LogWarning("Process {ProcessId} did not terminate after SIGTERM, sending SIGKILL", processToTerminate.Id); + bool exitedBeforeKill = false; + try { exitedBeforeKill = processToTerminate.HasExited; } catch { } + if (!exitedBeforeKill) { - logger.LogWarning("Process {ProcessId} did not terminate after SIGTERM, sending SIGKILL", _process.Id); - Process.Start("kill", $"-KILL {_process.Id}"); + log.LogDebug("Sending SIGKILL to process {ProcessId}...", processToTerminate.Id); + Process.Start("kill", $"-KILL {processToTerminate.Id}"); + processToTerminate.WaitForExit(500); } } + else + { + log.LogDebug("Process {ProcessId} terminated after SIGTERM.", processToTerminate.Id); + } } } + catch (InvalidOperationException ex) + { + log.LogWarning(ex, "Error terminating process {ProcessId}. It might have already exited.", processToTerminate.Id); + } + catch (System.ComponentModel.Win32Exception ex) + { + log.LogError(ex, "Win32Error during termination of process {ProcessId}", processToTerminate.Id); + } catch (Exception ex) { - logger.LogError(ex, "Error gracefully terminating process {ProcessId}", _process.Id); - + log.LogError(ex, "Error gracefully terminating process {ProcessId}", processToTerminate.Id); try { - if (!_process.HasExited) + bool exitedBeforeKill = false; + try { exitedBeforeKill = processToTerminate.HasExited; } catch { } + if (!exitedBeforeKill) { - _process.Kill(true); + log.LogWarning("Forcing kill on process {ProcessId} due to prior termination errors.", processToTerminate.Id); + processToTerminate.Kill(true); } } catch (Exception killEx) { - logger.LogError(killEx, "Failed to force kill process {ProcessId}", _process.Id); + log.LogError(killEx, "Failed to force kill process {ProcessId}", processToTerminate.Id); } } - } - - private void CleanupOldLogs(string directoryPath, int maxLogsToKeep) - { - try + finally { - if (!Directory.Exists(directoryPath)) - { - return; - } - - List logFiles = [.. new DirectoryInfo(directoryPath) - .GetFiles("stderr_*.log") - .OrderByDescending(f => f.CreationTime)]; - - if (logFiles.Count <= maxLogsToKeep) - { - return; // Nothing to clean up - } - - foreach (FileInfo? file in logFiles.Skip(maxLogsToKeep)) + try { - try + bool finalExitCheck = false; + try { finalExitCheck = processToTerminate.HasExited; } catch { } + if (!finalExitCheck) { - file.Delete(); + log.LogWarning("Process {ProcessId} termination logic completed, but HasExited is still false.", processToTerminate.Id); } - catch (Exception ex) + else { - logger.LogWarning(ex, "Failed to delete old log file: {FileName}", file.FullName); + log.LogDebug("Process {ProcessId} confirmed exited after termination logic.", processToTerminate.Id); } } - } - catch (Exception ex) - { - logger.LogError(ex, "Error cleaning up old logs in directory: {Directory}", directoryPath); + catch { /* Ignore final state check errors */ } } } - private void Process_Exited(object? sender, EventArgs e) + private void Process_Exited(Process? process, StreamWriter? writer) { - if (_process != null) + if (process == null) { - try + logger.LogWarning("Process_Exited called with null process."); + return; + } + + logger.LogDebug("Process {ProcessId} Exited event received.", process.Id); + try + { + if (!process.WaitForExit(1000)) { - _process.WaitForExit(); // Ensure process completes before disposing resources - _process.CancelErrorRead(); + logger.LogWarning("Process {ProcessId} Exited event received, but WaitForExit(1000) timed out.", process.Id); } - catch (Exception ex) + + try { - logger.LogWarning(ex, "Error waiting for process to exit."); + if (process.StartInfo.RedirectStandardError) + { + process.CancelErrorRead(); + } } + catch (InvalidOperationException) { /* Ignore if already detached */ } + } + catch (Exception ex) + { + logger.LogWarning(ex, "Error waiting for process {ProcessId} to fully exit in Exited event.", process.Id); } - if (errorWriter != null) + // Dispose the specific writer associated with this process exit + if (writer != null) { try { - errorWriter.Dispose(); + logger.LogDebug("Disposing error writer for Process {ProcessId}.", process.Id); + writer.Dispose(); } catch (Exception ex) { - logger.LogWarning(ex, "Error disposing error writer."); + logger.LogWarning(ex, "Error disposing error writer for Process {ProcessId}.", process.Id); } } + // Dispose the specific process handle that exited try { - _process?.Dispose(); + logger.LogDebug("Disposing process object {ProcessId}.", process.Id); + process.Dispose(); } catch (Exception ex) { - logger.LogWarning(ex, "Error disposing process."); + logger.LogWarning(ex, "Error disposing process object {ProcessId} in Exited event.", process.Id); } } private static string BuildCommand(string command, string clientUserAgent, string streamUrl, int? secondsIn) { + // (Implementation as before) string s = secondsIn.HasValue ? $"-ss {secondsIn} " : ""; - command = command.Replace("{clientUserAgent}", '"' + clientUserAgent + '"') .Replace("{streamUrl}", '"' + streamUrl + '"'); - if (secondsIn.HasValue) { int index = command.IndexOf("-i "); @@ -244,7 +332,6 @@ private static string BuildCommand(string command, string clientUserAgent, strin command = command.Insert(index, s); } } - return command; } @@ -259,43 +346,67 @@ private static void ConfigureProcess(Process process, string commandExec, string process.StartInfo.StandardOutputEncoding = Encoding.UTF8; process.StartInfo.StandardErrorEncoding = Encoding.UTF8; process.StartInfo.WindowStyle = ProcessWindowStyle.Hidden; - if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { - process.StartInfo.Environment["SM_PROCESS_ID"] = process.Id.ToString(); process.StartInfo.Environment["SM_PROCESS_TYPE"] = "STREAM"; } - process.EnableRaisingEvents = true; } + private void CleanupOldLogs(string directoryPath, int maxLogsToKeep) + { + try + { + if (!Directory.Exists(directoryPath)) + { + return; + } + List logFiles = [.. new DirectoryInfo(directoryPath) + .GetFiles("stderr_*.log") + .OrderByDescending(f => f.CreationTime)]; + if (logFiles.Count <= maxLogsToKeep) + { + return; + } + foreach (FileInfo? file in logFiles.Skip(maxLogsToKeep)) + { + try + { + file.Delete(); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to delete old log file: {FileName}", file.FullName); + } + } + } + catch (Exception ex) + { + logger.LogError(ex, "Error cleaning up old logs in directory: {Directory}", directoryPath); + } + } + /// - /// Disposes the process and cleans up resources. + /// Disposes managed resources. Primarily for fallback; lifetime should be tied to the wrapped stream. /// public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) { if (_disposed) { return; } - if (_process != null) + if (disposing) { - try - { - if (!_process.HasExited) - { - GracefullyTerminateProcess(); - } - _process.Dispose(); - } - catch (Exception ex) - { - logger.LogError(ex, "Error disposing process."); - } + logger.LogDebug("CommandExecutor Dispose({Disposing}) called. This is a fallback.", disposing); } _disposed = true; - GC.SuppressFinalize(this); } } \ No newline at end of file diff --git a/src/StreamMaster.Streams/Factories/ProcessStreamWrapper.cs b/src/StreamMaster.Streams/Factories/ProcessStreamWrapper.cs new file mode 100644 index 000000000..aa54f38d8 --- /dev/null +++ b/src/StreamMaster.Streams/Factories/ProcessStreamWrapper.cs @@ -0,0 +1,127 @@ +using System.Diagnostics; + +namespace StreamMaster.Streams.Factories; + +public class ProcessStreamWrapper : Stream +{ + private readonly Stream _baseStream; + private readonly Process _process; + private readonly Action _terminateAction; + private readonly ILogger _logger; + private bool _disposed = false; + + public ProcessStreamWrapper(Stream baseStream, Process process, Action terminateAction, ILogger logger) + { + _baseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream)); + _process = process ?? throw new ArgumentNullException(nameof(process)); + _terminateAction = terminateAction ?? throw new ArgumentNullException(nameof(terminateAction)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public override bool CanRead => _baseStream.CanRead; + public override bool CanSeek => _baseStream.CanSeek; + public override bool CanWrite => _baseStream.CanWrite; + public override long Length => _baseStream.Length; + + public override long Position + { + get => _baseStream.Position; + set => _baseStream.Position = value; + } + + public override void Flush() => _baseStream.Flush(); + + public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count); + + public override long Seek(long offset, SeekOrigin origin) => _baseStream.Seek(offset, origin); + + public override void SetLength(long value) => _baseStream.SetLength(value); + + public override void Write(byte[] buffer, int offset, int count) => _baseStream.Write(buffer, offset, count); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => + _baseStream.ReadAsync(buffer, offset, count, cancellationToken); + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => + await _baseStream.ReadAsync(buffer, cancellationToken); + + public override async ValueTask DisposeAsync() + { + if (_disposed) return; + + _logger.LogDebug("DisposeAsync called for ProcessStreamWrapper for process {ProcessId}", _process.Id); + + try + { + if (!_process.HasExited) + { + _logger.LogDebug("Process {ProcessId} has not exited, initiating termination.", _process.Id); + _terminateAction(_process); + await _process.WaitForExitAsync(); + } + else + { + _logger.LogDebug("Process {ProcessId} already exited.", _process.Id); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during process termination in ProcessStreamWrapper for process {ProcessId}", _process.Id); + } + + await _baseStream.DisposeAsync(); + + try + { + _process.Dispose(); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error disposing process object {ProcessId} in wrapper.", _process.Id); + } + + _disposed = true; + GC.SuppressFinalize(this); + + await base.DisposeAsync(); + } + + protected override void Dispose(bool disposing) + { + if (_disposed) return; + + if (disposing) + { + _logger.LogDebug("Dispose(true) called for ProcessStreamWrapper for process {ProcessId}", _process?.Id ?? -1); + try + { + if (_process != null && !_process.HasExited) + { + _logger.LogDebug("Process {ProcessId} has not exited, initiating termination (sync).", _process.Id); + _terminateAction(_process); + } + else if (_process != null) + { + _logger.LogDebug("Process {ProcessId} already exited (sync).", _process.Id); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during process termination in ProcessStreamWrapper.Dispose for process {ProcessId}", _process?.Id ?? -1); + } + + _baseStream?.Dispose(); + try + { + _process?.Dispose(); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error disposing process object {ProcessId} in wrapper (sync).", _process?.Id ?? -1); + } + } + + _disposed = true; + base.Dispose(disposing); + } +} \ No newline at end of file From d07edd8c767eaa5a33f2e0a2efc1c04625fddef1 Mon Sep 17 00:00:00 2001 From: Carl Reid Date: Mon, 5 May 2025 19:28:07 +0200 Subject: [PATCH 6/6] fix: Attempt to use `CliWrap` and `Pipelines` Hope this handled terminating underlying process better --- .../Factories/CommandExecutor.cs | 523 +++++++----------- .../StreamMaster.Streams.csproj | 2 + .../lib/smAPI/smapiTypes.ts | 98 ++-- 3 files changed, 251 insertions(+), 372 deletions(-) diff --git a/src/StreamMaster.Streams/Factories/CommandExecutor.cs b/src/StreamMaster.Streams/Factories/CommandExecutor.cs index 2d978005f..04af1430e 100644 --- a/src/StreamMaster.Streams/Factories/CommandExecutor.cs +++ b/src/StreamMaster.Streams/Factories/CommandExecutor.cs @@ -1,412 +1,289 @@ using System.Diagnostics; -using System.Runtime.InteropServices; +using System.IO.Pipelines; using System.Text; +using CliWrap; +using CliWrap.Exceptions; -namespace StreamMaster.Streams.Factories; - -/// -/// Executes commands based on the provided profiles and manages process lifecycles. -/// -public class CommandExecutor(ILogger logger) : ICommandExecutor, IDisposable +namespace StreamMaster.Streams.Factories { - private bool _disposed; - - /// - public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string streamUrl, string clientUserAgent, int? secondsIn, CancellationToken cancellationToken = default) + /// + /// Executes commands based on the provided profiles and manages process lifecycles using CliWrap. + /// Streams stdout and logs stderr. + /// + public class CommandExecutor(ILogger logger) : ICommandExecutor { - Stopwatch stopwatch = Stopwatch.StartNew(); - - Process? currentProcess = null; - StreamWriter? currentErrorWriter = null; - Stream? wrappedStream = null; - - try + private readonly ILogger _logger = logger; + private CancellationTokenSource? _processExitCts; + private CommandTask? _commandTask; + private int _processId = -1; + private string? _stderrFilePath; + private bool _disposed; + + /// + public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string streamUrl, string clientUserAgent, int? secondsIn, CancellationToken cancellationToken = default) { - string? exec = FileUtil.GetExec(commandProfile.Command); - if (exec == null) - { - logger.LogCritical("Command \"{command}\" not found", commandProfile.Command); - return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.FileNotFound, Message = $"{commandProfile.Command} not found" }); - } - string options = BuildCommand(commandProfile.Parameters, clientUserAgent, streamUrl, secondsIn); + Directory.CreateDirectory(BuildInfo.CommandErrorFolder); - currentProcess = new Process(); - ConfigureProcess(currentProcess, exec, options); + Stopwatch stopwatch = Stopwatch.StartNew(); + Pipe pipe = new(); - using var registration = cancellationToken.Register(() => - { - Process? processToCancel = currentProcess; - logger.LogDebug("Cancellation requested for Stream process {ProcessId}", processToCancel?.Id ?? -1); - if (processToCancel != null) - { - GracefullyTerminateProcessInternal(processToCancel, logger); - } - }); - - cancellationToken.ThrowIfCancellationRequested(); // Check cancellation after registration - - if (!currentProcess.Start()) - { - currentProcess.Dispose(); - currentProcess = null; - ProxyStreamError error = new() { ErrorCode = ProxyStreamErrorCode.ProcessStartFailed, Message = "Failed to start process" }; - logger.LogError("Error: {ErrorMessage}", error.Message); - return new GetStreamResult(null, -1, error); - } + _processExitCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + var internalToken = _processExitCts.Token; - logger.LogInformation("Process {ProcessId} started successfully.", currentProcess.Id); - - if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + try { - try - { - Process.Start("setpgrp", $"{currentProcess.Id}"); - } - catch (Exception ex) + string? execPath = FileUtil.GetExec(commandProfile.Command); + if (string.IsNullOrEmpty(execPath)) { - logger.LogWarning(ex, "Failed to set process group for {ProcessId}", currentProcess.Id); + _logger.LogCritical("Command executable \"{Command}\" not found in PATH or specified location.", commandProfile.Command); + return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.FileNotFound, Message = $"{commandProfile.Command} not found" }); } - } - string stderrFilePath = Path.Combine(BuildInfo.CommandErrorFolder, $"stderr_{currentProcess.Id}.log"); - Directory.CreateDirectory(Path.GetDirectoryName(stderrFilePath)!); - currentErrorWriter = new StreamWriter(stderrFilePath, append: true, Encoding.UTF8); - CleanupOldLogs(BuildInfo.CommandErrorFolder, 10); + string arguments = BuildCommand(commandProfile.Parameters, clientUserAgent, streamUrl, secondsIn); - currentProcess.ErrorDataReceived += (sender, e) => - { - StreamWriter? writer = currentErrorWriter; - if (writer != null && !string.IsNullOrWhiteSpace(e.Data)) - { - lock (writer) - { - try - { - writer.WriteLine(e.Data); - writer.Flush(); - } - catch (ObjectDisposedException) { /* Ignore if writer was disposed concurrently */ } - catch (Exception ex) { logger.LogError(ex, "Error writing stderr for Process {ProcessId}", (sender as Process)?.Id ?? -1); } - } - } - }; - currentProcess.BeginErrorReadLine(); - currentProcess.EnableRaisingEvents = true; + _stderrFilePath = Path.Combine(BuildInfo.CommandErrorFolder, $"stderr_{DateTime.Now:yyyyMMddHHmmss}_{Guid.NewGuid().ToString("N")[..8]}.log"); - currentProcess.Exited += (sender, e) => Process_Exited(sender as Process, currentErrorWriter); + _logger.LogInformation("Starting command: \"{Executable}\" with arguments: \"{Arguments}\". Logging stderr to: \"{StderrLogPath}\"", + execPath, arguments, _stderrFilePath); - stopwatch.Stop(); - logger.LogInformation("Opened command with args \"{options}\" for ProcessId {ProcessId} in {ElapsedMilliseconds} ms", commandProfile.Command + ' ' + commandProfile.Parameters, currentProcess.Id, stopwatch.ElapsedMilliseconds); + CleanupOldLogs(BuildInfo.CommandErrorFolder, 10); - Action terminateDelegate = (processToTerminate) => - { - GracefullyTerminateProcessInternal(processToTerminate, logger); - }; + Command command = Cli.Wrap(execPath) + .WithArguments(arguments) + .WithStandardOutputPipe(PipeTarget.ToStream(pipe.Writer.AsStream(), autoFlush: true)) + .WithStandardErrorPipe(PipeTarget.ToFile(_stderrFilePath)) + .WithValidation(CommandResultValidation.None); - wrappedStream = new ProcessStreamWrapper(currentProcess.StandardOutput.BaseStream, currentProcess, terminateDelegate, logger); + _commandTask = command.ExecuteAsync(internalToken); - var processId = currentProcess.Id; - var streamToReturn = wrappedStream; - currentProcess = null; // Ownership transferred to wrapper - currentErrorWriter = null; // Ownership (disposal) transferred to Exited event/wrapper + _processId = _commandTask.ProcessId; + _logger.LogInformation("Command (PID: {ProcessId}) started successfully in {ElapsedMilliseconds} ms. Arguments: {Arguments}", + _processId, stopwatch.ElapsedMilliseconds, arguments); - return new GetStreamResult(streamToReturn, processId, null); - } - catch (OperationCanceledException) - { - logger.LogInformation("ExecuteCommand cancelled for streamUrl: {StreamUrl}", streamUrl); - CleanupFailedExecution(currentProcess, currentErrorWriter); - return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.OperationCancelled, Message = "Operation was cancelled" }); - } - catch (Exception ex) - { - logger.LogError(ex, "Error executing command for streamUrl {StreamUrl}: {ErrorMessage}", streamUrl, ex.Message); - CleanupFailedExecution(currentProcess, currentErrorWriter); - return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.UnknownError, Message = ex.Message }); - } - } + _ = HandleCommandCompletionAsync(_commandTask, pipe.Writer, stopwatch, commandProfile, _processId, _stderrFilePath, internalToken); - private void CleanupFailedExecution(Process? process, StreamWriter? writer) - { - if (process != null) - { - logger.LogWarning("Cleaning up process {ProcessId} due to execution failure.", process.Id); - // Attempt termination before disposing - GracefullyTerminateProcessInternal(process, logger); - try { process.Dispose(); } - catch (Exception ex) { logger.LogError(ex, "Error disposing failed process {ProcessId}.", process.Id); } - } - if (writer != null) - { - try { writer.Dispose(); } - catch (Exception ex) { logger.LogError(ex, "Error disposing failed error writer."); } - } - } - - private static void GracefullyTerminateProcessInternal(Process? processToTerminate, ILogger log) - { - if (processToTerminate == null) - { - log.LogDebug("GracefullyTerminateProcessInternal called with null process."); - return; - } - - try - { - bool alreadyExited = false; - try - { - alreadyExited = processToTerminate.HasExited; + return new GetStreamResult(pipe.Reader.AsStream(), _processId, null); } - catch (InvalidOperationException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { - log.LogWarning("Error checking HasExited for process {ProcessId} (may already be disposed or inaccessible). Assuming exited.", processToTerminate.Id); - alreadyExited = true; + stopwatch.Stop(); + _logger.LogWarning("Command start explicitly cancelled before process execution."); + pipe.Writer.Complete(new OperationCanceledException("Command start cancelled.")); + return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.OperationCancelled, Message = "Operation was cancelled before start" }); } - catch (System.ComponentModel.Win32Exception ex) + catch (Exception ex) { - log.LogWarning(ex, "Error checking HasExited for process {ProcessId}. Assuming exited.", processToTerminate.Id); - alreadyExited = true; + stopwatch.Stop(); + _logger.LogError(ex, "Error starting command \"{Command}\": {ErrorMessage}", commandProfile.Command, ex.Message); + pipe.Writer.Complete(ex); + TryDeleteFile(_stderrFilePath); + return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.UnknownError, Message = $"Failed to start command: {ex.Message}" }); } + } - if (alreadyExited) + private async Task HandleCommandCompletionAsync( + CommandTask commandTask, + PipeWriter pipeWriter, + Stopwatch stopwatch, + CommandProfileDto commandProfile, + int processId, + string? stderrFilePath, + CancellationToken cancellationToken) + { + ProxyStreamError? error = null; + Exception? completionException = null; + try { - log.LogDebug("GracefullyTerminateProcessInternal: Process {ProcessId} already exited.", processToTerminate.Id); - return; - } - - log.LogDebug("Attempting to gracefully terminate process {ProcessId}", processToTerminate.Id); + CommandResult result = await commandTask; + stopwatch.Stop(); - if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - { - log.LogDebug("Waiting for process {ProcessId} to exit (Windows)...", processToTerminate.Id); - if (!processToTerminate.WaitForExit(3000)) + if (result.ExitCode == 0) { - log.LogWarning("Process {ProcessId} did not terminate gracefully after wait, forcing kill", processToTerminate.Id); - bool exitedBeforeKill = false; - try { exitedBeforeKill = processToTerminate.HasExited; } catch { } - if (!exitedBeforeKill) - { - processToTerminate.Kill(true); - } + _logger.LogInformation( + "Command {Command} (PID: {ProcessId}) completed successfully after {Duration}. Exit code: {ExitCode}", + commandProfile.Command, processId, stopwatch.Elapsed, result.ExitCode); } else { - log.LogDebug("Process {ProcessId} exited gracefully after wait (Windows).", processToTerminate.Id); + _logger.LogWarning( + "Command {Command} (PID: {ProcessId}) exited after {Duration} with non-zero exit code: {ExitCode}. Check stderr log: {StderrLogPath}", + commandProfile.Command, processId, stopwatch.Elapsed, result.ExitCode, stderrFilePath); + error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.IoError, Message = $"Process exited with code {result.ExitCode}" }; + completionException = new InvalidOperationException($"Process exited with code {result.ExitCode}. See log '{stderrFilePath}' for details."); } } - else + catch (OperationCanceledException ex) when (cancellationToken.IsCancellationRequested) { - log.LogDebug("Sending SIGTERM to process {ProcessId}...", processToTerminate.Id); - Process.Start("kill", $"-TERM {processToTerminate.Id}"); - if (!processToTerminate.WaitForExit(3000)) - { - log.LogWarning("Process {ProcessId} did not terminate after SIGTERM, sending SIGKILL", processToTerminate.Id); - bool exitedBeforeKill = false; - try { exitedBeforeKill = processToTerminate.HasExited; } catch { } - if (!exitedBeforeKill) - { - log.LogDebug("Sending SIGKILL to process {ProcessId}...", processToTerminate.Id); - Process.Start("kill", $"-KILL {processToTerminate.Id}"); - processToTerminate.WaitForExit(500); - } - } - else - { - log.LogDebug("Process {ProcessId} terminated after SIGTERM.", processToTerminate.Id); - } + stopwatch.Stop(); + _logger.LogInformation( + "Command {Command} (PID: {ProcessId}) was cancelled after {Duration}.", + commandProfile.Command, processId, stopwatch.Elapsed); + error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.OperationCancelled, Message = "Command execution was cancelled." }; + completionException = ex; } - } - catch (InvalidOperationException ex) - { - log.LogWarning(ex, "Error terminating process {ProcessId}. It might have already exited.", processToTerminate.Id); - } - catch (System.ComponentModel.Win32Exception ex) - { - log.LogError(ex, "Win32Error during termination of process {ProcessId}", processToTerminate.Id); - } - catch (Exception ex) - { - log.LogError(ex, "Error gracefully terminating process {ProcessId}", processToTerminate.Id); - try + catch (CommandExecutionException ex) { - bool exitedBeforeKill = false; - try { exitedBeforeKill = processToTerminate.HasExited; } catch { } - if (!exitedBeforeKill) - { - log.LogWarning("Forcing kill on process {ProcessId} due to prior termination errors.", processToTerminate.Id); - processToTerminate.Kill(true); - } + stopwatch.Stop(); + _logger.LogError(ex, + "Command {Command} (PID: {ProcessId}) failed execution after {Duration}. Exit code: {ExitCode}. Check stderr log: {StderrLogPath}. Error: {ErrorMessage}", + commandProfile.Command, processId, stopwatch.Elapsed, ex.ExitCode, stderrFilePath, ex.Message); + error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.IoError, Message = $"Command execution failed: {ex.Message}" }; + completionException = ex; + } + catch (Exception ex) + { + stopwatch.Stop(); + _logger.LogError(ex, + "Command {Command} (PID: {ProcessId}) encountered an unexpected error after {Duration}. Check stderr log: {StderrLogPath}", + commandProfile.Command, processId, stopwatch.Elapsed, stderrFilePath); + error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.UnknownError, Message = $"Unexpected error: {ex.Message}" }; + completionException = ex; } - catch (Exception killEx) + finally { - log.LogError(killEx, "Failed to force kill process {ProcessId}", processToTerminate.Id); + await pipeWriter.CompleteAsync(completionException); + _logger.LogDebug("PipeWriter completed for PID {ProcessId}.", processId); } } - finally + + private void CleanupOldLogs(string directoryPath, int maxLogsToKeep) { try { - bool finalExitCheck = false; - try { finalExitCheck = processToTerminate.HasExited; } catch { } - if (!finalExitCheck) - { - log.LogWarning("Process {ProcessId} termination logic completed, but HasExited is still false.", processToTerminate.Id); - } - else + if (!Directory.Exists(directoryPath) || maxLogsToKeep <= 0) { - log.LogDebug("Process {ProcessId} confirmed exited after termination logic.", processToTerminate.Id); + return; } - } - catch { /* Ignore final state check errors */ } - } - } - private void Process_Exited(Process? process, StreamWriter? writer) - { - if (process == null) - { - logger.LogWarning("Process_Exited called with null process."); - return; - } + var logFiles = new DirectoryInfo(directoryPath) + .GetFiles("stderr_*.log") + .OrderByDescending(f => f.LastWriteTime) + .Skip(maxLogsToKeep) + .ToList(); - logger.LogDebug("Process {ProcessId} Exited event received.", process.Id); - try - { - if (!process.WaitForExit(1000)) - { - logger.LogWarning("Process {ProcessId} Exited event received, but WaitForExit(1000) timed out.", process.Id); - } + if (!logFiles.Any()) + { + return; + } - try - { - if (process.StartInfo.RedirectStandardError) + _logger.LogDebug("Cleaning up {Count} old log files from {Directory}...", logFiles.Count, directoryPath); + foreach (FileInfo file in logFiles) { - process.CancelErrorRead(); + TryDeleteFile(file.FullName); } } - catch (InvalidOperationException) { /* Ignore if already detached */ } - } - catch (Exception ex) - { - logger.LogWarning(ex, "Error waiting for process {ProcessId} to fully exit in Exited event.", process.Id); + catch (Exception ex) + { + _logger.LogWarning(ex, "Error during old log cleanup in directory: {Directory}", directoryPath); + } } - // Dispose the specific writer associated with this process exit - if (writer != null) + private void TryDeleteFile(string? filePath) { + if (string.IsNullOrEmpty(filePath)) return; try { - logger.LogDebug("Disposing error writer for Process {ProcessId}.", process.Id); - writer.Dispose(); + if (File.Exists(filePath)) + { + File.Delete(filePath); + _logger.LogTrace("Deleted file: {FilePath}", filePath); + } } catch (Exception ex) { - logger.LogWarning(ex, "Error disposing error writer for Process {ProcessId}.", process.Id); + _logger.LogWarning(ex, "Failed to delete file: {FilePath}", filePath); } } - // Dispose the specific process handle that exited - try - { - logger.LogDebug("Disposing process object {ProcessId}.", process.Id); - process.Dispose(); - } - catch (Exception ex) + private static string BuildCommand(string command, string clientUserAgent, string streamUrl, int? secondsIn) { - logger.LogWarning(ex, "Error disposing process object {ProcessId} in Exited event.", process.Id); - } - } - - private static string BuildCommand(string command, string clientUserAgent, string streamUrl, int? secondsIn) - { - // (Implementation as before) - string s = secondsIn.HasValue ? $"-ss {secondsIn} " : ""; - command = command.Replace("{clientUserAgent}", '"' + clientUserAgent + '"') - .Replace("{streamUrl}", '"' + streamUrl + '"'); - if (secondsIn.HasValue) - { - int index = command.IndexOf("-i "); - if (index >= 0) + string s = secondsIn.HasValue ? $"-ss {secondsIn} " : ""; + command = command.Replace("{clientUserAgent}", '"' + clientUserAgent + '"') + .Replace("{streamUrl}", '"' + streamUrl + '"'); + if (secondsIn.HasValue) { - command = command.Insert(index, s); + int index = command.IndexOf("-i "); + if (index >= 0) + { + command = command.Insert(index, s); + } } + return command; } - return command; - } - private static void ConfigureProcess(Process process, string commandExec, string formattedArgs) - { - process.StartInfo.FileName = commandExec; - process.StartInfo.Arguments = formattedArgs; - process.StartInfo.CreateNoWindow = true; - process.StartInfo.UseShellExecute = false; - process.StartInfo.RedirectStandardOutput = true; - process.StartInfo.RedirectStandardError = true; - process.StartInfo.StandardOutputEncoding = Encoding.UTF8; - process.StartInfo.StandardErrorEncoding = Encoding.UTF8; - process.StartInfo.WindowStyle = ProcessWindowStyle.Hidden; - if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + /// + /// Disposes the command executor, attempts to cancel the running process, and cleans up resources. + /// + public void Dispose() { - process.StartInfo.Environment["SM_PROCESS_TYPE"] = "STREAM"; + Dispose(true); + GC.SuppressFinalize(this); } - process.EnableRaisingEvents = true; - } - private void CleanupOldLogs(string directoryPath, int maxLogsToKeep) - { - try + protected virtual void Dispose(bool disposing) { - if (!Directory.Exists(directoryPath)) - { - return; - } - List logFiles = [.. new DirectoryInfo(directoryPath) - .GetFiles("stderr_*.log") - .OrderByDescending(f => f.CreationTime)]; - if (logFiles.Count <= maxLogsToKeep) + if (_disposed) { return; } - foreach (FileInfo? file in logFiles.Skip(maxLogsToKeep)) + + if (disposing) { - try + _logger.LogDebug("Disposing CommandExecutor for PID {ProcessId}.", _processId); + + if (_processExitCts != null && !_processExitCts.IsCancellationRequested) { - file.Delete(); + _logger.LogInformation("Requesting cancellation for command (PID: {ProcessId}) via Dispose.", _processId); + try + { + _processExitCts.Cancel(); + } + catch (ObjectDisposedException) { } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error cancelling CancellationTokenSource during Dispose for PID {ProcessId}.", _processId); + } } - catch (Exception ex) + + _processExitCts?.Dispose(); + _processExitCts = null; + + if (_processId > 0) { - logger.LogWarning(ex, "Failed to delete old log file: {FileName}", file.FullName); + try + { + Process? process = Process.GetProcessById(_processId); + if (!process.HasExited) + { + _logger.LogWarning("Process (PID: {ProcessId}) still running after cancellation signal. Forcing kill.", _processId); + process.Kill(entireProcessTree: true); + _logger.LogInformation("Force killed process (PID: {ProcessId}).", _processId); + } + process.Dispose(); + } + catch (ArgumentException) + { + _logger.LogDebug("Process (PID: {ProcessId}) not found or already exited during Dispose.", _processId); + } + catch (InvalidOperationException) + { + _logger.LogDebug("Process (PID: {ProcessId}) exited before force kill attempt.", _processId); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error during final process cleanup for PID {ProcessId} in Dispose.", _processId); + } } - } - } - catch (Exception ex) - { - logger.LogError(ex, "Error cleaning up old logs in directory: {Directory}", directoryPath); - } - } - /// - /// Disposes managed resources. Primarily for fallback; lifetime should be tied to the wrapped stream. - /// - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } + _logger.LogDebug("Finished disposing CommandExecutor for PID {ProcessId}.", _processId); + } - protected virtual void Dispose(bool disposing) - { - if (_disposed) - { - return; + _disposed = true; } - if (disposing) + ~CommandExecutor() { - logger.LogDebug("CommandExecutor Dispose({Disposing}) called. This is a fallback.", disposing); + Dispose(false); } - - _disposed = true; } } \ No newline at end of file diff --git a/src/StreamMaster.Streams/StreamMaster.Streams.csproj b/src/StreamMaster.Streams/StreamMaster.Streams.csproj index c264d46e1..057d7ea57 100644 --- a/src/StreamMaster.Streams/StreamMaster.Streams.csproj +++ b/src/StreamMaster.Streams/StreamMaster.Streams.csproj @@ -6,6 +6,8 @@ + + diff --git a/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts b/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts index 0753467bc..4b58f7af7 100644 --- a/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts +++ b/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts @@ -756,30 +756,6 @@ export interface MoveToNextStreamRequest { SMChannelId: number; } -export interface GetStreamGroupSMChannelsRequest -{ - StreamGroupId: number; -} -export interface AddSMChannelsToStreamGroupByParametersRequest -{ - Parameters: QueryStringParameters; - StreamGroupId: number; -} -export interface AddSMChannelsToStreamGroupRequest -{ - SMChannelIds: number[]; - StreamGroupId: number; -} -export interface AddSMChannelToStreamGroupRequest -{ - SMChannelId: number; - StreamGroupId: number; -} -export interface RemoveSMChannelFromStreamGroupRequest -{ - SMChannelId: number; - StreamGroupId: number; -} export interface SGFS { Name: string; @@ -845,6 +821,30 @@ export interface UpdateStreamGroupRequest NewName?: string; StreamGroupId: number; } +export interface GetStreamGroupSMChannelsRequest +{ + StreamGroupId: number; +} +export interface AddSMChannelsToStreamGroupByParametersRequest +{ + Parameters: QueryStringParameters; + StreamGroupId: number; +} +export interface AddSMChannelsToStreamGroupRequest +{ + SMChannelIds: number[]; + StreamGroupId: number; +} +export interface AddSMChannelToStreamGroupRequest +{ + SMChannelId: number; + StreamGroupId: number; +} +export interface RemoveSMChannelFromStreamGroupRequest +{ + SMChannelId: number; + StreamGroupId: number; +} export interface GetChannelMetricsRequest { } @@ -956,25 +956,6 @@ export interface SendSuccessRequest Detail: string; Summary: string; } -export interface GetSMChannelStreamsRequest -{ - SMChannelId: number; -} -export interface AddSMStreamToSMChannelRequest -{ - Rank?: number; - SMChannelId: number; - SMStreamId: string; -} -export interface RemoveSMStreamFromSMChannelRequest -{ - SMChannelId: number; - SMStreamId: string; -} -export interface SetSMStreamRanksRequest -{ - Requests: SMChannelStreamRankRequest[]; -} export interface GetPagedSMChannelsRequest { Parameters: QueryStringParameters; @@ -1158,6 +1139,25 @@ export interface UpdateSMChannelRequest TimeShift?: number; VideoStreamHandler?: VideoStreamHandlers; } +export interface GetSMChannelStreamsRequest +{ + SMChannelId: number; +} +export interface AddSMStreamToSMChannelRequest +{ + Rank?: number; + SMChannelId: number; + SMStreamId: string; +} +export interface RemoveSMStreamFromSMChannelRequest +{ + SMChannelId: number; + SMStreamId: string; +} +export interface SetSMStreamRanksRequest +{ + Requests: SMChannelStreamRankRequest[]; +} export interface GetSMChannelChannelsRequest { SMChannelId: number; @@ -1455,6 +1455,12 @@ export interface SetTestTaskRequest { DelayInSeconds: number; } +export interface GetEPGColorsRequest +{ +} +export interface EPGSyncRequest +{ +} export interface GetEPGFileNamesRequest { } @@ -1515,12 +1521,6 @@ export interface UpdateEPGFileRequest TimeShift?: number; Url?: string; } -export interface GetEPGColorsRequest -{ -} -export interface EPGSyncRequest -{ -} export interface GetCustomPlayListRequest { SMStreamId?: string;