implement extensibility with NetworkBoundaryStrategy#1
Conversation
| /// Strategy for placing network boundaries in a distributed execution plan. | ||
| /// | ||
| /// When a network boundary is needed (e.g., after hash repartition or before coalesce), | ||
| /// strategies are invoked in order. The first strategy to return annotation with a boundary wins. | ||
| /// | ||
| /// Strategies should return `None` to defer to the next strategy in the chain. | ||
| /// Custom strategies can be registered to override default behavior. | ||
| pub trait NetworkBoundaryStrategy: Debug + Send + Sync { | ||
| /// Annotates a plan node with network boundary metadata. | ||
| /// | ||
| /// Returns `Some(NetworkBoundaryAnnotation)` if this strategy detects a boundary is needed, | ||
| /// or `None` to defer to the next strategy. | ||
| /// | ||
| /// The annotation can optionally include an `output_tasks` hint to override DFD's | ||
| /// default task count calculation. | ||
| fn annotate_network_boundary( | ||
| &self, | ||
| plan: &dyn ExecutionPlan, | ||
| ) -> Option<NetworkBoundaryAnnotation>; | ||
|
|
||
| /// Apply this strategy to place a network boundary. Return `Ok(None)` to defer to next strategy. | ||
| fn apply_boundary( | ||
| &self, | ||
| context: &NetworkBoundaryContext<'_>, | ||
| ) -> Result<Option<Arc<dyn ExecutionPlan>>>; | ||
| } |
There was a problem hiding this comment.
Here is the primary extensibility trait we have introduced to control annotation and plan mutation.
| Self::Shuffle => write!(f, "[NetworkBoundary] Shuffle"), | ||
| Self::Coalesce => write!(f, "[NetworkBoundary] Coalesce"), | ||
| Self::Broadcast => write!(f, "[NetworkBoundary] Broadcast"), | ||
| Self::Extension(name) => write!(f, "[NetworkBoundary] Extension({})", name), // @NetworkBoundaryStrategy: Extension variant |
There was a problem hiding this comment.
This adds and PlanOrNetworkBoundary::Extension that allows for custom annotations that can be used in the distribute_plan phase.
| } | ||
| } | ||
|
|
||
| // @NetworkBoundaryStrategy: strategies last—overwrite annotation when a strategy matches |
There was a problem hiding this comment.
Since this extensibility was designed for a fork, we chose to not implement NetworkBoundaryStrategys for theNetworkShuffleExec, NetworkCoalesceExec, and NetworkBroadcastExec as it was getting very complicated to do cleanly.
| } | ||
| } | ||
|
|
||
| // @NetworkBoundaryStrategy: strategies last—overwrite annotation when a strategy matches |
There was a problem hiding this comment.
This section calls NetworkBoundaryStrategy::annotate_plan and, if matches, overwrites the mutable annotation. This allows a NetworkBoundaryStrategy to be selected instead of another default strategy.
| max_child_task_count, | ||
| cfg, | ||
| ) | ||
| } |
There was a problem hiding this comment.
We use the match on the PlanOrNetworkBoundary::Extension to invoke the NetworkBoundaryStrategy::apply_boundary method.
This adds extensibility to the
DistributedPhysicalOptimizerRulevia a new trait calledNetworkBoundaryStrategy.The
NetworkBoundaryStrategywas developed to provide a minimal set of changes for a fork which plug into the plan annotation and plan distribution phases ofDistributedPhysicalOptimizerRule::optimize. While it is likely not the final design to propose upstream for introducing extensibility into theDistributedPhysicalOptimizerRule, it demonstrates key extensibility features that should be considered.Ultimately users of the library should have a way to express mutations on the
ExecutionPlantree while plugging into the many niceties of datafusion-distributed. This would allow for users with specific use-cases, constraints, and data systems to customize the types of distributed physical plans they produce, while relying on the distributed workers, task estimation, network boundaries, metrics, etc. which datafusion-distributed provides.