diff --git a/src/BuildClientAPI/BuildClientAPI.csproj b/src/BuildClientAPI/BuildClientAPI.csproj
index b7fb0412b..3b5c604c0 100644
--- a/src/BuildClientAPI/BuildClientAPI.csproj
+++ b/src/BuildClientAPI/BuildClientAPI.csproj
@@ -14,6 +14,8 @@
+
+
diff --git a/src/StreamMaster.API/StreamMaster.API.csproj b/src/StreamMaster.API/StreamMaster.API.csproj
index 209344f5b..f434856a6 100644
--- a/src/StreamMaster.API/StreamMaster.API.csproj
+++ b/src/StreamMaster.API/StreamMaster.API.csproj
@@ -60,6 +60,7 @@
+
diff --git a/src/StreamMaster.Application/StreamMaster.Application.csproj b/src/StreamMaster.Application/StreamMaster.Application.csproj
index abc8adbe2..212f5bdfe 100644
--- a/src/StreamMaster.Application/StreamMaster.Application.csproj
+++ b/src/StreamMaster.Application/StreamMaster.Application.csproj
@@ -75,6 +75,7 @@
+
diff --git a/src/StreamMaster.Domain/Common/FileUtil.cs b/src/StreamMaster.Domain/Common/FileUtil.cs
index 9a51ce5f9..c654a254a 100644
--- a/src/StreamMaster.Domain/Common/FileUtil.cs
+++ b/src/StreamMaster.Domain/Common/FileUtil.cs
@@ -79,6 +79,7 @@ public static bool IsFilePathValidAndExists(string filePath)
{
return IsValidFilePath(filePath) && File.Exists(filePath);
}
+
///
/// Searches for the specified executable name in predefined directories.
///
@@ -116,6 +117,43 @@ public static bool IsFilePathValidAndExists(string filePath)
}
}
+ // If nothing found, attempt to use .NET Process to locate the executable.
+ try
+ {
+ using Process process = new();
+
+ // Set up process to just get the path without actually running the program
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ process.StartInfo.FileName = "where";
+ process.StartInfo.Arguments = executableName;
+ }
+ else
+ {
+ process.StartInfo.FileName = "which";
+ process.StartInfo.Arguments = executableName;
+ }
+
+ process.StartInfo.RedirectStandardOutput = true;
+ process.StartInfo.UseShellExecute = false;
+ process.StartInfo.CreateNoWindow = true;
+
+ if (process.Start())
+ {
+ string? output = process.StandardOutput.ReadLine();
+ process.WaitForExit();
+
+ if (!string.IsNullOrEmpty(output) && File.Exists(output))
+ {
+ return output;
+ }
+ }
+ }
+ catch
+ {
+ // Silently fail if the process approach doesn't work
+ }
+
return null;
}
diff --git a/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj b/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj
index ea2e9d792..68f4d4d13 100644
--- a/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj
+++ b/src/StreamMaster.Infrastructure.EF.PGSQL.Console/StreamMaster.Infrastructure.EF.PGSQL.Console.csproj
@@ -6,6 +6,9 @@
Exe
true
+
+
+
diff --git a/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj b/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj
index 2327b83bc..c61c30352 100644
--- a/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj
+++ b/src/StreamMaster.Infrastructure.EF.PGSQL/StreamMaster.Infrastructure.EF.PGSQL.csproj
@@ -27,6 +27,7 @@
+
diff --git a/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj b/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj
index e7165b095..ded033e73 100644
--- a/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj
+++ b/src/StreamMaster.Infrastructure.EF/StreamMaster.Infrastructure.EF.csproj
@@ -13,6 +13,8 @@
+
+
diff --git a/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj b/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj
index 495bc751c..86c307e6a 100644
--- a/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj
+++ b/src/StreamMaster.Infrastructure/StreamMaster.Infrastructure.csproj
@@ -31,6 +31,7 @@
+
diff --git a/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj b/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj
index ad0982df4..4176b6894 100644
--- a/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj
+++ b/src/StreamMaster.SchedulesDirect/StreamMaster.SchedulesDirect.csproj
@@ -12,7 +12,7 @@
-
+
diff --git a/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs b/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs
index a5686a61c..bd5a9dfcf 100644
--- a/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs
+++ b/src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs
@@ -30,8 +30,10 @@ public class SourceBroadcaster(ILogger logger, StreamConnect
///
public StreamHandlerMetrics Metrics => StreamMetricsRecorder.Metrics;
+
public bool IsMultiView { get; set; }
public CancellationToken CancellationToken { get; } = cancellationToken;
+
///
public ConcurrentDictionary ChannelBroadcasters { get; } = new();
@@ -72,6 +74,7 @@ public bool RemoveChannelBroadcaster(string Id)
{
return ChannelBroadcasters.TryRemove(Id, out _);
}
+
public async Task SetSourceMultiViewStreamAsync(IChannelBroadcaster channelBroadcaster, CancellationToken cancellationToken)
{
logger.LogInformation("Setting source stream {Name} to {StreamName}", Name, SMStreamInfo.Name);
@@ -106,11 +109,14 @@ public async Task SetSourceStreamAsync(SMStreamInfo SMStreamInfo, Cancella
this.SMStreamInfo = SMStreamInfo;
+ // Start a new streaming task
+ _cancellationTokenSource = new CancellationTokenSource();
+
Stopwatch stopwatch = Stopwatch.StartNew();
try
{
(Stream? stream, int processId, ProxyStreamError? error) =
- await streamFactory.GetStream(SMStreamInfo, cancellationToken).ConfigureAwait(false);
+ await streamFactory.GetStream(SMStreamInfo, _cancellationTokenSource.Token).ConfigureAwait(false);
stopwatch.Stop();
if (stream == null || error != null)
{
@@ -118,8 +124,6 @@ public async Task SetSourceStreamAsync(SMStreamInfo SMStreamInfo, Cancella
return 0;
}
- // Start a new streaming task
- _cancellationTokenSource = new CancellationTokenSource();
_streamingTask = Task.Run(() => RunPipelineAsync(stream, SMStreamInfo.Name, cancellationToken: _cancellationTokenSource.Token), _cancellationTokenSource.Token);
return stopwatch.ElapsedMilliseconds;
}
@@ -193,6 +197,11 @@ private async Task RunPipelineAsync(Stream sourceStream, string name, int buffer
timeoutCts?.Dispose();
}
+ if (cancellationToken.IsCancellationRequested)
+ {
+ break;
+ }
+
if (bytesRead == 0)
{
if (!hasReadData)
@@ -263,6 +272,7 @@ private async Task RunPipelineAsync(Stream sourceStream, string name, int buffer
public async Task StopAsync()
{
await _stopLock.WaitAsync().ConfigureAwait(false);
+ Task? taskToAwait = null;
try
{
if (Interlocked.CompareExchange(ref _isStopped, 1, 0) == 0)
@@ -271,12 +281,29 @@ public async Task StopAsync()
{
_cancellationTokenSource?.Cancel();
}
+ taskToAwait = _streamingTask;
}
}
finally
{
_stopLock.Release();
}
+
+ if (taskToAwait != null)
+ {
+ try
+ {
+ await taskToAwait.ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ logger.LogDebug("Task was already cancelled");
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Error during SourceBroadcaster streaming task completion wait.");
+ }
+ }
}
///
diff --git a/src/StreamMaster.Streams/Factories/CommandExecutor.cs b/src/StreamMaster.Streams/Factories/CommandExecutor.cs
index 837b65a5f..04af1430e 100644
--- a/src/StreamMaster.Streams/Factories/CommandExecutor.cs
+++ b/src/StreamMaster.Streams/Factories/CommandExecutor.cs
@@ -1,218 +1,289 @@
using System.Diagnostics;
+using System.IO.Pipelines;
using System.Text;
+using CliWrap;
+using CliWrap.Exceptions;
-namespace StreamMaster.Streams.Factories;
-
-///
-/// Executes commands based on the provided profiles and manages process lifecycles.
-///
-public class CommandExecutor(ILogger logger) : ICommandExecutor, IDisposable
+namespace StreamMaster.Streams.Factories
{
- private StreamWriter? errorWriter;
- private Process? _process;
- private bool _disposed;
-
- ///
- public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string streamUrl, string clientUserAgent, int? secondsIn, CancellationToken cancellationToken = default)
+ ///
+ /// Executes commands based on the provided profiles and manages process lifecycles using CliWrap.
+ /// Streams stdout and logs stderr.
+ ///
+ public class CommandExecutor(ILogger logger) : ICommandExecutor
{
- Stopwatch stopwatch = Stopwatch.StartNew();
+ private readonly ILogger _logger = logger;
+ private CancellationTokenSource? _processExitCts;
+ private CommandTask? _commandTask;
+ private int _processId = -1;
+ private string? _stderrFilePath;
+ private bool _disposed;
- try
+ ///
+ public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string streamUrl, string clientUserAgent, int? secondsIn, CancellationToken cancellationToken = default)
{
- string? exec = FileUtil.GetExec(commandProfile.Command);
- if (exec == null)
- {
- logger.LogCritical("Command \"{command}\" not found", commandProfile.Command);
- return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.FileNotFound, Message = $"{commandProfile.Command} not found" });
- }
-
- string options = BuildCommand(commandProfile.Parameters, clientUserAgent, streamUrl, secondsIn);
+ Directory.CreateDirectory(BuildInfo.CommandErrorFolder);
- _process = new Process();
- ConfigureProcess(_process, exec, options);
+ Stopwatch stopwatch = Stopwatch.StartNew();
+ Pipe pipe = new();
- cancellationToken.ThrowIfCancellationRequested();
+ _processExitCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ var internalToken = _processExitCts.Token;
- if (!_process.Start())
+ try
{
- ProxyStreamError error = new() { ErrorCode = ProxyStreamErrorCode.ProcessStartFailed, Message = "Failed to start process" };
- logger.LogError("Error: {ErrorMessage}", error.Message);
- return new GetStreamResult(null, -1, error);
- }
+ string? execPath = FileUtil.GetExec(commandProfile.Command);
+ if (string.IsNullOrEmpty(execPath))
+ {
+ _logger.LogCritical("Command executable \"{Command}\" not found in PATH or specified location.", commandProfile.Command);
+ return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.FileNotFound, Message = $"{commandProfile.Command} not found" });
+ }
- string stderrFilePath = Path.Combine(BuildInfo.CommandErrorFolder, $"stderr_{_process.Id}.log");
- errorWriter = new StreamWriter(stderrFilePath, append: true, Encoding.UTF8);
+ string arguments = BuildCommand(commandProfile.Parameters, clientUserAgent, streamUrl, secondsIn);
- // Clean up older logs to keep only the latest 10
- CleanupOldLogs(BuildInfo.CommandErrorFolder, 10);
+ _stderrFilePath = Path.Combine(BuildInfo.CommandErrorFolder, $"stderr_{DateTime.Now:yyyyMMddHHmmss}_{Guid.NewGuid().ToString("N")[..8]}.log");
- _process.ErrorDataReceived += (_, e) =>
- {
- if (!string.IsNullOrWhiteSpace(e.Data))
- {
- lock (errorWriter) // Ensure thread-safe writes
- {
- errorWriter.WriteLine(e.Data);
- errorWriter.Flush();
- }
- }
- };
- _process.BeginErrorReadLine();
- _process.Exited += Process_Exited;
+ _logger.LogInformation("Starting command: \"{Executable}\" with arguments: \"{Arguments}\". Logging stderr to: \"{StderrLogPath}\"",
+ execPath, arguments, _stderrFilePath);
- stopwatch.Stop();
- logger.LogInformation("Opened command with args \"{options}\" in {ElapsedMilliseconds} ms", commandProfile.Command + ' ' + commandProfile.Parameters, stopwatch.ElapsedMilliseconds);
+ CleanupOldLogs(BuildInfo.CommandErrorFolder, 10);
- return new GetStreamResult(_process.StandardOutput.BaseStream, _process.Id, null);
- }
- catch (OperationCanceledException ex)
- {
- ProxyStreamError error = new() { ErrorCode = ProxyStreamErrorCode.OperationCancelled, Message = "Operation was cancelled" };
- logger.LogError(ex, "Error: {ErrorMessage}", error.Message);
- return new GetStreamResult(null, -1, error);
- }
- catch (Exception ex)
- {
- ProxyStreamError error = new() { ErrorCode = ProxyStreamErrorCode.UnknownError, Message = ex.Message };
- logger.LogError(ex, "Error: {ErrorMessage}", error.Message);
- return new GetStreamResult(null, -1, error);
- }
- finally
- {
- stopwatch.Stop();
- }
- }
+ Command command = Cli.Wrap(execPath)
+ .WithArguments(arguments)
+ .WithStandardOutputPipe(PipeTarget.ToStream(pipe.Writer.AsStream(), autoFlush: true))
+ .WithStandardErrorPipe(PipeTarget.ToFile(_stderrFilePath))
+ .WithValidation(CommandResultValidation.None);
- private void CleanupOldLogs(string directoryPath, int maxLogsToKeep)
- {
- try
- {
- if (!Directory.Exists(directoryPath))
- {
- return;
- }
+ _commandTask = command.ExecuteAsync(internalToken);
- List logFiles = [.. new DirectoryInfo(directoryPath)
- .GetFiles("stderr_*.log")
- .OrderByDescending(f => f.CreationTime)];
+ _processId = _commandTask.ProcessId;
+ _logger.LogInformation("Command (PID: {ProcessId}) started successfully in {ElapsedMilliseconds} ms. Arguments: {Arguments}",
+ _processId, stopwatch.ElapsedMilliseconds, arguments);
- if (logFiles.Count <= maxLogsToKeep)
+ _ = HandleCommandCompletionAsync(_commandTask, pipe.Writer, stopwatch, commandProfile, _processId, _stderrFilePath, internalToken);
+
+ return new GetStreamResult(pipe.Reader.AsStream(), _processId, null);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ stopwatch.Stop();
+ _logger.LogWarning("Command start explicitly cancelled before process execution.");
+ pipe.Writer.Complete(new OperationCanceledException("Command start cancelled."));
+ return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.OperationCancelled, Message = "Operation was cancelled before start" });
+ }
+ catch (Exception ex)
{
- return; // Nothing to clean up
+ stopwatch.Stop();
+ _logger.LogError(ex, "Error starting command \"{Command}\": {ErrorMessage}", commandProfile.Command, ex.Message);
+ pipe.Writer.Complete(ex);
+ TryDeleteFile(_stderrFilePath);
+ return new GetStreamResult(null, -1, new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.UnknownError, Message = $"Failed to start command: {ex.Message}" });
}
+ }
- foreach (FileInfo? file in logFiles.Skip(maxLogsToKeep))
+ private async Task HandleCommandCompletionAsync(
+ CommandTask commandTask,
+ PipeWriter pipeWriter,
+ Stopwatch stopwatch,
+ CommandProfileDto commandProfile,
+ int processId,
+ string? stderrFilePath,
+ CancellationToken cancellationToken)
+ {
+ ProxyStreamError? error = null;
+ Exception? completionException = null;
+ try
{
- try
+ CommandResult result = await commandTask;
+ stopwatch.Stop();
+
+ if (result.ExitCode == 0)
{
- file.Delete();
+ _logger.LogInformation(
+ "Command {Command} (PID: {ProcessId}) completed successfully after {Duration}. Exit code: {ExitCode}",
+ commandProfile.Command, processId, stopwatch.Elapsed, result.ExitCode);
}
- catch (Exception ex)
+ else
{
- logger.LogWarning(ex, "Failed to delete old log file: {FileName}", file.FullName);
+ _logger.LogWarning(
+ "Command {Command} (PID: {ProcessId}) exited after {Duration} with non-zero exit code: {ExitCode}. Check stderr log: {StderrLogPath}",
+ commandProfile.Command, processId, stopwatch.Elapsed, result.ExitCode, stderrFilePath);
+ error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.IoError, Message = $"Process exited with code {result.ExitCode}" };
+ completionException = new InvalidOperationException($"Process exited with code {result.ExitCode}. See log '{stderrFilePath}' for details.");
}
}
+ catch (OperationCanceledException ex) when (cancellationToken.IsCancellationRequested)
+ {
+ stopwatch.Stop();
+ _logger.LogInformation(
+ "Command {Command} (PID: {ProcessId}) was cancelled after {Duration}.",
+ commandProfile.Command, processId, stopwatch.Elapsed);
+ error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.OperationCancelled, Message = "Command execution was cancelled." };
+ completionException = ex;
+ }
+ catch (CommandExecutionException ex)
+ {
+ stopwatch.Stop();
+ _logger.LogError(ex,
+ "Command {Command} (PID: {ProcessId}) failed execution after {Duration}. Exit code: {ExitCode}. Check stderr log: {StderrLogPath}. Error: {ErrorMessage}",
+ commandProfile.Command, processId, stopwatch.Elapsed, ex.ExitCode, stderrFilePath, ex.Message);
+ error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.IoError, Message = $"Command execution failed: {ex.Message}" };
+ completionException = ex;
+ }
+ catch (Exception ex)
+ {
+ stopwatch.Stop();
+ _logger.LogError(ex,
+ "Command {Command} (PID: {ProcessId}) encountered an unexpected error after {Duration}. Check stderr log: {StderrLogPath}",
+ commandProfile.Command, processId, stopwatch.Elapsed, stderrFilePath);
+ error = new ProxyStreamError { ErrorCode = ProxyStreamErrorCode.UnknownError, Message = $"Unexpected error: {ex.Message}" };
+ completionException = ex;
+ }
+ finally
+ {
+ await pipeWriter.CompleteAsync(completionException);
+ _logger.LogDebug("PipeWriter completed for PID {ProcessId}.", processId);
+ }
}
- catch (Exception ex)
- {
- logger.LogError(ex, "Error cleaning up old logs in directory: {Directory}", directoryPath);
- }
- }
- private void Process_Exited(object? sender, EventArgs e)
- {
- if (_process != null)
+ private void CleanupOldLogs(string directoryPath, int maxLogsToKeep)
{
try
{
- _process.WaitForExit(); // Ensure process completes before disposing resources
- _process.CancelErrorRead();
+ if (!Directory.Exists(directoryPath) || maxLogsToKeep <= 0)
+ {
+ return;
+ }
+
+ var logFiles = new DirectoryInfo(directoryPath)
+ .GetFiles("stderr_*.log")
+ .OrderByDescending(f => f.LastWriteTime)
+ .Skip(maxLogsToKeep)
+ .ToList();
+
+ if (!logFiles.Any())
+ {
+ return;
+ }
+
+ _logger.LogDebug("Cleaning up {Count} old log files from {Directory}...", logFiles.Count, directoryPath);
+ foreach (FileInfo file in logFiles)
+ {
+ TryDeleteFile(file.FullName);
+ }
}
catch (Exception ex)
{
- logger.LogWarning(ex, "Error waiting for process to exit.");
+ _logger.LogWarning(ex, "Error during old log cleanup in directory: {Directory}", directoryPath);
}
}
- if (errorWriter != null)
+ private void TryDeleteFile(string? filePath)
{
+ if (string.IsNullOrEmpty(filePath)) return;
try
{
- errorWriter.Dispose();
+ if (File.Exists(filePath))
+ {
+ File.Delete(filePath);
+ _logger.LogTrace("Deleted file: {FilePath}", filePath);
+ }
}
catch (Exception ex)
{
- logger.LogWarning(ex, "Error disposing error writer.");
+ _logger.LogWarning(ex, "Failed to delete file: {FilePath}", filePath);
}
}
- try
+ private static string BuildCommand(string command, string clientUserAgent, string streamUrl, int? secondsIn)
{
- _process?.Dispose();
+ string s = secondsIn.HasValue ? $"-ss {secondsIn} " : "";
+ command = command.Replace("{clientUserAgent}", '"' + clientUserAgent + '"')
+ .Replace("{streamUrl}", '"' + streamUrl + '"');
+ if (secondsIn.HasValue)
+ {
+ int index = command.IndexOf("-i ");
+ if (index >= 0)
+ {
+ command = command.Insert(index, s);
+ }
+ }
+ return command;
}
- catch (Exception ex)
+
+ ///
+ /// Disposes the command executor, attempts to cancel the running process, and cleans up resources.
+ ///
+ public void Dispose()
{
- logger.LogWarning(ex, "Error disposing process.");
+ Dispose(true);
+ GC.SuppressFinalize(this);
}
- }
-
- private static string BuildCommand(string command, string clientUserAgent, string streamUrl, int? secondsIn)
- {
- string s = secondsIn.HasValue ? $"-ss {secondsIn} " : "";
-
- command = command.Replace("{clientUserAgent}", '"' + clientUserAgent + '"')
- .Replace("{streamUrl}", '"' + streamUrl + '"');
- if (secondsIn.HasValue)
+ protected virtual void Dispose(bool disposing)
{
- int index = command.IndexOf("-i ");
- if (index >= 0)
+ if (_disposed)
{
- command = command.Insert(index, s);
+ return;
}
- }
- return command;
- }
+ if (disposing)
+ {
+ _logger.LogDebug("Disposing CommandExecutor for PID {ProcessId}.", _processId);
- private static void ConfigureProcess(Process process, string commandExec, string formattedArgs)
- {
- process.StartInfo.FileName = commandExec;
- process.StartInfo.Arguments = formattedArgs;
- process.StartInfo.CreateNoWindow = true;
- process.StartInfo.UseShellExecute = false;
- process.StartInfo.RedirectStandardOutput = true;
- process.StartInfo.RedirectStandardError = true;
- process.StartInfo.StandardOutputEncoding = Encoding.UTF8;
- }
+ if (_processExitCts != null && !_processExitCts.IsCancellationRequested)
+ {
+ _logger.LogInformation("Requesting cancellation for command (PID: {ProcessId}) via Dispose.", _processId);
+ try
+ {
+ _processExitCts.Cancel();
+ }
+ catch (ObjectDisposedException) { }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "Error cancelling CancellationTokenSource during Dispose for PID {ProcessId}.", _processId);
+ }
+ }
- ///
- /// Disposes the process and cleans up resources.
- ///
- public void Dispose()
- {
- if (_disposed)
- {
- return;
- }
+ _processExitCts?.Dispose();
+ _processExitCts = null;
- if (_process != null)
- {
- try
- {
- if (!_process.HasExited)
+ if (_processId > 0)
{
- _process.Kill();
+ try
+ {
+ Process? process = Process.GetProcessById(_processId);
+ if (!process.HasExited)
+ {
+ _logger.LogWarning("Process (PID: {ProcessId}) still running after cancellation signal. Forcing kill.", _processId);
+ process.Kill(entireProcessTree: true);
+ _logger.LogInformation("Force killed process (PID: {ProcessId}).", _processId);
+ }
+ process.Dispose();
+ }
+ catch (ArgumentException)
+ {
+ _logger.LogDebug("Process (PID: {ProcessId}) not found or already exited during Dispose.", _processId);
+ }
+ catch (InvalidOperationException)
+ {
+ _logger.LogDebug("Process (PID: {ProcessId}) exited before force kill attempt.", _processId);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "Error during final process cleanup for PID {ProcessId} in Dispose.", _processId);
+ }
}
- _process.Dispose();
- }
- catch (Exception ex)
- {
- logger.LogError(ex, "Error disposing process.");
+
+ _logger.LogDebug("Finished disposing CommandExecutor for PID {ProcessId}.", _processId);
}
+
+ _disposed = true;
}
- _disposed = true;
- GC.SuppressFinalize(this);
+ ~CommandExecutor()
+ {
+ Dispose(false);
+ }
}
-}
+}
\ No newline at end of file
diff --git a/src/StreamMaster.Streams/Factories/ProcessStreamWrapper.cs b/src/StreamMaster.Streams/Factories/ProcessStreamWrapper.cs
new file mode 100644
index 000000000..aa54f38d8
--- /dev/null
+++ b/src/StreamMaster.Streams/Factories/ProcessStreamWrapper.cs
@@ -0,0 +1,127 @@
+using System.Diagnostics;
+
+namespace StreamMaster.Streams.Factories;
+
+public class ProcessStreamWrapper : Stream
+{
+ private readonly Stream _baseStream;
+ private readonly Process _process;
+ private readonly Action _terminateAction;
+ private readonly ILogger _logger;
+ private bool _disposed = false;
+
+ public ProcessStreamWrapper(Stream baseStream, Process process, Action terminateAction, ILogger logger)
+ {
+ _baseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream));
+ _process = process ?? throw new ArgumentNullException(nameof(process));
+ _terminateAction = terminateAction ?? throw new ArgumentNullException(nameof(terminateAction));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ }
+
+ public override bool CanRead => _baseStream.CanRead;
+ public override bool CanSeek => _baseStream.CanSeek;
+ public override bool CanWrite => _baseStream.CanWrite;
+ public override long Length => _baseStream.Length;
+
+ public override long Position
+ {
+ get => _baseStream.Position;
+ set => _baseStream.Position = value;
+ }
+
+ public override void Flush() => _baseStream.Flush();
+
+ public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count);
+
+ public override long Seek(long offset, SeekOrigin origin) => _baseStream.Seek(offset, origin);
+
+ public override void SetLength(long value) => _baseStream.SetLength(value);
+
+ public override void Write(byte[] buffer, int offset, int count) => _baseStream.Write(buffer, offset, count);
+
+ public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
+ _baseStream.ReadAsync(buffer, offset, count, cancellationToken);
+
+ public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) =>
+ await _baseStream.ReadAsync(buffer, cancellationToken);
+
+ public override async ValueTask DisposeAsync()
+ {
+ if (_disposed) return;
+
+ _logger.LogDebug("DisposeAsync called for ProcessStreamWrapper for process {ProcessId}", _process.Id);
+
+ try
+ {
+ if (!_process.HasExited)
+ {
+ _logger.LogDebug("Process {ProcessId} has not exited, initiating termination.", _process.Id);
+ _terminateAction(_process);
+ await _process.WaitForExitAsync();
+ }
+ else
+ {
+ _logger.LogDebug("Process {ProcessId} already exited.", _process.Id);
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error during process termination in ProcessStreamWrapper for process {ProcessId}", _process.Id);
+ }
+
+ await _baseStream.DisposeAsync();
+
+ try
+ {
+ _process.Dispose();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "Error disposing process object {ProcessId} in wrapper.", _process.Id);
+ }
+
+ _disposed = true;
+ GC.SuppressFinalize(this);
+
+ await base.DisposeAsync();
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (_disposed) return;
+
+ if (disposing)
+ {
+ _logger.LogDebug("Dispose(true) called for ProcessStreamWrapper for process {ProcessId}", _process?.Id ?? -1);
+ try
+ {
+ if (_process != null && !_process.HasExited)
+ {
+ _logger.LogDebug("Process {ProcessId} has not exited, initiating termination (sync).", _process.Id);
+ _terminateAction(_process);
+ }
+ else if (_process != null)
+ {
+ _logger.LogDebug("Process {ProcessId} already exited (sync).", _process.Id);
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error during process termination in ProcessStreamWrapper.Dispose for process {ProcessId}", _process?.Id ?? -1);
+ }
+
+ _baseStream?.Dispose();
+ try
+ {
+ _process?.Dispose();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "Error disposing process object {ProcessId} in wrapper (sync).", _process?.Id ?? -1);
+ }
+ }
+
+ _disposed = true;
+ base.Dispose(disposing);
+ }
+}
\ No newline at end of file
diff --git a/src/StreamMaster.Streams/StreamMaster.Streams.csproj b/src/StreamMaster.Streams/StreamMaster.Streams.csproj
index fad049d7f..057d7ea57 100644
--- a/src/StreamMaster.Streams/StreamMaster.Streams.csproj
+++ b/src/StreamMaster.Streams/StreamMaster.Streams.csproj
@@ -6,6 +6,8 @@
+
+
@@ -14,7 +16,7 @@
-
+
diff --git a/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts b/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts
index 0753467bc..4b58f7af7 100644
--- a/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts
+++ b/src/StreamMaster.WebUI/lib/smAPI/smapiTypes.ts
@@ -756,30 +756,6 @@ export interface MoveToNextStreamRequest
{
SMChannelId: number;
}
-export interface GetStreamGroupSMChannelsRequest
-{
- StreamGroupId: number;
-}
-export interface AddSMChannelsToStreamGroupByParametersRequest
-{
- Parameters: QueryStringParameters;
- StreamGroupId: number;
-}
-export interface AddSMChannelsToStreamGroupRequest
-{
- SMChannelIds: number[];
- StreamGroupId: number;
-}
-export interface AddSMChannelToStreamGroupRequest
-{
- SMChannelId: number;
- StreamGroupId: number;
-}
-export interface RemoveSMChannelFromStreamGroupRequest
-{
- SMChannelId: number;
- StreamGroupId: number;
-}
export interface SGFS
{
Name: string;
@@ -845,6 +821,30 @@ export interface UpdateStreamGroupRequest
NewName?: string;
StreamGroupId: number;
}
+export interface GetStreamGroupSMChannelsRequest
+{
+ StreamGroupId: number;
+}
+export interface AddSMChannelsToStreamGroupByParametersRequest
+{
+ Parameters: QueryStringParameters;
+ StreamGroupId: number;
+}
+export interface AddSMChannelsToStreamGroupRequest
+{
+ SMChannelIds: number[];
+ StreamGroupId: number;
+}
+export interface AddSMChannelToStreamGroupRequest
+{
+ SMChannelId: number;
+ StreamGroupId: number;
+}
+export interface RemoveSMChannelFromStreamGroupRequest
+{
+ SMChannelId: number;
+ StreamGroupId: number;
+}
export interface GetChannelMetricsRequest
{
}
@@ -956,25 +956,6 @@ export interface SendSuccessRequest
Detail: string;
Summary: string;
}
-export interface GetSMChannelStreamsRequest
-{
- SMChannelId: number;
-}
-export interface AddSMStreamToSMChannelRequest
-{
- Rank?: number;
- SMChannelId: number;
- SMStreamId: string;
-}
-export interface RemoveSMStreamFromSMChannelRequest
-{
- SMChannelId: number;
- SMStreamId: string;
-}
-export interface SetSMStreamRanksRequest
-{
- Requests: SMChannelStreamRankRequest[];
-}
export interface GetPagedSMChannelsRequest
{
Parameters: QueryStringParameters;
@@ -1158,6 +1139,25 @@ export interface UpdateSMChannelRequest
TimeShift?: number;
VideoStreamHandler?: VideoStreamHandlers;
}
+export interface GetSMChannelStreamsRequest
+{
+ SMChannelId: number;
+}
+export interface AddSMStreamToSMChannelRequest
+{
+ Rank?: number;
+ SMChannelId: number;
+ SMStreamId: string;
+}
+export interface RemoveSMStreamFromSMChannelRequest
+{
+ SMChannelId: number;
+ SMStreamId: string;
+}
+export interface SetSMStreamRanksRequest
+{
+ Requests: SMChannelStreamRankRequest[];
+}
export interface GetSMChannelChannelsRequest
{
SMChannelId: number;
@@ -1455,6 +1455,12 @@ export interface SetTestTaskRequest
{
DelayInSeconds: number;
}
+export interface GetEPGColorsRequest
+{
+}
+export interface EPGSyncRequest
+{
+}
export interface GetEPGFileNamesRequest
{
}
@@ -1515,12 +1521,6 @@ export interface UpdateEPGFileRequest
TimeShift?: number;
Url?: string;
}
-export interface GetEPGColorsRequest
-{
-}
-export interface EPGSyncRequest
-{
-}
export interface GetCustomPlayListRequest
{
SMStreamId?: string;
diff --git a/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj b/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj
index abec59742..9399636c4 100644
--- a/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj
+++ b/src/tests/StreamMaster.Application.UnitTests/StreamMaster.Application.UnitTests.csproj
@@ -13,6 +13,7 @@
+
diff --git a/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj b/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj
index 1374b3eb6..e032c6df6 100644
--- a/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj
+++ b/src/tests/StreamMaster.Infrastructure.UnitTests/StreamMaster.Infrastructure.UnitTests.csproj
@@ -12,6 +12,7 @@
+
diff --git a/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj b/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj
index f48ba63e0..58c66b924 100644
--- a/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj
+++ b/src/tests/StreamMaster.Streams.UnitTests/StreamMaster.Streams.UnitTests.csproj
@@ -12,6 +12,7 @@
+