Skip to content

fhalde/fauxspark

Repository files navigation

FauxSpark

A discrete event simulation of Apache Spark, built with SimPy.

The implementation reflects my understanding of Apache Spark internals. Contributions, feedback, and discussions are welcome!

Purpose

If you're running Apache Spark at large scale, experimentation can be costly and sometimes impractical. While data analysis can offer insights, I found simulations to be more approachable in understanding system behavior. Surprisingly, they work just fine!

This simulator intends to fill that gap by allowing users to experiment and observe Apache Spark's runtime characteristics such as performance and reliability for different job schedules, cluster configurations, and failure modes.

Like any simulator, the numbers produced here are approximate & may differ from real-world behavior, and are only as accurate as the model. The plan of course is to make the model better πŸ˜€

A walkthrough demonstrating how to use this tool is provided below.

Getting Started

git clone https://github.com/fhalde/fauxspark
cd fauxspark
uv sync
uv run sim -f examples/simple/dag.json

uv run sim -f examples/shuffle/dag.json -a true -d 2 --sf 0,7 1,13 -e 2 -c 2
# 1. auto-replace (-a) executor on failure with a delay (-d) of 2 seconds
# 2. simulate a failure (--sf) for executor 0 at t=7 and executor 1 at t=13
# 3. bootstrap the cluster with 2 executors (-e) and 2 cores (-c) each

Help

options:
  -h, --help            show this help message and exit
  -e EXECUTORS, --executors EXECUTORS
                        Set the number of executors to use (default: 1).
  -c CORES, --cores CORES
                        Specify how many cores each executor will have (default: 1).
  -f FILE, --file FILE  Path to DAG JSON file
  --sf SF [SF ...]      Specify list of failure events as pairs of (executor_id,time) to simulate executor failures.
  --sff SFF [SFF ...]   Inject shuffle fetch failures as dep,map,reduce,time[,count].
  --sa SA [SA ...]      Specify times (t) at which autoscaling (adding a new executor) should take place.
  -a AUTO_REPLACE, --auto-replace AUTO_REPLACE
                        Turn on/off auto-replacement of executors on failure.
  -d AUTO_REPLACE_DELAY, --auto-replace-delay AUTO_REPLACE_DELAY
                        Set the delay (in seconds) it takes to replace an executor on failure.
  --quiet               Suppress simulation logs and final report output.

βœ… Current Features

FauxSpark currently implements a simplified model of Apache Spark, which includes:

  • DAG scheduling with stages, tasks, and dependencies
  • Automatic retries on executor or shuffle-fetch failures
  • Targeted shuffle fetch-failure injection (--sff)
  • Driver-side shuffle metadata tracking (map output locations)
  • AZ-aware shuffle network simulation (local / intra-AZ / inter-AZ)
  • Single-job execution with configurable cluster parameters
  • Simple CLI to tweak cluster size, simulate failures, and scaling up executors

Network/AZ knobs

You can model cross-AZ cost/latency during shuffle fetches:

uv run sim -f examples/shuffle/dag.json \
  -e 4 -c 2 \
  --az-count 2 \
  --network-bandwidth-mb-s 48 \
  --intra-az-latency-ms 1 \
  --inter-az-latency-ms 12

The report now includes:

  • shuffle_local_mb
  • shuffle_intra_az_mb
  • shuffle_inter_az_mb
  • shuffle_intra_az_time_s
  • shuffle_inter_az_time_s
  • fetch_failures_total
  • fetch_failures_injected
  • stage_metrics (attempts / retries / fetch failures per stage)

Injecting shuffle fetch failures

Inject one or more targeted fetch failures for a specific shuffle block:

uv run sim -f examples/shuffle/dag.json -e 2 -c 2 \
  --sff 0,3,4,0.05 0,3,4,0.10,2

Format:

  • dep,map,reduce,time inject once
  • dep,map,reduce,time,count inject count times

πŸš€ Future Ideas

Planned enhancements:

  • Speculative Task Execution
  • Caching in Spark
  • Support for multiple concurrent jobs & fair resource sharing
  • Modeling different cluster topologies (e.g., for inter-AZ traffic and cost)
  • Enhanced reporting
  • Accepting RDD graphs / SparkPlans as input

