A .NET pipeline orchestration library with fluent builder, parallel execution, type dispatch, and ETL support.
- Fluent builder API -
StartWith<T>().ContinueWith<T>().ExecuteAsync() - Parallel execution -
ParallelPipeline(parallel actions) andParallelForEach(parallel over elements) - Composable - pipelines are actions, enabling nesting and reuse
- Type dispatch - polymorphic handler selection based on input type
- Dual-scope state - Session (shared) and Scope (copied per step)
- Template formatting - SmartFormat-based
{session.Variable}resolution - ETL base class -
DbToDbBulkCopy<TSource, TDest>for database-to-database transfers - Exception hierarchy -
FlowException,ParallelPipelineException,BulkCopyException - Configurable resilience - per-action
IErrorHandlerwith built-inRetryHandler(Polly-backed) andContinueHandler, plus optional fallback recovery pipelines - Side-channel
OnResultpipelines - attach a pipeline to any action to run telemetry / notifications / audit steps on successful results without mutating the main flow
var builder = new PipelineBuilder(logger);
await builder
.StartWith<SetSessionVariables>(op =>
op.AddStateVariable("OutputDir", @"C:\temp"))
.ContinueWith<GetFiles>(op =>
{
op.DirectoryPath = "{session.OutputDir}";
op.SearchPattern = "*.csv";
})
.ContinueWith<ForEach>(op =>
{
op.Actions = new[] { PipelineBuilder.CreateAction<ProcessFile>() };
})
.ExecuteAsync();| Package | Target | Description |
|---|---|---|
| Flownet | netstandard2.0 / net8.0 | Core pipeline engine, IO actions, base data classes |
| Flownet.Data.SqlServer | netstandard2.0 / net8.0 | SQL Server actions (SqlServerExecute, SqlBulkLoadCsv) |
| Flownet.Data.Postgres | net8.0 | PostgreSQL actions (PostgresQLExecute, SqlServerToPostgresBulkCopy) |
dotnet add package Flownet # Core
dotnet add package Flownet.Data.SqlServer # SQL Server actions
dotnet add package Flownet.Data.Postgres # PostgreSQL actionsAttach an IErrorHandler to any IPipelineAction to configure failure behavior. Default (null) preserves current fail-fast semantics — zero migration cost.
// Continue on error — log and return NullResult
new HttpDownload { Url = "...", ErrorHandler = new ContinueHandler() }
// Retry with exponential backoff, then give up
new HttpDownload
{
Url = "...",
ErrorHandler = new RetryHandler { MaxAttempts = 3 }
}
// Retry, then run a fallback recovery pipeline (e.g., dead-letter)
new ProcessRecord
{
ErrorHandler = new RetryHandler
{
MaxAttempts = 3,
Pipeline = new Pipeline { Actions = new IPipelineAction[]
{
new WriteToDeadLetterQueue(),
new ReturnNull()
}}
}
}OperationCanceledException is non-recoverable — it bypasses all handlers. Implement IErrorHandler directly for custom strategies (circuit breaker, bulkhead, telemetry hooks).
Attach an OnResult: IPipeline to any IPipelineAction to run follow-up steps (telemetry, notifications, audit) AFTER the action succeeds. The action's declared return type is preserved — OnResult pipelines cannot mutate the main flow's data.
// Emit metrics on each successful download
new HttpDownload
{
Url = "...",
OnResult = new Pipeline { Actions = new IPipelineAction[]
{
new LogMetrics { MetricName = "download_size" }
}}
}
// Fan-out side-chain — index each file as a side-effect
new GetFiles
{
DirectoryPath = "...",
OnResult = new Pipeline { Actions = new IPipelineAction[]
{
new ForEach { Actions = new[] { new IndexFile() } }
}}
}
// Still returns FilePathCollection to the main flowSemantics:
- Fires ONLY on genuine action success — not on
ErrorHandler-recovered values (ContinueHandlerfallback,RetryHandlerexhaustion fallback pipelines). - Returns the action's ORIGINAL result. OnResult pipeline output is discarded.
- Exceptions from OnResult are logged as warnings and swallowed. The action still returns its result. Exception:
OperationCanceledExceptionpropagates. - Runs with a shallow-isolated context: Scope and Session dictionaries are cloned (key reassignments don't leak back), but mutable reference values inside them remain shared with the parent. Store immutable values in Scope/Session if full isolation matters.
Features — construction helpers
PipelineBuilder.CreatePipeline— four new static overloads for standalone pipeline construction: no-arg (empty),params IPipelineAction[](sequence),Action<IPipeline>(fluent callback), and genericAction<T>(single-action pipeline). No logger required. Single factory for allPipelineconstruction.ParallelPipelineHelpers— fluent extension methods onParallelPipeline:AddPipeline(Action<IPipeline>)for multi-step branches built via callback, andAddPipeline<T>(Action<T>)for single-action branches configured inline. Both chainable.CreatePipeline(params)is permissive: null or empty arrays produce an empty pipeline rather than throwing. Empty pipelines are valid no-ops.CreatePipeline(Action<IPipeline>)normalizesActionsto a non-null empty array post-callback to preserve theActions != nullinvariant even when the callback reassigns.- All inline
new Pipeline { Actions = ... }sites in the codebase migrated toPipelineBuilder.CreatePipeline. Future construction-time concerns (defaultPayloadProvider, new invariants, validation hooks) now have one place to land.
Refactor — PipelineAction.ExecuteAsync orchestration clarity
- Internal refactor extracting three phase helpers:
ExecuteWorkAsync(entry/exit logging + handler dispatch),ExecutePrimaryAsync(ErrorHandler branching with scoped success tracking),TryRunOnResultAsync(side-channel with OCE rethrow + swallow-and-warn on other exceptions).ExecuteAsyncreduces to four lines of orchestration. - Zero behavior change. All 17 locked invariants from the resilience and OnResult features preserved. No public API changes.
Contract notes
ParallelPipeline.AddPipeline(params IPipelineAction[])throw-on-null/empty contract is UNCHANGED. The factory's permissiveness and the fluent API's intent-signal throw are deliberately different layers.
Features / redesign
ParallelPipelineredesigned. The property is nowIEnumerable<IPipeline> Pipelines(wasIEnumerable<IPipelineAction> Actions). Each branch is explicitly aPipeline. FluentAddPipeline(IPipeline)andAddPipeline(params IPipelineAction[])overloads support both pre-built pipelines and bare-action fan-out. Class no longer implementsIPipeline— it is: PipelineActiononly, which is honest about its semantics (parallel independent pipelines, not sequential actions).ParallelExecution.RunWithConcurrencyCapAsync— new internal helper centralizing semaphore-gated parallelism. Used by bothParallelPipelineandParallelForEach. One place to fix concurrency/cancellation semantics going forward.
Fixes
- True concurrency in
ParallelPipelineandParallelForEach. Previous implementation batched work in groups ofMaxDegreeOfParallelismand waited for each group to fully complete before starting the next — a slow action in one batch would stall all cores instead of yielding to the next batch. Both types now use a semaphore-gated cap so all work starts immediately and the cap gates execution. - Cancellation is honored cleanly in both types —
OperationCanceledExceptionpropagates without being wrapped in the respective aggregate exception types.
Compatibility
- Source-breaking for
ParallelPipelinedirect consumers:Actionsproperty removed in favor ofPipelines;IPipelineinterface no longer implemented. Migration: use the fluentAddPipelineoverloads or assignPipelines = new IPipeline[] { ... }. - Silent correctness fix for
ParallelForEachconsumers: API surface unchanged (Actions,IPipeline,PayloadCollectioninput all preserved). Behavior changes from batched to true concurrency — slow elements no longer block other work.
Fixes
QuandlHelpersJSON deserialization: replaced Newtonsoft[JsonProperty]attributes with[JsonPropertyName]to match Flurl 4'sSystem.Text.Jsondefault serializer. Added explicit property-name mappings on all deserialized types for consistency withQuandlDataDownloader. Previously-ignored attributes now correctly bind snake_case JSON fields.
Features
- Added
OnResult: IPipelineproperty onIPipelineActionfor side-channel follow-up pipelines (telemetry, notifications, audit). Fires only on genuine action success; never onErrorHandler-recovered values. Returns the action's original result;OnResultpipeline output is discarded. OnResultpipelines run with a shallow-isolated context (see README section for details).
Compatibility
- Behaviorally additive for actions deriving from
PipelineAction— null default means no runtime behavior change. - Source-breaking for consumers that implement
IPipelineActiondirectly — the interface gains a new requiredOnResultproperty. In practice all in-repo implementors derive fromPipelineAction; external consumers who wrote customIPipelineActionimplementations must add the property.
Features
- Added per-
IPipelineActionresilience viaIErrorHandler. Built-ins:RetryHandler(Polly v8-backed),ContinueHandler. Default null = current behavior (zero migration). - Added
CancellationTokentoIExecutionContext(non-breaking; defaults toCancellationToken.None). - Added
ErrorPayloadfor fallback recovery pipelines.
Breaking
Features
- Added per-
IPipelineActionresilience viaIErrorHandler. Built-ins:RetryHandler(Polly v8-backed),ContinueHandler. Default null = current behavior (zero migration). - Added
CancellationTokentoIExecutionContext(non-breaking; defaults toCancellationToken.None). - Added
ErrorPayloadfor fallback recovery pipelines.
Breaking
Flownet.Data.SqlServerandFlownet.Data.Postgresnow useMicrosoft.Data.SqlClient(5.2.2) instead ofSystem.Data.SqlClient.Microsoft.Data.SqlClientdefaultsEncrypt=true— connection strings to non-TLS SQL Server instances may needEncrypt=falseorTrustServerCertificate=true.- Removed unused
ErrorResulttype (was dead code — protected constructor, unusable externally).
Dependencies
Flownetnow depends onPolly.Core8.6.6.
MIT