diff --git a/README.md b/README.md index 42a662c..d90047f 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,25 @@ A .NET Standard 2.0 library for Task Parallel Library (TPL) orchestration with D Tasks are identified by **string key** and managed through a **status-based API** — no raw `Task` objects exposed to callers. The library handles the full task lifecycle: register → start → cancel → check completion → delete, and exposes an append-only observability queue that captures disposal metadata for external monitoring. +## NetTaskManagement vs TPL Dataflow — which one do you need? + +These are complementary tools, not competing ones. The benchmarks below are meant to help you choose. + +| | **NetTaskManagement** | **TPL Dataflow** | +|---|---|---| +| Task identity | Named string key — look up, cancel, or delete any task by name at any time | Anonymous — items are messages, no individual handles | +| Lifecycle control | Explicit Register → Start → Cancel → CheckCompleted → Delete with status at every step | Complete the block and await `Completion`; no per-item lifecycle | +| DI integration | `AddTaskManagement()` registers `ITaskManagement` as a scoped service | Manual construction | +| Cancellation granularity | Per-task: cancel one, a filtered subset, or all — with per-task failure reporting | Shared token: cancel the whole block at once | +| Throughput focus | Long-running named workers (background jobs, daemons, named pipelines) | High-volume anonymous item streams (fan-out, fan-in, transform chains) | +| Observability | Built-in disposal queue — capture task name, id, and final status when a task is removed | No built-in disposal queue | +| Memory reclaim | `DeleteTask` forces `GC.Collect` after disposal — guarantees memory is freed | GC-managed by the runtime | + +**Choose NetTaskManagement when** your tasks have identity, need individual control, and live for seconds to hours.
+**Choose TPL Dataflow when** you are processing a high-volume stream of anonymous items through a pipeline graph. + +**[View live benchmark charts — NTM vs Dataflow](https://ryujose.github.io/NetTaskManagement/benchmarks/results)** + ## Packages | Package | NuGet | @@ -111,14 +130,22 @@ Performance is tracked automatically on every push to `main` and published as in **[View benchmark charts](https://ryujose.github.io/NetTaskManagement/benchmarks/results)** -Benchmarks cover: -- `LifecycleBenchmarks` — individual lifecycle stages: Register, Start, Cancel, Delete, and full end-to-end -- `GetTasksStatusBenchmarks` — dictionary snapshot cost at 1, 10, and 50 tasks -- `CancelAllTasksBenchmarks` — `Parallel.ForEach` cancellation fan-out at 1, 10, and 50 tasks +| Benchmark class | What it measures | +|---|---| +| `LifecycleBenchmarks` | Each lifecycle stage in isolation: Register, Start, Cancel, Delete, and full end-to-end | +| `GetTasksStatusBenchmarks` | Dictionary snapshot cost at 1, 10, and 50 tasks | +| `CancelAllTasksBenchmarks` | `Parallel.ForEach` cancellation fan-out at 1, 10, and 50 tasks | +| `DataflowComparisonBenchmarks` | Head-to-head vs TPL Dataflow: start N workers, cancel N workers, process N items — at N = 10, 50, 100 | All benchmarks run on **net8.0**, **net9.0**, and **net10.0** with `[MemoryDiagnoser]` enabled (reports allocated bytes per operation). -To run locally (Release mode required): +To run the comparison benchmarks locally (Release mode required): + +```bash +dotnet run -c Release -f net9.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter '*DataflowComparison*' +``` + +To run everything: ```bash dotnet run -c Release -f net8.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter '*' diff --git a/benchmarks/NetFramework.Tasks.Management.Benchmarks/DataflowComparisonBenchmarks.cs b/benchmarks/NetFramework.Tasks.Management.Benchmarks/DataflowComparisonBenchmarks.cs new file mode 100644 index 0000000..cbf9eac --- /dev/null +++ b/benchmarks/NetFramework.Tasks.Management.Benchmarks/DataflowComparisonBenchmarks.cs @@ -0,0 +1,501 @@ +using BenchmarkDotNet.Attributes; +using Microsoft.Extensions.Logging.Abstractions; +using NetFramework.Tasks.Management.Abstractions.Enums; +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace NetFramework.Tasks.Management.Benchmarks +{ + /// + /// Head-to-head: NetTaskManagement vs TPL Dataflow for concurrent-worker scenarios. + /// + /// Purpose: help developers choose the right tool, not declare a winner. + /// + /// Threading model — this is the core distinction: + /// NTM uses TaskCreationOptions.LongRunning throughout, which instructs the runtime + /// to create a dedicated OS thread per task rather than borrowing one from the pool. + /// This is the intended usage: named, long-running workers (daemons, background jobs, + /// pipeline stages) that must not starve the thread pool used by the rest of the app. + /// + /// Dataflow's ActionBlock uses thread-pool threads. That is the correct model for + /// high-throughput anonymous item processing where each work unit is short-lived. + /// + /// NetTaskManagement is the right choice when you need: + /// – Named task handles (look up / cancel / delete by string key) + /// – Dedicated OS thread per worker — zero pool pressure for long-running work + /// – Status-based lifecycle API (no raw Task objects exposed to callers) + /// – DI-friendly registration and an observability queue for disposed tasks + /// – Explicit per-task cancel + wait + delete with memory reclaim guarantees + /// + /// TPL Dataflow is the right choice when you need: + /// – High-throughput anonymous item processing (pipeline graphs, fan-out/fan-in) + /// – Built-in backpressure (BoundedCapacity) and block linking + /// – Minimal per-item overhead for high-volume streams + /// – Single shared cancellation token across all workers + /// + /// Overlap measured here: starting N concurrent workers, cancelling them in bulk, + /// and processing N short-lived work items end-to-end. + /// + /// Each scenario has two NTM variants: + /// _LongRunning — TaskCreationOptions.LongRunning — dedicated OS thread per task. + /// Right choice for daemons, pipeline stages, long-duration workers. + /// _Pool — TaskCreationOptions.None — thread-pool thread per task. + /// Right choice when tasks complete quickly and pool availability + /// is not a concern; avoids the OS thread spawn cost of LongRunning. + /// + /// [InvocationCount(1)] is required for all benchmarks — every method mutates + /// shared state (ConcurrentDictionary, ActionBlock, CancellationTokenSource). + /// + [MemoryDiagnoser] + [InvocationCount(1)] + public class DataflowComparisonBenchmarks + { + private static readonly Action SpinUntilCancelled = state => + { + var cts = (CancellationTokenSource)state; + while (!cts.IsCancellationRequested) + Thread.SpinWait(1); + }; + + [Params(10, 50, 100)] + public int N { get; set; } + + /// + /// Duration of blocking work per task in Benchmark 4. + /// Short enough to keep benchmark runs fast; long enough that the thread-pool + /// injection interval (~500 ms) cannot hide the batching cost when N > min-threads. + /// + private const int WorkMs = 20; + + // ── NTM state ───────────────────────────────────────────────────────── + private TasksManagement _tasks = null!; + private int _counter; + private string[] _taskNames = null!; + private CancellationTokenSource[] _ctsList = null!; + + // ── Dataflow state ──────────────────────────────────────────────────── + private ActionBlock _block = null!; + private CancellationTokenSource _blockCts = null!; + + [GlobalSetup] + public void GlobalSetup() + => _tasks = new TasksManagement(NullLogger.Instance); + + [GlobalCleanup] + public void GlobalCleanup() + => CancelWaitDeleteAll(); + + /// + /// Properly tears down all running NTM workers: + /// 1. Signal every CTS via CancelAllTasks (idempotent if already cancelled). + /// 2. Wait for each dedicated OS thread to actually exit via CheckTaskStatusCompleted. + /// 3. Delete each task (releases dict entry, disposes CTS, reclaims memory). + /// 4. ClearConcurrentLists as a safety sweep. + /// + /// Without the wait step, ClearConcurrentLists returns while threads are still + /// spinning, and each subsequent iteration spawns more — leading to hundreds of + /// unbound OS threads that saturate all CPUs. + /// + private void CancelWaitDeleteAll() + { + var failed = new ConcurrentDictionary(); + _tasks.CancelAllTasks(null, ref failed); + + var taskNames = _tasks.GetTasksStatus().Keys.ToArray(); + foreach (var name in taskNames) + { + _tasks.CheckTaskStatusCompleted(name, retry: 10, millisecondsCancellationWait: 10_000); + _tasks.DeleteTask(name); + } + + _tasks.ClearConcurrentLists(); + } + + // ───────────────────────────────────────────────────────────────────── + // Benchmark 1 — Start N concurrent workers + // + // NTM_LongRunning: RegisterTask(LongRunning) × N + StartTask × N + // Each worker gets its own OS thread via TaskCreationOptions.LongRunning. + // Thread spawn is expensive (~1 ms/thread on Windows) but the thread + // is dedicated — zero pool pressure for the lifetime of the worker. + // Right choice for daemons and pipeline stages that run for minutes/hours. + // + // NTM_Pool: RegisterTask(None) × N + StartTask × N + // Workers borrow thread-pool threads. Startup is ~10× faster than + // LongRunning because no new OS thread is created. Pool threads are + // shared, so long-running spin loops will starve other pool work — + // use only when tasks are genuinely short-lived or pool depth is ample. + // + // Dataflow: new ActionBlock(MaxDegreeOfParallelism = N) + Post × N + // Workers are anonymous pool threads — no per-worker handle or key. + // + // Trade-off: LongRunning → isolated, addressable, expensive to start. + // Pool → shared, cheaper to start, pool-pressure risk. + // Dataflow → pooled, anonymous, cheapest for high-volume streams. + // ───────────────────────────────────────────────────────────────────── + + [IterationSetup(Targets = new[] { nameof(NTM_StartNWorkers_LongRunning), nameof(NTM_StartNWorkers_Pool) })] + public void Setup_NTM_StartNWorkers() + { + _ctsList = new CancellationTokenSource[N]; + _taskNames = new string[N]; + for (int i = 0; i < N; i++) + { + _ctsList[i] = new CancellationTokenSource(); + _taskNames[i] = $"worker-{Interlocked.Increment(ref _counter)}"; + } + } + + [Benchmark] + public void NTM_StartNWorkers_LongRunning() + { + for (int i = 0; i < N; i++) + { + _tasks.RegisterTask(_taskNames[i], SpinUntilCancelled, _ctsList[i], TaskCreationOptions.LongRunning); + _tasks.StartTask(_taskNames[i]); + } + } + + [Benchmark] + public void NTM_StartNWorkers_Pool() + { + for (int i = 0; i < N; i++) + { + _tasks.RegisterTask(_taskNames[i], SpinUntilCancelled, _ctsList[i], TaskCreationOptions.None); + _tasks.StartTask(_taskNames[i]); + } + } + + [IterationCleanup(Targets = new[] { nameof(NTM_StartNWorkers_LongRunning), nameof(NTM_StartNWorkers_Pool) })] + public void Cleanup_NTM_StartNWorkers() + => CancelWaitDeleteAll(); + + // ── + + [IterationSetup(Targets = new[] { nameof(Dataflow_StartNWorkers) })] + public void Setup_Dataflow_StartNWorkers() + => _blockCts = new CancellationTokenSource(); + + [Benchmark] + public void Dataflow_StartNWorkers() + { + _block = new ActionBlock(_ => + { + while (!_blockCts.IsCancellationRequested) + Thread.SpinWait(1); + }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = N }); + + for (int i = 0; i < N; i++) + _block.Post(i); + } + + [IterationCleanup(Targets = new[] { nameof(Dataflow_StartNWorkers) })] + public void Cleanup_Dataflow_StartNWorkers() + { + _blockCts.Cancel(); + _block.Complete(); + _block.Completion.Wait(); + _blockCts.Dispose(); + } + + // ───────────────────────────────────────────────────────────────────── + // Benchmark 2 — Cancel N running workers + // + // NTM: CancelAllTasks() — sequential foreach (N ≤ 500) across the + // dictionary, calling CancellationTokenSource.Cancel() per task. + // Returns a status dict of any that could not be cancelled. + // Both LongRunning and Pool variants measure the same cancel path — + // the CTS signalling cost does not depend on thread origin. + // + // Dataflow: CancellationTokenSource.Cancel() — one call signals all N workers + // via the shared token passed to ExecutionDataflowBlockOptions. + // Workers run on thread-pool threads. + // + // Trade-off: NTM's per-task cancel lets you exempt specific tasks and collect + // per-task failure status; Dataflow's shared-token cancel is O(1) + // but all-or-nothing. + // + // Note: Thread.Sleep gives workers time to enter their spin loops before + // measurement. LongRunning threads are scheduled immediately by the OS, + // so the sleep is the same for both variants. + // ───────────────────────────────────────────────────────────────────── + + [IterationSetup(Targets = new[] { nameof(NTM_CancelNWorkers_LongRunning) })] + public void Setup_NTM_CancelNWorkers_LongRunning() + { + _tasks.ClearConcurrentLists(); + for (int i = 0; i < N; i++) + { + var cts = new CancellationTokenSource(); + var name = $"worker-{Interlocked.Increment(ref _counter)}"; + _tasks.RegisterTask(name, SpinUntilCancelled, cts, TaskCreationOptions.LongRunning); + _tasks.StartTask(name); + } + Thread.Sleep(50); + } + + [Benchmark] + public TaskManagementStatus NTM_CancelNWorkers_LongRunning() + { + var failed = new ConcurrentDictionary(); + return _tasks.CancelAllTasks(null, ref failed); + } + + [IterationCleanup(Targets = new[] { nameof(NTM_CancelNWorkers_LongRunning) })] + public void Cleanup_NTM_CancelNWorkers_LongRunning() + => CancelWaitDeleteAll(); + + // ── + + [IterationSetup(Targets = new[] { nameof(NTM_CancelNWorkers_Pool) })] + public void Setup_NTM_CancelNWorkers_Pool() + { + _tasks.ClearConcurrentLists(); + for (int i = 0; i < N; i++) + { + var cts = new CancellationTokenSource(); + var name = $"worker-{Interlocked.Increment(ref _counter)}"; + _tasks.RegisterTask(name, SpinUntilCancelled, cts, TaskCreationOptions.None); + _tasks.StartTask(name); + } + Thread.Sleep(50); + } + + [Benchmark] + public TaskManagementStatus NTM_CancelNWorkers_Pool() + { + var failed = new ConcurrentDictionary(); + return _tasks.CancelAllTasks(null, ref failed); + } + + [IterationCleanup(Targets = new[] { nameof(NTM_CancelNWorkers_Pool) })] + public void Cleanup_NTM_CancelNWorkers_Pool() + => CancelWaitDeleteAll(); + + // ── + + [IterationSetup(Targets = new[] { nameof(Dataflow_CancelNWorkers) })] + public void Setup_Dataflow_CancelNWorkers() + { + _blockCts = new CancellationTokenSource(); + _block = new ActionBlock(_ => + { + while (!_blockCts.IsCancellationRequested) + Thread.SpinWait(1); + }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = N }); + + for (int i = 0; i < N; i++) + _block.Post(i); + + // Give thread-pool workers time to enter their spin loops. + Thread.Sleep(50); + } + + [Benchmark] + public void Dataflow_CancelNWorkers() + => _blockCts.Cancel(); + + [IterationCleanup(Targets = new[] { nameof(Dataflow_CancelNWorkers) })] + public void Cleanup_Dataflow_CancelNWorkers() + { + _block.Complete(); + _block.Completion.Wait(); + _blockCts.Dispose(); + } + + // ───────────────────────────────────────────────────────────────────── + // Benchmark 3 — Process N short-lived work items end-to-end + // + // NTM_LongRunning: Measured: StartTask × N → CheckTaskStatusCompleted × N + // → DeleteTask × N (includes GC.Collect per delete) + // Each item gets a dedicated OS thread. Expensive for trivial + // work — intentionally shows the cost of the wrong tool. + // + // NTM_Pool: Same pipeline with TaskCreationOptions.None. + // Thread-pool reuse avoids the OS thread spawn overhead, so + // start is faster. Still pays GC.Collect per DeleteTask. + // Right choice when items complete quickly and pool depth is + // not under pressure from the rest of the application. + // + // Dataflow: new ActionBlock + Post × N + Complete + Wait + // Thread-pool threads, no per-item lifecycle cost — this IS + // its intended use case. Use NTM when you need named handles + // and per-task cancel/status; use Dataflow for anonymous items. + // + // Read the LongRunning gap as the cost of spawning OS threads for trivial work. + // Read the Pool gap as the NTM lifecycle overhead (dict, GC) vs raw Dataflow. + // ───────────────────────────────────────────────────────────────────── + + [IterationSetup(Targets = new[] { nameof(NTM_ProcessNItems_LongRunning), nameof(NTM_ProcessNItems_Pool) })] + public void Setup_NTM_ProcessNItems() + { + _tasks.ClearConcurrentLists(); + _ctsList = new CancellationTokenSource[N]; + _taskNames = new string[N]; + for (int i = 0; i < N; i++) + { + _ctsList[i] = new CancellationTokenSource(); + _taskNames[i] = $"item-{Interlocked.Increment(ref _counter)}"; + } + } + + [Benchmark] + public void NTM_ProcessNItems_LongRunning() + { + for (int i = 0; i < N; i++) + _tasks.RegisterTask(_taskNames[i], _ => { /* trivial work unit */ }, _ctsList[i], TaskCreationOptions.LongRunning); + + for (int i = 0; i < N; i++) + _tasks.StartTask(_taskNames[i]); + + for (int i = 0; i < N; i++) + _tasks.CheckTaskStatusCompleted(_taskNames[i], retry: 3, millisecondsCancellationWait: 5000); + + for (int i = 0; i < N; i++) + _tasks.DeleteTask(_taskNames[i]); + } + + [Benchmark] + public void NTM_ProcessNItems_Pool() + { + for (int i = 0; i < N; i++) + _tasks.RegisterTask(_taskNames[i], _ => { /* trivial work unit */ }, _ctsList[i], TaskCreationOptions.None); + + for (int i = 0; i < N; i++) + _tasks.StartTask(_taskNames[i]); + + for (int i = 0; i < N; i++) + _tasks.CheckTaskStatusCompleted(_taskNames[i], retry: 3, millisecondsCancellationWait: 5000); + + for (int i = 0; i < N; i++) + _tasks.DeleteTask(_taskNames[i]); + } + + [IterationCleanup(Targets = new[] { nameof(NTM_ProcessNItems_LongRunning), nameof(NTM_ProcessNItems_Pool) })] + public void Cleanup_NTM_ProcessNItems() + => CancelWaitDeleteAll(); + + // ── + + [Benchmark] + public void Dataflow_ProcessNItems() + { + var block = new ActionBlock( + _ => { /* trivial work unit */ }, + new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = N }); + + for (int i = 0; i < N; i++) + block.Post(i); + + block.Complete(); + block.Completion.Wait(); + } + + // ───────────────────────────────────────────────────────────────────── + // Benchmark 4 — Parallel blocking-work throughput + // + // Each of N workers blocks for WorkMs milliseconds (simulating I/O, + // a database call, an HTTP request, etc.). The total wall-clock time + // is determined by how quickly all workers can start and run in parallel. + // + // NTM_LongRunning: N dedicated OS threads are created before the first + // worker starts. All N workers begin blocking simultaneously. + // Total elapsed ≈ WorkMs — no queuing, no injection delay. + // + // NTM_Pool / Dataflow_Unbounded: Workers run on thread-pool threads. + // The pool starts with min = Environment.ProcessorCount threads. + // When all pool threads are blocked, the pool injects one new thread + // roughly every 500 ms. For N >> ProcessorCount the work is batched: + // total ≈ ceil(N / minThreads) × WorkMs (ignoring injection delay) + // plus potential 500 ms stalls while the pool decides to inject. + // + // This is the definitive LongRunning use case: blocking workers that + // must all start within a latency budget, or workers that must not + // starve the pool that the rest of the application depends on. + // + // Note: tasks are pre-registered in IterationSetup so that thread + // creation (LongRunning) and pool dispatch (Pool) are NOT included + // in the measurement — only the actual parallel execution is timed. + // ───────────────────────────────────────────────────────────────────── + + [IterationSetup(Targets = new[] + { + nameof(NTM_ParallelBlockingWork_LongRunning), + nameof(NTM_ParallelBlockingWork_Pool) + })] + public void Setup_NTM_ParallelBlockingWork() + { + _tasks.ClearConcurrentLists(); + _ctsList = new CancellationTokenSource[N]; + _taskNames = new string[N]; + for (int i = 0; i < N; i++) + { + _ctsList[i] = new CancellationTokenSource(); + _taskNames[i] = $"blocking-{Interlocked.Increment(ref _counter)}"; + } + } + + [Benchmark] + public void NTM_ParallelBlockingWork_LongRunning() + { + for (int i = 0; i < N; i++) + _tasks.RegisterTask(_taskNames[i], _ => Thread.Sleep(WorkMs), _ctsList[i], TaskCreationOptions.LongRunning); + + for (int i = 0; i < N; i++) + _tasks.StartTask(_taskNames[i]); + + for (int i = 0; i < N; i++) + _tasks.CheckTaskStatusCompleted(_taskNames[i], retry: 5, millisecondsCancellationWait: 30_000); + + for (int i = 0; i < N; i++) + _tasks.DeleteTask(_taskNames[i]); + } + + [Benchmark] + public void NTM_ParallelBlockingWork_Pool() + { + for (int i = 0; i < N; i++) + _tasks.RegisterTask(_taskNames[i], _ => Thread.Sleep(WorkMs), _ctsList[i], TaskCreationOptions.None); + + for (int i = 0; i < N; i++) + _tasks.StartTask(_taskNames[i]); + + for (int i = 0; i < N; i++) + _tasks.CheckTaskStatusCompleted(_taskNames[i], retry: 5, millisecondsCancellationWait: 30_000); + + for (int i = 0; i < N; i++) + _tasks.DeleteTask(_taskNames[i]); + } + + [IterationCleanup(Targets = new[] + { + nameof(NTM_ParallelBlockingWork_LongRunning), + nameof(NTM_ParallelBlockingWork_Pool) + })] + public void Cleanup_NTM_ParallelBlockingWork() + => CancelWaitDeleteAll(); + + // ── + + [Benchmark] + public void Dataflow_ParallelBlockingWork() + { + // Unbounded degree — Dataflow imposes no artificial cap, so any gap + // between this and LongRunning is purely the thread-pool injection cost, + // not a Dataflow configuration choice. + var block = new ActionBlock( + _ => Thread.Sleep(WorkMs), + new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); + + for (int i = 0; i < N; i++) + block.Post(i); + + block.Complete(); + block.Completion.Wait(); + } + } +} diff --git a/benchmarks/NetFramework.Tasks.Management.Benchmarks/README.md b/benchmarks/NetFramework.Tasks.Management.Benchmarks/README.md index 8e179c9..d90d0b3 100644 --- a/benchmarks/NetFramework.Tasks.Management.Benchmarks/README.md +++ b/benchmarks/NetFramework.Tasks.Management.Benchmarks/README.md @@ -32,6 +32,22 @@ Measures the `Parallel.ForEach` fan-out inside `CancelAllTasks` as the number of |---|---| | `TaskCount` | 1, 10, 50 | +### `DataflowComparisonBenchmarks` + +Head-to-head comparison of NetTaskManagement vs TPL Dataflow for the scenarios where both libraries can solve the same problem. The goal is to help developers choose the right tool, not declare a winner — each library has distinct strengths. + +| Benchmark pair | What is measured | +|---|---| +| `NTM_StartNWorkers` / `Dataflow_StartNWorkers` | Cost of getting N concurrent workers running. NTM: `RegisterTask × N` + `StartTask × N` (ConcurrentDictionary inserts + `Task.Start`). Dataflow: `new ActionBlock` + `Post × N`. | +| `NTM_CancelNWorkers` / `Dataflow_CancelNWorkers` | Cancel N running workers. NTM: `CancelAllTasks()` uses `Parallel.ForEach` and returns per-task status. Dataflow: single `CancellationTokenSource.Cancel()` shared across all workers. | +| `NTM_ProcessNItems` / `Dataflow_ProcessNItems` | Process N short-lived items end-to-end. NTM: `StartTask × N` → `CheckTaskStatusCompleted × N` → `DeleteTask × N` (includes `GC.Collect` per delete). Dataflow: `Post × N` → `Complete` → `Wait`. | + +`[InvocationCount(1)]` applies to all pairs because every method mutates shared state. A `Thread.Sleep(50)` in the cancellation setup ensures workers are actively spinning before the timed cancellation call — without it, some items may not have been dequeued yet and would understate the true cancellation cost. + +| Parameter | Values | +|---|---| +| `N` | 10, 50, 100 | + ## Running > Benchmarks must be run in **Release** configuration. Debug builds produce meaningless results. @@ -55,6 +71,7 @@ dotnet run -c Release -f net10.0 --project benchmarks/NetFramework.Tasks.Managem dotnet run -c Release -f net8.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter *Lifecycle* dotnet run -c Release -f net8.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter *GetTasksStatus* dotnet run -c Release -f net8.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter *CancelAll* +dotnet run -c Release -f net8.0 --project benchmarks/NetFramework.Tasks.Management.Benchmarks/ -- --filter *DataflowComparison* ``` ### All benchmarks @@ -80,7 +97,8 @@ BenchmarkDotNet.Artifacts/ └── results/ ├── NetFramework.Tasks.Management.Benchmarks.LifecycleBenchmarks-report.md ├── NetFramework.Tasks.Management.Benchmarks.GetTasksStatusBenchmarks-report.md - └── NetFramework.Tasks.Management.Benchmarks.CancelAllTasksBenchmarks-report.md + ├── NetFramework.Tasks.Management.Benchmarks.CancelAllTasksBenchmarks-report.md + └── NetFramework.Tasks.Management.Benchmarks.DataflowComparisonBenchmarks-report.md ``` `[MemoryDiagnoser]` is enabled on all classes so each report includes **allocated bytes per operation** in addition to execution time. diff --git a/src/NetFramework.Task.Management/NetFramework.Tasks.Management.csproj b/src/NetFramework.Task.Management/NetFramework.Tasks.Management.csproj index 2d0b060..85d3e62 100644 --- a/src/NetFramework.Task.Management/NetFramework.Tasks.Management.csproj +++ b/src/NetFramework.Task.Management/NetFramework.Tasks.Management.csproj @@ -30,4 +30,10 @@ + + + <_Parameter1>NetFramework.Tasks.Management.xUnit.Tests + + + diff --git a/src/NetFramework.Task.Management/TasksManagement.cs b/src/NetFramework.Task.Management/TasksManagement.cs index 7c47326..408f1b2 100644 --- a/src/NetFramework.Task.Management/TasksManagement.cs +++ b/src/NetFramework.Task.Management/TasksManagement.cs @@ -20,6 +20,14 @@ public class TasksManagement : ITaskManagement private static ConcurrentDictionary TasksDataModel = new ConcurrentDictionary(); private static ConcurrentQueue TaskDisposedDataModel = new ConcurrentQueue(); + // CancellationTokenSource.Cancel() is an atomic flag-set (~100 ns). + // Parallel.ForEach thread-pool setup costs ~50-80 µs regardless of N, + // so sequential foreach is faster below this threshold. + // Above 500 tasks the parallel fan-out amortises the setup cost. + // internal (not private const) so the test assembly can lower it to exercise + // the parallel path without registering hundreds of real tasks. + internal static int CancelAllTasksParallelThreshold = 500; + private readonly ILogger _logger; public TasksManagement(ILogger logger) { @@ -204,6 +212,15 @@ public TaskManagementStatus CheckTaskStatusCompleted(string taskName, int retry return TaskManagementStatus.TaskNotFound; } + // Fast path: task already completed — skip Task.Run allocation and polling loop. + // Task.IsCompleted is thread-safe and terminal: once true it never goes false. + if (taskDataModel.Task.IsCompleted) + { + _logger.LogInformation($"{nameof(taskName)}-{taskName} {nameof(TaskManagementStatus.Completed)}"); + + return TaskManagementStatus.Completed; + } + var cancellationTokenSource = new CancellationTokenSource(millisecondsCancellationWait); int retryCount = 0; @@ -352,18 +369,11 @@ public TaskManagementStatus DequeueTaskDisposedDataModel(out TaskDisposedDataMod return TaskManagementStatus.ObjectInfoDequeued; } - public TaskManagementStatus CancelAllTasks (IList except, ref ConcurrentDictionary tasksCancelPetitionFailedRef) + public TaskManagementStatus CancelAllTasks(IList except, ref ConcurrentDictionary tasksCancelPetitionFailedRef) { try { - IEnumerable> tasksData = null; - - if (except != null) - tasksData = TasksDataModel.Where(a => !except.Any(b => a.Key == b)).Select(c => c); - else - tasksData = TasksDataModel.Select(c => c); - - if (!tasksData.Any()) + if (TasksDataModel.IsEmpty) { _logger.LogWarning($"{nameof(TaskManagementStatus.TasksNotFoundToBeCancelled)}"); @@ -372,26 +382,67 @@ public TaskManagementStatus CancelAllTasks (IList except, ref Concurrent var tasksNotCancelled = new ConcurrentDictionary(); - Parallel.ForEach(tasksData, (taskData) => + // Below the threshold, Parallel.ForEach thread-pool setup (~50-80 µs) costs + // more than calling Cancel() sequentially — Cancel() is an atomic flag-set (~100 ns). + // Above the threshold, parallel fan-out amortises the setup cost across many tasks. + if (TasksDataModel.Count <= CancelAllTasksParallelThreshold) { - if (taskData.Value.CancellationTokenSource == null) + bool anyTaskFound = false; + + foreach (var taskData in TasksDataModel) { - if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.CancellationTokenSourceNotFound)) - _logger.LogWarning($"{nameof(CancelAllTasks)}-{nameof(taskData.Key)} doesn't have a cancellation token source"); + if (except != null && except.Contains(taskData.Key)) + continue; + + anyTaskFound = true; + + if (taskData.Value.Task.IsCompleted) + { + if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.Completed)) + _logger.LogWarning($"{nameof(CancelAllTasks)}-{taskData.Key} task already completed"); + + continue; + } - return; + taskData.Value.CancellationTokenSource.Cancel(); } - if (taskData.Value.Task.IsCompleted) + if (!anyTaskFound) { - if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.Completed)) - _logger.LogWarning($"{nameof(CancelAllTasks)}-{nameof(taskData.Key)} task already completed"); + _logger.LogWarning($"{nameof(TaskManagementStatus.TasksNotFoundToBeCancelled)}"); - return; + return TaskManagementStatus.TasksNotFoundToBeCancelled; } + } + else + { + int anyTaskFound = 0; + + Parallel.ForEach(TasksDataModel, (taskData) => + { + if (except != null && except.Contains(taskData.Key)) + return; + + Interlocked.Exchange(ref anyTaskFound, 1); + + if (taskData.Value.Task.IsCompleted) + { + if (!tasksNotCancelled.TryAdd(taskData.Key, TaskManagementStatus.Completed)) + _logger.LogWarning($"{nameof(CancelAllTasks)}-{taskData.Key} task already completed"); - taskData.Value.CancellationTokenSource.Cancel(); - }); + return; + } + + taskData.Value.CancellationTokenSource.Cancel(); + }); + + if (anyTaskFound == 0) + { + _logger.LogWarning($"{nameof(TaskManagementStatus.TasksNotFoundToBeCancelled)}"); + + return TaskManagementStatus.TasksNotFoundToBeCancelled; + } + } tasksCancelPetitionFailedRef = tasksNotCancelled; diff --git a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs index b20eabb..3f1f75f 100644 --- a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs +++ b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCancelAllTasksTest.cs @@ -113,5 +113,218 @@ public void TaskNotAcceptedCancellationTokenSourceNotFoundExceptedTasktaskManage Assert.False(isCancelPetitionFailed); taskManagement.ClearConcurrentLists(); } + + [Fact] + public void AllTasksInExceptListCancelAllTasks_TasksNotFoundToBeCancelledStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + const string simpleTaskName = "tasktest1"; + const string simpleTaskNameTwo = "tasktest2"; + + var cancellationTokenSource = new CancellationTokenSource(); + TaskManagementStatus taskManagementStatus = taskManagement.RegisterTask(simpleTaskName, new ActionsUtilitiesTests().ActionObjectCancellationTokenSource(), cancellationTokenSource); + + var cancellationTokenSource2 = new CancellationTokenSource(); + taskManagementStatus = taskManagement.RegisterTask(simpleTaskNameTwo, new ActionsUtilitiesTests().ActionObjectCancellationTokenSource(), cancellationTokenSource2); + taskManagementStatus = taskManagement.StartTask(simpleTaskName); + taskManagementStatus = taskManagement.StartTask(simpleTaskNameTwo); + + // All registered tasks are in the except list — sequential path finds no eligible + // task to cancel and should return TasksNotFoundToBeCancelled. + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + taskManagementStatus = taskManagement.CancelAllTasks(new List { simpleTaskName, simpleTaskNameTwo }, ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.TasksNotFoundToBeCancelled, taskManagementStatus); + Assert.Empty(tasksCancelPetitionFailedRef); + taskManagement.ClearConcurrentLists(); + } + + [Fact] + public void ParallelPathCancelAllTasks_AllTasksCancelPetitionAcceptedStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + // Lower the threshold so 6 tasks trigger the Parallel.ForEach path without + // the cost of registering hundreds of real tasks. Restore in finally so a + // test failure cannot pollute subsequent tests in the collection. + int originalThreshold = TasksManagement.CancelAllTasksParallelThreshold; + try + { + TasksManagement.CancelAllTasksParallelThreshold = 5; + + for (int i = 1; i <= 6; i++) + { + var cts = new CancellationTokenSource(); + taskManagement.RegisterTask($"parallel-task-{i}", new ActionsUtilitiesTests().ActionObjectCancellationTokenSource(), cts); + taskManagement.StartTask($"parallel-task-{i}"); + } + + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + TaskManagementStatus taskManagementStatus = taskManagement.CancelAllTasks(null, ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.AllTasksCancelPetitionAccepted, taskManagementStatus); + Assert.Empty(tasksCancelPetitionFailedRef); + } + finally + { + TasksManagement.CancelAllTasksParallelThreshold = originalThreshold; + taskManagement.ClearConcurrentLists(); + } + } + + [Fact] + public void ParallelPathWithExceptListCancelAllTasks_AllTasksCancelPetitionAcceptedStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + int originalThreshold = TasksManagement.CancelAllTasksParallelThreshold; + try + { + TasksManagement.CancelAllTasksParallelThreshold = 5; + + var ctsList = new System.Collections.Generic.List(); + for (int i = 1; i <= 6; i++) + { + var cts = new CancellationTokenSource(); + ctsList.Add(cts); + taskManagement.RegisterTask($"parallel-except-task-{i}", new ActionsUtilitiesTests().ActionObjectCancellationTokenSource(), cts); + taskManagement.StartTask($"parallel-except-task-{i}"); + } + + // Except the first two — the parallel path must skip them and cancel the rest. + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + TaskManagementStatus taskManagementStatus = taskManagement.CancelAllTasks( + new List { "parallel-except-task-1", "parallel-except-task-2" }, + ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.AllTasksCancelPetitionAccepted, taskManagementStatus); + } + finally + { + TasksManagement.CancelAllTasksParallelThreshold = originalThreshold; + taskManagement.ClearConcurrentLists(); + } + } + + [Fact] + public void ParallelPathAlreadyCompletedTaskCancelAllTasks_AllTasksCancelPetitionAcceptedStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + int originalThreshold = TasksManagement.CancelAllTasksParallelThreshold; + try + { + TasksManagement.CancelAllTasksParallelThreshold = 5; + + for (int i = 1; i <= 6; i++) + { + var cts = new CancellationTokenSource(); + taskManagement.RegisterTask($"parallel-completed-task-{i}", _ => { }, cts); + taskManagement.StartTask($"parallel-completed-task-{i}"); + } + + // Wait for all trivially-completing tasks to reach RanToCompletion so the + // parallel path hits the IsCompleted branch for every entry. + var spinWait = new SpinWait(); + bool allDone = false; + while (!allDone) + { + allDone = true; + var statuses = taskManagement.GetTasksStatus(); + foreach (var s in statuses.Values) + { + if (!s.Equals(System.Threading.Tasks.TaskStatus.RanToCompletion)) + { + allDone = false; + break; + } + } + if (!allDone) spinWait.SpinOnce(); + } + + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + TaskManagementStatus taskManagementStatus = taskManagement.CancelAllTasks(null, ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.AllTasksCancelPetitionAccepted, taskManagementStatus); + } + finally + { + TasksManagement.CancelAllTasksParallelThreshold = originalThreshold; + taskManagement.ClearConcurrentLists(); + } + } + + [Fact] + public void ParallelPathAllTasksExceptedCancelAllTasks_TasksNotFoundToBeCancelledStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + int originalThreshold = TasksManagement.CancelAllTasksParallelThreshold; + try + { + TasksManagement.CancelAllTasksParallelThreshold = 5; + + var taskNames = new List(); + for (int i = 1; i <= 6; i++) + { + var name = $"parallel-allexcepted-task-{i}"; + taskNames.Add(name); + var cts = new CancellationTokenSource(); + taskManagement.RegisterTask(name, new ActionsUtilitiesTests().ActionObjectCancellationTokenSource(), cts); + taskManagement.StartTask(name); + } + + // All tasks are excepted — parallel path sets anyTaskFound to 0 + // and must return TasksNotFoundToBeCancelled. + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + TaskManagementStatus taskManagementStatus = taskManagement.CancelAllTasks(taskNames, ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.TasksNotFoundToBeCancelled, taskManagementStatus); + Assert.Empty(tasksCancelPetitionFailedRef); + } + finally + { + TasksManagement.CancelAllTasksParallelThreshold = originalThreshold; + taskManagement.ClearConcurrentLists(); + } + } + + [Fact] + public void AlreadyCompletedTaskCancelAllTasks_AllTasksCancelPetitionAcceptedStatus() + { + var logger = new Mock(); + TasksManagement taskManagement = new TasksManagement(logger.Object); + + const string simpleTaskName = "tasktest1"; + + // Register a task with a trivially-completing action so it finishes on its own. + var cancellationTokenSource = new CancellationTokenSource(); + TaskManagementStatus taskManagementStatus = taskManagement.RegisterTask(simpleTaskName, _ => { }, cancellationTokenSource); + taskManagementStatus = taskManagement.StartTask(simpleTaskName); + + // Wait until the task reaches RanToCompletion before calling CancelAllTasks — + // this exercises the IsCompleted branch in the sequential path, which records + // the task as Completed in the failure dict. The final check treats Completed + // entries as non-failures, so the method returns AllTasksCancelPetitionAccepted. + var spinWait = new SpinWait(); + var statuses = taskManagement.GetTasksStatus(); + while (!statuses[simpleTaskName].Equals(System.Threading.Tasks.TaskStatus.RanToCompletion)) + { + spinWait.SpinOnce(); + statuses = taskManagement.GetTasksStatus(); + } + + var tasksCancelPetitionFailedRef = new ConcurrentDictionary(); + taskManagementStatus = taskManagement.CancelAllTasks(null, ref tasksCancelPetitionFailedRef); + + Assert.Equal(TaskManagementStatus.AllTasksCancelPetitionAccepted, taskManagementStatus); + taskManagement.ClearConcurrentLists(); + } } } diff --git a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCheckTaskStatusCompletedTest.cs b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCheckTaskStatusCompletedTest.cs index d7a8a9f..e22f559 100644 --- a/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCheckTaskStatusCompletedTest.cs +++ b/src/NetFramework.Tasks.Management.xUnit.Tests/TaskManagementCheckTaskStatusCompletedTest.cs @@ -79,5 +79,31 @@ public void CompletedWithoutCancelsCheckTaskStatusCompleted_TaskManagementRegist Assert.Equal(TaskManagementStatus.NotCompleted, taskManagementStatus); } + + [Fact] + public void AlreadyCompletedTaskCheckTaskStatusCompleted_CompletedStatus() + { + const string simpleTaskName = "tasktest"; + + // Register a task with a trivially-completing action (no spin loop). + var cancellationTokenSource = new CancellationTokenSource(); + TaskManagementStatus taskManagementStatus = _taskManagement.RegisterTask(simpleTaskName, _ => { }, cancellationTokenSource); + taskManagementStatus = _taskManagement.StartTask(simpleTaskName); + + // Wait until the task has actually reached a completed state before calling + // CheckTaskStatusCompleted — this exercises the fast-path branch that returns + // immediately when Task.IsCompleted is true, skipping the Task.Run polling loop. + var spinWait = new SpinWait(); + var statuses = _taskManagement.GetTasksStatus(); + while (!statuses[simpleTaskName].Equals(System.Threading.Tasks.TaskStatus.RanToCompletion)) + { + spinWait.SpinOnce(); + statuses = _taskManagement.GetTasksStatus(); + } + + taskManagementStatus = _taskManagement.CheckTaskStatusCompleted(simpleTaskName, retry: 1, millisecondsCancellationWait: 1000); + + Assert.Equal(TaskManagementStatus.Completed, taskManagementStatus); + } } }