Some stretch goals:

  • Modeling Network & Disk IO (e.g., network bandwidth to observe changes in shuffle performance, spills)
  • Adaptive Query Execution (AQE) behavior

Walkthrough

Consider a straightforward SQL query.

SELECT * FROM foo;

which could be represented using this DAG.

[
  {
    "id": 0,
    "deps": [],
    "status": "pending",
    "input": {
      "size": "1024 MB",
      "partitions": 10,
      "distribution": {
        "kind": "uniform"
      }
    }
    "throughput": "102.4 MB",
    "tasks": []
  }
]

This is a single stage query (no shuffle) reading an input of 1024 MB uniformly distributed across 10 partitions. Let's assume for now a single core can process at a rate of 102.4 MB/s.

Let's run the simulation:

(fauxspark) ➜  fauxspark git:(main) uv run sim -f examples/simple/dag.json # 1 executor, 1 core (default parameters)
00:00:10: [main        ] job completed successfully
00:00:10: [report      ] {"utilization": 1.0, "runtime": 10.0}

Since the simulator is currently idealized (no network/scheduling delays etc.,) the utilization is 1.0 (100%).

A few more runs:

# double the cores
(fauxspark) ➜  fauxspark git:(main) uv run sim -f examples/simple/dag.json -c 2 # 1 executor, 2 core
00:00:05: [main        ] job completed successfully
00:00:05: [report      ] {"utilization": 1.0, "runtime": 5.0}
# and again
(fauxspark) ➜  fauxspark git:(main) uv run sim -f examples/simple/dag.json -c 4 # 1 executor, 4 core
00:00:03: [main        ] job completed successfully
00:00:03: [report      ] {"utilization": 0.8333333333333334, "runtime": 3.0}

Two observations:

  1. The execution time didn't shrink by half unlike before (5.0 ➜ 3.0)
  2. The utilization dropped by ~16%

Observation #1 is not surprising. The total number of tasks wasn't divisible by the number of cores. Looking at the schedule step by step:

  • First batch: 4 tasks run in 1s
  • Second batch: 4 tasks run in 1s
  • Final batch: 2 tasks run in 1s

To understand utilization, we first need to define it. In the simulator, utilization is defined as:

Ξ£ task.runtime / Ξ£ executor.uptime * executor.cores

In our example, the last batch kept 2 cores idle hence the drop in utilization.


The example above didn't really justify a simulator – but neither was the example either!

In practice, jobs:

  • process non-uniformly distributed data – often skewed,
  • are often quite complex,
  • may encounter failures during execution.

Analyzing such situations & planning for it can quickly become challenging. For example, let's consider skews.

"input": {
  "size": "1024 MB",
  "partitions": 10,
  "distribution": {
    "kind": "pareto",
    "alpha": 1.5
  }
}

This will split our 1024 MB input into 10 randomly sized partitions that follow a Pareto distribution, resulting in a heavily skewed dataset. Far more realistic!

Screenshot 2025-10-08 at 18 50 36

Running the sim:

(fauxspark) ➜  fauxspark git:(main) βœ— uv run sim -f examples/simple/dag.json -c 5 # 1 executor, 5 core
00:00:00: [executor-0  ] [0-0] input bytes=2.63 MB
00:00:00: [executor-0  ] [0-1] input bytes=18.5 MB
00:00:00: [executor-0  ] [0-2] input bytes=17.51 MB
00:00:00: [executor-0  ] [0-3] input bytes=81.51 MB
00:00:00: [executor-0  ] [0-4] input bytes=504.2 MB
00:00:00: [executor-0  ] [0-5] input bytes=11.3 MB
00:00:00: [executor-0  ] [0-6] input bytes=3.91 MB
00:00:00: [executor-0  ] [0-7] input bytes=310.48 MB
00:00:00: [executor-0  ] [0-8] input bytes=45.93 MB
00:00:00: [executor-0  ] [0-9] input bytes=28.02 MB

It's clear that some tasks are handling more data than others. Running the sim several times, it's not clear how to interpret the numbers.

