diff --git a/Project.toml b/Project.toml index 9864c453..cddd1654 100644 --- a/Project.toml +++ b/Project.toml @@ -16,7 +16,7 @@ CompilerCaching = "9db33cc3-5358-4881-8759-fa4194144afd" EnumX = "4e289a0a-7415-4d19-859d-a7e5c4648b56" GPUArrays = "0c68f7d7-f131-5f86-a1c3-88cf8149b2d7" IRStructurizer = "93e32bba-5bb8-402b-805d-ffb066edee93" -LMDB_jll = "6206cf0b-f360-5984-af49-5437264c140e" +LMDB = "11f193de-5e89-5f17-923a-7d207d56daf9" PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a" REPL = "3fa0cd96-eef1-5676-8a61-b3b8758bbffb" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" @@ -38,8 +38,11 @@ CompilerCaching = "0.2.4" EnumX = "1.0" GPUArrays = "11" IRStructurizer = "0.6" -LMDB_jll = "0.9" +LMDB = "2" PrecompileTools = "1" REPL = "1" Scratch = "1.2" julia = "1.11" + +[sources] +LMDB = { url="https://github.com/maleadt/LMDB.jl", rev="master" } diff --git a/src/cache.jl b/src/cache.jl index e2c226b0..8b18f836 100644 --- a/src/cache.jl +++ b/src/cache.jl @@ -9,10 +9,12 @@ persists across sessions, so the second run of a kernel skips `tileiras` entirely. Modeled on JuliaLang/julia#61527 (LLVM `objcache`) and cuTile Python's SQLite cache (`cuda.tile._cache`). -The implementation talks to LMDB directly via `LMDB_jll`. The public -surface is intentionally narrow (`open`, `close`, `get`, `put!`, -`compute_key`, `evict_lru!`, plus the lazy `global_cache` accessor) so we -can swap the backend to `LMDB.jl` later without touching call sites. +The implementation talks to LMDB via the `LMDB.jl` tier-2 API +(`Environment` / `start` / `tryget` / `put!` / `walk` / `delete!`). All +the value framing and prefix-skipping happens via custom +`Base.read(io::IO, ::Type{AtimedBlob})` / `::Type{AtimeMeta}` methods — +LMDB.jl exposes each value as an `MDBValueIO <: IO` view, so the typed +`tryget` / `walk` calls dispatch through standard Julia IO reads. # Layout - A single LMDB env at `\$(scratchspace)/disk_cache/`. The directory @@ -27,12 +29,12 @@ can swap the backend to `LMDB.jl` later without touching call sites. The atime drives the LRU eviction policy. # Eviction -LMDB provides no built-in cache replacement; `mdb_put` returns +LMDB provides no built-in cache replacement; `put!` returns `MDB_MAP_FULL` when the map is exhausted. We prune *before* hitting that point (deletes are also copy-on-write in LMDB and need free pages, so draining a fully-saturated map is unreliable). On every `put!`: -1. Compute env utilization via `mdb_env_info` + the cached page size. +1. Compute env utilization via `LMDB.info` + the cached page size. 2. If above [`HIGH_WATER`](@ref) (90%), call [`evict_lru!`](@ref) targeting [`LOW_WATER`](@ref) (75%). 3. Then write the new entry. @@ -48,56 +50,35 @@ cause on hot kernels. """ module DiskCache -using LMDB_jll: liblmdb +import LMDB using Scratch: @get_scratch! # =========================================================================== -# Minimal LMDB binding +# Value framing — `atime_ns ‖ payload` # =========================================================================== -const MDB_RDONLY = Cuint(0x00020000) -const MDB_NOTLS = Cuint(0x00200000) -const MDB_NORDAHEAD = Cuint(0x00800000) -const MDB_NOOVERWRITE = Cuint(0x00000010) - -const MDB_SUCCESS = Cint(0) -const MDB_KEYEXIST = Cint(-30799) -const MDB_NOTFOUND = Cint(-30798) -const MDB_MAP_FULL = Cint(-30792) - -# MDB_cursor_op values (from lmdb.h) -const MDB_FIRST = Cuint(0) -const MDB_NEXT = Cuint(8) - -struct MDB_val - mv_size::Csize_t - mv_data::Ptr{Cvoid} -end +const _ATIME_PREFIX = 8 # bytes for a UInt64 little-endian timestamp -struct MDB_envinfo - me_mapaddr::Ptr{Cvoid} - me_mapsize::Csize_t - me_last_pgno::Csize_t - me_last_txnid::Csize_t - me_maxreaders::Cuint - me_numreaders::Cuint -end +# `tryget(txn, dbi, key, AtimedBlob)` returns the payload (atime stripped), +# or `nothing` for both NOTFOUND and malformed entries — caller treats both +# as cache miss. Single copy out of the mmap via `read(io, Vector{UInt8})`. +struct AtimedBlob end -struct MDB_stat - ms_psize::Cuint - ms_depth::Cuint - ms_branch_pages::Csize_t - ms_leaf_pages::Csize_t - ms_overflow_pages::Csize_t - ms_entries::Csize_t +function Base.read(io::IO, ::Type{AtimedBlob}) + bytesavailable(io) < _ATIME_PREFIX && return nothing + skip(io, _ATIME_PREFIX) + return read(io, Vector{UInt8}) end -errstr(ret::Cint) = - unsafe_string(ccall((:mdb_strerror, liblmdb), Cstring, (Cint,), ret)) +# `walk(cur, Vector{UInt8}, AtimeMeta)` yields per-entry `(atime, full_size)` +# pairs without copying out the payload — used by eviction to sort by atime +# and approximate freed-byte counts. +struct AtimeMeta end -@inline function check(ret::Cint, what) - iszero(ret) && return nothing - error("LMDB $what failed: $(errstr(ret))") +function Base.read(io::IO, ::Type{AtimeMeta}) + sz = bytesavailable(io) + atime = sz >= _ATIME_PREFIX ? ltoh(read(io, UInt64)) : UInt64(0) + return (atime = atime, size = sz) end # =========================================================================== @@ -113,22 +94,20 @@ writers internally; readers use `MDB_NOTLS` so they're decoupled from the OS thread). """ mutable struct Cache - env::Ptr{Cvoid} # MDB_env* - dbi::Cuint # main DB handle + env::LMDB.Environment + dbi::LMDB.DBI psize::Int # LMDB page size in bytes (cached at open) path::String refreshed::Set{Vector{UInt8}} # keys whose atime we already bumped this session state_lock::ReentrantLock # guards `refreshed` + serializes evictions - function Cache(env::Ptr{Cvoid}, dbi::Cuint, psize::Integer, path::AbstractString) - c = new(env, dbi, Int(psize), String(path), - Set{Vector{UInt8}}(), ReentrantLock()) - finalizer(close, c) - return c + function Cache(env, dbi, psize::Integer, path::AbstractString) + new(env, dbi, Int(psize), String(path), + Set{Vector{UInt8}}(), ReentrantLock()) end end -isopen(cache::Cache) = cache.env != C_NULL +isopen(cache::Cache) = LMDB.isopen(cache.env) """ close(cache::Cache) @@ -136,9 +115,9 @@ isopen(cache::Cache) = cache.env != C_NULL Release the underlying LMDB environment. Idempotent. """ function close(cache::Cache) - cache.env == C_NULL && return - ccall((:mdb_env_close, liblmdb), Cvoid, (Ptr{Cvoid},), cache.env) - cache.env = C_NULL + LMDB.isopen(cache.env) || return + LMDB.close(cache.env, cache.dbi) + LMDB.close(cache.env) return end @@ -154,71 +133,19 @@ function open(path::AbstractString; mapsize::Integer = (Csize_t(1) << 30), maxreaders::Integer = 510) mkpath(path) - env_ref = Ref{Ptr{Cvoid}}(C_NULL) - check(ccall((:mdb_env_create, liblmdb), Cint, (Ref{Ptr{Cvoid}},), env_ref), - "mdb_env_create") - env = env_ref[] - - try - check(ccall((:mdb_env_set_maxreaders, liblmdb), Cint, - (Ptr{Cvoid}, Cuint), env, Cuint(maxreaders)), - "mdb_env_set_maxreaders") - check(ccall((:mdb_env_set_mapsize, liblmdb), Cint, - (Ptr{Cvoid}, Csize_t), env, Csize_t(mapsize)), - "mdb_env_set_mapsize") - - # MDB_NOTLS: read txns aren't tied to the OS thread that opened them - # (Julia tasks may migrate). MDB_NORDAHEAD: lookups are random-access, - # so OS read-ahead is wasted I/O. - flags = MDB_NOTLS | MDB_NORDAHEAD - check(ccall((:mdb_env_open, liblmdb), Cint, - (Ptr{Cvoid}, Cstring, Cuint, Cushort), - env, path, flags, Cushort(0o644)), - "mdb_env_open($(repr(path)))") - - dbi, psize = open_main_db_and_stat!(env) - return Cache(env, dbi, psize, path) - catch - ccall((:mdb_env_close, liblmdb), Cvoid, (Ptr{Cvoid},), env) - rethrow() + # MDB_NOTLS: read txns aren't tied to the OS thread that opened them + # (Julia tasks may migrate). MDB_NORDAHEAD: lookups are random-access, + # so OS read-ahead is wasted I/O. + env = LMDB.Environment(path; + mapsize = mapsize, + maxreaders = maxreaders, + flags = LMDB.MDB_NOTLS | LMDB.MDB_NORDAHEAD, + mode = 0o644) + dbi, psize = LMDB.start(env) do txn + d = LMDB.open(txn) + (d, LMDB.stat(txn, d).psize) end -end - -# Get a reusable handle to the env's main (unnamed) DB and read out the page -# size in the same dummy write txn. The dbi handle is only valid in -# subsequent transactions after the opening txn commits, so we always go -# through this dance. -function open_main_db_and_stat!(env::Ptr{Cvoid}) - txn_ref = Ref{Ptr{Cvoid}}(C_NULL) - check(ccall((:mdb_txn_begin, liblmdb), Cint, - (Ptr{Cvoid}, Ptr{Cvoid}, Cuint, Ref{Ptr{Cvoid}}), - env, C_NULL, Cuint(0), txn_ref), - "mdb_txn_begin (init)") - txn = txn_ref[] - - dbi_ref = Ref{Cuint}(0) - ret = ccall((:mdb_dbi_open, liblmdb), Cint, - (Ptr{Cvoid}, Ptr{Cchar}, Cuint, Ref{Cuint}), - txn, Ptr{Cchar}(C_NULL), Cuint(0), dbi_ref) - if !iszero(ret) - ccall((:mdb_txn_abort, liblmdb), Cvoid, (Ptr{Cvoid},), txn) - check(ret, "mdb_dbi_open (main)") - end - dbi = dbi_ref[] - - stat_ref = Ref{MDB_stat}() - ret = ccall((:mdb_stat, liblmdb), Cint, - (Ptr{Cvoid}, Cuint, Ref{MDB_stat}), txn, dbi, stat_ref) - if !iszero(ret) - ccall((:mdb_txn_abort, liblmdb), Cvoid, (Ptr{Cvoid},), txn) - check(ret, "mdb_stat") - end - psize = Int(stat_ref[].ms_psize) - - check(ccall((:mdb_txn_commit, liblmdb), Cint, (Ptr{Cvoid},), txn), - "mdb_txn_commit (init)") - - return dbi, psize + return Cache(env, dbi, psize, path) end """ @@ -227,61 +154,24 @@ end Snapshot of the env's *live* page count and configured map size. Used by the eviction policy to decide whether to prune. -Note that `mdb_env_info`'s `me_last_pgno` is a monotonic high-water +Note that `MDB_envinfo`'s `me_last_pgno` is a monotonic high-water mark — it doesn't drop when entries get deleted. Eviction relies on -`mdb_stat`'s live-page accounting (branch + leaf + overflow), which +`MDB_stat`'s live-page accounting (branch + leaf + overflow), which *does* shrink after a delete commits. Cost is one short read txn — ~10 µs. """ function env_info(cache::Cache) - info_ref = Ref{MDB_envinfo}() - check(ccall((:mdb_env_info, liblmdb), Cint, - (Ptr{Cvoid}, Ref{MDB_envinfo}), - cache.env, info_ref), - "mdb_env_info") - mapsize = Int(info_ref[].me_mapsize) - - txn_ref = Ref{Ptr{Cvoid}}(C_NULL) - check(ccall((:mdb_txn_begin, liblmdb), Cint, - (Ptr{Cvoid}, Ptr{Cvoid}, Cuint, Ref{Ptr{Cvoid}}), - cache.env, C_NULL, MDB_RDONLY, txn_ref), - "mdb_txn_begin (stat)") - txn = txn_ref[] - stat_ref = Ref{MDB_stat}() - try - check(ccall((:mdb_stat, liblmdb), Cint, - (Ptr{Cvoid}, Cuint, Ref{MDB_stat}), - txn, cache.dbi, stat_ref), - "mdb_stat") - finally - ccall((:mdb_txn_abort, liblmdb), Cvoid, (Ptr{Cvoid},), txn) + mapsize = LMDB.info(cache.env).mapsize + s = LMDB.start(cache.env; flags = LMDB.MDB_RDONLY) do txn + LMDB.stat(txn, cache.dbi) end - s = stat_ref[] - live_pages = Int(s.ms_branch_pages) + Int(s.ms_leaf_pages) + Int(s.ms_overflow_pages) + live_pages = s.branch_pages + s.leaf_pages + s.overflow_pages used_bytes = live_pages * cache.psize - return (; mapsize, used_bytes, entries = Int(s.ms_entries)) -end - -@inline utilization(cache::Cache) = let i = env_info(cache); i.used_bytes / i.mapsize end - -# =========================================================================== -# Value framing (atime prefix) -# =========================================================================== - -const _ATIME_PREFIX = 8 # bytes for a UInt64 little-endian timestamp - -@inline function pack_value(atime::UInt64, value::Vector{UInt8}) - out = Vector{UInt8}(undef, _ATIME_PREFIX + length(value)) - GC.@preserve out begin - unsafe_store!(Ptr{UInt64}(pointer(out)), htol(atime)) - end - @inbounds copyto!(out, _ATIME_PREFIX + 1, value, 1, length(value)) - return out + return (; mapsize, used_bytes, entries = s.entries) end -@inline function read_atime(p::Ptr{UInt8}) - return ltoh(unsafe_load(Ptr{UInt64}(p))) -end +@inline utilization(cache::Cache) = + let i = env_info(cache); i.used_bytes / i.mapsize end # =========================================================================== # get / put! @@ -308,42 +198,13 @@ writer is allowed to reuse. On hit, the entry's atime is bumped (at most once per session per key, to avoid write-amplification). This drives the LRU eviction order. """ -function get(cache::Cache, key::Vector{UInt8}) - cache.env != C_NULL || error("DiskCache.get on closed cache") - - txn_ref = Ref{Ptr{Cvoid}}(C_NULL) - check(ccall((:mdb_txn_begin, liblmdb), Cint, - (Ptr{Cvoid}, Ptr{Cvoid}, Cuint, Ref{Ptr{Cvoid}}), - cache.env, C_NULL, MDB_RDONLY, txn_ref), - "mdb_txn_begin (read)") - txn = txn_ref[] - - blob = try - GC.@preserve key begin - key_val = Ref(MDB_val(Csize_t(length(key)), pointer(key))) - data_val = Ref(MDB_val(Csize_t(0), C_NULL)) - - ret = ccall( - (:mdb_get, liblmdb), Cint, - (Ptr{Cvoid}, Cuint, Ref{MDB_val}, Ref{MDB_val}), - txn, cache.dbi, key_val, data_val) - - ret == MDB_NOTFOUND && return nothing - check(ret, "mdb_get") - end +function get(cache::Cache, key::AbstractVector{UInt8}) + LMDB.isopen(cache.env) || error("DiskCache.get on closed cache") - sz = Int(data_val[].mv_size) - # Defensively reject malformed entries (shorter than the atime prefix). - # We never write those, but a corrupted env shouldn't crash the caller. - sz < _ATIME_PREFIX && return nothing - out = Vector{UInt8}(undef, sz - _ATIME_PREFIX) - unsafe_copyto!(pointer(out), - Ptr{UInt8}(data_val[].mv_data) + _ATIME_PREFIX, - sz - _ATIME_PREFIX) - out - finally - ccall((:mdb_txn_abort, liblmdb), Cvoid, (Ptr{Cvoid},), txn) + blob = LMDB.start(cache.env; flags = LMDB.MDB_RDONLY) do txn + LMDB.tryget(txn, cache.dbi, key, AtimedBlob) end + blob === nothing && return nothing # Throttled atime refresh. Done outside the read txn (LMDB doesn't # allow nesting). Errors here are non-fatal — we'd rather return the @@ -352,7 +213,7 @@ function get(cache::Cache, key::Vector{UInt8}) if !(key in cache.refreshed) push!(cache.refreshed, copy(key)) try - put_raw!(cache, key, pack_value(time_ns(), blob)) + put_framed!(cache, key, time_ns(), blob) catch err @debug "atime refresh failed" exception=(err, catch_backtrace()) end @@ -373,8 +234,9 @@ first writer wins. If the env is above [`HIGH_WATER`](@ref), [`evict_lru!`](@ref) is run first to drop down to [`LOW_WATER`](@ref). """ -function put!(cache::Cache, key::Vector{UInt8}, value::Vector{UInt8}) - cache.env != C_NULL || error("DiskCache.put! on closed cache") +function put!(cache::Cache, key::AbstractVector{UInt8}, + value::AbstractVector{UInt8}) + LMDB.isopen(cache.env) || error("DiskCache.put! on closed cache") # Double-checked: the cheap utilization probe gates the lock acquisition # in the common case (well below high water, no contention). @@ -386,8 +248,7 @@ function put!(cache::Cache, key::Vector{UInt8}, value::Vector{UInt8}) end end - framed = pack_value(time_ns(), value) - put_raw!(cache, key, framed) + put_framed!(cache, key, time_ns(), value) # Mark this key as "atime is fresh" so a subsequent get in the same # session doesn't redundantly bump it. @@ -395,31 +256,17 @@ function put!(cache::Cache, key::Vector{UInt8}, value::Vector{UInt8}) return end -# Single mdb_put with already-framed (atime-prefixed) value bytes. -function put_raw!(cache::Cache, key::Vector{UInt8}, framed::Vector{UInt8}) - txn_ref = Ref{Ptr{Cvoid}}(C_NULL) - check(ccall((:mdb_txn_begin, liblmdb), Cint, - (Ptr{Cvoid}, Ptr{Cvoid}, Cuint, Ref{Ptr{Cvoid}}), - cache.env, C_NULL, Cuint(0), txn_ref), - "mdb_txn_begin (write)") - txn = txn_ref[] - - committed = false - try - key_val = Ref(MDB_val(Csize_t(length(key)), pointer(key))) - val_val = Ref(MDB_val(Csize_t(length(framed)), pointer(framed))) - - ret = GC.@preserve key framed ccall( - (:mdb_put, liblmdb), Cint, - (Ptr{Cvoid}, Cuint, Ref{MDB_val}, Ref{MDB_val}, Cuint), - txn, cache.dbi, key_val, val_val, Cuint(0)) # plain overwrite - check(ret, "mdb_put") - - check(ccall((:mdb_txn_commit, liblmdb), Cint, (Ptr{Cvoid},), txn), - "mdb_txn_commit (write)") - committed = true - finally - committed || ccall((:mdb_txn_abort, liblmdb), Cvoid, (Ptr{Cvoid},), txn) +# Write `atime ‖ payload` directly into LMDB-allocated mmap pages, +# avoiding the intermediate `Vector{UInt8}` we used to build before +# `MDB_RESERVE` was reachable from tier-2. +function put_framed!(cache::Cache, key::AbstractVector{UInt8}, atime::UInt64, + payload::AbstractVector{UInt8}) + sz = _ATIME_PREFIX + length(payload) + LMDB.start(cache.env) do txn + LMDB.put_reserved!(txn, cache.dbi, key, sz) do buf + unsafe_store!(Ptr{UInt64}(pointer(buf)), htol(atime)) + copyto!(buf, _ATIME_PREFIX + 1, payload, 1, length(payload)) + end end return end @@ -479,97 +326,31 @@ function evict_lru!(cache::Cache, target_ratio::Real = LOW_WATER) end # Collect (key_copy, atime, raw_value_size) for every entry in the cache. +# Typed walk: the key gets unpacked as `Vector{UInt8}` (default Julia-owned +# copy), the value as the AtimeMeta NamedTuple (atime + size, no payload +# copy). Pre-eviction-format / malformed entries surface with `atime = 0` +# so they evict first. function collect_entries(cache::Cache) entries = Tuple{Vector{UInt8}, UInt64, Int}[] - - txn_ref = Ref{Ptr{Cvoid}}(C_NULL) - check(ccall((:mdb_txn_begin, liblmdb), Cint, - (Ptr{Cvoid}, Ptr{Cvoid}, Cuint, Ref{Ptr{Cvoid}}), - cache.env, C_NULL, MDB_RDONLY, txn_ref), - "mdb_txn_begin (evict-scan)") - txn = txn_ref[] - - cursor_ref = Ref{Ptr{Cvoid}}(C_NULL) - ret = ccall((:mdb_cursor_open, liblmdb), Cint, - (Ptr{Cvoid}, Cuint, Ref{Ptr{Cvoid}}), - txn, cache.dbi, cursor_ref) - if !iszero(ret) - ccall((:mdb_txn_abort, liblmdb), Cvoid, (Ptr{Cvoid},), txn) - check(ret, "mdb_cursor_open") - end - cursor = cursor_ref[] - - try - op = MDB_FIRST - key_val = Ref(MDB_val(Csize_t(0), C_NULL)) - data_val = Ref(MDB_val(Csize_t(0), C_NULL)) - while true - ret = ccall((:mdb_cursor_get, liblmdb), Cint, - (Ptr{Cvoid}, Ref{MDB_val}, Ref{MDB_val}, Cuint), - cursor, key_val, data_val, op) - ret == MDB_NOTFOUND && break - check(ret, "mdb_cursor_get") - - kv, dv = key_val[], data_val[] - - keysz = Int(kv.mv_size) - datasz = Int(dv.mv_size) - - key_copy = Vector{UInt8}(undef, keysz) - unsafe_copyto!(pointer(key_copy), Ptr{UInt8}(kv.mv_data), keysz) - - atime = if datasz >= _ATIME_PREFIX - read_atime(Ptr{UInt8}(dv.mv_data)) - else - # Pre-eviction-format entries (or anything malformed) get - # priority eviction by virtue of atime = 0. - UInt64(0) + LMDB.start(cache.env; flags = LMDB.MDB_RDONLY) do txn + LMDB.open(txn, cache.dbi) do cur + LMDB.walk(cur, Vector{UInt8}, AtimeMeta) do key, meta + push!(entries, (key, meta.atime, meta.size)) end - - push!(entries, (key_copy, atime, datasz)) - op = MDB_NEXT end - finally - ccall((:mdb_cursor_close, liblmdb), Cvoid, (Ptr{Cvoid},), cursor) - ccall((:mdb_txn_abort, liblmdb), Cvoid, (Ptr{Cvoid},), txn) end - return entries end # Delete a batch of keys in a single write txn. function delete_batch!(cache::Cache, keys::Vector{Vector{UInt8}}) - txn_ref = Ref{Ptr{Cvoid}}(C_NULL) - check(ccall((:mdb_txn_begin, liblmdb), Cint, - (Ptr{Cvoid}, Ptr{Cvoid}, Cuint, Ref{Ptr{Cvoid}}), - cache.env, C_NULL, Cuint(0), txn_ref), - "mdb_txn_begin (evict-delete)") - txn = txn_ref[] - - deleted = 0 - committed = false - try + LMDB.start(cache.env) do txn + deleted = 0 for key in keys - key_val = Ref(MDB_val(Csize_t(length(key)), pointer(key))) - ret = GC.@preserve key ccall( - (:mdb_del, liblmdb), Cint, - (Ptr{Cvoid}, Cuint, Ref{MDB_val}, Ptr{MDB_val}), - txn, cache.dbi, key_val, C_NULL) - if ret == MDB_NOTFOUND - # Already gone (race or repeat); skip. - else - check(ret, "mdb_del") - deleted += 1 - end + LMDB.delete!(txn, cache.dbi, key) && (deleted += 1) end - - check(ccall((:mdb_txn_commit, liblmdb), Cint, (Ptr{Cvoid},), txn), - "mdb_txn_commit (evict)") - committed = true - finally - committed || ccall((:mdb_txn_abort, liblmdb), Cvoid, (Ptr{Cvoid},), txn) + deleted end - return deleted end # ===========================================================================