diff --git a/ext/CUDAExt.jl b/ext/CUDAExt.jl index ea77889ff..2f4bff0e5 100644 --- a/ext/CUDAExt.jl +++ b/ext/CUDAExt.jl @@ -370,11 +370,13 @@ CuArray(H::Dagger.HaloArray) = convert(CuArray, H) Base.convert(::Type{C}, H::Dagger.HaloArray) where {C<:CuArray} = Dagger.HaloArray(C(H.center), C.(H.halos), - H.halo_width) + H.halo_width; + own_center=H.own_center) Adapt.adapt_structure(to::CUDA.KernelAdaptor, H::Dagger.HaloArray) = Dagger.HaloArray(adapt(to, H.center), adapt.(Ref(to), H.halos), - H.halo_width) + H.halo_width; + own_center=H.own_center) function Dagger.inner_stencil_proc!(::CuArrayDeviceProc, f, output, read_vars) Dagger.Kernel(_inner_stencil!)(f, output, read_vars; ndrange=size(output)) return diff --git a/ext/IntelExt.jl b/ext/IntelExt.jl index 9bc3c1537..e5af5e64d 100644 --- a/ext/IntelExt.jl +++ b/ext/IntelExt.jl @@ -322,11 +322,13 @@ oneArray(H::Dagger.HaloArray) = convert(oneArray, H) Base.convert(::Type{C}, H::Dagger.HaloArray) where {C<:oneArray} = Dagger.HaloArray(C(H.center), C.(H.halos), - H.halo_width) + H.halo_width; + own_center=H.own_center) Adapt.adapt_structure(to::oneAPI.KernelAdaptor, H::Dagger.HaloArray) = Dagger.HaloArray(adapt(to, H.center), adapt.(Ref(to), H.halos), - H.halo_width) + H.halo_width; + own_center=H.own_center) function Dagger.inner_stencil_proc!(::oneArrayDeviceProc, f, output, read_vars) Dagger.Kernel(_inner_stencil!)(f, output, read_vars; ndrange=size(output)) return diff --git a/ext/MetalExt.jl b/ext/MetalExt.jl index eb1215030..2b72b8523 100644 --- a/ext/MetalExt.jl +++ b/ext/MetalExt.jl @@ -346,11 +346,13 @@ MtlArray(H::Dagger.HaloArray) = convert(MtlArray, H) Base.convert(::Type{C}, H::Dagger.HaloArray) where {C<:MtlArray} = Dagger.HaloArray(C(H.center), C.(H.halos), - H.halo_width) + H.halo_width; + own_center=H.own_center) Adapt.adapt_structure(to::Metal.Adaptor, H::Dagger.HaloArray) = Dagger.HaloArray(adapt(to, H.center), adapt.(Ref(to), H.halos), - H.halo_width) + H.halo_width; + own_center=H.own_center) function Dagger.inner_stencil_proc!(::MtlArrayDeviceProc, f, output, read_vars) Dagger.Kernel(_inner_stencil!)(f, output, read_vars; ndrange=size(output)) return diff --git a/ext/OpenCLExt.jl b/ext/OpenCLExt.jl index 356d25ed0..49deefd4d 100644 --- a/ext/OpenCLExt.jl +++ b/ext/OpenCLExt.jl @@ -320,11 +320,13 @@ CLArray(H::Dagger.HaloArray) = convert(CLArray, H) Base.convert(::Type{C}, H::Dagger.HaloArray) where {C<:CLArray} = Dagger.HaloArray(C(H.center), C.(H.halos), - H.halo_width) + H.halo_width; + own_center=H.own_center) Adapt.adapt_structure(to::OpenCL.KernelAdaptor, H::Dagger.HaloArray) = Dagger.HaloArray(adapt(to, H.center), adapt.(Ref(to), H.halos), - H.halo_width) + H.halo_width; + own_center=H.own_center) function Dagger.inner_stencil_proc!(::CLArrayDeviceProc, f, output, read_vars) Dagger.Kernel(_inner_stencil!)(f, output, read_vars; ndrange=size(output)) return diff --git a/ext/ROCExt.jl b/ext/ROCExt.jl index 3ab6d0731..1f65c743c 100644 --- a/ext/ROCExt.jl +++ b/ext/ROCExt.jl @@ -343,11 +343,13 @@ ROCArray(H::Dagger.HaloArray) = convert(ROCArray, H) Base.convert(::Type{C}, H::Dagger.HaloArray) where {C<:ROCArray} = Dagger.HaloArray(C(H.center), C.(H.halos), - H.halo_width) + H.halo_width; + own_center=H.own_center) Adapt.adapt_structure(to::AMDGPU.Runtime.Adaptor, H::Dagger.HaloArray) = Dagger.HaloArray(adapt(to, H.center), adapt.(Ref(to), H.halos), - H.halo_width) + H.halo_width; + own_center=H.own_center) function Dagger.inner_stencil_proc!(::ROCArrayDeviceProc, f, output, read_vars) Dagger.Kernel(_inner_stencil!)(f, output, read_vars; ndrange=size(output)) return diff --git a/src/Dagger.jl b/src/Dagger.jl index 2e757ebc5..13469536e 100644 --- a/src/Dagger.jl +++ b/src/Dagger.jl @@ -162,7 +162,12 @@ import PrecompileTools: @compile_workload include("precompile.jl") function __init__() - # Initialize system UUID + # Clear any precompile-cached UUID for this process: the precompile workload + # runs system_uuid() and the resulting SYSTEM_UUIDS entry gets baked into + # the compiled image. Without clearing it here, get!() would return that + # stale build-time UUID instead of reading the actual runtime UUID file, + # causing mismatches between process 1 and workers. + delete!(SYSTEM_UUIDS, myid()) system_uuid() @static if !isdefined(Base, :get_extension) diff --git a/src/array/stencil.jl b/src/array/stencil.jl index b42b1b734..858445d77 100644 --- a/src/array/stencil.jl +++ b/src/array/stencil.jl @@ -17,6 +17,14 @@ function validate_neigh_dist(neigh_dist, size) throw(ArgumentError("Neighborhood distance ($neigh_dist) must not be larger than the chunk size ($size)")) end end +# Overload for checking only tuple-length compatibility, called outside @spawn where only +# ndims is known (not actual chunk sizes). The full size check still runs inside @spawn. +function validate_neigh_dist(neigh_dist, N::Int) + validate_neigh_dist(neigh_dist) + if neigh_dist isa Tuple && length(neigh_dist) != N + throw(ArgumentError("Neighborhood distance tuple length ($(length(neigh_dist))) must match array ndims ($N)")) + end +end get_neigh_dist(neigh_dist::Integer, i::Int) = neigh_dist get_neigh_dist(neigh_dist::Tuple, i::Int) = neigh_dist[i] @@ -46,8 +54,19 @@ function load_neighbor_region(arr, region_code::NTuple{N,Int}, neigh_dist) where lastindex(arr, i) end end) - # FIXME: Don't collect - return move(task_processor(), collect(@view arr[start_idx:stop_idx])) + return move(task_processor(), copy(@view arr[start_idx:stop_idx])) +end + +# In-place variant: load region directly into a pre-allocated destination buffer. +function load_neighbor_region_into!(dest, arr, region_code::NTuple{N,Int}, neigh_dist) where N + validate_neigh_dist(neigh_dist, size(arr)) + start_idx = CartesianIndex(ntuple(N) do i + region_code[i] == -1 ? lastindex(arr, i) - get_neigh_dist(neigh_dist, i) + 1 : firstindex(arr, i) + end) + stop_idx = CartesianIndex(ntuple(N) do i + region_code[i] == +1 ? firstindex(arr, i) + get_neigh_dist(neigh_dist, i) - 1 : lastindex(arr, i) + end) + copyto!(dest, @view arr[start_idx:stop_idx]) end is_past_boundary(size, idx) = any(ntuple(i -> idx[i] < 1 || idx[i] > size[i], length(size))) @@ -123,6 +142,9 @@ boundary_transition(::Wrap, idx, size) = load_boundary_region(::Wrap, arr, region_code, neigh_dist, boundary_dims) = load_neighbor_region(arr, region_code, neigh_dist) +load_boundary_region_into!(dest, ::Wrap, arr, region_code, neigh_dist, boundary_dims) = + load_neighbor_region_into!(dest, arr, region_code, neigh_dist) + function boundary_source_index(::Wrap, arr, rc, nd, idx_d, d) if rc == -1 return lastindex(arr, d) - nd + idx_d @@ -153,10 +175,14 @@ function load_boundary_region(pad::Pad, arr, region_code::NTuple{N,Int}, neigh_d region_size = ntuple(N) do i region_code[i] == 0 ? size(arr, i) : get_neigh_dist(neigh_dist, i) end - # FIXME: return Fill(pad.padval, region_size) - return move(task_processor(), fill(pad.padval, region_size)) + result = similar(arr, region_size...) + fill!(result, pad.padval) + return move(task_processor(), result) end +load_boundary_region_into!(dest, pad::Pad, arr, region_code, neigh_dist, boundary_dims) = + fill!(dest, pad.padval) + # Use edge as source index (value will be overridden by apply_boundary_value) boundary_source_index(::Pad, arr, rc, nd, idx_d, d) = rc == -1 ? firstindex(arr, d) : (rc == +1 ? lastindex(arr, d) : idx_d) @@ -221,6 +247,10 @@ function load_boundary_region(::Clamp, arr, region_code::NTuple{N,Int}, neigh_di return move(task_processor(), result) end +function load_boundary_region_into!(dest, ::Clamp, arr, region_code::NTuple{N,Int}, neigh_dist, boundary_dims::NTuple{N,Bool}) where N + Kernel(load_boundary_region_kernel)(Clamp(), dest, arr, region_code, neigh_dist, boundary_dims; ndrange=length(dest)) +end + function boundary_source_index(::Clamp, arr, rc, nd, idx_d, d) if rc == -1 return firstindex(arr, d) @@ -332,6 +362,18 @@ function load_boundary_region(::LinearExtrapolate, arr::AbstractArray{T}, region return move(task_processor(), result) end +function load_boundary_region_into!(dest, ::LinearExtrapolate, arr::AbstractArray{T}, region_code::NTuple{N,Int}, neigh_dist, boundary_dims::NTuple{N,Bool}) where {T<:Real,N} + extrap_dim = 0 + for d in 1:N + if boundary_dims[d] && region_code[d] != 0 + extrap_dim = d + break + end + end + nd = get_neigh_dist(neigh_dist, extrap_dim) + Kernel(load_boundary_region_kernel)(LinearExtrapolate(), dest, arr, region_code, neigh_dist, boundary_dims, Val(extrap_dim), Val(nd); ndrange=length(dest)) +end + # Use edge as source index (value will be computed by apply_boundary_value) boundary_source_index(::LinearExtrapolate, arr, rc, nd, idx_d, d) = rc == -1 ? firstindex(arr, d) : (rc == +1 ? lastindex(arr, d) : idx_d) @@ -420,7 +462,7 @@ function load_boundary_region(::Reflect{Symm}, arr, region_code::NTuple{N,Int}, end end) - region = move(task_processor(), collect(@view arr[start_idx:stop_idx])) + region = move(task_processor(), copy(@view arr[start_idx:stop_idx])) # Reverse only along dimensions that are actually being reflected # (both non-zero in region_code AND past boundary) @@ -434,6 +476,41 @@ function load_boundary_region(::Reflect{Symm}, arr, region_code::NTuple{N,Int}, return region end +function load_boundary_region_into!(dest, ::Reflect{Symm}, arr, region_code::NTuple{N,Int}, neigh_dist, boundary_dims::NTuple{N,Bool}) where {N, Symm} + flipped_code = ntuple(N) do i + (region_code[i] != 0 && boundary_dims[i]) ? -region_code[i] : region_code[i] + end + skip = Symm ? 0 : 1 + start_idx = CartesianIndex(ntuple(N) do i + needs_skip = boundary_dims[i] && region_code[i] != 0 + actual_skip = needs_skip ? skip : 0 + if flipped_code[i] == -1 + lastindex(arr, i) - get_neigh_dist(neigh_dist, i) + 1 - actual_skip + elseif flipped_code[i] == +1 + firstindex(arr, i) + actual_skip + else + firstindex(arr, i) + end + end) + stop_idx = CartesianIndex(ntuple(N) do i + needs_skip = boundary_dims[i] && region_code[i] != 0 + actual_skip = needs_skip ? skip : 0 + if flipped_code[i] == +1 + firstindex(arr, i) + get_neigh_dist(neigh_dist, i) - 1 + actual_skip + elseif flipped_code[i] == -1 + lastindex(arr, i) - actual_skip + else + lastindex(arr, i) + end + end) + copyto!(dest, @view arr[start_idx:stop_idx]) + for i in 1:N + GPUArraysCore.@allowscalar if region_code[i] != 0 && boundary_dims[i] + reverse!(dest, dims=i) + end + end +end + function boundary_source_index(::Reflect{Symm}, arr, rc, nd, idx_d, d) where Symm skip = Symm ? 0 : 1 if rc == -1 @@ -564,10 +641,131 @@ function load_boundary_region(boundary::Tuple, arr, region_code::NTuple{N,Int}, return move(task_processor(), result) end +function load_boundary_region_into!(dest, boundary::Tuple, arr, region_code::NTuple{N,Int}, neigh_dist, boundary_dims::NTuple{N,Bool}) where N + Kernel(load_boundary_region_kernel)(boundary, dest, arr, region_code, neigh_dist, boundary_dims; ndrange=length(dest)) +end + ############################################################################# # Chunk Selection and Halo Building ############################################################################# +function load_neighborhood_halos(chunks, idx, neigh_dist, boundary) + validate_neigh_dist(neigh_dist) + + N = ndims(chunks) + chunk_dist = 1 + nhalos = 3^N - 1 + halos = Vector{Any}(undef, nhalos) + h = 0 + + for i in 0:(3^N - 1) + region_code = ntuple(N) do d + ((i ÷ 3^(d-1)) % 3) - 1 + end + all(==(0), region_code) && continue + h += 1 + + chunk_offset = CartesianIndex(ntuple(N) do d + region_code[d] * chunk_dist + end) + new_idx = idx + chunk_offset + + if is_past_boundary(size(chunks), new_idx) + boundary_dims = ntuple(N) do d + new_idx[d] < 1 || new_idx[d] > size(chunks)[d] + end + if boundary_has_transition(boundary) + new_idx = boundary_transition(boundary, new_idx, size(chunks)) + else + new_idx = idx + end + chunk = chunks[new_idx] + halos[h] = load_boundary_region(boundary, chunk, region_code, neigh_dist, boundary_dims) + else + chunk = chunks[new_idx] + halos[h] = load_neighbor_region(chunk, region_code, neigh_dist) + end + end + + @assert h == nhalos + return Tuple(halos) +end + +function load_neighborhood_halos_from_deps(deps, idx, chunk_size, neigh_dist, boundary) + validate_neigh_dist(neigh_dist) + + N = length(chunk_size) + chunk_dist = 1 + nhalos = 3^N - 1 + halos = Vector{Any}(undef, nhalos) + h = 0 + + for i in 0:(3^N - 1) + region_code = ntuple(N) do d + ((i ÷ 3^(d-1)) % 3) - 1 + end + all(==(0), region_code) && continue + h += 1 + + chunk_offset = CartesianIndex(ntuple(N) do d + region_code[d] * chunk_dist + end) + new_idx = idx + chunk_offset + + chunk = deps[h+1] + if is_past_boundary(chunk_size, new_idx) + boundary_dims = ntuple(N) do d + new_idx[d] < 1 || new_idx[d] > chunk_size[d] + end + halos[h] = load_boundary_region(boundary, chunk, region_code, neigh_dist, boundary_dims) + else + halos[h] = load_neighbor_region(chunk, region_code, neigh_dist) + end + end + + @assert h == nhalos + return Tuple(halos) +end + +function select_neighborhood_chunk_deps(chunks, idx, neigh_dist, boundary) + validate_neigh_dist(neigh_dist) + + N = ndims(chunks) + chunk_dist = 1 + + accesses = Any[chunks[idx]] + + for i in 0:(3^N - 1) + region_code = ntuple(N) do d + ((i ÷ 3^(d-1)) % 3) - 1 + end + all(==(0), region_code) && continue + + chunk_offset = CartesianIndex(ntuple(N) do d + region_code[d] * chunk_dist + end) + new_idx = idx + chunk_offset + + if is_past_boundary(size(chunks), new_idx) + if boundary_has_transition(boundary) + new_idx = boundary_transition(boundary, new_idx, size(chunks)) + else + new_idx = idx + end + end + push!(accesses, chunks[new_idx]) + end + + @assert length(accesses) == 3^N + return accesses +end + +function build_chunk_halo(neigh_dist, boundary, idx, chunk_size, own_center::Bool, read_deps...) + center = read_deps[1] + halos = load_neighborhood_halos_from_deps(read_deps, idx, chunk_size, neigh_dist, boundary) + return build_halo(neigh_dist, boundary, center, halos...; own_center=own_center) +end + function select_neighborhood_chunks(chunks, idx, neigh_dist, boundary) validate_neigh_dist(neigh_dist) @@ -615,11 +813,130 @@ function select_neighborhood_chunks(chunks, idx, neigh_dist, boundary) return accesses end -function build_halo(neigh_dist, boundary, center, all_halos...) +# Returns (region_metadata, neighbor_chunk_dtasks) without spawning intermediate load tasks. +# region_metadata: Vector of (region_code, is_boundary, boundary_dims). +# neighbor_chunk_dtasks: Vector of raw chunk DTasks (resolved to arrays when build_halo_new runs). +function select_neighborhood_info(chunks, idx, neigh_dist, boundary) + validate_neigh_dist(neigh_dist) + N = ndims(chunks) + chunk_dist = 1 + region_metadata = Tuple[] + neighbor_chunks = Any[] + + for i in 0:(3^N - 1) + region_code = ntuple(N) do d + ((i ÷ 3^(d-1)) % 3) - 1 + end + all(==(0), region_code) && continue + + chunk_offset = CartesianIndex(ntuple(N) do d + region_code[d] * chunk_dist + end) + new_idx = idx + chunk_offset + + if is_past_boundary(size(chunks), new_idx) + boundary_dims = ntuple(N) do d + new_idx[d] < 1 || new_idx[d] > size(chunks)[d] + end + if boundary_has_transition(boundary) + new_idx = boundary_transition(boundary, new_idx, size(chunks)) + else + new_idx = idx + end + push!(region_metadata, (region_code, true, boundary_dims)) + else + push!(region_metadata, (region_code, false, ntuple(_ -> false, N))) + end + push!(neighbor_chunks, chunks[new_idx]) + end + + @assert length(region_metadata) == 3^N - 1 + return region_metadata, neighbor_chunks +end + +# Per-thread cache: WeakKeyDict{DArray, Dict{(chunk_idx, halo_width), HaloArray}}. +# WeakKeyDict is used for the outer level so that the cache does not hold a strong reference +# to the source DArray — allowing its GC finalizer to fire when user code drops its last +# reference (see below). Using chunk_idx as part of the inner key ensures that within one +# DArray, every chunk has its own dedicated buffer — so if a single worker thread processes +# multiple same-shaped chunks in the same iteration sequentially, each gets a distinct +# HaloArray and there is no aliasing with a concurrently running inner-stencil task. +# Filling a cached buffer in-place is safe because spawn_datadeps blocks until all inner +# tasks complete before the next iteration's build_halo_consolidated calls run. +const HALO_ARRAY_CACHE = TaskLocalValue{WeakKeyDict{Any,Dict{Any,Any}}}(()->WeakKeyDict{Any,Dict{Any,Any}}()) + +# Called on the main task (outside any Dagger.@spawn) to get or create the per-DArray inner +# cache dict. Keeping all WeakKeyDict operations here — rather than inside spawned tasks — +# avoids every pitfall of passing a DArray to @spawn (Dagger resolving it to a ROCArray on +# GPU, serialization producing a copy with a different objectid under Distributed, etc.). +# A finalizer registered on first encounter frees all cached HaloArrays when the DArray is +# collected; the WeakKeyDict ensures the cache itself does not prevent that collection. +function get_halo_inner_cache(read_darray) + outer_cache = HALO_ARRAY_CACHE[] + if !haskey(outer_cache, read_darray) + inner_cache = Dict{Any,Any}() + outer_cache[read_darray] = inner_cache + finalizer(read_darray) do _ + # GC finalizers cannot yield (no task switches allowed), but unsafe_free! on + # GPU/distributed Chunks calls remotecall_fetch which acquires locks and yields. + # Defer cleanup to a scheduled task so it runs outside the finalizer context. + errormonitor(Threads.@spawn begin + for halo in values(inner_cache) + try + unsafe_free!(halo) + catch e + # fill_halo_inplace! runs inside spawn_datadeps and may be placed + # on a Distributed worker; if that worker has already exited by + # cleanup time, skip the free. Use nameof to work with both + # Distributed and DistributedNext backends. + string(nameof(typeof(e))) == "ProcessExitedException" || rethrow(e) + end + end + end) + end + end + return outer_cache[read_darray] +end + +# Cache-miss path: allocate an empty HaloArray using similar() so the buffers are created +# on the same device as the center chunk (GPU or CPU). No filling — that happens inside +# spawn_datadeps via fill_halo_inplace!. +function alloc_halo(neigh_dist, center) + N = ndims(center) + center_size = size(center) + halo_width = ntuple(i -> get_neigh_dist(neigh_dist, i), N) + codes = all_region_codes(Val(N)) + halos = ntuple(length(codes)) do i + region_size = halo_region_size(center_size, halo_width, codes[i]) + similar(center, region_size...) + end + return HaloArray(similar(center), halos, halo_width; own_center=true) +end + +# Cache-hit path: fill an existing HaloArray in-place and return it. No cache operations — +# the HaloArray was looked up and passed by the main task before spawning. +function fill_halo_inplace!(halo::HaloArray, neigh_dist, boundary, center, region_metadata, neighbor_chunks...) + expected_halos = length(region_metadata) + @assert length(neighbor_chunks) == expected_halos + copyto!(halo.center, center) + for i in 1:expected_halos + region_code, is_boundary, boundary_dims = region_metadata[i] + chunk = neighbor_chunks[i] + if is_boundary + load_boundary_region_into!(halo.halos[i], boundary, chunk, region_code, neigh_dist, boundary_dims) + else + load_neighbor_region_into!(halo.halos[i], chunk, region_code, neigh_dist) + end + end + return halo +end + +function build_halo(neigh_dist, boundary, center, all_halos...; own_center::Bool=false) N = ndims(center) expected_halos = 3^N - 1 @assert length(all_halos) == expected_halos "Halo mismatch: N=$N expected $expected_halos halos, got $(length(all_halos))" - return HaloArray(copy(center), (all_halos...,), ntuple(i->get_neigh_dist(neigh_dist, i), N)) + center_data = own_center ? copy(center) : center + return HaloArray(center_data, (all_halos...,), ntuple(i->get_neigh_dist(neigh_dist, i), N); own_center) end function load_neighborhood(arr::HaloArray{T,N}, idx) where {T,N} @@ -631,7 +948,8 @@ end function inner_stencil!(f, output, read_vars) processor = task_processor() inner_stencil_proc!(processor, f, output, read_vars) - foreach(v -> v isa HaloArray && unsafe_free!(v), values(read_vars)) + # HaloArray lifetime is now managed by the DArray finalizer registered in + # get_halo_inner_cache; do not unsafe_free! here to avoid use-after-free on cache hits. end # Non-KA (for CPUs) @@ -791,8 +1109,11 @@ macro stencil(orig_ex) end end - # 2. Stencil operations (inside spawn_datadeps) - datadeps_body = Expr(:block) + # 2. Stencil operations: one spawn_datadeps region per expression. + # Because spawn_datadeps blocks until all its tasks complete, each expression's + # region fully finishes before the next expression's halo tasks are spawned. + # This means HaloArray allocations can always live outside spawn_datadeps, + # avoiding Datadeps aliasing issues unconditionally. for (;inner_ex, accessed_vars, write_var, write_idx, read_ex, read_vars, neighborhoods, is_allocation, source_var) in inners # Generate a variable for chunk access @gensym chunk_idx @@ -821,25 +1142,45 @@ macro stencil(orig_ex) end inner_fn = Expr(:->, Expr(:tuple, Expr(:parameters, inner_write_var, actual_read_vars...)), new_inner_ex) - # 2a. Pre-spawn all halos for this expression - # This ensures all readers capture the "old" state before any writers start. - @gensym halo_tasks_map - push!(datadeps_body.args, :($halo_tasks_map = Dict{Symbol, Any}())) + # 2a. For each neighborhood read_var: pre-compute region metadata on the main task + # (to avoid passing DArrays into @spawn), and for cache misses spawn alloc_halo + # outside spawn_datadeps so that HaloArray buffers are allocated on the correct + # device (GPU or CPU) before filling. + # + # fill_halo_inplace! is spawned *inside* spawn_datadeps (step 2c) with explicit + # Read deps on the DArray chunks, so Datadeps automatically enforces the ordering + # between fill tasks and inner stencil tasks — including the cross-chunk case where + # fill_task[C] reads chunk[N] as a neighbor while inner_stencil[N] writes chunk[N]. + # This replaces the explicit wait-barrier that was needed when fills ran outside. + cache_sym_map = Dict{Symbol, NamedTuple}() for read_var in read_vars if read_var in keys(neighborhoods) neigh_dist, boundary = neighborhoods[read_var] - @gensym halo_tasks - push!(datadeps_body.args, :($halo_tasks = Array{$DTask}(undef, size($chunks($read_var))))) - push!(datadeps_body.args, quote + @gensym region_info_table fill_tasks region_meta neighbor_cks halo_width_var inner_cache_var cache_key_var + cache_sym_map[read_var] = (; fill_tasks, halo_width_var, inner_cache_var, cache_key_var, region_info_table) + # Validate and compute halo_width on the main task so errors are immediate. + push!(final_ex.args, :($validate_neigh_dist($neigh_dist, ndims($read_var)))) + push!(final_ex.args, :($halo_width_var = ntuple(i -> $get_neigh_dist($neigh_dist, i), ndims($read_var)))) + push!(final_ex.args, :($inner_cache_var = $get_halo_inner_cache($read_var))) + # Pre-compute region metadata for every chunk on the main task. + push!(final_ex.args, :($region_info_table = Array{Any}(undef, size($chunks($read_var))))) + push!(final_ex.args, :($fill_tasks = Array{$DTask}(undef, size($chunks($read_var))))) + push!(final_ex.args, quote for $chunk_idx in $CartesianIndices($chunks($read_var)) - $halo_tasks[$chunk_idx] = Dagger.@spawn name="stencil_build_halo" $build_halo($neigh_dist, $boundary, map($Read, $select_neighborhood_chunks($chunks($read_var), $chunk_idx, $neigh_dist, $boundary))...) + ($region_meta, $neighbor_cks) = $select_neighborhood_info($chunks($read_var), $chunk_idx, $neigh_dist, $boundary) + $region_info_table[$chunk_idx] = (tuple($region_meta...), $neighbor_cks) + $cache_key_var = ($chunk_idx, $halo_width_var) + if !haskey($inner_cache_var, $cache_key_var) + # Cache miss: allocate empty buffers on the correct device. + $inner_cache_var[$cache_key_var] = Dagger.@spawn name="stencil_alloc_halo" $alloc_halo($neigh_dist, $chunks($read_var)[$chunk_idx]) + end end end) - push!(datadeps_body.args, :($halo_tasks_map[$(QuoteNode(read_var))] = $halo_tasks)) end end - # 2b. Generate @spawn call with appropriate vars and deps + # 2b. Build the inner-stencil @spawn expression. Neighborhood deps reference + # fill_tasks[chunk_idx] populated by the fill loop (see 2c below). deps_ex = Any[] if write_var in read_vars push!(deps_ex, Expr(:kw, inner_write_var, :($ReadWrite($chunks($write_var)[$chunk_idx])))) @@ -848,26 +1189,70 @@ macro stencil(orig_ex) end for read_var in actual_read_vars if read_var in keys(neighborhoods) - push!(deps_ex, Expr(:kw, read_var, :($Read($halo_tasks_map[$(QuoteNode(read_var))][$chunk_idx])))) + syms = cache_sym_map[read_var] + # Reference fill_tasks[chunk_idx] — populated by the fill loop before the + # stencil loop runs, so it is always assigned when this expression executes. + push!(deps_ex, Expr(:kw, read_var, :($Read($(syms.fill_tasks)[$chunk_idx])))) else if read_var != write_var push!(deps_ex, Expr(:kw, read_var, :($Read($chunks($read_var)[$chunk_idx])))) end end end - spawn_ex = :(Dagger.@spawn name="stencil_inner_fn" $inner_fn(;$(deps_ex...))) + inner_spawn_ex = :(Dagger.@spawn name="stencil_inner_fn" $inner_fn(;$(deps_ex...))) + + # Build the fill @spawn expression — one per neighborhood read_var. + fill_spawn_exs = Expr[] + for read_var in read_vars + if read_var in keys(neighborhoods) + neigh_dist, boundary = neighborhoods[read_var] + syms = cache_sym_map[read_var] + @gensym _rmt _nck _rn + push!(fill_spawn_exs, quote + ($_rmt, $_nck) = $(syms.region_info_table)[$chunk_idx] + $_rn = map($Read, $_nck) + $(syms.fill_tasks)[$chunk_idx] = Dagger.@spawn name="stencil_fill_halo" $fill_halo_inplace!( + $ReadWrite($(syms.inner_cache_var)[($chunk_idx, $(syms.halo_width_var))]), + $neigh_dist, $boundary, + $Read($chunks($read_var)[$chunk_idx]), + $_rmt, + $_rn... + ) + end) + end + end - # 2c. Generate loop to spawn stencil tasks - push!(datadeps_body.args, quote + # 2c. Each expression gets its own spawn_datadeps region with TWO separate loops: + # first all fills, then all stencils. This ordering is critical when write_var is + # also a neighborhood read_var: submitting all fill tasks before any stencil tasks + # ensures Datadeps sees fill[C]'s Read(chunk[N]) BEFORE stencil[N]'s + # ReadWrite(chunk[N]), so stencil[N] is correctly ordered after fill[C]. + # (An interleaved single loop would register stencil[C]'s write to chunk[C] before + # fill[C+1] reads chunk[C], inverting the dependency.) + fill_loop_body = Expr(:block, fill_spawn_exs...) + push!(final_ex.args, :(Dagger.spawn_datadeps() do for $chunk_idx in $CartesianIndices($chunks($write_var)) - $spawn_ex + $fill_loop_body end - end) - end + for $chunk_idx in $CartesianIndices($chunks($write_var)) + $inner_spawn_ex + end + end)) - push!(final_ex.args, :(Dagger.spawn_datadeps() do - $datadeps_body - end)) + # 2d. After spawn_datadeps completes all fill and stencil tasks, fetch the filled + # HaloArray Chunks and store them in the cache (replacing any alloc DTasks from 2a). + # spawn_datadeps already waited for everything, so fetch is instant. + for read_var in read_vars + if read_var in keys(neighborhoods) + syms = cache_sym_map[read_var] + push!(final_ex.args, quote + for $chunk_idx in $CartesianIndices($chunks($read_var)) + $(syms.inner_cache_var)[($chunk_idx, $(syms.halo_width_var))] = fetch($(syms.fill_tasks)[$chunk_idx]; raw=true) + end + end) + end + end + end # 3. Return last allocated var if applicable if !isempty(inners) && inners[end].is_allocation diff --git a/src/precompile.jl b/src/precompile.jl index 874e70de5..ef34da610 100644 --- a/src/precompile.jl +++ b/src/precompile.jl @@ -52,4 +52,7 @@ @assert isempty(Sch.WORKER_MONITOR_CHANS) @assert isempty(Sch.WORKER_MONITOR_TASKS) ID_COUNTER[] = 1 + # Clear the precompile-time UUID cache so it is not baked into the compiled + # image; __init__ re-populates it from the shared UUID file at load time. + delete!(SYSTEM_UUIDS, myid()) end diff --git a/src/utils/haloarray.jl b/src/utils/haloarray.jl index 0067bf801..32867e973 100644 --- a/src/utils/haloarray.jl +++ b/src/utils/haloarray.jl @@ -9,6 +9,12 @@ struct HaloArray{T,N,A<:AbstractArray{T,N},H<:Tuple} <: AbstractArray{T,N} center::A halos::H # Tuple of 3^N - 1 arrays in canonical order halo_width::NTuple{N,Int} + own_center::Bool +end + +function HaloArray(center, halos::Tuple, halo_width::NTuple{N,Int}; own_center::Bool=false) where N + T = eltype(center) + return HaloArray{T,N,typeof(center),typeof(halos)}(center, halos, halo_width, own_center) end # Number of halo regions for N dimensions @@ -63,7 +69,7 @@ function HaloArray{T,N}(center_size::NTuple{N,Int}, halo_width::NTuple{N,Int}) w Array{T,N}(undef, region_size...) end - return HaloArray{T,N,typeof(center),typeof(halos)}(center, halos, halo_width) + return HaloArray(center, halos, halo_width; own_center=true) end Base.size(tile::HaloArray) = size(tile.center) .+ 2 .* tile.halo_width @@ -83,7 +89,7 @@ function Base.copy(tile::HaloArray{T,N}) where {T,N} center = copy(tile.center) halos = ntuple(i -> copy(tile.halos[i]), length(tile.halos)) halo_width = tile.halo_width - return HaloArray(center, halos, halo_width) + return HaloArray(center, halos, halo_width; own_center=true) end # Compute the region code for a given index @@ -182,7 +188,8 @@ end Adapt.adapt_structure(to, H::Dagger.HaloArray) = HaloArray(Adapt.adapt(to, H.center), Adapt.adapt.(Ref(to), H.halos), - H.halo_width) + H.halo_width; + own_center=H.own_center) function aliasing(A::HaloArray) return CombinedAliasing([aliasing(A.center), map(aliasing, A.halos)...]) @@ -193,16 +200,17 @@ function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::P center_chunk = move_rewrap(cache, from_proc, to_proc, from_space, to_space, A.center) halo_chunks = ntuple(i -> move_rewrap(cache, from_proc, to_proc, from_space, to_space, A.halos[i]), length(A.halos)) halo_width = A.halo_width + own_center = A.own_center to_w = root_worker_id(to_proc) - return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, center_chunk, halo_chunks, halo_width) do from_proc, to_proc, from_space, to_space, center_chunk, halo_chunks, halo_width + return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, center_chunk, halo_chunks, halo_width, own_center) do from_proc, to_proc, from_space, to_space, center_chunk, halo_chunks, halo_width, own_center center_new = unwrap(center_chunk) halos_new = ntuple(i -> unwrap(halo_chunks[i]), length(halo_chunks)) - return tochunk(HaloArray(center_new, halos_new, halo_width), to_proc) + return tochunk(HaloArray(center_new, halos_new, halo_width; own_center=own_center), to_proc) end end function Dagger.unsafe_free!(A::HaloArray) - unsafe_free!(A.center) + A.own_center && unsafe_free!(A.center) foreach(unsafe_free!, A.halos) end diff --git a/src/utils/system_uuid.jl b/src/utils/system_uuid.jl index 89925ef0f..62951dadc 100644 --- a/src/utils/system_uuid.jl +++ b/src/utils/system_uuid.jl @@ -11,13 +11,18 @@ function system_uuid_fallback() flush(temp_io) close(temp_io) try - # Try to make this the UUID file - mv(temp_uuid_file, uuid_file; force=false) + # Use hardlink (link(2)) instead of rename(2): link is atomic and + # fails with EEXIST when the destination already exists, whereas + # rename(2) silently overwrites, creating a TOCTOU race where two + # concurrent callers both pass the ispath check and both return + # different UUIDs. Only one hardlink call can succeed; the rest + # fall through to read the winner's UUID from the file. + Base.Filesystem.hardlink(temp_uuid_file, uuid_file) + rm(temp_uuid_file; force=true) return uuid - catch err - err isa ArgumentError || rethrow(err) - # Failed, clean up temp file, and read existing UUID file - rm(temp_uuid_file) + catch + # Failed (file already exists or other error); clean up and read + rm(temp_uuid_file; force=true) end end return parse(UUID, read(uuid_file, String)) diff --git a/test/array/stencil.jl b/test/array/stencil.jl index 7dbf741ee..4db1984d0 100644 --- a/test/array/stencil.jl +++ b/test/array/stencil.jl @@ -1,6 +1,6 @@ import Dagger: @stencil, Wrap, Pad, Reflect, Clamp, LinearExtrapolate -function test_stencil() +function test_stencil(; gpu::Bool=false) @testset "Simple assignment" begin A = zeros(Blocks(2, 2), Int, 4, 4) @stencil A[idx] = 1 @@ -387,6 +387,8 @@ function test_stencil() # From issue #669 for N in 3:4 + # Fine-grained 4D GPU stencils require too many halo copies for typical GPU memory + gpu && N == 4 && continue @testset "$(N)D array" begin A = ones(Blocks(ntuple(_->1, N)...), Int, ntuple(_->3, N)...) Dagger.allowscalar() do @@ -394,7 +396,7 @@ function test_stencil() end B = zeros(Blocks(ntuple(_->1, N)...), Float32, ntuple(_->3, N)...) - @stencil B[idx] = sum(@neighbors(A[idx], 1, Wrap())) / length(A) + @stencil B[idx] = sum(@neighbors(A[idx], 1, Wrap())) ÷ length(A) @test all(==(Float64(sum(1:length(A)) / length(A))), collect(B)) end end @@ -497,7 +499,7 @@ end kind == :oneAPI && continue @testset "$kind" begin Dagger.with_options(;scope) do - test_stencil() + test_stencil(; gpu=true) end end end