(fauxspark) ➜  fauxspark git:(main) βœ— uv run sim -f examples/simple/dag.json -c 5 # 1 executor, 5 core
00:00:03: [main        ] job completed successfully
00:00:03: [report      ] {"utilization": 0.631879572897418, "runtime": 3.165160080787559}

(fauxspark) ➜  fauxspark git:(main) βœ— uv run sim -f examples/simple/dag.json -c 5 # 1 executor, 5 core
00:00:04: [main        ] job completed successfully
00:00:04: [report      ] {"utilization": 0.42678031467385535, "runtime": 4.686251758187104}

(fauxspark) ➜  fauxspark git:(main) βœ— uv run sim -f examples/simple/dag.json -c 5 # 1 executor, 5 core
00:00:02: [main        ] job completed successfully
00:00:02: [report      ] {"utilization": 0.7940439222567488, "runtime": 2.5187523560608693}

Since our simulation is now stochastic (using random inputs), the outputs will constantly vary. You can't rely on any single run to draw conclusions. In simulations, it's common to run thousands of simulations to form a statistically significant result.

Suppose we anticipate that our dataset will skew over the coming months, and your team wants to plan capacity to minimize wasted cost (1 βˆ’ utilization) while maintaining a target SLA. A simple way to approach this is using a optimizer function such as this this.

This function takes two inputs

  • utilization
  • runtime

and performs 10k simulations for each cluster configuration and filters the ones where the p90 of 10k sim runtimes & waste (1-utilization) was below the target SLA and wasted budget. I chose p90 arbitrarily for this example.

Let's be ambitious

>>> m.optimizer(waste=0, runtime=1)
Status Cores Waste (p90) Runtime (p90)
πŸ‘Ž 1 0.0000 10.0000
πŸ‘Ž 2 0.3537 7.7369
πŸ‘Ž 3 0.5431 7.2947
πŸ‘Ž 4 0.6494 7.1297
πŸ‘Ž 5 0.7160 7.0411
πŸ‘Ž 6 0.7624 7.0133
πŸ‘Ž 7 0.7958 6.9945
πŸ‘Ž 8 0.8210 6.9833
πŸ‘Ž 9 0.8408 6.9802
πŸ‘Ž 10 0.8567 6.9763

pretty printed markdown table from console logs

It's apparent that under skewed conditions, waste increases quickly. We might have to sacrifice some $$ for the projected skew or simply mitigate skew altogether.

>>> m.optimizer(waste=0.3, runtime=8)
Status Cores Waste (p90) Runtime (p90)
πŸ‘Ž 1 0.0000 10.0000
πŸ‘Ž 2 0.3512 7.7062
πŸ‘Ž 3 0.5432 7.2976
πŸ‘Ž 4 0.6503 7.1495
πŸ‘Ž 5 0.7178 7.0861
πŸ‘Ž 6 0.7634 7.0439
πŸ‘Ž 7 0.7964 7.0179
πŸ‘Ž 8 0.8215 7.0039
πŸ‘Ž 9 0.8411 6.9929
πŸ‘Ž 10 0.8569 6.9882
>>> m.optimizer(waste=0.6, runtime=8)
Status Cores Waste (p90) Runtime (p90)
πŸ‘Ž 1 0.0000 10.0000
βœ… 2 0.3456 7.6400
βœ… 3 0.5377 7.2109
πŸ‘Ž 4 0.6457 7.0567
πŸ‘Ž 5 0.7131 6.9703
πŸ‘Ž 6 0.7597 6.9355
πŸ‘Ž 7 0.7933 6.9127
πŸ‘Ž 8 0.8189 6.9015
πŸ‘Ž 9 0.8387 6.8890
πŸ‘Ž 10 0.8548 6.8890

Finally! According to the simulation results, both the 2 core and 3 core configurations meet the targets, with the 2 core setup being optimal – achieving 35% waste and a runtime of 7.7 seconds.

By the way, did you notice that even with all the randomness in our simulations, the percentiles still converged?

Randomness is seemingly chaotic, yet inherently consistent

About

A discrete event simulation of Apache Spark, built with SimPy.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages