Replies: 1 comment
-
|
DD was dropped |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Delta Query Architecture Design
Status: Proposal (Consensus from Architect + Analyst + Planner + Critic review)
Date: 2026-03-03
1. Executive Summary
wirelog currently uses a one-shot batch execution model: each
wl_dd_execute_cb()call constructs a new timely dataflow, loads all EDB data vianew_collection_from(), runs to completion, and tears down. No state survives between executions.This document proposes adding delta (incremental) query support where:
The design targets both the current DD backend and a future nanoarrow (C11-native) backend, maximizing code reuse at the C API layer while keeping backend-specific implementations separate.
2. Current Architecture (Evidence-Based)
2.1 One-Shot Execution Lifecycle
From
wirelog/cli/driver.c:132-219:Every call to
wl_run_pipeline()creates a fresh worker, loads all facts, executes, and destroys everything.2.2 Worker is a Data Bag, Not a Compute Session
From
rust/wirelog-dd/src/ffi.rs:WlDdWorkerstores EDB data as owned vectors and passes them totimely::execute(). It holds no timely worker handle, no input sessions, and no probe.2.3 Static Collections (No Update Mechanism)
From
rust/wirelog-dd/src/dataflow.rs:136:The
_discards theInputSessionhandle.new_collection_from()creates an immutable snapshot. There is no mechanism to inject updates after initial loading.2.4 Sequential Stratum Execution
From
rust/wirelog-dd/src/dataflow.rs:72-92: strata are executed sequentially, each in its ownworker.dataflow()scope, with results collected in aHashMap<String, Vec<Row>>between them. This architecture prevents cross-stratum change propagation in a persistent session.3. Design Decisions (Consensus)
The following decisions were reached through architect/analyst/planner proposals reviewed by a critic agent.
3.1 Plan Naming: Keep
wl_dd_*Decision: Do NOT rename
wl_dd_plan_ttowl_exec_plan_t.Rationale: The plan structure encodes DD-specific operators (VARIABLE, MAP, FILTER, JOIN, ANTIJOIN, REDUCE, CONCAT, CONSOLIDATE, SEMIJOIN). A future nanoarrow backend would use different operators (sort-merge join, hash join). The plan is NOT backend-agnostic; the backend abstraction should operate at the session/execution level, not the plan level. This avoids touching 8+ files for zero functional gain.
3.2 Batch Mode: Keep Separate
Decision: The existing batch API (
wl_dd_execute_cb) stays as-is. Delta is an addition, not a replacement. Batch is NOT reimplemented on top of sessions.Rationale:
new_collection_from()is simpler and faster thanInputSessionfor all-at-once execution3.3 Set Semantics for Delta Output
Decision: Delta callback
diffvalues are +1 or -1 only (never +2). The session internally usesdistinct()/consolidate()to enforce set semantics.Rationale: wirelog is a Datalog engine; Datalog has set semantics. Bag semantics would be a language-level change.
3.4 Session Thread Safety
Decision: Sessions are NOT thread-safe. All calls to a given session must come from the same thread.
Rationale: Matches C convention (no hidden locking), keeps implementation simple. If concurrent access is needed later, the caller wraps in a mutex.
3.5 String Interning
Decision: C side pre-interns.
session_insertacceptsint64_t*only. The session does not hold a reference towl_intern_t.Rationale: Keeps the FFI boundary clean (no C-to-Rust-to-C callbacks), matches the existing
wl_dd_load_edbcontract.3.6 Multi-Worker Sessions
Decision: MVP is single-worker only.
num_workers > 1returns an error in Phase 1.Rationale: Timely's multi-worker mode introduces inter-thread communication complexity orthogonal to core delta logic. Single-worker first, optimize later.
3.7 Recursive Aggregation Scope
Decision: Defer to Phase 1D design spike. Min/max are monotone and can be moved inside
iterate(). Count/sum in recursive contexts may be explicitly unsupported in delta mode.Rationale: Recursive aggregation is only well-defined for monotone aggregates over lattices. This is a known Datalog limitation.
4. C API Design
4.1 Backend vtable (
wirelog/backend.h)4.2 Session Convenience Wrappers (
wirelog/session.h)4.3 Internal Session Structure
4.4 Usage Example
5. Rust-Side Design (DD Backend)
5.1 Architecture Change
5.2 Key Data Structures
5.3 Critical Changes
worker.dataflow::<(), _, _>()worker.dataflow::<u64, _, _>()dataflow.rs:130new_collection_from(rows)scope.new_input::<Vec<i64>>()dataflow.rs:136_(InputSession)HashMap<String, InputSession>dataflow.rs:136worker.step_while(|| !probe.done())worker.step_while(|| probe.less_than(&epoch))dataflow.rs:165timely::execute()returns immediatelydataflow.rs:69inspectfires only fordiff > 0diff > 0anddiff < 0dataflow.rs:149-158worker.dataflow()scopesdataflow.rs:74-925.4 New FFI Entry Points
6. DD-to-nanoarrow Mapping
session_createtimely::execute()on background thread, build dataflow withInputSessionhandlessession_insertinput_session.insert(row)session_removeinput_session.remove(row)session_stepadvance_to(epoch+1),flush(),step_while(probe.less_than)session_snapshotsession_destroy6.1 Code Reuse Summary
session.h)backend.h)dd_plan.h,dd_ffi.h)dataflow.rs)driver.c)wl_dd_load_edbis DD-specific; session_insert is backend-agnostic7. Implementation Phases
Phase 0: Backend Abstraction (C-only, ~2 days)
Goal: Introduce vtable without changing behavior.
wl_compute_backend_tvtablewirelog/backend.hwl_session_tconvenience wrapperswirelog/session.h,wirelog/session.cwirelog/backend_dd.cwirelog/meson.buildValidation: All 410 existing tests pass. Zero behavior change.
Phase 1A: Insert-Only Non-Recursive Delta (~4 days)
Goal: Minimum viable delta. Single non-recursive stratum, insert-only.
DdSessionwith background thread + mpscrust/wirelog-dd/src/session.rsInputSession(non-recursive only)rust/wirelog-dd/src/dataflow.rs(new function, not modifying existing)()->u64for session pathsession.rsrust/wirelog-dd/src/ffi.rswirelog/backend_dd.ctests/test_session.cValidation: Insert facts, step, observe outputs. Insert more, step, observe deltas only.
Phase 1B: Multi-Stratum + Deletions (~3 days)
Goal: Full non-recursive delta support.
rust/wirelog-dd/src/session.rssession_removewith negative diffsrust/wirelog-dd/src/session.rs,ffi.rssession_snapshotrust/wirelog-dd/src/session.rs,ffi.rstests/test_session.cValidation: Multi-stratum programs, insert/delete/step sequences, snapshot consistency.
Phase 1C: Recursive Strata (No Aggregation) (~4 days)
Goal: Incremental transitive closure, reachability queries.
iterate()scope withVariablecollectionrust/wirelog-dd/src/session.rstests/test_session.cValidation: TC on chain graph, add edge -> 3 new tuples. Delete edge -> 4 retractions.
Phase 1D: Recursive Aggregation (Design Spike, ~5 days)
Goal: Incremental connected components (min-label), SSSP.
iterate()apply_post_reducefor min/max in session pathrust/wirelog-dd/src/session.rstests/test_session.cValidation: CC with min-label propagation works incrementally. SSSP distance updates on edge changes.
Phase 2: nanoarrow Backend (Future, Separate Project)
Goal: C11-native semi-naive delta evaluator behind same vtable.
Out of scope for this document.
8. Dependency Graph
Parallelizable:
9. Risk Assessment
timely::execute()takes ownership and runs to completion. Keeping it alive requires careful shutdown coordination.worker.dataflow(). Persistent sessions need all strata in one scope.iterate()with changing input requiresVariablecollections that re-trigger fixpoint.execute_plan()) stays completely untouched. Session gets its own code path.apply_post_reduceis batch-only. The current recursive aggregation workaround cannot work incrementally.10. Acceptance Criteria
10.1 Batch Regression (All Phases)
After delta support is added, all 410 existing tests (325 C + 85 Rust) must pass with zero changes:
meson test -C buildreports 0 failurescargo testreports 0 failures10.2 Delta Insert (Phase 1A)
Given
r(X,Y) :- a(X,Y), b(Y,Z)with initiala={(1,2)},b={(2,3)}:r={(1,2,3)}a=(1,4),b=(4,5), stepr(1,4,5)withdiff=+110.3 Delta Delete (Phase 1B)
Same setup, delete
a=(1,2), step:r(1,2,3)withdiff=-110.4 Recursive Insert (Phase 1C)
Given
edge={(1,2),(2,3)}producingtc={(1,2),(2,3),(1,3)}:edge=(3,4), steptc += {(3,4),(2,4),(1,4)}(exactly 3 insertions)10.5 Recursive Delete (Phase 1C)
Given
edge={(1,2),(2,3),(3,4)}, deleteedge=(2,3), step:tc -= {(2,3),(1,3),(2,4),(1,4)}(exactly 4 retractions)10.6 Session Cleanup
After
wl_session_destroy(), all Rust-side memory is freed. No leaks detected by ASan.11. Open Questions
These must be resolved before Phase 1A implementation begins:
Initial execution output: Should the first
session_step()produce all results asdiff=+1deltas, or should there be a separate "initial load" mode?Worker-session relationship: Can one
wl_dd_worker_thost multiple sessions? MVP says no (1:1), but the long-term model needs clarification.Rule changes: Can the execution plan change between delta updates? Current proposal says no (session is bound to a plan at creation). Rule changes require session recreation.
Memory budget: What is the maximum acceptable memory overhead for a persistent delta session vs. one-shot batch? Critical for embedded targets (<256MB per ARCHITECTURE.md).
Epoch exposure: Should the C API expose the logical timestamp / epoch counter, or keep it internal?
Async stepping: Should
session_step()be synchronous (block until propagation completes) or support an async model?vtable extensibility: How to add new operations to
wl_compute_backend_twithout breaking ABI? Version field? Sentinel NULL check?12. Edge Cases
wl_intern_put()before passing as i6413. Conflict Resolution Log
wl_exec_plan_twl_dd_*wl_dd_*-- plan encodes DD-specific operatorsapply_post_reduce14. References
Internal Code References
wirelog/ffi/dd_ffi.h-- Current FFI boundarywirelog/ffi/dd_plan.h-- Plan structure (DD-specific operators)rust/wirelog-dd/src/dataflow.rs-- Rust DD execution (one-shot model)rust/wirelog-dd/src/ffi.rs-- Rust FFI entry pointswirelog/cli/driver.c-- Pipeline driverwirelog/intern.h-- Symbol interningArchitecture Documents
ARCHITECTURE.md-- ComputeBackend abstraction (section 2.3), nanoarrow plan (Phase 3)External References
differential_dataflow::input::InputSessionBeta Was this translation helpful? Give feedback.
All reactions