diff --git a/src/BuildClientAPI/BuildClientAPI.csproj b/src/BuildClientAPI/BuildClientAPI.csproj index b7fb0412b..3b5c604c0 100644 --- a/src/BuildClientAPI/BuildClientAPI.csproj +++ b/src/BuildClientAPI/BuildClientAPI.csproj @@ -14,6 +14,8 @@ + + diff --git a/src/StreamMaster.API/StreamMaster.API.csproj b/src/StreamMaster.API/StreamMaster.API.csproj index 209344f5b..f434856a6 100644 --- a/src/StreamMaster.API/StreamMaster.API.csproj +++ b/src/StreamMaster.API/StreamMaster.API.csproj @@ -60,6 +60,7 @@ + diff --git a/src/StreamMaster.Application/StreamMaster.Application.csproj b/src/StreamMaster.Application/StreamMaster.Application.csproj index abc8adbe2..212f5bdfe 100644 --- a/src/StreamMaster.Application/StreamMaster.Application.csproj +++ b/src/StreamMaster.Application/StreamMaster.Application.csproj @@ -75,6 +75,7 @@ + diff --git a/src/StreamMaster.Domain/Common/FileUtil.cs b/src/StreamMaster.Domain/Common/FileUtil.cs index 9a51ce5f9..c654a254a 100644 --- a/src/StreamMaster.Domain/Common/FileUtil.cs +++ b/src/StreamMaster.Domain/Common/FileUtil.cs @@ -79,6 +79,7 @@ public static bool IsFilePathValidAndExists(string filePath) { return IsValidFilePath(filePath) && File.Exists(filePath); } + /// /// Searches for the specified executable name in predefined directories. /// @@ -116,6 +117,43 @@ public static bool IsFilePathValidAndExists(string filePath) } } + // If nothing found, attempt to use .NET Process to locate the executable. + try + { + using Process process = new(); + + // Set up process to just get the path without actually running the program + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + process.StartInfo.FileName = "where"; + process.StartInfo.Arguments = executableName; + } + else + { + process.StartInfo.FileName = "which"; + process.StartInfo.Arguments = executableName; + } + + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.UseShellExecute = false; + process.StartInfo.CreateNoWindow = true; + + if (process.Start()) + { + string? output = process.StandardOutput.ReadLine(); + process.WaitForExit(); + + if (!string.IsNullOrEmpty(output) && File.Exists(output)) + { + return output; + } + } + } + catch + { + // Silently fail if the process approach doesn't work + } + return null; } diff --git a/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj b/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj index ea2e9d792..68f4d4d13 100644 --- a/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj +++ b/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj @@ -6,6 +6,9 @@ Exe true + + + diff --git a/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj b/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj index 2327b83bc..c61c30352 100644 --- a/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj +++ b/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj @@ -27,6 +27,7 @@ + diff --git a/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj b/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj index e7165b095..ded033e73 100644 --- a/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj +++ b/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj @@ -13,6 +13,8 @@ + + diff --git a/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj b/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj index 495bc751c..86c307e6a 100644 --- a/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj +++ b/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj @@ -31,6 +31,7 @@ + diff --git a/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj b/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj index ad0982df4..4176b6894 100644 --- a/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj +++ b/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs b/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs index a5686a61c..bd5a9dfcf 100644 --- a/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs +++ b/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs @@ -30,8 +30,10 @@ public class SourceBroadcaster(ILogger logger, StreamConnect /// public StreamHandlerMetrics Metrics => StreamMetricsRecorder.Metrics; + public bool IsMultiView { get; set; } public CancellationToken CancellationToken { get; } = cancellationToken; + /// public ConcurrentDictionary ChannelBroadcasters { get; } = new(); @@ -72,6 +74,7 @@ public bool RemoveChannelBroadcaster(string Id) { return ChannelBroadcasters.TryRemove(Id, out _); } + public async Task SetSourceMultiViewStreamAsync(IChannelBroadcaster channelBroadcaster, CancellationToken cancellationToken) { logger.LogInformation("Setting source stream {Name} to {StreamName}", Name, SMStreamInfo.Name); @@ -106,11 +109,14 @@ public async Task SetSourceStreamAsync(SMStreamInfo SMStreamInfo, Cancella this.SMStreamInfo = SMStreamInfo; + // Start a new streaming task + _cancellationTokenSource = new CancellationTokenSource(); + Stopwatch stopwatch = Stopwatch.StartNew(); try { (Stream? stream, int processId, ProxyStreamError? error) = - await streamFactory.GetStream(SMStreamInfo, cancellationToken).ConfigureAwait(false); + await streamFactory.GetStream(SMStreamInfo, _cancellationTokenSource.Token).ConfigureAwait(false); stopwatch.Stop(); if (stream == null || error != null) { @@ -118,8 +124,6 @@ public async Task SetSourceStreamAsync(SMStreamInfo SMStreamInfo, Cancella return 0; } - // Start a new streaming task - _cancellationTokenSource = new CancellationTokenSource(); _streamingTask = Task.Run(() => RunPipelineAsync(stream, SMStreamInfo.Name, cancellationToken: _cancellationTokenSource.Token), _cancellationTokenSource.Token); return stopwatch.ElapsedMilliseconds; } @@ -193,6 +197,11 @@ private async Task RunPipelineAsync(Stream sourceStream, string name, int buffer timeoutCts?.Dispose(); } + if (cancellationToken.IsCancellationRequested) + { + break; + } + if (bytesRead == 0) { if (!hasReadData) @@ -263,6 +272,7 @@ private async Task RunPipelineAsync(Stream sourceStream, string name, int buffer public async Task StopAsync() { await _stopLock.WaitAsync().ConfigureAwait(false); + Task? taskToAwait = null; try { if (Interlocked.CompareExchange(ref _isStopped, 1, 0) == 0) @@ -271,12 +281,29 @@ public async Task StopAsync() { _cancellationTokenSource?.Cancel(); } + taskToAwait = _streamingTask; } } finally { _stopLock.Release(); } + + if (taskToAwait != null) + { + try + { + await taskToAwait.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + logger.LogDebug("Task was already cancelled"); + } + catch (Exception ex) + { + logger.LogError(ex, "Error during SourceBroadcaster streaming task completion wait."); + } + } } /// diff --git a/src/StreamMaster.Streams/Factories/CommandExecutor.cs b/src/StreamMaster.Streams/Factories/CommandExecutor.cs index 837b65a5f..04af1430e 100644 --- a/src/StreamMaster.Streams/Factories/CommandExecutor.cs +++ b/src/StreamMaster.Streams/Factories/CommandExecutor.cs @@ -1,218 +1,289 @@ using System.Diagnostics; +using System.IO.Pipelines; using System.Text; +using CliWrap; +using CliWrap.Exceptions; -namespace StreamMaster.Streams.Factories; - -/// -/// Executes commands based on the provided profiles and manages process lifecycles. -/// -public class CommandExecutor(ILogger logger) : ICommandExecutor, IDisposable +namespace StreamMaster.Streams.Factories { - private StreamWriter? errorWriter; - private Process? _process; - private bool _disposed; - - /// - public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string streamUrl, string clientUserAgent, int? secondsIn, CancellationToken cancellationToken = default) + /// + /// Executes commands based on the provided profiles and manages process lifecycles using CliWrap. + /// Streams stdout and logs stderr. + /// + public class CommandExecutor(ILogger logger) : ICommandExecutor { - Stopwatch stopwatch = Stopwatch.StartNew(); + private readonly ILogger _logger = logger; + private CancellationTokenSource? _processExitCts; + private CommandTask? _commandTask; + private int _processId = -1; + private string? _stderrFilePath; + private bool _disposed; - try + /// + public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string streamUrl, string clientUserAgent, int? secondsIn, CancellationToken cancellationToken = default) { - string? exec = FileUtil.GetExec(commandProfile.Command); - if (exec == null) - { - logger.LogCritical("Command \"{command}\" not found", commandProfile.Command); - return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.FileNotFound, Message = $"{commandProfile.Command} not found" }); - } - - string options = BuildCommand(commandProfile.Parameters, clientUserAgent, streamUrl, secondsIn); + Directory.CreateDirectory(BuildInfo.CommandErrorFolder); - _process = new Process(); - ConfigureProcess(_process, exec, options); + Stopwatch stopwatch = Stopwatch.StartNew(); + Pipe pipe = new(); - cancellationToken.ThrowIfCancellationRequested(); + _processExitCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + var internalToken = _processExitCts.Token; - if (!_process.Start()) + try { - ProxyStreamError error = new() { ErrorCode = ProxyStreamErrorCode.ProcessStartFailed, Message = "Failed to start process" }; - logger.LogError("Error: {ErrorMessage}", error.Message); - return new GetStreamResult(null, -1, error); - } + string? execPath = FileUtil.GetExec(commandProfile.Command); + if (string.IsNullOrEmpty(execPath)) + { + _logger.LogCritical("Command executable \"{Command}\" not found in PATH or specified location.", commandProfile.Command); + return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.FileNotFound, Message = $"{commandProfile.Command} not found" }); + } - string stderrFilePath = Path.Combine(BuildInfo.CommandErrorFolder, $"stderr_{_process.Id}.log"); - errorWriter = new StreamWriter(stderrFilePath, append: true, Encoding.UTF8); + string arguments = BuildCommand(commandProfile.Parameters, clientUserAgent, streamUrl, secondsIn); - // Clean up older logs to keep only the latest 10 - CleanupOldLogs(BuildInfo.CommandErrorFolder, 10); + _stderrFilePath = Path.Combine(BuildInfo.CommandErrorFolder, $"stderr_{DateTime.Now:yyyyMMddHHmmss}_{Guid.NewGuid().ToString("N")[..8]}.log"); - _process.ErrorDataReceived += (_, e) => - { - if (!string.IsNullOrWhiteSpace(e.Data)) - { - lock (errorWriter) // Ensure thread-safe writes - { - errorWriter.WriteLine(e.Data); - errorWriter.Flush(); - } - } - }; - _process.BeginErrorReadLine(); - _process.Exited += Process_Exited; + _logger.LogInformation("Starting command: \"{Executable}\" with arguments: \"{Arguments}\". Logging stderr to: \"{StderrLogPath}\"", + execPath, arguments, _stderrFilePath); - stopwatch.Stop(); - logger.LogInformation("Opened command with args \"{options}\" in {ElapsedMilliseconds} ms", commandProfile.Command + ' ' + commandProfile.Parameters, stopwatch.ElapsedMilliseconds); + CleanupOldLogs(BuildInfo.CommandErrorFolder, 10); - return new GetStreamResult(_process.StandardOutput.BaseStream, _process.Id, null); - } - catch (OperationCanceledException ex) - { - ProxyStreamError error = new() { ErrorCode = ProxyStreamErrorCode.OperationCancelled, Message = "Operation was cancelled" }; - logger.LogError(ex, "Error: {ErrorMessage}", error.Message); - return new GetStreamResult(null, -1, error); - } - catch (Exception ex) - { - ProxyStreamError error = new() { ErrorCode = ProxyStreamErrorCode.UnknownError, Message = ex.Message }; - logger.LogError(ex, "Error: {ErrorMessage}", error.Message); - return new GetStreamResult(null, -1, error); - } - finally - { - stopwatch.Stop(); - } - } + Command command = Cli.Wrap(execPath) + .WithArguments(arguments) + .WithStandardOutputPipe(PipeTarget.ToStream(pipe.Writer.AsStream(), autoFlush: true)) + .WithStandardErrorPipe(PipeTarget.ToFile(_stderrFilePath)) + .WithValidation(CommandResultValidation.None); - private void CleanupOldLogs(string directoryPath, int maxLogsToKeep) - { - try - { - if (!Directory.Exists(directoryPath)) - { - return; - } + _commandTask = command.ExecuteAsync(internalToken); - List logFiles = [.. new DirectoryInfo(directoryPath) - .GetFiles("stderr_*.log") - .OrderByDescending(f => f.CreationTime)]; + _processId = _commandTask.ProcessId; + _logger.LogInformation("Command (PID: {ProcessId}) started successfully in {ElapsedMilliseconds} ms. Arguments: {Arguments}", + _processId, stopwatch.ElapsedMilliseconds, arguments); - if (logFiles.Count <= maxLogsToKeep) + _ = HandleCommandCompletionAsync(_commandTask, pipe.Writer, stopwatch, commandProfile, _processId, _stderrFilePath, internalToken); + + return new GetStreamResult(pipe.Reader.AsStream(), _processId, null); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + stopwatch.Stop(); + _logger.LogWarning("Command start explicitly cancelled before process execution."); + pipe.Writer.Complete(new OperationCanceledException("Command start cancelled.")); + return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.OperationCancelled, Message = "Operation was cancelled before start" }); + } + catch (Exception ex) { - return; // Nothing to clean up + stopwatch.Stop(); + _logger.LogError(ex, "Error starting command \"{Command}\": {ErrorMessage}", commandProfile.Command, ex.Message); + pipe.Writer.Complete(ex); + TryDeleteFile(_stderrFilePath); + return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.UnknownError, Message = $"Failed to start command: {ex.Message}" }); } + } - foreach (FileInfo? file in logFiles.Skip(maxLogsToKeep)) + private async Task HandleCommandCompletionAsync( + CommandTask commandTask, + PipeWriter pipeWriter, + Stopwatch stopwatch, + CommandProfileDto commandProfile, + int processId, + string? stderrFilePath, + CancellationToken cancellationToken) + { + ProxyStreamError? error = null; + Exception? completionException = null; + try { - try + CommandResult result = await commandTask; + stopwatch.Stop(); + + if (result.ExitCode == 0) { - file.Delete(); + _logger.LogInformation( + "Command {Command} (PID: {ProcessId}) completed successfully after {Duration}. Exit code: {ExitCode}", + commandProfile.Command, processId, stopwatch.Elapsed, result.ExitCode); } - catch (Exception ex) + else { - logger.LogWarning(ex, "Failed to delete old log file: {FileName}", file.FullName); + _logger.LogWarning( + "Command {Command} (PID: {ProcessId}) exited after {Duration} with non-zero exit code: {ExitCode}. Check stderr log: {StderrLogPath}", + commandProfile.Command, processId, stopwatch.Elapsed, result.ExitCode, stderrFilePath); + error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.IoError, Message = $"Process exited with code {result.ExitCode}" }; + completionException = new InvalidOperationException($"Process exited with code {result.ExitCode}. See log '{stderrFilePath}' for details."); } } + catch (OperationCanceledException ex) when (cancellationToken.IsCancellationRequested) + { + stopwatch.Stop(); + _logger.LogInformation( + "Command {Command} (PID: {ProcessId}) was cancelled after {Duration}.", + commandProfile.Command, processId, stopwatch.Elapsed); + error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.OperationCancelled, Message = "Command execution was cancelled." }; + completionException = ex; + } + catch (CommandExecutionException ex) + { + stopwatch.Stop(); + _logger.LogError(ex, + "Command {Command} (PID: {ProcessId}) failed execution after {Duration}. Exit code: {ExitCode}. Check stderr log: {StderrLogPath}. Error: {ErrorMessage}", + commandProfile.Command, processId, stopwatch.Elapsed, ex.ExitCode, stderrFilePath, ex.Message); + error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.IoError, Message = $"Command execution failed: {ex.Message}" }; + completionException = ex; + } + catch (Exception ex) + { + stopwatch.Stop(); + _logger.LogError(ex, + "Command {Command} (PID: {ProcessId}) encountered an unexpected error after {Duration}. Check stderr log: {StderrLogPath}", + commandProfile.Command, processId, stopwatch.Elapsed, stderrFilePath); + error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.UnknownError, Message = $"Unexpected error: {ex.Message}" }; + completionException = ex; + } + finally + { + await pipeWriter.CompleteAsync(completionException); + _logger.LogDebug("PipeWriter completed for PID {ProcessId}.", processId); + } } - catch (Exception ex) - { - logger.LogError(ex, "Error cleaning up old logs in directory: {Directory}", directoryPath); - } - } - private void Process_Exited(object? sender, EventArgs e) - { - if (_process != null) + private void CleanupOldLogs(string directoryPath, int maxLogsToKeep) { try { - _process.WaitForExit(); // Ensure process completes before disposing resources - _process.CancelErrorRead(); + if (!Directory.Exists(directoryPath) || maxLogsToKeep <= 0) + { + return; + } + + var logFiles = new DirectoryInfo(directoryPath) + .GetFiles("stderr_*.log") + .OrderByDescending(f => f.LastWriteTime) + .Skip(maxLogsToKeep) + .ToList(); + + if (!logFiles.Any()) + { + return; + } + + _logger.LogDebug("Cleaning up {Count} old log files from {Directory}...", logFiles.Count, directoryPath); + foreach (FileInfo file in logFiles) + { + TryDeleteFile(file.FullName); + } } catch (Exception ex) { - logger.LogWarning(ex, "Error waiting for process to exit."); + _logger.LogWarning(ex, "Error during old log cleanup in directory: {Directory}", directoryPath); } } - if (errorWriter != null) + private void TryDeleteFile(string? filePath) { + if (string.IsNullOrEmpty(filePath)) return; try { - errorWriter.Dispose(); + if (File.Exists(filePath)) + { + File.Delete(filePath); + _logger.LogTrace("Deleted file: {FilePath}", filePath); + } } catch (Exception ex) { - logger.LogWarning(ex, "Error disposing error writer."); + _logger.LogWarning(ex, "Failed to delete file: {FilePath}", filePath); } } - try + private static string BuildCommand(string command, string clientUserAgent, string streamUrl, int? secondsIn) { - _process?.Dispose(); + string s = secondsIn.HasValue ? $"-ss {secondsIn} " : ""; + command = command.Replace("{clientUserAgent}", '"' + clientUserAgent + '"') + .Replace("{streamUrl}", '"' + streamUrl + '"'); + if (secondsIn.HasValue) + { + int index = command.IndexOf("-i "); + if (index >= 0) + { + command = command.Insert(index, s); + } + } + return command; } - catch (Exception ex) + + /// + /// Disposes the command executor, attempts to cancel the running process, and cleans up resources. + /// + public void Dispose() { - logger.LogWarning(ex, "Error disposing process."); + Dispose(true); + GC.SuppressFinalize(this); } - } - - private static string BuildCommand(string command, string clientUserAgent, string streamUrl, int? secondsIn) - { - string s = secondsIn.HasValue ? $"-ss {secondsIn} " : ""; - - command = command.Replace("{clientUserAgent}", '"' + clientUserAgent + '"') - .Replace("{streamUrl}", '"' + streamUrl + '"'); - if (secondsIn.HasValue) + protected virtual void Dispose(bool disposing) { - int index = command.IndexOf("-i "); - if (index >= 0) + if (_disposed) { - command = command.Insert(index, s); + return; } - } - return command; - } + if (disposing) + { + _logger.LogDebug("Disposing CommandExecutor for PID {ProcessId}.", _processId); - private static void ConfigureProcess(Process process, string commandExec, string formattedArgs) - { - process.StartInfo.FileName = commandExec; - process.StartInfo.Arguments = formattedArgs; - process.StartInfo.CreateNoWindow = true; - process.StartInfo.UseShellExecute = false; - process.StartInfo.RedirectStandardOutput = true; - process.StartInfo.RedirectStandardError = true; - process.StartInfo.StandardOutputEncoding = Encoding.UTF8; - } + if (_processExitCts != null && !_processExitCts.IsCancellationRequested) + { + _logger.LogInformation("Requesting cancellation for command (PID: {ProcessId}) via Dispose.", _processId); + try + { + _processExitCts.Cancel(); + } + catch (ObjectDisposedException) { } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error cancelling CancellationTokenSource during Dispose for PID {ProcessId}.", _processId); + } + } - /// - /// Disposes the process and cleans up resources. - /// - public void Dispose() - { - if (_disposed) - { - return; - } + _processExitCts?.Dispose(); + _processExitCts = null; - if (_process != null) - { - try - { - if (!_process.HasExited) + if (_processId > 0) { - _process.Kill(); + try + { + Process? process = Process.GetProcessById(_processId); + if (!process.HasExited) + { + _logger.LogWarning("Process (PID: {ProcessId}) still running after cancellation signal. Forcing kill.", _processId); + process.Kill(entireProcessTree: true); + _logger.LogInformation("Force killed process (PID: {ProcessId}).", _processId); + } + process.Dispose(); + } + catch (ArgumentException) + { + _logger.LogDebug("Process (PID: {ProcessId}) not found or already exited during Dispose.", _processId); + } + catch (InvalidOperationException) + { + _logger.LogDebug("Process (PID: {ProcessId}) exited before force kill attempt.", _processId); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error during final process cleanup for PID {ProcessId} in Dispose.", _processId); + } } - _process.Dispose(); - } - catch (Exception ex) - { - logger.LogError(ex, "Error disposing process."); + + _logger.LogDebug("Finished disposing CommandExecutor for PID {ProcessId}.", _processId); } + + _disposed = true; } - _disposed = true; - GC.SuppressFinalize(this); + ~CommandExecutor() + { + Dispose(false); + } } -} +} \ No newline at end of file diff --git a/src/StreamMaster.Streams/Factories/ProcessStreamWrapper.cs b/src/StreamMaster.Streams/Factories/ProcessStreamWrapper.cs new file mode 100644 index 000000000..aa54f38d8 --- /dev/null +++ b/src/StreamMaster.Streams/Factories/ProcessStreamWrapper.cs @@ -0,0 +1,127 @@ +using System.Diagnostics; + +namespace StreamMaster.Streams.Factories; + +public class ProcessStreamWrapper : Stream +{ + private readonly Stream _baseStream; + private readonly Process _process; + private readonly Action _terminateAction; + private readonly ILogger _logger; + private bool _disposed = false; + + public ProcessStreamWrapper(Stream baseStream, Process process, Action terminateAction, ILogger logger) + { + _baseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream)); + _process = process ?? throw new ArgumentNullException(nameof(process)); + _terminateAction = terminateAction ?? throw new ArgumentNullException(nameof(terminateAction)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public override bool CanRead => _baseStream.CanRead; + public override bool CanSeek => _baseStream.CanSeek; + public override bool CanWrite => _baseStream.CanWrite; + public override long Length => _baseStream.Length; + + public override long Position + { + get => _baseStream.Position; + set => _baseStream.Position = value; + } + + public override void Flush() => _baseStream.Flush(); + + public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count); + + public override long Seek(long offset, SeekOrigin origin) => _baseStream.Seek(offset, origin); + + public override void SetLength(long value) => _baseStream.SetLength(value); + + public override void Write(byte[] buffer, int offset, int count) => _baseStream.Write(buffer, offset, count); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => + _baseStream.ReadAsync(buffer, offset, count, cancellationToken); + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => + await _baseStream.ReadAsync(buffer, cancellationToken); + + public override async ValueTask DisposeAsync() + { + if (_disposed) return; + + _logger.LogDebug("DisposeAsync called for ProcessStreamWrapper for process {ProcessId}", _process.Id); + + try + { + if (!_process.HasExited) + { + _logger.LogDebug("Process {ProcessId} has not exited, initiating termination.", _process.Id); + _terminateAction(_process); + await _process.WaitForExitAsync(); + } + else + { + _logger.LogDebug("Process {ProcessId} already exited.", _process.Id); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during process termination in ProcessStreamWrapper for process {ProcessId}", _process.Id); + } + + await _baseStream.DisposeAsync(); + + try + { + _process.Dispose(); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error disposing process object {ProcessId} in wrapper.", _process.Id); + } + + _disposed = true; + GC.SuppressFinalize(this); + + await base.DisposeAsync(); + } + + protected override void Dispose(bool disposing) + { + if (_disposed) return; + + if (disposing) + { + _logger.LogDebug("Dispose(true) called for ProcessStreamWrapper for process {ProcessId}", _process?.Id ?? -1); + try + { + if (_process != null && !_process.HasExited) + { + _logger.LogDebug("Process {ProcessId} has not exited, initiating termination (sync).", _process.Id); + _terminateAction(_process); + } + else if (_process != null) + { + _logger.LogDebug("Process {ProcessId} already exited (sync).", _process.Id); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during process termination in ProcessStreamWrapper.Dispose for process {ProcessId}", _process?.Id ?? -1); + } + + _baseStream?.Dispose(); + try + { + _process?.Dispose(); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error disposing process object {ProcessId} in wrapper (sync).", _process?.Id ?? -1); + } + } + + _disposed = true; + base.Dispose(disposing); + } +} \ No newline at end of file diff --git a/src/StreamMaster.Streams/StreamMaster.Streams.csproj b/src/StreamMaster.Streams/StreamMaster.Streams.csproj index fad049d7f..057d7ea57 100644 --- a/src/StreamMaster.Streams/StreamMaster.Streams.csproj +++ b/src/StreamMaster.Streams/StreamMaster.Streams.csproj @@ -6,6 +6,8 @@ + + @@ -14,7 +16,7 @@ - + diff --git a/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts b/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts index 0753467bc..4b58f7af7 100644 --- a/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts +++ b/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts @@ -756,30 +756,6 @@ export interface MoveToNextStreamRequest { SMChannelId: number; } -export interface GetStreamGroupSMChannelsRequest -{ - StreamGroupId: number; -} -export interface AddSMChannelsToStreamGroupByParametersRequest -{ - Parameters: QueryStringParameters; - StreamGroupId: number; -} -export interface AddSMChannelsToStreamGroupRequest -{ - SMChannelIds: number[]; - StreamGroupId: number; -} -export interface AddSMChannelToStreamGroupRequest -{ - SMChannelId: number; - StreamGroupId: number; -} -export interface RemoveSMChannelFromStreamGroupRequest -{ - SMChannelId: number; - StreamGroupId: number; -} export interface SGFS { Name: string; @@ -845,6 +821,30 @@ export interface UpdateStreamGroupRequest NewName?: string; StreamGroupId: number; } +export interface GetStreamGroupSMChannelsRequest +{ + StreamGroupId: number; +} +export interface AddSMChannelsToStreamGroupByParametersRequest +{ + Parameters: QueryStringParameters; + StreamGroupId: number; +} +export interface AddSMChannelsToStreamGroupRequest +{ + SMChannelIds: number[]; + StreamGroupId: number; +} +export interface AddSMChannelToStreamGroupRequest +{ + SMChannelId: number; + StreamGroupId: number; +} +export interface RemoveSMChannelFromStreamGroupRequest +{ + SMChannelId: number; + StreamGroupId: number; +} export interface GetChannelMetricsRequest { } @@ -956,25 +956,6 @@ export interface SendSuccessRequest Detail: string; Summary: string; } -export interface GetSMChannelStreamsRequest -{ - SMChannelId: number; -} -export interface AddSMStreamToSMChannelRequest -{ - Rank?: number; - SMChannelId: number; - SMStreamId: string; -} -export interface RemoveSMStreamFromSMChannelRequest -{ - SMChannelId: number; - SMStreamId: string; -} -export interface SetSMStreamRanksRequest -{ - Requests: SMChannelStreamRankRequest[]; -} export interface GetPagedSMChannelsRequest { Parameters: QueryStringParameters; @@ -1158,6 +1139,25 @@ export interface UpdateSMChannelRequest TimeShift?: number; VideoStreamHandler?: VideoStreamHandlers; } +export interface GetSMChannelStreamsRequest +{ + SMChannelId: number; +} +export interface AddSMStreamToSMChannelRequest +{ + Rank?: number; + SMChannelId: number; + SMStreamId: string; +} +export interface RemoveSMStreamFromSMChannelRequest +{ + SMChannelId: number; + SMStreamId: string; +} +export interface SetSMStreamRanksRequest +{ + Requests: SMChannelStreamRankRequest[]; +} export interface GetSMChannelChannelsRequest { SMChannelId: number; @@ -1455,6 +1455,12 @@ export interface SetTestTaskRequest { DelayInSeconds: number; } +export interface GetEPGColorsRequest +{ +} +export interface EPGSyncRequest +{ +} export interface GetEPGFileNamesRequest { } @@ -1515,12 +1521,6 @@ export interface UpdateEPGFileRequest TimeShift?: number; Url?: string; } -export interface GetEPGColorsRequest -{ -} -export interface EPGSyncRequest -{ -} export interface GetCustomPlayListRequest { SMStreamId?: string; diff --git a/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj b/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj index abec59742..9399636c4 100644 --- a/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj +++ b/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj @@ -13,6 +13,7 @@ + diff --git a/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj b/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj index 1374b3eb6..e032c6df6 100644 --- a/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj +++ b/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj @@ -12,6 +12,7 @@ + diff --git a/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj b/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj index f48ba63e0..58c66b924 100644 --- a/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj +++ b/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj @@ -12,6 +12,7 @@ +