Skip to content

Distributed Async Evaluation — Support W=512+ Multi-Worker Scaling #407

@justinjoy

Description

@justinjoy

Issue #401: Distributed Async Evaluation — Support W=512+ Multi-Worker Scaling

Summary

The wirelog TDD (Timely-Differential Dataflow) multi-worker evaluation pipeline exhibits super-linear slowdown beyond W=4: running with W=8 workers is 8% slower than W=4 (28.3s vs 26.1s), despite doubling the worker count. This violates the expected near-linear scaling behavior and blocks any path to high-parallelism deployment.

The root cause is a combination of barrier-based synchronous coordination, O(W) sequential coordinator exchange, and O(W²) exchange buffer allocation that together create an Amdahl's Law ceiling at ~W=4. The current architecture's serial fraction (estimated 5–15%) makes scaling beyond W=8 physically impossible without fundamental changes.

This umbrella issue tracks the design and implementation of a distributed async evaluation architecture that replaces barrier synchronization with epoch-based convergence, serial exchange with tree-structured parallel merge, and sequential coordination with pipelined compute/exchange overlap. The target is near-linear scaling to W=512 and W=1024.


Root Cause Analysis

Observed Performance Data

Workers Time (s) Speedup vs W=1 Speedup vs W=4 Status
W=1 752.4 1.0× Baseline (sequential path)
W=4 26.1 28.8× 1.0× Working (post-#404 fixes)
W=8 28.3 26.6× 0.92× REGRESSING

Note: The W=1 → W=4 gap (28.8×) far exceeds the 4× expected from parallelism alone. This is because the TDD/BDX multi-worker path uses a fundamentally more efficient incremental evaluation strategy (delta-only BDX) than the W=1 sequential path (col_eval_stratum). The W=8 vs W=4 regression is overhead layered on top of this faster algorithm.

Three Bottlenecks

Bottleneck Current At W=512 Code Location
Barrier synchronization wl_workqueue_wait_all() blocks until slowest worker finishes Straggler latency × 512 iterations workqueue.c:200-220
O(W²) exchange buffers exchange_bufs[W][W] allocates W² col_rel_t* slots 262,144 slots (512²) eval.c:2425-2446
O(W) sequential exchange tdd_bdx_exchange_deltas runs single-threaded on coordinator 512 serial delta unions per sub-pass eval.c:3581-3767

Amdahl's Law Analysis

T(W) = T_seq + T_par/W + T_sync(W) + T_overhead(W)

Current serial fraction (estimated): 5-15%
  - At f=10%: max theoretical speedup = 1/f = 10× (regardless of W)
  - At W=512 with f=10%: T(512) ≈ T_seq (parallel portion negligible)

Target serial fraction: <2%
  - At f=2%: max theoretical speedup = 50×
  - Enables meaningful scaling to W=512+

Additional Contributing Factors

  • O(N log N) coordinator IDB re-sort per iteration (eval.c:3661-3666, flagged as TODO(#390)): grows unboundedly as VarPointsTo accumulates millions of tuples
  • Workqueue ring capacity: WL_WQ_RING_CAP = 256 (workqueue.c:30) — physically cannot enqueue W=512 work items
  • Single mutex + thundering herd: cond_broadcast wakes ALL W workers simultaneously, serializing on one mutex (workqueue.c:78-108)
  • EDB replication: All W workers read identical large EDB tables (e.g., Var_Type: 947k rows), multiplying memory bandwidth pressure with no parallelism benefit
  • Memory budget over-partitioning: budget = total / (W+1) (session.c:546) — at W=8, each party gets only 8.3% of RAM, triggering premature backpressure

Solution Architecture

Current vs Proposed

CURRENT (Barrier-Based Synchronous):

  for each iteration:
    for each sub-pass:
      ┌─────────────────────────────────────────┐
      │ 1. RESET worker contexts     O(W×nrels) │
      │ 2. DISPATCH: submit W tasks  O(W) mutex │
      │ 3. BARRIER: wait_all()       BLOCKING   │
      │ 4. CONVERGENCE CHECK         O(W) scan  │
      │ 5. EXCHANGE: serial merge    O(W×D)     │
      └─────────────────────────────────────────┘

PROPOSED (Distributed Async):

  Epoch-based execution:
    ┌──────────────────────────────────────────────────────┐
    │ Workers compute continuously (no barrier)            │
    │   → push delta to lock-free SPSC queue (per-worker)  │
    │   → atomically increment epoch counter               │
    │                                                      │
    │ Merge tree reduces deltas: O(log₂W) depth            │
    │   → parallel binary merge at each level               │
    │   → zero-copy shared views between levels             │
    │                                                      │
    │ Coordinator distributes merged delta (pipelined)      │
    │   → workers start next epoch while merge proceeds     │
    │                                                      │
    │ Convergence: atomic epoch counter (no global lock)    │
    └──────────────────────────────────────────────────────┘

Key Architectural Changes

Component Current Proposed Complexity Change
Synchronization pthread_barrier / wl_workqueue_wait_all Atomic epoch counter O(W) → O(1) amortized
Delta exchange Sequential coordinator merge Tree-structured parallel merge O(W) depth → O(log₂W) depth
Worker communication Single mutex + cond_broadcast Lock-free SPSC ring buffers (per-worker) ~50-100ns → ~10-20ns per op
Exchange buffers exchange_bufs[W][W] O(W²) Per-worker delta slots O(W) W²→W memory
Coordinator IDB sort O(N log N) re-sort per iteration O(N+D) sorted merge-append Eliminates redundant work
Compute/exchange Sequential (compute → wait → exchange) Pipelined (overlap compute and merge) Latency hiding

Lock-Free Protocol (High-Level)

Per-worker SPSC ring buffer (implemented: wirelog/util/lockfree_queue.h):
  - Enqueue: load head(acquire), write slot, store tail(release)
  - Dequeue: load tail(acquire), read slot, store head(release)
  - No CAS, no mutex — 2 atomic ops per operation
  - ABA-free: monotone integer cursors, no pointer comparison

MPMC wrapper: W independent SPSC queues, coordinator polls round-robin.
  - Zero contention between producers (each owns a queue)
  - Coordinator dequeue_all: O(W × N) scan

Epoch convergence:
  - Workers: atomic_fetch_add(&epoch.workers_done, 1)
  - Last worker (prev+1 == W): check any_new_at_epoch flag
  - If no new tuples: set converged = true (no barrier needed)

Implementation Roadmap

Phase 0: Compatibility & Foundation (3 atomic commits)

Goal: Remove scaling walls without changing the synchronization model. All changes backward-compatible with W=1 fast path.

Commit 1: Extend WL_WQ_RING_CAP

  • File: wirelog/workqueue.c:30
  • Change: WL_WQ_RING_CAP from 256 → 1024 (or dynamic allocation)
  • Why: Current cap physically prevents W>256 from enqueuing work items
  • Test: Verify W=512 work items can be submitted without overflow

Commit 2: Sorted merge-insert optimization (TODO #390)

  • File: wirelog/columnar/eval.c:3661-3666
  • Change: Replace O(N log N) tdd_dedup_rel(coord_idb) with O(N+D) merge-append that maintains sorted order incrementally
  • Why: Eliminates the single largest per-iteration coordinator cost
  • Test: DOOP convergence correctness at W=4, verify iteration count unchanged

Commit 3: Cache-line padding for worker state

  • Files: wirelog/columnar/eval.c (worker context struct), wirelog/workqueue.c (work item struct)
  • Change: Add __attribute__((aligned(64))) or manual padding to prevent false sharing between worker-local state and coordinator state
  • Why: At W=8+, false sharing on adjacent cache lines causes coherency traffic spikes
  • Test: perf stat cache-miss comparison W=4 vs W=8

Phase 1: Async Delta Slot + Epoch Convergence

Goal: Remove barrier synchronization. Workers submit deltas asynchronously; coordinator detects convergence via atomic epoch counters.

  • Integrate lock-free MPMC queue (wirelog/util/lockfree_queue.h, already implemented)
  • Replace wl_workqueue_wait_all() barrier with epoch-counter convergence detection
  • Workers push delta to per-worker SPSC queue after each sub-pass
  • Coordinator polls queues and merges deltas (still serial at this phase)
  • Validation: DOOP W=4 correctness (same tuple count, same iteration count)
  • Target: Eliminate barrier idle time; W=8 should match or beat W=4

Phase 2: Tree-Structured Parallel Merge

Goal: Replace O(W) serial delta merge with O(log₂W) parallel tree reduction.

Level 0 (leaves):    W₀  W₁  W₂  W₃  W₄  W₅  W₆  W₇  ... W₅₁₁
                      ╲ ╱    ╲ ╱    ╲ ╱    ╲ ╱              ╲ ╱
Level 1 (W/2):       M₀₁   M₂₃   M₄₅   M₆₇    ...       M₅₁₀₋₅₁₁
                       ╲   ╱       ╲   ╱
Level 2 (W/4):       M₀₋₃      M₄₋₇         ...
                         ...
Level 9 (root):      M₀₋₅₁₁    ← final merged delta
  • Each level's merges are independent → parallel execution using workqueue
  • Dedup at each level prevents exponential intermediate growth
  • W=512: 9 levels, 511 merge operations (vs 512 serial)
  • Validation: W=16 scaling (expect near-linear improvement over W=8)

Phase 3: Pipelined Compute/Exchange Overlap

Goal: Workers start next epoch while coordinator merges current epoch's deltas.

  • Async double-buffer: workers write to epoch e slot while coordinator merges epoch e-1
  • Speculative execution: workers proceed with stale delta (safe under monotonicity — stale delta ⊆ fresh delta)
  • Validation: W=64 scaling, measure compute/exchange overlap ratio
  • Semi-naive correctness invariant: Speculative execution with stale deltas may produce redundant tuples (re-derived from previous iterations), but never incorrect tuples. Dedup at merge eliminates duplicates. Monotonicity of Datalog ensures old_delta ⊆ new_delta, so workers using stale deltas compute a superset that converges to the same fixpoint.

Phase 4: Long-term Optimizations

Goal: Production-grade scaling to W=512+.

  • Partitioned IDB (if needed): Hash-partition large IDB tables across workers to reduce memory replication
  • NUMA-aware scheduling: Pin worker threads to NUMA nodes; allocate worker memory from local node
  • EDB partitioning for BDX mode: Hash-partition large EDB tables (e.g., Var_Type 947k rows) by EXCHANGE key instead of full replication
  • Cooperative scheduling: Worker-driven coordination to replace coordinator bottleneck

Deliverables Checklist

  • Phase 0: 3 atomic commits (ring cap, sorted merge, cache-line padding)
  • Phase 0: Code review + architect sign-off
  • Phase 0: Full regression suite passing (112+ tests)
  • Phase 1: Async delta slot + epoch convergence implementation
  • Phase 1: DOOP W=4 correctness validation (same tuple count as baseline)
  • Phase 1: TSan clean at W=8
  • Phase 2: Tree-structured parallel merge implementation
  • Phase 2: W=16 scaling validation (near-linear improvement)
  • Phase 3: Pipelined compute/exchange overlap
  • Phase 3: W=64 scaling validation
  • Phase 4: Production optimization + W=512 target

Performance Targets

Phase W Target Time Speedup vs W=1 Speedup vs Current W=4
Baseline 1 752s
Baseline 4 26.1s 28.8× 1.0×
Current 8 28.3s 26.6× 0.92× (regression)
Phase 1 8 ~26s 28.9× 1.0×
Phase 2 32 ~7s 107× 3.7×
Phase 3 128 ~2s 376× 13×
Phase 4 512 <1s 752×+ 26×+

Note: W=1→W=4 speedup (28.8×) includes algorithmic improvement from BDX incremental evaluation, not just parallelism. Targets for W>8 assume the parallel merge tree and pipelining unlock true parallelism on top of the BDX algorithmic advantage.


Technical Notes

Semi-Naive Correctness Under Async Execution

Speculative execution with stale deltas preserves semi-naive correctness:

  • Monotonicity: Datalog is monotone — stale_delta ⊆ fresh_delta (adding facts never removes derived facts)
  • Redundant but never incorrect: Workers using stale deltas may re-derive tuples from previous iterations, but dedup at merge eliminates duplicates
  • Convergence: The fixpoint is unique regardless of evaluation order; async execution changes the path but not the destination
  • Formal invariant: At every epoch boundary, the union of all worker-produced tuples is a subset of the minimal model

Thread Safety

  • Lock-free synchronization via atomic epoch counter (TSan validated for SPSC queue)
  • Per-worker SPSC queues eliminate cross-worker contention entirely
  • Coordinator is sole consumer — no multi-consumer race conditions
  • Memory ordering: acquire/release semantics on cursor updates; relaxed for diagnostic counters

Backward Compatibility

  • W=1 fast path fully preserved: All async machinery is bypassed when num_workers == 1
  • Existing API unchanged: wl_session_snapshot() remains the public entry point
  • Graceful degradation: If lock-free queue is unavailable (build config), falls back to current barrier-based path

Dependencies & Blockers

  • Lock-free MPMC queue implementation (wirelog/util/lockfree_queue.h, wirelog/util/lockfree_queue.c)
  • Async exchange architecture design (see protocol specification above)
  • Root cause analysis and W=4/W=8 profiling (see Root Cause Analysis section)
  • Phase 0 code review (in progress on feature/issue-401-distributed-async-exchange)
  • DOOP W>1 EOVERFLOW regression fix (current build fails at W=4 and W=8 — likely interaction with fix: DOOP benchmark fails with multi-worker execution (W=4, W=8) #404 commits)
  • TDD W=1 sequential path performance validation

Related Issues


Discussion Points

  1. W=512 realism: Is W=512 a realistic deployment target, or should we cap at W=256? The tree merge overhead at W=512 (9 levels) vs W=256 (8 levels) is marginal, but memory bandwidth may become the true bottleneck before W=512.

  2. Partitioned IDB timing: Should hash-partitioned IDB (Phase 4) be pulled into Phase 2 to improve memory locality earlier? Trade-off: more complex merge logic vs better per-worker cache utilization.

  3. Cooperative scheduling: Should Phase 3 explore worker-driven coordination (workers decide when to proceed) vs coordinator-driven (coordinator signals epoch transitions)? Cooperative scheduling eliminates the coordinator as a bottleneck but complicates convergence detection.

  4. EDB partitioning for DOOP: DOOP's large EDB tables (Var_Type 947k rows, Var_DeclaringMethod 947k rows) are fully replicated to all workers. Hash-partitioning EDB by EXCHANGE key would reduce memory bandwidth at W=8+, but requires changes to tdd_init_workers_hybrid (eval.c:3199-3228). Is this worth the complexity?

  5. Speculative execution safety: The monotonicity argument for stale-delta correctness assumes pure Datalog (no negation across strata). Should we add a runtime assertion that speculative execution is only enabled for non-negated strata?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions