Issue #401: Distributed Async Evaluation — Support W=512+ Multi-Worker Scaling
Summary
The wirelog TDD (Timely-Differential Dataflow) multi-worker evaluation pipeline exhibits super-linear slowdown beyond W=4: running with W=8 workers is 8% slower than W=4 (28.3s vs 26.1s), despite doubling the worker count. This violates the expected near-linear scaling behavior and blocks any path to high-parallelism deployment.
The root cause is a combination of barrier-based synchronous coordination, O(W) sequential coordinator exchange, and O(W²) exchange buffer allocation that together create an Amdahl's Law ceiling at ~W=4. The current architecture's serial fraction (estimated 5–15%) makes scaling beyond W=8 physically impossible without fundamental changes.
This umbrella issue tracks the design and implementation of a distributed async evaluation architecture that replaces barrier synchronization with epoch-based convergence, serial exchange with tree-structured parallel merge, and sequential coordination with pipelined compute/exchange overlap. The target is near-linear scaling to W=512 and W=1024.
Root Cause Analysis
Observed Performance Data
| Workers |
Time (s) |
Speedup vs W=1 |
Speedup vs W=4 |
Status |
| W=1 |
752.4 |
1.0× |
— |
Baseline (sequential path) |
| W=4 |
26.1 |
28.8× |
1.0× |
Working (post-#404 fixes) |
| W=8 |
28.3 |
26.6× |
0.92× |
REGRESSING |
Note: The W=1 → W=4 gap (28.8×) far exceeds the 4× expected from parallelism alone. This is because the TDD/BDX multi-worker path uses a fundamentally more efficient incremental evaluation strategy (delta-only BDX) than the W=1 sequential path (col_eval_stratum). The W=8 vs W=4 regression is overhead layered on top of this faster algorithm.
Three Bottlenecks
| Bottleneck |
Current |
At W=512 |
Code Location |
| Barrier synchronization |
wl_workqueue_wait_all() blocks until slowest worker finishes |
Straggler latency × 512 iterations |
workqueue.c:200-220 |
| O(W²) exchange buffers |
exchange_bufs[W][W] allocates W² col_rel_t* slots |
262,144 slots (512²) |
eval.c:2425-2446 |
| O(W) sequential exchange |
tdd_bdx_exchange_deltas runs single-threaded on coordinator |
512 serial delta unions per sub-pass |
eval.c:3581-3767 |
Amdahl's Law Analysis
T(W) = T_seq + T_par/W + T_sync(W) + T_overhead(W)
Current serial fraction (estimated): 5-15%
- At f=10%: max theoretical speedup = 1/f = 10× (regardless of W)
- At W=512 with f=10%: T(512) ≈ T_seq (parallel portion negligible)
Target serial fraction: <2%
- At f=2%: max theoretical speedup = 50×
- Enables meaningful scaling to W=512+
Additional Contributing Factors
- O(N log N) coordinator IDB re-sort per iteration (
eval.c:3661-3666, flagged as TODO(#390)): grows unboundedly as VarPointsTo accumulates millions of tuples
- Workqueue ring capacity:
WL_WQ_RING_CAP = 256 (workqueue.c:30) — physically cannot enqueue W=512 work items
- Single mutex + thundering herd:
cond_broadcast wakes ALL W workers simultaneously, serializing on one mutex (workqueue.c:78-108)
- EDB replication: All W workers read identical large EDB tables (e.g.,
Var_Type: 947k rows), multiplying memory bandwidth pressure with no parallelism benefit
- Memory budget over-partitioning:
budget = total / (W+1) (session.c:546) — at W=8, each party gets only 8.3% of RAM, triggering premature backpressure
Solution Architecture
Current vs Proposed
CURRENT (Barrier-Based Synchronous):
for each iteration:
for each sub-pass:
┌─────────────────────────────────────────┐
│ 1. RESET worker contexts O(W×nrels) │
│ 2. DISPATCH: submit W tasks O(W) mutex │
│ 3. BARRIER: wait_all() BLOCKING │
│ 4. CONVERGENCE CHECK O(W) scan │
│ 5. EXCHANGE: serial merge O(W×D) │
└─────────────────────────────────────────┘
PROPOSED (Distributed Async):
Epoch-based execution:
┌──────────────────────────────────────────────────────┐
│ Workers compute continuously (no barrier) │
│ → push delta to lock-free SPSC queue (per-worker) │
│ → atomically increment epoch counter │
│ │
│ Merge tree reduces deltas: O(log₂W) depth │
│ → parallel binary merge at each level │
│ → zero-copy shared views between levels │
│ │
│ Coordinator distributes merged delta (pipelined) │
│ → workers start next epoch while merge proceeds │
│ │
│ Convergence: atomic epoch counter (no global lock) │
└──────────────────────────────────────────────────────┘
Key Architectural Changes
| Component |
Current |
Proposed |
Complexity Change |
| Synchronization |
pthread_barrier / wl_workqueue_wait_all |
Atomic epoch counter |
O(W) → O(1) amortized |
| Delta exchange |
Sequential coordinator merge |
Tree-structured parallel merge |
O(W) depth → O(log₂W) depth |
| Worker communication |
Single mutex + cond_broadcast |
Lock-free SPSC ring buffers (per-worker) |
~50-100ns → ~10-20ns per op |
| Exchange buffers |
exchange_bufs[W][W] O(W²) |
Per-worker delta slots O(W) |
W²→W memory |
| Coordinator IDB sort |
O(N log N) re-sort per iteration |
O(N+D) sorted merge-append |
Eliminates redundant work |
| Compute/exchange |
Sequential (compute → wait → exchange) |
Pipelined (overlap compute and merge) |
Latency hiding |
Lock-Free Protocol (High-Level)
Per-worker SPSC ring buffer (implemented: wirelog/util/lockfree_queue.h):
- Enqueue: load head(acquire), write slot, store tail(release)
- Dequeue: load tail(acquire), read slot, store head(release)
- No CAS, no mutex — 2 atomic ops per operation
- ABA-free: monotone integer cursors, no pointer comparison
MPMC wrapper: W independent SPSC queues, coordinator polls round-robin.
- Zero contention between producers (each owns a queue)
- Coordinator dequeue_all: O(W × N) scan
Epoch convergence:
- Workers: atomic_fetch_add(&epoch.workers_done, 1)
- Last worker (prev+1 == W): check any_new_at_epoch flag
- If no new tuples: set converged = true (no barrier needed)
Implementation Roadmap
Phase 0: Compatibility & Foundation (3 atomic commits)
Goal: Remove scaling walls without changing the synchronization model. All changes backward-compatible with W=1 fast path.
Commit 1: Extend WL_WQ_RING_CAP
- File:
wirelog/workqueue.c:30
- Change:
WL_WQ_RING_CAP from 256 → 1024 (or dynamic allocation)
- Why: Current cap physically prevents W>256 from enqueuing work items
- Test: Verify W=512 work items can be submitted without overflow
Commit 2: Sorted merge-insert optimization (TODO #390)
- File:
wirelog/columnar/eval.c:3661-3666
- Change: Replace O(N log N)
tdd_dedup_rel(coord_idb) with O(N+D) merge-append that maintains sorted order incrementally
- Why: Eliminates the single largest per-iteration coordinator cost
- Test: DOOP convergence correctness at W=4, verify iteration count unchanged
Commit 3: Cache-line padding for worker state
- Files:
wirelog/columnar/eval.c (worker context struct), wirelog/workqueue.c (work item struct)
- Change: Add
__attribute__((aligned(64))) or manual padding to prevent false sharing between worker-local state and coordinator state
- Why: At W=8+, false sharing on adjacent cache lines causes coherency traffic spikes
- Test: perf stat cache-miss comparison W=4 vs W=8
Phase 1: Async Delta Slot + Epoch Convergence
Goal: Remove barrier synchronization. Workers submit deltas asynchronously; coordinator detects convergence via atomic epoch counters.
- Integrate lock-free MPMC queue (
wirelog/util/lockfree_queue.h, already implemented)
- Replace
wl_workqueue_wait_all() barrier with epoch-counter convergence detection
- Workers push delta to per-worker SPSC queue after each sub-pass
- Coordinator polls queues and merges deltas (still serial at this phase)
- Validation: DOOP W=4 correctness (same tuple count, same iteration count)
- Target: Eliminate barrier idle time; W=8 should match or beat W=4
Phase 2: Tree-Structured Parallel Merge
Goal: Replace O(W) serial delta merge with O(log₂W) parallel tree reduction.
Level 0 (leaves): W₀ W₁ W₂ W₃ W₄ W₅ W₆ W₇ ... W₅₁₁
╲ ╱ ╲ ╱ ╲ ╱ ╲ ╱ ╲ ╱
Level 1 (W/2): M₀₁ M₂₃ M₄₅ M₆₇ ... M₅₁₀₋₅₁₁
╲ ╱ ╲ ╱
Level 2 (W/4): M₀₋₃ M₄₋₇ ...
...
Level 9 (root): M₀₋₅₁₁ ← final merged delta
- Each level's merges are independent → parallel execution using workqueue
- Dedup at each level prevents exponential intermediate growth
- W=512: 9 levels, 511 merge operations (vs 512 serial)
- Validation: W=16 scaling (expect near-linear improvement over W=8)
Phase 3: Pipelined Compute/Exchange Overlap
Goal: Workers start next epoch while coordinator merges current epoch's deltas.
- Async double-buffer: workers write to epoch
e slot while coordinator merges epoch e-1
- Speculative execution: workers proceed with stale delta (safe under monotonicity — stale delta ⊆ fresh delta)
- Validation: W=64 scaling, measure compute/exchange overlap ratio
- Semi-naive correctness invariant: Speculative execution with stale deltas may produce redundant tuples (re-derived from previous iterations), but never incorrect tuples. Dedup at merge eliminates duplicates. Monotonicity of Datalog ensures
old_delta ⊆ new_delta, so workers using stale deltas compute a superset that converges to the same fixpoint.
Phase 4: Long-term Optimizations
Goal: Production-grade scaling to W=512+.
- Partitioned IDB (if needed): Hash-partition large IDB tables across workers to reduce memory replication
- NUMA-aware scheduling: Pin worker threads to NUMA nodes; allocate worker memory from local node
- EDB partitioning for BDX mode: Hash-partition large EDB tables (e.g.,
Var_Type 947k rows) by EXCHANGE key instead of full replication
- Cooperative scheduling: Worker-driven coordination to replace coordinator bottleneck
Deliverables Checklist
Performance Targets
| Phase |
W |
Target Time |
Speedup vs W=1 |
Speedup vs Current W=4 |
| Baseline |
1 |
752s |
1× |
— |
| Baseline |
4 |
26.1s |
28.8× |
1.0× |
| Current |
8 |
28.3s |
26.6× |
0.92× (regression) |
| Phase 1 |
8 |
~26s |
28.9× |
1.0× |
| Phase 2 |
32 |
~7s |
107× |
3.7× |
| Phase 3 |
128 |
~2s |
376× |
13× |
| Phase 4 |
512 |
<1s |
752×+ |
26×+ |
Note: W=1→W=4 speedup (28.8×) includes algorithmic improvement from BDX incremental evaluation, not just parallelism. Targets for W>8 assume the parallel merge tree and pipelining unlock true parallelism on top of the BDX algorithmic advantage.
Technical Notes
Semi-Naive Correctness Under Async Execution
Speculative execution with stale deltas preserves semi-naive correctness:
- Monotonicity: Datalog is monotone —
stale_delta ⊆ fresh_delta (adding facts never removes derived facts)
- Redundant but never incorrect: Workers using stale deltas may re-derive tuples from previous iterations, but dedup at merge eliminates duplicates
- Convergence: The fixpoint is unique regardless of evaluation order; async execution changes the path but not the destination
- Formal invariant: At every epoch boundary, the union of all worker-produced tuples is a subset of the minimal model
Thread Safety
- Lock-free synchronization via atomic epoch counter (TSan validated for SPSC queue)
- Per-worker SPSC queues eliminate cross-worker contention entirely
- Coordinator is sole consumer — no multi-consumer race conditions
- Memory ordering: acquire/release semantics on cursor updates; relaxed for diagnostic counters
Backward Compatibility
- W=1 fast path fully preserved: All async machinery is bypassed when
num_workers == 1
- Existing API unchanged:
wl_session_snapshot() remains the public entry point
- Graceful degradation: If lock-free queue is unavailable (build config), falls back to current barrier-based path
Dependencies & Blockers
Related Issues
Discussion Points
-
W=512 realism: Is W=512 a realistic deployment target, or should we cap at W=256? The tree merge overhead at W=512 (9 levels) vs W=256 (8 levels) is marginal, but memory bandwidth may become the true bottleneck before W=512.
-
Partitioned IDB timing: Should hash-partitioned IDB (Phase 4) be pulled into Phase 2 to improve memory locality earlier? Trade-off: more complex merge logic vs better per-worker cache utilization.
-
Cooperative scheduling: Should Phase 3 explore worker-driven coordination (workers decide when to proceed) vs coordinator-driven (coordinator signals epoch transitions)? Cooperative scheduling eliminates the coordinator as a bottleneck but complicates convergence detection.
-
EDB partitioning for DOOP: DOOP's large EDB tables (Var_Type 947k rows, Var_DeclaringMethod 947k rows) are fully replicated to all workers. Hash-partitioning EDB by EXCHANGE key would reduce memory bandwidth at W=8+, but requires changes to tdd_init_workers_hybrid (eval.c:3199-3228). Is this worth the complexity?
-
Speculative execution safety: The monotonicity argument for stale-delta correctness assumes pure Datalog (no negation across strata). Should we add a runtime assertion that speculative execution is only enabled for non-negated strata?
Issue #401: Distributed Async Evaluation — Support W=512+ Multi-Worker Scaling
Summary
The wirelog TDD (Timely-Differential Dataflow) multi-worker evaluation pipeline exhibits super-linear slowdown beyond W=4: running with W=8 workers is 8% slower than W=4 (28.3s vs 26.1s), despite doubling the worker count. This violates the expected near-linear scaling behavior and blocks any path to high-parallelism deployment.
The root cause is a combination of barrier-based synchronous coordination, O(W) sequential coordinator exchange, and O(W²) exchange buffer allocation that together create an Amdahl's Law ceiling at ~W=4. The current architecture's serial fraction (estimated 5–15%) makes scaling beyond W=8 physically impossible without fundamental changes.
This umbrella issue tracks the design and implementation of a distributed async evaluation architecture that replaces barrier synchronization with epoch-based convergence, serial exchange with tree-structured parallel merge, and sequential coordination with pipelined compute/exchange overlap. The target is near-linear scaling to W=512 and W=1024.
Root Cause Analysis
Observed Performance Data
Three Bottlenecks
wl_workqueue_wait_all()blocks until slowest worker finishesworkqueue.c:200-220exchange_bufs[W][W]allocates W²col_rel_t*slotseval.c:2425-2446tdd_bdx_exchange_deltasruns single-threaded on coordinatoreval.c:3581-3767Amdahl's Law Analysis
Additional Contributing Factors
eval.c:3661-3666, flagged asTODO(#390)): grows unboundedly as VarPointsTo accumulates millions of tuplesWL_WQ_RING_CAP = 256(workqueue.c:30) — physically cannot enqueue W=512 work itemscond_broadcastwakes ALL W workers simultaneously, serializing on one mutex (workqueue.c:78-108)Var_Type: 947k rows), multiplying memory bandwidth pressure with no parallelism benefitbudget = total / (W+1)(session.c:546) — at W=8, each party gets only 8.3% of RAM, triggering premature backpressureSolution Architecture
Current vs Proposed
Key Architectural Changes
pthread_barrier/wl_workqueue_wait_allcond_broadcastexchange_bufs[W][W]O(W²)Lock-Free Protocol (High-Level)
Implementation Roadmap
Phase 0: Compatibility & Foundation (3 atomic commits)
Goal: Remove scaling walls without changing the synchronization model. All changes backward-compatible with W=1 fast path.
Commit 1: Extend
WL_WQ_RING_CAPwirelog/workqueue.c:30WL_WQ_RING_CAPfrom 256 → 1024 (or dynamic allocation)Commit 2: Sorted merge-insert optimization (TODO #390)
wirelog/columnar/eval.c:3661-3666tdd_dedup_rel(coord_idb)with O(N+D) merge-append that maintains sorted order incrementallyCommit 3: Cache-line padding for worker state
wirelog/columnar/eval.c(worker context struct),wirelog/workqueue.c(work item struct)__attribute__((aligned(64)))or manual padding to prevent false sharing between worker-local state and coordinator statePhase 1: Async Delta Slot + Epoch Convergence
Goal: Remove barrier synchronization. Workers submit deltas asynchronously; coordinator detects convergence via atomic epoch counters.
wirelog/util/lockfree_queue.h, already implemented)wl_workqueue_wait_all()barrier with epoch-counter convergence detectionPhase 2: Tree-Structured Parallel Merge
Goal: Replace O(W) serial delta merge with O(log₂W) parallel tree reduction.
Phase 3: Pipelined Compute/Exchange Overlap
Goal: Workers start next epoch while coordinator merges current epoch's deltas.
eslot while coordinator merges epoche-1old_delta ⊆ new_delta, so workers using stale deltas compute a superset that converges to the same fixpoint.Phase 4: Long-term Optimizations
Goal: Production-grade scaling to W=512+.
Var_Type947k rows) by EXCHANGE key instead of full replicationDeliverables Checklist
Performance Targets
Technical Notes
Semi-Naive Correctness Under Async Execution
Speculative execution with stale deltas preserves semi-naive correctness:
stale_delta ⊆ fresh_delta(adding facts never removes derived facts)Thread Safety
Backward Compatibility
num_workers == 1wl_session_snapshot()remains the public entry pointDependencies & Blockers
wirelog/util/lockfree_queue.h,wirelog/util/lockfree_queue.c)feature/issue-401-distributed-async-exchange)Related Issues
num_workerscap — ring capacity limitation (addressed in Phase 0, Commit 1)join_output_limitregression — TDD worker join truncation fixes (prerequisite: must be stable before Phase 1)Discussion Points
W=512 realism: Is W=512 a realistic deployment target, or should we cap at W=256? The tree merge overhead at W=512 (9 levels) vs W=256 (8 levels) is marginal, but memory bandwidth may become the true bottleneck before W=512.
Partitioned IDB timing: Should hash-partitioned IDB (Phase 4) be pulled into Phase 2 to improve memory locality earlier? Trade-off: more complex merge logic vs better per-worker cache utilization.
Cooperative scheduling: Should Phase 3 explore worker-driven coordination (workers decide when to proceed) vs coordinator-driven (coordinator signals epoch transitions)? Cooperative scheduling eliminates the coordinator as a bottleneck but complicates convergence detection.
EDB partitioning for DOOP: DOOP's large EDB tables (
Var_Type947k rows,Var_DeclaringMethod947k rows) are fully replicated to all workers. Hash-partitioning EDB by EXCHANGE key would reduce memory bandwidth at W=8+, but requires changes totdd_init_workers_hybrid(eval.c:3199-3228). Is this worth the complexity?Speculative execution safety: The monotonicity argument for stale-delta correctness assumes pure Datalog (no negation across strata). Should we add a runtime assertion that speculative execution is only enabled for non-negated strata?