Skip to content

Assign specific worker URLs to specific tasks #378

@gabotechs

Description

@gabotechs

Today, worker URL assignation is random, and users have no say over which URL is assigned to each worker.

How it works today

The process since there's a valid single-node plan until there's a distributed plan with worker URLs assigned is the following:

  1. Users tell the distributed planner what is an appropriate amount of tasks to execute each node (typically only done for leaf nodes)
          ┌──────────────────┐          
          │CoalescePartitions│          
          └──────────────────┘          
          ┌──────────────────┐          
          │ Aggregate(final) │          
          └──────────────────┘          
          ┌──────────────────┐          
          │   Repartition    │          
          └──────────────────┘          
          ┌──────────────────┐          
          │Aggregate(partial)│          
          └──────────────────┘          
          ┌──────────────────┐          
          │      Filter      │          
          └──────────────────┘          
          ┌──────────────────┐          
          │      Union       │          
          └──────────────────┘          
┌──────────────────┐┌──────────────────┐
│   DataSource1    ││   DataSource2    │
└──────────────────┘└──────────────────┘
    TaskEstimator:       TaskEstimator: 
    I want 2 tasks       I want 3 tasks 
  1. The distributed planner performs some task count reconciliation in each stage, as there might be multiple nodes contributing an estimated task count for the same stage:
                                            
                   ...                      
┌ ─ ─ ─ ─ ─                     ─Same stage 
           ┌──────────────────┐            │
│          │   Repartition    │ 3 tasks     
           └──────────────────┘            │
│          ┌──────────────────┐             
           │Aggregate(partial)│ 3 tasks    │
│          └──────────────────┘             
           ┌──────────────────┐            │
│          │      Filter      │ 3 tasks     
           └──────────────────┘            │
│          ┌──────────────────┐             
           │      Union       │ 3 tasks    │
│          └──────────────────┘             
 ┌──────────────────┐┌──────────────────┐  │
││   DataSource1    ││   DataSource2    │   
 └──────────────────┘└──────────────────┘  │
│       3 tasks            3 tasks          
 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
  1. Network boundaries with specific input task counts are injected, but with empty slots for worker URLs
                               ┏━━━━━━━━━━━━━━━━━━┓                                   
                               ┃ DistributedExec  ┃                                   
                               ┗━━━━━━━━━━━━━━━━━━┛                                   
                               ┌──────────────────┐                                   
                               │CoalescePartitions│                                   
                               └──────────────────┘                                   
                               ┏━━━━━━━━━━━━━━━━━━┓                                   
                               ┃ NetworkCoalesce  ┃                                   
                               ┗━━━━━━━━━━━━━━━━━━┛                                   
                                                                                      
                   ┌ ─ ─ ─ ─ ─ ─URL: None  ┌ ─ ─ ─ ─ ─ URL: None                      
                    ┌──────────────────┐│   ┌──────────────────┐│                     
                   ││ Aggregate(final) │   ││ Aggregate(final) │                      
                    └──────────────────┘│   └──────────────────┘│                     
                   │┏━━━━━━━━━━━━━━━━━━┓   │┏━━━━━━━━━━━━━━━━━━┓                      
                    ┃  NetworkShuffle  ┃│   ┃  NetworkShuffle  ┃│                     
                   │┗━━━━━━━━━━━━━━━━━━┛   │┗━━━━━━━━━━━━━━━━━━┛                      
                    ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                     
                                                                                      
┌ ─ ─ ─ ─ ─ ─ ─ ─ URL: None  ┌ ─ ─ ─ ─ ─ ─ ─ ─ URL: None  ┌ ─ ─ ─ ─ ─ ─ ─ ─ URL: None 
    ┌──────────────────┐   │     ┌──────────────────┐   │     ┌──────────────────┐   │
│   │   Repartition    │     │   │   Repartition    │     │   │   Repartition    │    
    └──────────────────┘   │     └──────────────────┘   │     └──────────────────┘   │
│   ┌──────────────────┐     │   ┌──────────────────┐     │   ┌──────────────────┐    
    │Aggregate(partial)│   │     │Aggregate(partial)│   │     │Aggregate(partial)│   │
│   └──────────────────┘     │   └──────────────────┘     │   └──────────────────┘    
    ┌──────────────────┐   │     ┌──────────────────┐   │     ┌──────────────────┐   │
