From a47b42ca425b60d86705c42edabebcab1aa7eb2f Mon Sep 17 00:00:00 2001 From: Jose Luis Guerra Infante Date: Sat, 4 Apr 2026 19:33:43 +0200 Subject: [PATCH 1/7] feat: add Dataflow vs NetTaskManagement benchmarks and optimize task cancellation logic - Introduced `DataflowComparisonBenchmarks`: - Compares NetTaskManagement and TPL Dataflow for starting, canceling, and processing N workers/items. - Helps developers choose between tools based on specific use cases. - Added details and execution instructions to `README.md`. - Enhanced `CancelAllTasks()`: - Introduced `CancelAllTasksParallelThreshold` for selective parallelization based on task count. - Optimized logic for sequential vs parallel task cancellation to minimize overhead. - Improved handling for completed tasks and cancellation token sources. - Updated CI benchmark workflow to include the new comparison benchmarks. - Extended documentation with detailed benchmark descriptions and updated example output. Signed-off-by: Jose Luis Guerra Infante --- README.md | 37 ++- .../DataflowComparisonBenchmarks.cs | 275 ++++++++++++++++++ .../README.md | 20 +- .../TasksManagement.cs | 105 +++++-- 4 files changed, 411 insertions(+), 26 deletions(-) create mode 100644 benchmarks/NetFramework.Tasks.Management.Benchmarks/DataflowComparisonBenchmarks.cs diff --git a/README.md b/README.md index 42a662c..d90047f 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,25 @@ A .NET Standard 2.0 library for Task Parallel Library (TPL) orchestration with D Tasks are identified by **string key** and managed through a **status-based API** — no raw `Task` objects exposed to callers. The library handles the full task lifecycle: register → start → cancel → check completion → delete, and exposes an append-only observability queue that captures disposal metadata for external monitoring. +## NetTaskManagement vs TPL Dataflow — which one do you need? + +These are complementary tools, not competing ones. The benchmarks below are meant to help you choose. + +| | **NetTaskManagement** | **TPL Dataflow** | +|---|---|---| +| Task identity | Named string key — look up, cancel, or delete any task by name at any time | Anonymous — items are messages, no individual handles | +| Lifecycle control | Explicit Register → Start → Cancel → CheckCompleted → Delete with status at every step | Complete the block and await `Completion`; no per-item lifecycle | +| DI integration | `AddTaskManagement()` registers `ITaskManagement` as a scoped service | Manual construction | +| Cancellation granularity | Per-task: cancel one, a filtered subset, or all — with per-task failure reporting | Shared token: cancel the whole block at once | +| Throughput focus | Long-running named workers (background jobs, daemons, named pipelines) | High-volume anonymous item streams (fan-out, fan-in, transform chains) | +| Observability | Built-in disposal queue — capture task name, id, and final status when a task is removed | No built-in disposal queue | +| Memory reclaim | `DeleteTask` forces `GC.Collect` after disposal — guarantees memory is freed | GC-managed by the runtime | + +**Choose NetTaskManagement when** your tasks have identity, need individual control, and live for seconds to hours.
+**Choose TPL Dataflow when** you are processing a high-volume stream of anonymous items through a pipeline graph. + +**[View live benchmark charts — NTM vs Dataflow](https://ryujose.github.io/NetTaskManagement/benchmarks/results)** + ## Packages | Package | NuGet | @@ -111,14 +130,22 @@ Performance is tracked automatically on every push to `main` and published as in **[View benchmark charts](https://ryujose.github.io/NetTaskManagement/benchmarks/results)** -Benchmarks cover: -- `LifecycleBenchmarks` — individual lifecycle stages: Register, Start, Cancel, Delete, and full end-to-end -- `GetTasksStatusBenchmarks` — dictionary snapshot cost at 1, 10, and 50 tasks -- `CancelAllTasksBenchmarks` — `Parallel.ForEach` cancellation fan-out at 1, 10, and 50 tasks +| Benchmark class | What it measures | +|---|---| +| `LifecycleBenchmarks` | Each lifecycle stage in isolation: Register, Start, Cancel, Delete, and full end-to-end | +| `GetTasksStatusBenchmarks` | Dictionary snapshot cost at 1, 10, and 50 tasks | +| `CancelAllTasksBenchmarks` | `Parallel.ForEach` cancellation fan-out at 1, 10, and 50 tasks | +| `DataflowComparisonBenchmarks` | Head-to-head vs TPL Dataflow: start N workers, cancel N workers, process N items — at N = 10, 50, 100 | All benchmarks run on **net8.0**, **net9.0**, and **net10.0** with `[MemoryDiagnoser]` enabled (reports allocated bytes per operation). -To run locally (Release mode required): +To run the comparison benchmarks locally (Release mode required): + +```bash +dotnet run -c Release -f net9.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter '*DataflowComparison*' +``` + +To run everything: ```bash dotnet run -c Release -f net8.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter '*' diff --git a/benchmarks/NetFramework.Tasks.Management.Benchmarks/DataflowComparisonBenchmarks.cs b/benchmarks/NetFramework.Tasks.Management.Benchmarks/DataflowComparisonBenchmarks.cs new file mode 100644 index 0000000..6d0f953 --- /dev/null +++ b/benchmarks/NetFramework.Tasks.Management.Benchmarks/DataflowComparisonBenchmarks.cs @@ -0,0 +1,275 @@ +using BenchmarkDotNet.Attributes; +using Microsoft.Extensions.Logging.Abstractions; +using NetFramework.Tasks.Management.Abstractions.Enums; +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace NetFramework.Tasks.Management.Benchmarks +{ + /// + /// Head-to-head: NetTaskManagement vs TPL Dataflow for concurrent-worker scenarios. + /// + /// Purpose: help developers choose the right tool, not declare a winner. + /// + /// NetTaskManagement is the right choice when you need: + /// – Named task handles (look up / cancel / delete by string key) + /// – Status-based lifecycle API (no raw Task objects exposed to callers) + /// – DI-friendly registration and an observability queue for disposed tasks + /// – Explicit per-task cancel + wait + delete with memory reclaim guarantees + /// + /// TPL Dataflow is the right choice when you need: + /// – High-throughput anonymous item processing (pipeline graphs, fan-out/fan-in) + /// – Built-in backpressure (BoundedCapacity) and block linking + /// – Minimal per-item overhead for high-volume streams + /// – Single shared cancellation token across all workers + /// + /// Overlap measured here: starting N concurrent workers, cancelling them in bulk, + /// and processing N short-lived work items end-to-end. + /// + /// [InvocationCount(1)] is required for all benchmarks — every method mutates + /// shared state (ConcurrentDictionary, ActionBlock, CancellationTokenSource). + /// + [MemoryDiagnoser] + [InvocationCount(1)] + public class DataflowComparisonBenchmarks + { + private static readonly Action SpinUntilCancelled = state => + { + var cts = (CancellationTokenSource)state; + while (!cts.IsCancellationRequested) + Thread.SpinWait(1); + }; + + [Params(10, 50, 100)] + public int N { get; set; } + + // ── NTM state ───────────────────────────────────────────────────────── + private TasksManagement _tasks = null!; + private int _counter; + private string[] _taskNames = null!; + private CancellationTokenSource[] _ctsList = null!; + + // ── Dataflow state ──────────────────────────────────────────────────── + private ActionBlock _block = null!; + private CancellationTokenSource _blockCts = null!; + + [GlobalSetup] + public void GlobalSetup() + => _tasks = new TasksManagement(NullLogger.Instance); + + [GlobalCleanup] + public void GlobalCleanup() + => _tasks.ClearConcurrentLists(); + + // ───────────────────────────────────────────────────────────────────── + // Benchmark 1 — Start N concurrent workers + // + // NTM: RegisterTask × N + StartTask × N + // Each worker gets a unique string key, a CancellationTokenSource, + // and a slot in the ConcurrentDictionary. + // + // Dataflow: new ActionBlock(MaxDegreeOfParallelism = N) + Post × N + // Workers are anonymous — no per-worker handle or key. + // + // Trade-off: NTM gives you addressable handles at the cost of per-task + // dictionary overhead; Dataflow is leaner but you lose individual control. + // ───────────────────────────────────────────────────────────────────── + + [IterationSetup(Targets = new[] { nameof(NTM_StartNWorkers) })] + public void Setup_NTM_StartNWorkers() + { + _ctsList = new CancellationTokenSource[N]; + _taskNames = new string[N]; + for (int i = 0; i < N; i++) + { + _ctsList[i] = new CancellationTokenSource(); + _taskNames[i] = $"worker-{Interlocked.Increment(ref _counter)}"; + } + } + + [Benchmark] + public void NTM_StartNWorkers() + { + for (int i = 0; i < N; i++) + { + _tasks.RegisterTask(_taskNames[i], SpinUntilCancelled, _ctsList[i]); + _tasks.StartTask(_taskNames[i]); + } + } + + [IterationCleanup(Targets = new[] { nameof(NTM_StartNWorkers) })] + public void Cleanup_NTM_StartNWorkers() + => _tasks.ClearConcurrentLists(); + + // ── + + [IterationSetup(Targets = new[] { nameof(Dataflow_StartNWorkers) })] + public void Setup_Dataflow_StartNWorkers() + => _blockCts = new CancellationTokenSource(); + + [Benchmark] + public void Dataflow_StartNWorkers() + { + _block = new ActionBlock(_ => + { + while (!_blockCts.IsCancellationRequested) + Thread.SpinWait(1); + }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = N }); + + for (int i = 0; i < N; i++) + _block.Post(i); + } + + [IterationCleanup(Targets = new[] { nameof(Dataflow_StartNWorkers) })] + public void Cleanup_Dataflow_StartNWorkers() + { + _blockCts.Cancel(); + _block.Complete(); + _block.Completion.Wait(); + _blockCts.Dispose(); + } + + // ───────────────────────────────────────────────────────────────────── + // Benchmark 2 — Cancel N running workers + // + // NTM: CancelAllTasks() — Parallel.ForEach across the dictionary, + // calling CancellationTokenSource.Cancel() on each task individually. + // Returns a status dict of any that could not be cancelled. + // + // Dataflow: CancellationTokenSource.Cancel() — one call signals all N workers + // via the shared token passed to ExecutionDataflowBlockOptions. + // + // Trade-off: NTM's per-task cancel lets you exempt specific tasks and collect + // per-task failure status; Dataflow's shared-token cancel is O(1) + // but all-or-nothing. + // + // Note: Thread.Sleep in Dataflow setup gives workers time to enter their + // spin loops before measurement — without it some items may not have + // been dequeued yet, understating the cancellation cost. + // ───────────────────────────────────────────────────────────────────── + + [IterationSetup(Targets = new[] { nameof(NTM_CancelNWorkers) })] + public void Setup_NTM_CancelNWorkers() + { + _tasks.ClearConcurrentLists(); + for (int i = 0; i < N; i++) + { + var cts = new CancellationTokenSource(); + var name = $"worker-{Interlocked.Increment(ref _counter)}"; + _tasks.RegisterTask(name, SpinUntilCancelled, cts); + _tasks.StartTask(name); + } + // Give thread-pool workers time to enter their spin loops. + Thread.Sleep(50); + } + + [Benchmark] + public TaskManagementStatus NTM_CancelNWorkers() + { + var failed = new ConcurrentDictionary(); + return _tasks.CancelAllTasks(null, ref failed); + } + + [IterationCleanup(Targets = new[] { nameof(NTM_CancelNWorkers) })] + public void Cleanup_NTM_CancelNWorkers() + => _tasks.ClearConcurrentLists(); + + // ── + + [IterationSetup(Targets = new[] { nameof(Dataflow_CancelNWorkers) })] + public void Setup_Dataflow_CancelNWorkers() + { + _blockCts = new CancellationTokenSource(); + _block = new ActionBlock(_ => + { + while (!_blockCts.IsCancellationRequested) + Thread.SpinWait(1); + }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = N }); + + for (int i = 0; i < N; i++) + _block.Post(i); + + // Give thread-pool workers time to enter their spin loops. + Thread.Sleep(50); + } + + [Benchmark] + public void Dataflow_CancelNWorkers() + => _blockCts.Cancel(); + + [IterationCleanup(Targets = new[] { nameof(Dataflow_CancelNWorkers) })] + public void Cleanup_Dataflow_CancelNWorkers() + { + _block.Complete(); + _block.Completion.Wait(); + _blockCts.Dispose(); + } + + // ───────────────────────────────────────────────────────────────────── + // Benchmark 3 — Process N short-lived work items end-to-end + // + // NTM: Tasks are pre-registered in IterationSetup. + // Measured: StartTask × N → CheckTaskStatusCompleted × N + // → DeleteTask × N (includes GC.Collect per delete) + // This is the correct NTM idiom for item-scoped work; it exposes + // the per-task lifecycle cost (dictionary TryRemove + GC). + // + // Dataflow: Measured: new ActionBlock + Post × N + Complete + Wait + // No per-item lifecycle cost — items are dispatched and forgotten. + // + // Trade-off: NTM carries O(N) lifecycle overhead (GC.Collect × N) in exchange + // for named handles and status tracking per item. Dataflow has + // near-zero overhead per item but you get no individual item status. + // ───────────────────────────────────────────────────────────────────── + + [IterationSetup(Targets = new[] { nameof(NTM_ProcessNItems) })] + public void Setup_NTM_ProcessNItems() + { + _tasks.ClearConcurrentLists(); + _ctsList = new CancellationTokenSource[N]; + _taskNames = new string[N]; + for (int i = 0; i < N; i++) + { + _ctsList[i] = new CancellationTokenSource(); + _taskNames[i] = $"item-{Interlocked.Increment(ref _counter)}"; + _tasks.RegisterTask(_taskNames[i], _ => { /* trivial work unit */ }, _ctsList[i]); + } + } + + [Benchmark] + public void NTM_ProcessNItems() + { + for (int i = 0; i < N; i++) + _tasks.StartTask(_taskNames[i]); + + for (int i = 0; i < N; i++) + _tasks.CheckTaskStatusCompleted(_taskNames[i], retry: 3, millisecondsCancellationWait: 5000); + + for (int i = 0; i < N; i++) + _tasks.DeleteTask(_taskNames[i]); + } + + [IterationCleanup(Targets = new[] { nameof(NTM_ProcessNItems) })] + public void Cleanup_NTM_ProcessNItems() + => _tasks.ClearConcurrentLists(); + + // ── + + [Benchmark] + public void Dataflow_ProcessNItems() + { + var block = new ActionBlock( + _ => { /* trivial work unit */ }, + new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = N }); + + for (int i = 0; i < N; i++) + block.Post(i); + + block.Complete(); + block.Completion.Wait(); + } + } +} diff --git a/benchmarks/NetFramework.Tasks.Management.Benchmarks/README.md b/benchmarks/NetFramework.Tasks.Management.Benchmarks/README.md index 8e179c9..d90d0b3 100644 --- a/benchmarks/NetFramework.Tasks.Management.Benchmarks/README.md +++ b/benchmarks/NetFramework.Tasks.Management.Benchmarks/README.md @@ -32,6 +32,22 @@ Measures the `Parallel.ForEach` fan-out inside `CancelAllTasks` as the number of |---|---| | `TaskCount` | 1, 10, 50 | +### `DataflowComparisonBenchmarks` + +Head-to-head comparison of NetTaskManagement vs TPL Dataflow for the scenarios where both libraries can solve the same problem. The goal is to help developers choose the right tool, not declare a winner — each library has distinct strengths. + +| Benchmark pair | What is measured | +|---|---| +| `NTM_StartNWorkers` / `Dataflow_StartNWorkers` | Cost of getting N concurrent workers running. NTM: `RegisterTask × N` + `StartTask × N` (ConcurrentDictionary inserts + `Task.Start`). Dataflow: `new ActionBlock` + `Post × N`. | +| `NTM_CancelNWorkers` / `Dataflow_CancelNWorkers` | Cancel N running workers. NTM: `CancelAllTasks()` uses `Parallel.ForEach` and returns per-task status. Dataflow: single `CancellationTokenSource.Cancel()` shared across all workers. | +| `NTM_ProcessNItems` / `Dataflow_ProcessNItems` | Process N short-lived items end-to-end. NTM: `StartTask × N` → `CheckTaskStatusCompleted × N` → `DeleteTask × N` (includes `GC.Collect` per delete). Dataflow: `Post × N` → `Complete` → `Wait`. | + +`[InvocationCount(1)]` applies to all pairs because every method mutates shared state. A `Thread.Sleep(50)` in the cancellation setup ensures workers are actively spinning before the timed cancellation call — without it, some items may not have been dequeued yet and would understate the true cancellation cost. + +| Parameter | Values | +|---|---| +| `N` | 10, 50, 100 | + ## Running > Benchmarks must be run in **Release** configuration. Debug builds produce meaningless results. @@ -55,6 +71,7 @@ dotnet run -c Release -f net10.0 --project benchmarks/NetFramework.Tasks.Managem dotnet run -c Release -f net8.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter *Lifecycle* dotnet run -c Release -f net8.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter *GetTasksStatus* dotnet run -c Release -f net8.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter *CancelAll* +dotnet run -c Release -f net8.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter *DataflowComparison* ``` ### All benchmarks @@ -80,7 +97,8 @@ BenchmarkDotNet.Artifacts/ └── results/ ├── NetFramework.Tasks.Management.Benchmarks.LifecycleBenchmarks-report.md ├── NetFramework.Tasks.Management.Benchmarks.GetTasksStatusBenchmarks-report.md - └── NetFramework.Tasks.Management.Benchmarks.CancelAllTasksBenchmarks-report.md + ├── NetFramework.Tasks.Management.Benchmarks.CancelAllTasksBenchmarks-report.md + └── NetFramework.Tasks.Management.Benchmarks.DataflowComparisonBenchmarks-report.md ``` `[MemoryDiagnoser]` is enabled on all classes so each report includes **allocated bytes per operation** in addition to execution time. diff --git a/src/NetFramework.Task.Management/TasksManagement.cs b/src/NetFramework.Task.Management/TasksManagement.cs index 7c47326..5f4f9da 100644 --- a/src/NetFramework.Task.Management/TasksManagement.cs +++ b/src/NetFramework.Task.Management/TasksManagement.cs @@ -20,6 +20,12 @@ public class TasksManagement : ITaskManagement private static ConcurrentDictionary TasksDataModel = new ConcurrentDictionary(); private static ConcurrentQueue TaskDisposedDataModel = new ConcurrentQueue(); + // CancellationTokenSource.Cancel() is an atomic flag-set (~100 ns). + // Parallel.ForEach thread-pool setup costs ~50-80 µs regardless of N, + // so sequential foreach is faster below this threshold. + // Above 500 tasks the parallel fan-out amortises the setup cost. + private const int CancelAllTasksParallelThreshold = 500; + private readonly ILogger _logger; public TasksManagement(ILogger logger) { @@ -204,6 +210,15 @@ public TaskManagementStatus CheckTaskStatusCompleted(string taskName, int retry return TaskManagementStatus.TaskNotFound; } + // Fast path: task already completed — skip Task.Run allocation and polling loop. + // Task.IsCompleted is thread-safe and terminal: once true it never goes false. + if (taskDataModel.Task.IsCompleted) + { + _logger.LogInformation($"{nameof(taskName)}-{taskName} {nameof(TaskManagementStatus.Completed)}"); + + return TaskManagementStatus.Completed; + } + var cancellationTokenSource = new CancellationTokenSource(millisecondsCancellationWait); int retryCount = 0; @@ -352,18 +367,11 @@ public TaskManagementStatus DequeueTaskDisposedDataModel(out TaskDisposedDataMod return TaskManagementStatus.ObjectInfoDequeued; } - public TaskManagementStatus CancelAllTasks (IList except, ref ConcurrentDictionary tasksCancelPetitionFailedRef) + public TaskManagementStatus CancelAllTasks(IList except, ref ConcurrentDictionary tasksCancelPetitionFailedRef) { try { - IEnumerable> tasksData = null; - - if (except != null) - tasksData = TasksDataModel.Where(a => !except.Any(b => a.Key == b)).Select(c => c); - else - tasksData = TasksDataModel.Select(c => c); - - if (!tasksData.Any()) + if (TasksDataModel.IsEmpty) { _logger.LogWarning($"{nameof(TaskManagementStatus.TasksNotFoundToBeCancelled)}"); @@ -372,26 +380,83 @@ public TaskManagementStatus CancelAllTasks (IList except, ref Concurrent var tasksNotCancelled = new ConcurrentDictionary(); - Parallel.ForEach(tasksData, (taskData) => + // Below the threshold, Parallel.ForEach thread-pool setup (~50-80 µs) costs + // more than calling Cancel() sequentially — Cancel() is an atomic flag-set (~100 ns). + // Above the threshold, parallel fan-out amortises the setup cost across many tasks. + if (TasksDataModel.Count <= CancelAllTasksParallelThreshold) { - if (taskData.Value.CancellationTokenSource == null) + bool anyTaskFound = false; + + foreach (var taskData in TasksDataModel) { - if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.CancellationTokenSourceNotFound)) - _logger.LogWarning($"{nameof(CancelAllTasks)}-{nameof(taskData.Key)} doesn't have a cancellation token source"); + if (except != null && except.Contains(taskData.Key)) + continue; + + anyTaskFound = true; + + if (taskData.Value.CancellationTokenSource == null) + { + if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.CancellationTokenSourceNotFound)) + _logger.LogWarning($"{nameof(CancelAllTasks)}-{taskData.Key} doesn't have a cancellation token source"); + + continue; + } + + if (taskData.Value.Task.IsCompleted) + { + if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.Completed)) + _logger.LogWarning($"{nameof(CancelAllTasks)}-{taskData.Key} task already completed"); + + continue; + } - return; + taskData.Value.CancellationTokenSource.Cancel(); } - if (taskData.Value.Task.IsCompleted) + if (!anyTaskFound) { - if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.Completed)) - _logger.LogWarning($"{nameof(CancelAllTasks)}-{nameof(taskData.Key)} task already completed"); + _logger.LogWarning($"{nameof(TaskManagementStatus.TasksNotFoundToBeCancelled)}"); - return; + return TaskManagementStatus.TasksNotFoundToBeCancelled; } + } + else + { + int anyTaskFound = 0; + + Parallel.ForEach(TasksDataModel, (taskData) => + { + if (except != null && except.Contains(taskData.Key)) + return; + + Interlocked.Exchange(ref anyTaskFound, 1); + + if (taskData.Value.CancellationTokenSource == null) + { + if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.CancellationTokenSourceNotFound)) + _logger.LogWarning($"{nameof(CancelAllTasks)}-{taskData.Key} doesn't have a cancellation token source"); + + return; + } + + if (taskData.Value.Task.IsCompleted) + { + if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.Completed)) + _logger.LogWarning($"{nameof(CancelAllTasks)}-{taskData.Key} task already completed"); + + return; + } - taskData.Value.CancellationTokenSource.Cancel(); - }); + taskData.Value.CancellationTokenSource.Cancel(); + }); + + if (anyTaskFound == 0) + { + _logger.LogWarning($"{nameof(TaskManagementStatus.TasksNotFoundToBeCancelled)}"); + + return TaskManagementStatus.TasksNotFoundToBeCancelled; + } + } tasksCancelPetitionFailedRef = tasksNotCancelled; From 3b17d23522a5ea24c6e0135496c7d411018f4918 Mon Sep 17 00:00:00 2001 From: Jose Luis Guerra Infante Date: Sat, 4 Apr 2026 19:46:20 +0200 Subject: [PATCH 2/7] feat: add unit tests for completed tasks and task cancellation edge cases - Introduced `AlreadyCompletedTaskCheckTaskStatusCompleted_CompletedStatus` to validate handling of completed tasks in `CheckTaskStatusCompleted()`. - Added `AllTasksInExceptListCancelAllTasks_TasksNotFoundToBeCancelledStatus` test to confirm behavior when all tasks are excluded from cancellation. - Implemented `AlreadyCompletedTaskCancelAllTasks_AllTasksCancelPetitionAcceptedStatus` to verify cancellation behavior for already completed tasks. Signed-off-by: Jose Luis Guerra Infante --- .../TaskManagementCancelAllTasksTest.cs | 59 +++++++++++++++++++ ...kManagementCheckTaskStatusCompletedTest.cs | 26 ++++++++ 2 files changed, 85 insertions(+) diff --git a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs index b20eabb..f578485 100644 --- a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs +++ b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs @@ -113,5 +113,64 @@ public void TaskNotAcceptedCancellationTokenSourceNotFoundExceptedTasktaskManage Assert.False(isCancelPetitionFailed); taskManagement.ClearConcurrentLists(); } + + [Fact] + public void AllTasksInExceptListCancelAllTasks_TasksNotFoundToBeCancelledStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + const string simpleTaskName = "tasktest1"; + const string simpleTaskNameTwo = "tasktest2"; + + var cancellationTokenSource = new CancellationTokenSource(); + TaskManagementStatus taskManagementStatus = taskManagement.RegisterTask(simpleTaskName, new ActionsUtilitiesTests().ActionObjectCancellationTokenSource(), cancellationTokenSource); + + var cancellationTokenSource2 = new CancellationTokenSource(); + taskManagementStatus = taskManagement.RegisterTask(simpleTaskNameTwo, new ActionsUtilitiesTests().ActionObjectCancellationTokenSource(), cancellationTokenSource2); + taskManagementStatus = taskManagement.StartTask(simpleTaskName); + taskManagementStatus = taskManagement.StartTask(simpleTaskNameTwo); + + // All registered tasks are in the except list — sequential path finds no eligible + // task to cancel and should return TasksNotFoundToBeCancelled. + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + taskManagementStatus = taskManagement.CancelAllTasks(new List { simpleTaskName, simpleTaskNameTwo }, ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.TasksNotFoundToBeCancelled, taskManagementStatus); + Assert.Empty(tasksCancelPetitionFailedRef); + taskManagement.ClearConcurrentLists(); + } + + [Fact] + public void AlreadyCompletedTaskCancelAllTasks_AllTasksCancelPetitionAcceptedStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + const string simpleTaskName = "tasktest1"; + + // Register a task with a trivially-completing action so it finishes on its own. + var cancellationTokenSource = new CancellationTokenSource(); + TaskManagementStatus taskManagementStatus = taskManagement.RegisterTask(simpleTaskName, _ => { }, cancellationTokenSource); + taskManagementStatus = taskManagement.StartTask(simpleTaskName); + + // Wait until the task reaches RanToCompletion before calling CancelAllTasks — + // this exercises the IsCompleted branch in the sequential path, which records + // the task as Completed in the failure dict. The final check treats Completed + // entries as non-failures, so the method returns AllTasksCancelPetitionAccepted. + var spinWait = new SpinWait(); + var statuses = taskManagement.GetTasksStatus(); + while (!statuses[simpleTaskName].Equals(System.Threading.Tasks.TaskStatus.RanToCompletion)) + { + spinWait.SpinOnce(); + statuses = taskManagement.GetTasksStatus(); + } + + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + taskManagementStatus = taskManagement.CancelAllTasks(null, ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.AllTasksCancelPetitionAccepted, taskManagementStatus); + taskManagement.ClearConcurrentLists(); + } } } diff --git a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCheckTaskStatusCompletedTest.cs b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCheckTaskStatusCompletedTest.cs index d7a8a9f..e22f559 100644 --- a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCheckTaskStatusCompletedTest.cs +++ b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCheckTaskStatusCompletedTest.cs @@ -79,5 +79,31 @@ public void CompletedWithoutCancelsCheckTaskStatusCompleted_TaskManagementRegist Assert.Equal(TaskManagementStatus.NotCompleted, taskManagementStatus); } + + [Fact] + public void AlreadyCompletedTaskCheckTaskStatusCompleted_CompletedStatus() + { + const string simpleTaskName = "tasktest"; + + // Register a task with a trivially-completing action (no spin loop). + var cancellationTokenSource = new CancellationTokenSource(); + TaskManagementStatus taskManagementStatus = _taskManagement.RegisterTask(simpleTaskName, _ => { }, cancellationTokenSource); + taskManagementStatus = _taskManagement.StartTask(simpleTaskName); + + // Wait until the task has actually reached a completed state before calling + // CheckTaskStatusCompleted — this exercises the fast-path branch that returns + // immediately when Task.IsCompleted is true, skipping the Task.Run polling loop. + var spinWait = new SpinWait(); + var statuses = _taskManagement.GetTasksStatus(); + while (!statuses[simpleTaskName].Equals(System.Threading.Tasks.TaskStatus.RanToCompletion)) + { + spinWait.SpinOnce(); + statuses = _taskManagement.GetTasksStatus(); + } + + taskManagementStatus = _taskManagement.CheckTaskStatusCompleted(simpleTaskName, retry: 1, millisecondsCancellationWait: 1000); + + Assert.Equal(TaskManagementStatus.Completed, taskManagementStatus); + } } } From bc9a6704ff35acbc463693482cf485ce3feadddf Mon Sep 17 00:00:00 2001 From: Jose Luis Guerra Infante Date: Sat, 4 Apr 2026 19:56:15 +0200 Subject: [PATCH 3/7] feat: add internal visibility for parallel cancellation testing and new unit test - Exposed `CancelAllTasksParallelThreshold` as `internal` to facilitate testing of parallel cancellation logic. - Added `ParallelPathCancelAllTasks_AllTasksCancelPetitionAcceptedStatus` unit test to validate parallel task cancellation behavior with a reduced threshold. - Modified project file to include `InternalsVisibleTo` for test assembly access. Signed-off-by: Jose Luis Guerra Infante --- .../NetFramework.Tasks.Management.csproj | 6 ++++ .../TasksManagement.cs | 4 ++- .../TaskManagementCancelAllTasksTest.cs | 34 +++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/NetFramework.Task.Management/NetFramework.Tasks.Management.csproj b/src/NetFramework.Task.Management/NetFramework.Tasks.Management.csproj index 2d0b060..85d3e62 100644 --- a/src/NetFramework.Task.Management/NetFramework.Tasks.Management.csproj +++ b/src/NetFramework.Task.Management/NetFramework.Tasks.Management.csproj @@ -30,4 +30,10 @@ + + + <_Parameter1>NetFramework.Tasks.Management.xUnit.Tests + + + diff --git a/src/NetFramework.Task.Management/TasksManagement.cs b/src/NetFramework.Task.Management/TasksManagement.cs index 5f4f9da..84b66cc 100644 --- a/src/NetFramework.Task.Management/TasksManagement.cs +++ b/src/NetFramework.Task.Management/TasksManagement.cs @@ -24,7 +24,9 @@ public class TasksManagement : ITaskManagement // Parallel.ForEach thread-pool setup costs ~50-80 µs regardless of N, // so sequential foreach is faster below this threshold. // Above 500 tasks the parallel fan-out amortises the setup cost. - private const int CancelAllTasksParallelThreshold = 500; + // internal (not private const) so the test assembly can lower it to exercise + // the parallel path without registering hundreds of real tasks. + internal static int CancelAllTasksParallelThreshold = 500; private readonly ILogger _logger; public TasksManagement(ILogger logger) diff --git a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs index f578485..8f4352a 100644 --- a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs +++ b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs @@ -141,6 +141,40 @@ public void AllTasksInExceptListCancelAllTasks_TasksNotFoundToBeCancelledStatus( taskManagement.ClearConcurrentLists(); } + [Fact] + public void ParallelPathCancelAllTasks_AllTasksCancelPetitionAcceptedStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + // Lower the threshold so 6 tasks trigger the Parallel.ForEach path without + // the cost of registering hundreds of real tasks. Restore in finally so a + // test failure cannot pollute subsequent tests in the collection. + int originalThreshold = TasksManagement.CancelAllTasksParallelThreshold; + try + { + TasksManagement.CancelAllTasksParallelThreshold = 5; + + for (int i = 1; i <= 6; i++) + { + var cts = new CancellationTokenSource(); + taskManagement.RegisterTask($"parallel-task-{i}", new ActionsUtilitiesTests().ActionObjectCancellationTokenSource(), cts); + taskManagement.StartTask($"parallel-task-{i}"); + } + + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + TaskManagementStatus taskManagementStatus = taskManagement.CancelAllTasks(null, ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.AllTasksCancelPetitionAccepted, taskManagementStatus); + Assert.Empty(tasksCancelPetitionFailedRef); + } + finally + { + TasksManagement.CancelAllTasksParallelThreshold = originalThreshold; + taskManagement.ClearConcurrentLists(); + } + } + [Fact] public void AlreadyCompletedTaskCancelAllTasks_AllTasksCancelPetitionAcceptedStatus() { From 95207edffde8d7982b2fbb6d0d063ba41b5023cc Mon Sep 17 00:00:00 2001 From: Jose Luis Guerra Infante Date: Sat, 4 Apr 2026 20:06:17 +0200 Subject: [PATCH 4/7] feat: add unit tests for additional edge cases in `CancelAllTasks()` - Added three new unit tests to validate behavior under diverse scenarios: - `ParallelPathWithExceptListCancelAllTasks_AllTasksCancelPetitionAcceptedStatus`: Verifies cancellation of tasks excluding specific exceptions. - `ParallelPathAlreadyCompletedTaskCancelAllTasks_AllTasksCancelPetitionAcceptedStatus`: Confirms behavior when all tasks are already completed. - `ParallelPathAllTasksExceptedCancelAllTasks_TasksNotFoundToBeCancelledStatus`: Ensures correct status when all tasks are excluded from cancellation. - Refactored `CancelAllTasks()` to remove redundant checks for `CancellationTokenSource`, improving code readability and maintainability. Signed-off-by: Jose Luis Guerra Infante --- .../TasksManagement.cs | 16 --- .../TaskManagementCancelAllTasksTest.cs | 120 ++++++++++++++++++ 2 files changed, 120 insertions(+), 16 deletions(-) diff --git a/src/NetFramework.Task.Management/TasksManagement.cs b/src/NetFramework.Task.Management/TasksManagement.cs index 84b66cc..408f1b2 100644 --- a/src/NetFramework.Task.Management/TasksManagement.cs +++ b/src/NetFramework.Task.Management/TasksManagement.cs @@ -396,14 +396,6 @@ public TaskManagementStatus CancelAllTasks(IList except, ref ConcurrentD anyTaskFound = true; - if (taskData.Value.CancellationTokenSource == null) - { - if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.CancellationTokenSourceNotFound)) - _logger.LogWarning($"{nameof(CancelAllTasks)}-{taskData.Key} doesn't have a cancellation token source"); - - continue; - } - if (taskData.Value.Task.IsCompleted) { if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.Completed)) @@ -433,14 +425,6 @@ public TaskManagementStatus CancelAllTasks(IList except, ref ConcurrentD Interlocked.Exchange(ref anyTaskFound, 1); - if (taskData.Value.CancellationTokenSource == null) - { - if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.CancellationTokenSourceNotFound)) - _logger.LogWarning($"{nameof(CancelAllTasks)}-{taskData.Key} doesn't have a cancellation token source"); - - return; - } - if (taskData.Value.Task.IsCompleted) { if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.Completed)) diff --git a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs index 8f4352a..3f1f75f 100644 --- a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs +++ b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs @@ -175,6 +175,126 @@ public void ParallelPathCancelAllTasks_AllTasksCancelPetitionAcceptedStatus() } } + [Fact] + public void ParallelPathWithExceptListCancelAllTasks_AllTasksCancelPetitionAcceptedStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + int originalThreshold = TasksManagement.CancelAllTasksParallelThreshold; + try + { + TasksManagement.CancelAllTasksParallelThreshold = 5; + + var ctsList = new System.Collections.Generic.List(); + for (int i = 1; i <= 6; i++) + { + var cts = new CancellationTokenSource(); + ctsList.Add(cts); + taskManagement.RegisterTask($"parallel-except-task-{i}", new ActionsUtilitiesTests().ActionObjectCancellationTokenSource(), cts); + taskManagement.StartTask($"parallel-except-task-{i}"); + } + + // Except the first two — the parallel path must skip them and cancel the rest. + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + TaskManagementStatus taskManagementStatus = taskManagement.CancelAllTasks( + new List { "parallel-except-task-1", "parallel-except-task-2" }, + ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.AllTasksCancelPetitionAccepted, taskManagementStatus); + } + finally + { + TasksManagement.CancelAllTasksParallelThreshold = originalThreshold; + taskManagement.ClearConcurrentLists(); + } + } + + [Fact] + public void ParallelPathAlreadyCompletedTaskCancelAllTasks_AllTasksCancelPetitionAcceptedStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + int originalThreshold = TasksManagement.CancelAllTasksParallelThreshold; + try + { + TasksManagement.CancelAllTasksParallelThreshold = 5; + + for (int i = 1; i <= 6; i++) + { + var cts = new CancellationTokenSource(); + taskManagement.RegisterTask($"parallel-completed-task-{i}", _ => { }, cts); + taskManagement.StartTask($"parallel-completed-task-{i}"); + } + + // Wait for all trivially-completing tasks to reach RanToCompletion so the + // parallel path hits the IsCompleted branch for every entry. + var spinWait = new SpinWait(); + bool allDone = false; + while (!allDone) + { + allDone = true; + var statuses = taskManagement.GetTasksStatus(); + foreach (var s in statuses.Values) + { + if (!s.Equals(System.Threading.Tasks.TaskStatus.RanToCompletion)) + { + allDone = false; + break; + } + } + if (!allDone) spinWait.SpinOnce(); + } + + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + TaskManagementStatus taskManagementStatus = taskManagement.CancelAllTasks(null, ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.AllTasksCancelPetitionAccepted, taskManagementStatus); + } + finally + { + TasksManagement.CancelAllTasksParallelThreshold = originalThreshold; + taskManagement.ClearConcurrentLists(); + } + } + + [Fact] + public void ParallelPathAllTasksExceptedCancelAllTasks_TasksNotFoundToBeCancelledStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + int originalThreshold = TasksManagement.CancelAllTasksParallelThreshold; + try + { + TasksManagement.CancelAllTasksParallelThreshold = 5; + + var taskNames = new List(); + for (int i = 1; i <= 6; i++) + { + var name = $"parallel-allexcepted-task-{i}"; + taskNames.Add(name); + var cts = new CancellationTokenSource(); + taskManagement.RegisterTask(name, new ActionsUtilitiesTests().ActionObjectCancellationTokenSource(), cts); + taskManagement.StartTask(name); + } + + // All tasks are excepted — parallel path sets anyTaskFound to 0 + // and must return TasksNotFoundToBeCancelled. + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + TaskManagementStatus taskManagementStatus = taskManagement.CancelAllTasks(taskNames, ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.TasksNotFoundToBeCancelled, taskManagementStatus); + Assert.Empty(tasksCancelPetitionFailedRef); + } + finally + { + TasksManagement.CancelAllTasksParallelThreshold = originalThreshold; + taskManagement.ClearConcurrentLists(); + } + } + [Fact] public void AlreadyCompletedTaskCancelAllTasks_AllTasksCancelPetitionAcceptedStatus() { From 51dc4c1826e92d63c63b9540fdff44411ac5092e Mon Sep 17 00:00:00 2001 From: Jose Luis Guerra Infante Date: Sat, 4 Apr 2026 23:44:08 +0200 Subject: [PATCH 5/7] feat: expand Dataflow vs NetTaskManagement benchmarks with detailed scenarios and threading model distinctions - Enhanced `DataflowComparisonBenchmarks` with new benchmarking scenarios: - Added separate benchmarks for `LongRunning` and `Pool` task creation options. - Introduced `ParallelBlockingWork` benchmark to measure throughput for blocking workloads. - Implemented `CancelWaitDeleteAll()` to ensure proper teardown of tasks in benchmarks. - Detailed threading model trade-offs for different task behaviors (e.g., high-throughput anonymous items vs long-running dedicated workers). - Improved clarity with updated benchmark naming and comments: - Split benchmarks to explicitly track `LongRunning` vs `Pool` configurations. - Expanded documentation of benchmarks' purpose, implementation, and tradeoffs. - Updated iteration setup and cleanup logic to support task type distinctions. - Ensured accurate benchmarking by accounting for thread-pool injection delays in `ParallelBlockingWork`. Signed-off-by: Jose Luis Guerra Infante --- .../DataflowComparisonBenchmarks.cs | 306 +++++++++++++++--- 1 file changed, 266 insertions(+), 40 deletions(-) diff --git a/benchmarks/NetFramework.Tasks.Management.Benchmarks/DataflowComparisonBenchmarks.cs b/benchmarks/NetFramework.Tasks.Management.Benchmarks/DataflowComparisonBenchmarks.cs index 6d0f953..cbf9eac 100644 --- a/benchmarks/NetFramework.Tasks.Management.Benchmarks/DataflowComparisonBenchmarks.cs +++ b/benchmarks/NetFramework.Tasks.Management.Benchmarks/DataflowComparisonBenchmarks.cs @@ -3,6 +3,7 @@ using NetFramework.Tasks.Management.Abstractions.Enums; using System; using System.Collections.Concurrent; +using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; @@ -14,8 +15,18 @@ namespace NetFramework.Tasks.Management.Benchmarks /// /// Purpose: help developers choose the right tool, not declare a winner. /// + /// Threading model — this is the core distinction: + /// NTM uses TaskCreationOptions.LongRunning throughout, which instructs the runtime + /// to create a dedicated OS thread per task rather than borrowing one from the pool. + /// This is the intended usage: named, long-running workers (daemons, background jobs, + /// pipeline stages) that must not starve the thread pool used by the rest of the app. + /// + /// Dataflow's ActionBlock uses thread-pool threads. That is the correct model for + /// high-throughput anonymous item processing where each work unit is short-lived. + /// /// NetTaskManagement is the right choice when you need: /// – Named task handles (look up / cancel / delete by string key) + /// – Dedicated OS thread per worker — zero pool pressure for long-running work /// – Status-based lifecycle API (no raw Task objects exposed to callers) /// – DI-friendly registration and an observability queue for disposed tasks /// – Explicit per-task cancel + wait + delete with memory reclaim guarantees @@ -29,6 +40,13 @@ namespace NetFramework.Tasks.Management.Benchmarks /// Overlap measured here: starting N concurrent workers, cancelling them in bulk, /// and processing N short-lived work items end-to-end. /// + /// Each scenario has two NTM variants: + /// _LongRunning — TaskCreationOptions.LongRunning — dedicated OS thread per task. + /// Right choice for daemons, pipeline stages, long-duration workers. + /// _Pool — TaskCreationOptions.None — thread-pool thread per task. + /// Right choice when tasks complete quickly and pool availability + /// is not a concern; avoids the OS thread spawn cost of LongRunning. + /// /// [InvocationCount(1)] is required for all benchmarks — every method mutates /// shared state (ConcurrentDictionary, ActionBlock, CancellationTokenSource). /// @@ -46,6 +64,13 @@ public class DataflowComparisonBenchmarks [Params(10, 50, 100)] public int N { get; set; } + /// + /// Duration of blocking work per task in Benchmark 4. + /// Short enough to keep benchmark runs fast; long enough that the thread-pool + /// injection interval (~500 ms) cannot hide the batching cost when N > min-threads. + /// + private const int WorkMs = 20; + // ── NTM state ───────────────────────────────────────────────────────── private TasksManagement _tasks = null!; private int _counter; @@ -62,23 +87,58 @@ public void GlobalSetup() [GlobalCleanup] public void GlobalCleanup() - => _tasks.ClearConcurrentLists(); + => CancelWaitDeleteAll(); + + /// + /// Properly tears down all running NTM workers: + /// 1. Signal every CTS via CancelAllTasks (idempotent if already cancelled). + /// 2. Wait for each dedicated OS thread to actually exit via CheckTaskStatusCompleted. + /// 3. Delete each task (releases dict entry, disposes CTS, reclaims memory). + /// 4. ClearConcurrentLists as a safety sweep. + /// + /// Without the wait step, ClearConcurrentLists returns while threads are still + /// spinning, and each subsequent iteration spawns more — leading to hundreds of + /// unbound OS threads that saturate all CPUs. + /// + private void CancelWaitDeleteAll() + { + var failed = new ConcurrentDictionary(); + _tasks.CancelAllTasks(null, ref failed); + + var taskNames = _tasks.GetTasksStatus().Keys.ToArray(); + foreach (var name in taskNames) + { + _tasks.CheckTaskStatusCompleted(name, retry: 10, millisecondsCancellationWait: 10_000); + _tasks.DeleteTask(name); + } + + _tasks.ClearConcurrentLists(); + } // ───────────────────────────────────────────────────────────────────── // Benchmark 1 — Start N concurrent workers // - // NTM: RegisterTask × N + StartTask × N - // Each worker gets a unique string key, a CancellationTokenSource, - // and a slot in the ConcurrentDictionary. + // NTM_LongRunning: RegisterTask(LongRunning) × N + StartTask × N + // Each worker gets its own OS thread via TaskCreationOptions.LongRunning. + // Thread spawn is expensive (~1 ms/thread on Windows) but the thread + // is dedicated — zero pool pressure for the lifetime of the worker. + // Right choice for daemons and pipeline stages that run for minutes/hours. + // + // NTM_Pool: RegisterTask(None) × N + StartTask × N + // Workers borrow thread-pool threads. Startup is ~10× faster than + // LongRunning because no new OS thread is created. Pool threads are + // shared, so long-running spin loops will starve other pool work — + // use only when tasks are genuinely short-lived or pool depth is ample. // // Dataflow: new ActionBlock(MaxDegreeOfParallelism = N) + Post × N - // Workers are anonymous — no per-worker handle or key. + // Workers are anonymous pool threads — no per-worker handle or key. // - // Trade-off: NTM gives you addressable handles at the cost of per-task - // dictionary overhead; Dataflow is leaner but you lose individual control. + // Trade-off: LongRunning → isolated, addressable, expensive to start. + // Pool → shared, cheaper to start, pool-pressure risk. + // Dataflow → pooled, anonymous, cheapest for high-volume streams. // ───────────────────────────────────────────────────────────────────── - [IterationSetup(Targets = new[] { nameof(NTM_StartNWorkers) })] + [IterationSetup(Targets = new[] { nameof(NTM_StartNWorkers_LongRunning), nameof(NTM_StartNWorkers_Pool) })] public void Setup_NTM_StartNWorkers() { _ctsList = new CancellationTokenSource[N]; @@ -91,18 +151,28 @@ public void Setup_NTM_StartNWorkers() } [Benchmark] - public void NTM_StartNWorkers() + public void NTM_StartNWorkers_LongRunning() + { + for (int i = 0; i < N; i++) + { + _tasks.RegisterTask(_taskNames[i], SpinUntilCancelled, _ctsList[i], TaskCreationOptions.LongRunning); + _tasks.StartTask(_taskNames[i]); + } + } + + [Benchmark] + public void NTM_StartNWorkers_Pool() { for (int i = 0; i < N; i++) { - _tasks.RegisterTask(_taskNames[i], SpinUntilCancelled, _ctsList[i]); + _tasks.RegisterTask(_taskNames[i], SpinUntilCancelled, _ctsList[i], TaskCreationOptions.None); _tasks.StartTask(_taskNames[i]); } } - [IterationCleanup(Targets = new[] { nameof(NTM_StartNWorkers) })] + [IterationCleanup(Targets = new[] { nameof(NTM_StartNWorkers_LongRunning), nameof(NTM_StartNWorkers_Pool) })] public void Cleanup_NTM_StartNWorkers() - => _tasks.ClearConcurrentLists(); + => CancelWaitDeleteAll(); // ── @@ -135,47 +205,76 @@ public void Cleanup_Dataflow_StartNWorkers() // ───────────────────────────────────────────────────────────────────── // Benchmark 2 — Cancel N running workers // - // NTM: CancelAllTasks() — Parallel.ForEach across the dictionary, - // calling CancellationTokenSource.Cancel() on each task individually. + // NTM: CancelAllTasks() — sequential foreach (N ≤ 500) across the + // dictionary, calling CancellationTokenSource.Cancel() per task. // Returns a status dict of any that could not be cancelled. + // Both LongRunning and Pool variants measure the same cancel path — + // the CTS signalling cost does not depend on thread origin. // // Dataflow: CancellationTokenSource.Cancel() — one call signals all N workers // via the shared token passed to ExecutionDataflowBlockOptions. + // Workers run on thread-pool threads. // // Trade-off: NTM's per-task cancel lets you exempt specific tasks and collect // per-task failure status; Dataflow's shared-token cancel is O(1) // but all-or-nothing. // - // Note: Thread.Sleep in Dataflow setup gives workers time to enter their - // spin loops before measurement — without it some items may not have - // been dequeued yet, understating the cancellation cost. + // Note: Thread.Sleep gives workers time to enter their spin loops before + // measurement. LongRunning threads are scheduled immediately by the OS, + // so the sleep is the same for both variants. // ───────────────────────────────────────────────────────────────────── - [IterationSetup(Targets = new[] { nameof(NTM_CancelNWorkers) })] - public void Setup_NTM_CancelNWorkers() + [IterationSetup(Targets = new[] { nameof(NTM_CancelNWorkers_LongRunning) })] + public void Setup_NTM_CancelNWorkers_LongRunning() { _tasks.ClearConcurrentLists(); for (int i = 0; i < N; i++) { var cts = new CancellationTokenSource(); var name = $"worker-{Interlocked.Increment(ref _counter)}"; - _tasks.RegisterTask(name, SpinUntilCancelled, cts); + _tasks.RegisterTask(name, SpinUntilCancelled, cts, TaskCreationOptions.LongRunning); + _tasks.StartTask(name); + } + Thread.Sleep(50); + } + + [Benchmark] + public TaskManagementStatus NTM_CancelNWorkers_LongRunning() + { + var failed = new ConcurrentDictionary(); + return _tasks.CancelAllTasks(null, ref failed); + } + + [IterationCleanup(Targets = new[] { nameof(NTM_CancelNWorkers_LongRunning) })] + public void Cleanup_NTM_CancelNWorkers_LongRunning() + => CancelWaitDeleteAll(); + + // ── + + [IterationSetup(Targets = new[] { nameof(NTM_CancelNWorkers_Pool) })] + public void Setup_NTM_CancelNWorkers_Pool() + { + _tasks.ClearConcurrentLists(); + for (int i = 0; i < N; i++) + { + var cts = new CancellationTokenSource(); + var name = $"worker-{Interlocked.Increment(ref _counter)}"; + _tasks.RegisterTask(name, SpinUntilCancelled, cts, TaskCreationOptions.None); _tasks.StartTask(name); } - // Give thread-pool workers time to enter their spin loops. Thread.Sleep(50); } [Benchmark] - public TaskManagementStatus NTM_CancelNWorkers() + public TaskManagementStatus NTM_CancelNWorkers_Pool() { var failed = new ConcurrentDictionary(); return _tasks.CancelAllTasks(null, ref failed); } - [IterationCleanup(Targets = new[] { nameof(NTM_CancelNWorkers) })] - public void Cleanup_NTM_CancelNWorkers() - => _tasks.ClearConcurrentLists(); + [IterationCleanup(Targets = new[] { nameof(NTM_CancelNWorkers_Pool) })] + public void Cleanup_NTM_CancelNWorkers_Pool() + => CancelWaitDeleteAll(); // ── @@ -211,21 +310,27 @@ public void Cleanup_Dataflow_CancelNWorkers() // ───────────────────────────────────────────────────────────────────── // Benchmark 3 — Process N short-lived work items end-to-end // - // NTM: Tasks are pre-registered in IterationSetup. - // Measured: StartTask × N → CheckTaskStatusCompleted × N - // → DeleteTask × N (includes GC.Collect per delete) - // This is the correct NTM idiom for item-scoped work; it exposes - // the per-task lifecycle cost (dictionary TryRemove + GC). + // NTM_LongRunning: Measured: StartTask × N → CheckTaskStatusCompleted × N + // → DeleteTask × N (includes GC.Collect per delete) + // Each item gets a dedicated OS thread. Expensive for trivial + // work — intentionally shows the cost of the wrong tool. // - // Dataflow: Measured: new ActionBlock + Post × N + Complete + Wait - // No per-item lifecycle cost — items are dispatched and forgotten. + // NTM_Pool: Same pipeline with TaskCreationOptions.None. + // Thread-pool reuse avoids the OS thread spawn overhead, so + // start is faster. Still pays GC.Collect per DeleteTask. + // Right choice when items complete quickly and pool depth is + // not under pressure from the rest of the application. // - // Trade-off: NTM carries O(N) lifecycle overhead (GC.Collect × N) in exchange - // for named handles and status tracking per item. Dataflow has - // near-zero overhead per item but you get no individual item status. + // Dataflow: new ActionBlock + Post × N + Complete + Wait + // Thread-pool threads, no per-item lifecycle cost — this IS + // its intended use case. Use NTM when you need named handles + // and per-task cancel/status; use Dataflow for anonymous items. + // + // Read the LongRunning gap as the cost of spawning OS threads for trivial work. + // Read the Pool gap as the NTM lifecycle overhead (dict, GC) vs raw Dataflow. // ───────────────────────────────────────────────────────────────────── - [IterationSetup(Targets = new[] { nameof(NTM_ProcessNItems) })] + [IterationSetup(Targets = new[] { nameof(NTM_ProcessNItems_LongRunning), nameof(NTM_ProcessNItems_Pool) })] public void Setup_NTM_ProcessNItems() { _tasks.ClearConcurrentLists(); @@ -235,13 +340,31 @@ public void Setup_NTM_ProcessNItems() { _ctsList[i] = new CancellationTokenSource(); _taskNames[i] = $"item-{Interlocked.Increment(ref _counter)}"; - _tasks.RegisterTask(_taskNames[i], _ => { /* trivial work unit */ }, _ctsList[i]); } } [Benchmark] - public void NTM_ProcessNItems() + public void NTM_ProcessNItems_LongRunning() + { + for (int i = 0; i < N; i++) + _tasks.RegisterTask(_taskNames[i], _ => { /* trivial work unit */ }, _ctsList[i], TaskCreationOptions.LongRunning); + + for (int i = 0; i < N; i++) + _tasks.StartTask(_taskNames[i]); + + for (int i = 0; i < N; i++) + _tasks.CheckTaskStatusCompleted(_taskNames[i], retry: 3, millisecondsCancellationWait: 5000); + + for (int i = 0; i < N; i++) + _tasks.DeleteTask(_taskNames[i]); + } + + [Benchmark] + public void NTM_ProcessNItems_Pool() { + for (int i = 0; i < N; i++) + _tasks.RegisterTask(_taskNames[i], _ => { /* trivial work unit */ }, _ctsList[i], TaskCreationOptions.None); + for (int i = 0; i < N; i++) _tasks.StartTask(_taskNames[i]); @@ -252,9 +375,9 @@ public void NTM_ProcessNItems() _tasks.DeleteTask(_taskNames[i]); } - [IterationCleanup(Targets = new[] { nameof(NTM_ProcessNItems) })] + [IterationCleanup(Targets = new[] { nameof(NTM_ProcessNItems_LongRunning), nameof(NTM_ProcessNItems_Pool) })] public void Cleanup_NTM_ProcessNItems() - => _tasks.ClearConcurrentLists(); + => CancelWaitDeleteAll(); // ── @@ -271,5 +394,108 @@ public void Dataflow_ProcessNItems() block.Complete(); block.Completion.Wait(); } + + // ───────────────────────────────────────────────────────────────────── + // Benchmark 4 — Parallel blocking-work throughput + // + // Each of N workers blocks for WorkMs milliseconds (simulating I/O, + // a database call, an HTTP request, etc.). The total wall-clock time + // is determined by how quickly all workers can start and run in parallel. + // + // NTM_LongRunning: N dedicated OS threads are created before the first + // worker starts. All N workers begin blocking simultaneously. + // Total elapsed ≈ WorkMs — no queuing, no injection delay. + // + // NTM_Pool / Dataflow_Unbounded: Workers run on thread-pool threads. + // The pool starts with min = Environment.ProcessorCount threads. + // When all pool threads are blocked, the pool injects one new thread + // roughly every 500 ms. For N >> ProcessorCount the work is batched: + // total ≈ ceil(N / minThreads) × WorkMs (ignoring injection delay) + // plus potential 500 ms stalls while the pool decides to inject. + // + // This is the definitive LongRunning use case: blocking workers that + // must all start within a latency budget, or workers that must not + // starve the pool that the rest of the application depends on. + // + // Note: tasks are pre-registered in IterationSetup so that thread + // creation (LongRunning) and pool dispatch (Pool) are NOT included + // in the measurement — only the actual parallel execution is timed. + // ───────────────────────────────────────────────────────────────────── + + [IterationSetup(Targets = new[] + { + nameof(NTM_ParallelBlockingWork_LongRunning), + nameof(NTM_ParallelBlockingWork_Pool) + })] + public void Setup_NTM_ParallelBlockingWork() + { + _tasks.ClearConcurrentLists(); + _ctsList = new CancellationTokenSource[N]; + _taskNames = new string[N]; + for (int i = 0; i < N; i++) + { + _ctsList[i] = new CancellationTokenSource(); + _taskNames[i] = $"blocking-{Interlocked.Increment(ref _counter)}"; + } + } + + [Benchmark] + public void NTM_ParallelBlockingWork_LongRunning() + { + for (int i = 0; i < N; i++) + _tasks.RegisterTask(_taskNames[i], _ => Thread.Sleep(WorkMs), _ctsList[i], TaskCreationOptions.LongRunning); + + for (int i = 0; i < N; i++) + _tasks.StartTask(_taskNames[i]); + + for (int i = 0; i < N; i++) + _tasks.CheckTaskStatusCompleted(_taskNames[i], retry: 5, millisecondsCancellationWait: 30_000); + + for (int i = 0; i < N; i++) + _tasks.DeleteTask(_taskNames[i]); + } + + [Benchmark] + public void NTM_ParallelBlockingWork_Pool() + { + for (int i = 0; i < N; i++) + _tasks.RegisterTask(_taskNames[i], _ => Thread.Sleep(WorkMs), _ctsList[i], TaskCreationOptions.None); + + for (int i = 0; i < N; i++) + _tasks.StartTask(_taskNames[i]); + + for (int i = 0; i < N; i++) + _tasks.CheckTaskStatusCompleted(_taskNames[i], retry: 5, millisecondsCancellationWait: 30_000); + + for (int i = 0; i < N; i++) + _tasks.DeleteTask(_taskNames[i]); + } + + [IterationCleanup(Targets = new[] + { + nameof(NTM_ParallelBlockingWork_LongRunning), + nameof(NTM_ParallelBlockingWork_Pool) + })] + public void Cleanup_NTM_ParallelBlockingWork() + => CancelWaitDeleteAll(); + + // ── + + [Benchmark] + public void Dataflow_ParallelBlockingWork() + { + // Unbounded degree — Dataflow imposes no artificial cap, so any gap + // between this and LongRunning is purely the thread-pool injection cost, + // not a Dataflow configuration choice. + var block = new ActionBlock( + _ => Thread.Sleep(WorkMs), + new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); + + for (int i = 0; i < N; i++) + block.Post(i); + + block.Complete(); + block.Completion.Wait(); + } } } From d148592b7ee06096b0ffae2429d780492c270763 Mon Sep 17 00:00:00 2001 From: Jose Luis Guerra Infante Date: Sat, 4 Apr 2026 23:49:12 +0200 Subject: [PATCH 6/7] feat: improve logging for task completion checks in `TasksManagement` - Updated `_logger.LogDebug` to use structured logging with placeholders, improving log clarity and parameterization. - Replaced interpolated string with structured log format for better log analysis and performance. Signed-off-by: Jose Luis Guerra Infante --- src/NetFramework.Task.Management/TasksManagement.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NetFramework.Task.Management/TasksManagement.cs b/src/NetFramework.Task.Management/TasksManagement.cs index 408f1b2..5272f08 100644 --- a/src/NetFramework.Task.Management/TasksManagement.cs +++ b/src/NetFramework.Task.Management/TasksManagement.cs @@ -240,7 +240,7 @@ public TaskManagementStatus CheckTaskStatusCompleted(string taskName, int retry break; } - _logger.LogDebug($"{nameof(taskName)}-{taskName} while {nameof(cancellationTokenSource.IsCancellationRequested)} checking if it´s already completed"); + _logger.LogDebug("{TaskName} while {Property} checking if it´s already completed", taskName, nameof(cancellationTokenSource.IsCancellationRequested)); } }, cancellationTokenSource.Token); From 5530afc88e042cde5694a1a8b968e989718b6c2e Mon Sep 17 00:00:00 2001 From: Jose Luis Guerra Infante Date: Sun, 5 Apr 2026 00:04:50 +0200 Subject: [PATCH 7/7] feat: update `README.md` with threading model benchmarks and usage guidance - Added detailed descriptions of `LongRunning` vs `None` threading models, including performance trade-offs. - Enhanced benchmark comparison for blocking workloads and thread-pool behavior. - Updated `DataflowComparisonBenchmarks` with expanded scenarios and threading distinctions. Signed-off-by: Jose Luis Guerra Infante --- README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d90047f..a23d06b 100644 --- a/README.md +++ b/README.md @@ -17,11 +17,14 @@ These are complementary tools, not competing ones. The benchmarks below are mean | Lifecycle control | Explicit Register → Start → Cancel → CheckCompleted → Delete with status at every step | Complete the block and await `Completion`; no per-item lifecycle | | DI integration | `AddTaskManagement()` registers `ITaskManagement` as a scoped service | Manual construction | | Cancellation granularity | Per-task: cancel one, a filtered subset, or all — with per-task failure reporting | Shared token: cancel the whole block at once | +| Threading model | `LongRunning`: dedicated OS thread per task — all N workers start simultaneously, no pool pressure. `None` (default): thread-pool thread, lower startup cost | Thread-pool threads — pool injection heuristic limits how many workers can start at once | +| Blocking-work throughput | With `LongRunning`: **~3× faster than Dataflow at N=100** for blocking tasks — all threads start and block in parallel, no injection delay | Pool injection batches workers when N > min-threads; total time scales with N / pool-size | | Throughput focus | Long-running named workers (background jobs, daemons, named pipelines) | High-volume anonymous item streams (fan-out, fan-in, transform chains) | | Observability | Built-in disposal queue — capture task name, id, and final status when a task is removed | No built-in disposal queue | | Memory reclaim | `DeleteTask` forces `GC.Collect` after disposal — guarantees memory is freed | GC-managed by the runtime | -**Choose NetTaskManagement when** your tasks have identity, need individual control, and live for seconds to hours.
+**Choose NetTaskManagement (`LongRunning`) when** your workers block (I/O, sleep, external calls) and N exceeds the thread-pool minimum — all workers start simultaneously with no injection delay.
+**Choose NetTaskManagement (`None`) when** tasks are short-lived and pool availability is not a concern — lower startup cost with named handles and per-task lifecycle control.
**Choose TPL Dataflow when** you are processing a high-volume stream of anonymous items through a pipeline graph. **[View live benchmark charts — NTM vs Dataflow](https://ryujose.github.io/NetTaskManagement/benchmarks/results)** @@ -135,7 +138,7 @@ Performance is tracked automatically on every push to `main` and published as in | `LifecycleBenchmarks` | Each lifecycle stage in isolation: Register, Start, Cancel, Delete, and full end-to-end | | `GetTasksStatusBenchmarks` | Dictionary snapshot cost at 1, 10, and 50 tasks | | `CancelAllTasksBenchmarks` | `Parallel.ForEach` cancellation fan-out at 1, 10, and 50 tasks | -| `DataflowComparisonBenchmarks` | Head-to-head vs TPL Dataflow: start N workers, cancel N workers, process N items — at N = 10, 50, 100 | +| `DataflowComparisonBenchmarks` | Head-to-head vs TPL Dataflow across four scenarios at N = 10, 50, 100: start N workers, cancel N workers, process N short-lived items, and parallel blocking-work throughput. Each NTM scenario has two variants — `_LongRunning` (dedicated OS thread) and `_Pool` (thread-pool thread) — so you can pick the right threading model for your use case | All benchmarks run on **net8.0**, **net9.0**, and **net10.0** with `[MemoryDiagnoser]` enabled (reports allocated bytes per operation).