│   │      Filter      │     │   │      Filter      │     │   │      Filter      │    
    └──────────────────┘   │     └──────────────────┘   │     └──────────────────┘   │
│   ┌──────────────────┐     │   ┌──────────────────┐     │   ┌──────────────────┐    
    │      Union       │   │     │      Union       │   │     │      Union       │   │
│   └──────────────────┘     │   └──────────────────┘     │   └──────────────────┘    
 ┌───────────┐┌───────────┐│  ┌───────────┐┌───────────┐│  ┌───────────┐┌───────────┐│
││ModDataSrc1││ModDataSrc2│  ││ModDataSrc1││ModDataSrc2│  ││ModDataSrc1││ModDataSrc2│ 
 └───────────┘└───────────┘│  └───────────┘└───────────┘│  └───────────┘└───────────┘│
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ 
  1. The DistributedExec head node lazily modifies the nodes below, assigning worker URLs to the different tasks right before execution:
                               ┏━━━━━━━━━━━━━━━━━━┓                                   
                               ┃ DistributedExec  ┃                                   
                               ┗━━━━━━━━━━━━━━━━━━┛                                   
                               ┌──────────────────┐                                   
                               │CoalescePartitions│                                   
                               └──────────────────┘                                   
                               ┏━━━━━━━━━━━━━━━━━━┓                                   
                               ┃ NetworkCoalesce  ┃                                   
                               ┗━━━━━━━━━━━━━━━━━━┛                                   
                                                                                      
                   ┌ ─ ─ ─ URL: http://4   ┌ ─ ─ ─ URL: http://5                      
                    ┌──────────────────┐│   ┌──────────────────┐│                     
                   ││ Aggregate(final) │   ││ Aggregate(final) │                      
                    └──────────────────┘│   └──────────────────┘│                     
                   │┏━━━━━━━━━━━━━━━━━━┓   │┏━━━━━━━━━━━━━━━━━━┓                      
                    ┃  NetworkShuffle  ┃│   ┃  NetworkShuffle  ┃│                     
                   │┗━━━━━━━━━━━━━━━━━━┛   │┗━━━━━━━━━━━━━━━━━━┛                      
                    ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                     
                                                                                      
┌ ─ ─ ─ ─ ─ ─ URL: http://1  ┌ ─ ─ ─ ─ ─ ─ URL: http://2  ┌ ─ ─ ─ ─ ─ ─ URL: http://3 
    ┌──────────────────┐   │     ┌──────────────────┐   │     ┌──────────────────┐   │
│   │   Repartition    │     │   │   Repartition    │     │   │   Repartition    │    
    └──────────────────┘   │     └──────────────────┘   │     └──────────────────┘   │
│   ┌──────────────────┐     │   ┌──────────────────┐     │   ┌──────────────────┐    
    │Aggregate(partial)│   │     │Aggregate(partial)│   │     │Aggregate(partial)│   │
│   └──────────────────┘     │   └──────────────────┘     │   └──────────────────┘    
    ┌──────────────────┐   │     ┌──────────────────┐   │     ┌──────────────────┐   │
│   │      Filter      │     │   │      Filter      │     │   │      Filter      │    
    └──────────────────┘   │     └──────────────────┘   │     └──────────────────┘   │
│   ┌──────────────────┐     │   ┌──────────────────┐     │   ┌──────────────────┐    
    │      Union       │   │     │      Union       │   │     │      Union       │   │
│   └──────────────────┘     │   └──────────────────┘     │   └──────────────────┘    
 ┌───────────┐┌───────────┐│  ┌───────────┐┌───────────┐│  ┌───────────┐┌───────────┐│
││ModDataSrc1││ModDataSrc2│  ││ModDataSrc1││ModDataSrc2│  ││ModDataSrc1││ModDataSrc2│ 
 └───────────┘└───────────┘│  └───────────┘└───────────┘│  └───────────┘└───────────┘│
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ 

How it could work in the future

That last step is random, but at the moment it happens, there's a lot of information available for the users to decide what's an appropriate worker URL for executing that task:

  • Task index and task count (what the project calls DistributedTaskContext)
  • The subplan that is assigned to that task
  • The overall query plan

This should be sufficient information for users to assign themselves a specific worker URL to each task.

Note that #374 is also proposing an extension to the API for allowing providing more things rather than just a number (the task count estimation), so any future evolution of this API will need to take into account the two efforts:

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions