diff --git a/src/BLite.Core/BLiteEngine.cs b/src/BLite.Core/BLiteEngine.cs index a83ef00..04c49a8 100644 --- a/src/BLite.Core/BLiteEngine.cs +++ b/src/BLite.Core/BLiteEngine.cs @@ -2,12 +2,13 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using BLite.Bson; -using BLite.Core.KeyValue; -using BLite.Core.Storage; -using BLite.Core.Transactions; +using System.Threading; +using System.Threading.Tasks; +using BLite.Bson; +using BLite.Core.Collections; +using BLite.Core.KeyValue; +using BLite.Core.Storage; +using BLite.Core.Transactions; namespace BLite.Core; @@ -22,9 +23,10 @@ namespace BLite.Core; /// public sealed class BLiteEngine : IDisposable, ITransactionHolder { - private readonly StorageEngine _storage; - private readonly ConcurrentDictionary _collections = new(StringComparer.OrdinalIgnoreCase); - private readonly BLiteKvStore _kvStore; + private readonly StorageEngine _storage; + private readonly ConcurrentDictionary _collections = new(StringComparer.OrdinalIgnoreCase); + private readonly FreeSpaceIndexProvider _freeSpaceIndexes; + private readonly BLiteKvStore _kvStore; private bool _disposed; /// @@ -68,8 +70,9 @@ public BLiteEngine(string databasePath, PageFileConfig config, BLiteKvOptions? k if (string.IsNullOrWhiteSpace(databasePath)) throw new ArgumentNullException(nameof(databasePath)); - _storage = new StorageEngine(databasePath, config); - _kvStore = new BLiteKvStore(_storage, kvOptions); + _storage = new StorageEngine(databasePath, config); + _freeSpaceIndexes = new FreeSpaceIndexProvider(_storage); + _kvStore = new BLiteKvStore(_storage, kvOptions); } /// @@ -78,8 +81,9 @@ public BLiteEngine(string databasePath, PageFileConfig config, BLiteKvOptions? k /// internal BLiteEngine(StorageEngine storage, BLiteKvOptions? kvOptions = null) { - _storage = storage ?? throw new ArgumentNullException(nameof(storage)); - _kvStore = new BLiteKvStore(_storage, kvOptions); + _storage = storage ?? throw new ArgumentNullException(nameof(storage)); + _freeSpaceIndexes = new FreeSpaceIndexProvider(_storage); + _kvStore = new BLiteKvStore(_storage, kvOptions); } /// @@ -150,8 +154,8 @@ public static BLiteEngine CreateFromStorage(StorageEngine storage, BLiteKvOption /// A new backed by this engine's storage. public BLiteSession OpenSession() { - ThrowIfDisposed(); - return new BLiteSession(_storage); + ThrowIfDisposed(); + return new BLiteSession(_storage, _freeSpaceIndexes); } #endregion @@ -182,7 +186,7 @@ public DynamicCollection GetOrCreateCollection(string name, BsonIdType idType = if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name)); - return _collections.GetOrAdd(name, n => new DynamicCollection(_storage, this, n, idType)); + return _collections.GetOrAdd(name, n => new DynamicCollection(_storage, this, n, idType, _freeSpaceIndexes.GetIndex())); } /// @@ -224,7 +228,7 @@ public IReadOnlyList ListCollections() // own metadata from storage, so the idType argument is irrelevant for // existing collections (only matters when creating brand-new ones). foreach (var meta in _storage.GetAllCollectionsMetadata()) - _collections.GetOrAdd(meta.Name, n => new DynamicCollection(_storage, this, n)); + _collections.GetOrAdd(meta.Name, n => new DynamicCollection(_storage, this, n, BsonIdType.ObjectId, _freeSpaceIndexes.GetIndex())); return _collections.Keys.ToList(); } diff --git a/src/BLite.Core/BLiteSession.cs b/src/BLite.Core/BLiteSession.cs index 8297e95..68e7ef2 100644 --- a/src/BLite.Core/BLiteSession.cs +++ b/src/BLite.Core/BLiteSession.cs @@ -4,6 +4,7 @@ using System.Threading; using System.Threading.Tasks; using BLite.Bson; +using BLite.Core.Collections; using BLite.Core.Storage; using BLite.Core.Transactions; @@ -26,14 +27,16 @@ namespace BLite.Core; public sealed class BLiteSession : ITransactionHolder, IDisposable { private readonly StorageEngine _storage; + private readonly FreeSpaceIndexProvider _freeSpaceIndexes; private readonly ConcurrentDictionary> _collections = new(StringComparer.OrdinalIgnoreCase); private ITransaction? _currentTransaction; private bool _disposed; - internal BLiteSession(StorageEngine storage) + internal BLiteSession(StorageEngine storage, FreeSpaceIndexProvider freeSpaceIndexes) { _storage = storage ?? throw new ArgumentNullException(nameof(storage)); + _freeSpaceIndexes = freeSpaceIndexes ?? throw new ArgumentNullException(nameof(freeSpaceIndexes)); } // ───────────────────────────────────────────────────────────────────────── @@ -142,7 +145,7 @@ public DynamicCollection GetOrCreateCollection(string name, BsonIdType idType = return _collections.GetOrAdd(name, n => new Lazy( - () => new DynamicCollection(_storage, this, n, idType), + () => new DynamicCollection(_storage, this, n, idType, _freeSpaceIndexes.GetIndex()), System.Threading.LazyThreadSafetyMode.ExecutionAndPublication)).Value; } diff --git a/src/BLite.Core/Collections/DocumentCollection.cs b/src/BLite.Core/Collections/DocumentCollection.cs index de515fc..d31b5ec 100644 --- a/src/BLite.Core/Collections/DocumentCollection.cs +++ b/src/BLite.Core/Collections/DocumentCollection.cs @@ -120,19 +120,26 @@ public async Task ForcePruneAsync() await transaction.CommitAsync(); } - [RequiresDynamicCode("DocumentCollection uses CollectionIndexManager which compiles index key selectors via Expression.Compile().")] - [RequiresUnreferencedCode("Index creation uses reflection (Expression.PropertyOrField) to access type members. Ensure all entity types and their members are preserved.")] - public DocumentCollection(StorageEngine storage, ITransactionHolder transactionHolder, IDocumentMapper mapper, string? collectionName = null) - { - _storage = storage ?? throw new ArgumentNullException(nameof(storage)); - _transactionHolder = transactionHolder ?? throw new ArgumentNullException(nameof(transactionHolder)); - _mapper = mapper ?? throw new ArgumentNullException(nameof(mapper)); - _collectionName = collectionName ?? _mapper.CollectionName; + [RequiresDynamicCode("DocumentCollection uses CollectionIndexManager which compiles index key selectors via Expression.Compile().")] + [RequiresUnreferencedCode("Index creation uses reflection (Expression.PropertyOrField) to access type members. Ensure all entity types and their members are preserved.")] + public DocumentCollection(StorageEngine storage, ITransactionHolder transactionHolder, IDocumentMapper mapper, string? collectionName = null) + : this(storage, transactionHolder, mapper, collectionName, null) + { + } + + [RequiresDynamicCode("DocumentCollection uses CollectionIndexManager which compiles index key selectors via Expression.Compile().")] + [RequiresUnreferencedCode("Index creation uses reflection (Expression.PropertyOrField) to access type members. Ensure all entity types and their members are preserved.")] + internal DocumentCollection(StorageEngine storage, ITransactionHolder transactionHolder, IDocumentMapper mapper, string? collectionName, FreeSpaceIndex? freeSpaceIndex) + { + _storage = storage ?? throw new ArgumentNullException(nameof(storage)); + _transactionHolder = transactionHolder ?? throw new ArgumentNullException(nameof(transactionHolder)); + _mapper = mapper ?? throw new ArgumentNullException(nameof(mapper)); + _collectionName = collectionName ?? _mapper.CollectionName; - // Initialize secondary index manager first (loads metadata including Primary Root Page ID) - _indexManager = new CollectionIndexManager(_storage, _mapper, _collectionName); - _fsi = new FreeSpaceIndex(_storage.PageSize); - _isPageLocked = _storage.IsPageLocked; + // Initialize secondary index manager first (loads metadata including Primary Root Page ID) + _indexManager = new CollectionIndexManager(_storage, _mapper, _collectionName); + _fsi = freeSpaceIndex ?? new FreeSpaceIndex(_storage.PageSize); + _isPageLocked = _storage.IsPageLocked; // Calculate max document size dynamically based on page size // Reserve space for PageHeader (24) and some safety margin @@ -165,9 +172,10 @@ public DocumentCollection(StorageEngine storage, ITransactionHolder transactionH _storage.RegisterKeys(_mapper.UsedKeys); // Rebuild the free-space index from existing page headers so that a cold-start - // DocumentCollection can reuse partially-filled pages instead of always allocating new ones. - RebuildFreeSpaceIndex(); - } + // DocumentCollection can reuse partially-filled pages instead of always allocating new ones. + RebuildFreeSpaceIndex(); + } + /// /// Scans the header of every data page belonging to this collection and @@ -1005,12 +1013,12 @@ private Task FindPageWithSpace(int requiredBytes, ITransaction transaction if (freeBytes >= requiredBytes && !_storage.IsPageLocked(_currentDataPage, txnId)) return Task.FromResult(_currentDataPage); } - else - { - // Page not yet in FSI (e.g., after a rollback recovery): read header from disk. - Span page = stackalloc byte[SlottedPageHeader.Size]; - _storage.ReadPage(_currentDataPage, null, page); - var header = SlottedPageHeader.ReadFrom(page); + else + { + // Page not yet in FSI (e.g., after a rollback recovery): read header from disk. + Span page = stackalloc byte[SlottedPageHeader.Size]; + _storage.ReadPageHeader(_currentDataPage, null, page); + var header = SlottedPageHeader.ReadFrom(page); if (header.AvailableFreeSpace >= requiredBytes) { @@ -1061,9 +1069,9 @@ private Task AllocateNewDataPage(ITransaction transaction) _storage.WritePage(pageId, transaction.TransactionId, buffer); } - // Track free space - _fsi.Update(pageId, header.AvailableFreeSpace); - _currentDataPage = pageId; + SnapshotFsiForTransaction(transaction, pageId); + _fsi.Update(pageId, header.AvailableFreeSpace); + _currentDataPage = pageId; } finally { @@ -1158,8 +1166,8 @@ private async Task InsertIntoPage(uint pageId, byte[] data, ITransaction _storage.WritePage(pageId, transaction.TransactionId, buffer); } - // UpdateAsync free space index - _fsi.Update(pageId, header.AvailableFreeSpace); + SnapshotFsiForTransaction(transaction, pageId); + _fsi.Update(pageId, header.AvailableFreeSpace); return slotIndex; } @@ -1341,8 +1349,8 @@ private uint AllocateOverflowPage(ReadOnlySpan data, uint nextOverflowPage ); ((Transaction)transaction).AddWrite(writeOp); - // UpdateAsync free space index - _fsi.Update(primaryPageId, header.AvailableFreeSpace); + SnapshotFsiForTransaction(transaction, primaryPageId); + _fsi.Update(primaryPageId, header.AvailableFreeSpace); return (primaryPageId, slotIndex); } @@ -2265,17 +2273,8 @@ private async Task DeleteCore(TId id, ITransaction transaction, bool notif // Update the free space index with post-compaction free bytes. // Snapshot the pre-modification value first so that a rollback can restore it. var compactedHeader = SlottedPageHeader.ReadFrom(buffer.AsSpan(0, SlottedPageHeader.Size)); - if (_fsi.SnapshotForTransaction(transaction.TransactionId, location.PageId)) - { - // First page tracked for this transaction — subscribe once to restore/cleanup. - var txnId = transaction.TransactionId; - transaction.OnRollback += () => _fsi.RollbackTransaction(txnId); - // OnCommit is only on the concrete Transaction type (not on ITransaction) - // to avoid a breaking change to the public interface. - if (transaction is Transactions.Transaction concreteTxn) - concreteTxn.OnCommit += () => _fsi.CommitTransaction(txnId); - } - _fsi.Update(location.PageId, compactedHeader.AvailableFreeSpace); + SnapshotFsiForTransaction(transaction, location.PageId); + _fsi.Update(location.PageId, compactedHeader.AvailableFreeSpace); // Remove from primary index _primaryIndex.Delete(key, location, transaction.TransactionId); @@ -2717,6 +2716,18 @@ Func GetCompiled() => } } + private void SnapshotFsiForTransaction(ITransaction transaction, uint pageId) + { + if (_fsi.SnapshotForTransaction(transaction.TransactionId, pageId)) + { + var txnId = transaction.TransactionId; + transaction.OnRollback += () => _fsi.RollbackTransaction(txnId); + + if (transaction is Transaction concreteTxn) + concreteTxn.OnCommit += () => _fsi.CommitTransaction(txnId); + } + } + private static readonly IEqualityComparer s_inProbeKeyComparer = new InProbeKeyComparer(); private sealed class InProbeKeyComparer : IEqualityComparer diff --git a/src/BLite.Core/Collections/FreeSpaceIndex.cs b/src/BLite.Core/Collections/FreeSpaceIndex.cs index f050495..8641379 100644 --- a/src/BLite.Core/Collections/FreeSpaceIndex.cs +++ b/src/BLite.Core/Collections/FreeSpaceIndex.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Runtime.CompilerServices; +using System.Threading; namespace BLite.Core.Collections; @@ -15,6 +16,18 @@ internal sealed class FreeSpaceIndex { private const int BucketCount = 16; + private readonly struct PageSnapshot + { + public PageSnapshot(bool exists, ushort freeBytes) + { + Exists = exists; + FreeBytes = freeBytes; + } + + public bool Exists { get; } + public ushort FreeBytes { get; } + } + // One growable uint[] per bucket; each element is a page ID. private readonly uint[][] _buckets; @@ -29,13 +42,14 @@ internal sealed class FreeSpaceIndex // Per-transaction snapshots of pre-modification FSI values. // Keyed by transaction ID; inner dictionary maps page ID → free bytes before first modification. // Used to proactively restore the FSI when a transaction is rolled back. - private readonly Dictionary> _txnSnapshots = new(); + private readonly Dictionary> _txnSnapshots = new(); // Width of each bucket in bytes, computed from the *usable* page space so that // bucket boundaries reflect real available room (not header overhead). private readonly int _bucketWidth; + private readonly SemaphoreSlim? _gate; - public FreeSpaceIndex(int pageSize) + public FreeSpaceIndex(int pageSize, bool serializeAccess = false) { // Subtract the fixed page-header size so that bucket boundaries are aligned to // actual free bytes a caller would observe in SlottedPageHeader.AvailableFreeSpace. @@ -46,10 +60,20 @@ public FreeSpaceIndex(int pageSize) _buckets = new uint[BucketCount][]; _counts = new int[BucketCount]; _freeMap = new Dictionary(); + _gate = serializeAccess ? new SemaphoreSlim(1, 1) : null; for (int i = 0; i < BucketCount; i++) _buckets[i] = new uint[4]; // initial capacity per bucket } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void EnterGate() => _gate?.Wait(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void ExitGate() + { + if (_gate != null) _gate.Release(); + } + /// Returns the bucket index for a given free-byte count. [MethodImpl(MethodImplOptions.AggressiveInlining)] private int GetBucket(int freeBytes) => @@ -63,26 +87,15 @@ private int GetBucket(int freeBytes) => /// public void Update(uint pageId, int freeBytes) { - int newBucket = GetBucket(freeBytes); - - if (_freeMap.TryGetValue(pageId, out var oldFreeBytes)) + EnterGate(); + try { - int oldBucket = GetBucket(oldFreeBytes); - - if (oldBucket == newBucket) - { - // Same bucket: just refresh the stored free bytes, no structural change. - _freeMap[pageId] = (ushort)freeBytes; - return; - } - - // Different bucket: remove the page from the old bucket (swap-with-last). - RemoveFromBucket(pageId, oldBucket); + UpdateCore(pageId, freeBytes); + } + finally + { + ExitGate(); } - - // Add the page to the new bucket. - AddToBucket(pageId, newBucket); - _freeMap[pageId] = (ushort)freeBytes; } /// @@ -90,8 +103,18 @@ public void Update(uint pageId, int freeBytes) /// Returns false if the page is not tracked in this index. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool TryGetFreeBytes(uint pageId, out ushort freeBytes) => - _freeMap.TryGetValue(pageId, out freeBytes); + public bool TryGetFreeBytes(uint pageId, out ushort freeBytes) + { + EnterGate(); + try + { + return _freeMap.TryGetValue(pageId, out freeBytes); + } + finally + { + ExitGate(); + } + } /// /// Finds a page that has at least of free space @@ -108,41 +131,15 @@ public bool TryGetFreeBytes(uint pageId, out ushort freeBytes) => /// public uint FindPage(int requiredBytes, Func? isPageLocked = null) { - int minBucket = GetBucket(requiredBytes); - - // Buckets above minBucket are guaranteed to hold enough free space. - // Bucket b is zero-based and holds pages whose free bytes fall in the range - // [b * _bucketWidth, (b+1) * _bucketWidth). For any b > minBucket every - // page in that bucket already has at least (minBucket + 1) * _bucketWidth - // free bytes, which is > requiredBytes by construction, so no additional - // _freeMap lookup is needed. - for (int b = BucketCount - 1; b > minBucket; b--) + EnterGate(); + try { - var arr = _buckets[b]; - var count = _counts[b]; - for (int i = count - 1; i >= 0; i--) - { - if (isPageLocked == null || !isPageLocked(arr[i])) - return arr[i]; - } + return FindPageCore(requiredBytes, isPageLocked); } - - // Boundary bucket: entries may have less free space than required (floor division), - // so we verify actual stored free bytes before returning a candidate. + finally { - var arr = _buckets[minBucket]; - var count = _counts[minBucket]; - for (int i = count - 1; i >= 0; i--) - { - uint pid = arr[i]; - if (_freeMap.TryGetValue(pid, out var fb) && - fb >= requiredBytes && - (isPageLocked == null || !isPageLocked(pid))) - return pid; - } + ExitGate(); } - - return 0; } /// @@ -162,33 +159,15 @@ public uint FindPage(int requiredBytes, Func? isPageLocked = null) /// public uint FindPage(int requiredBytes, ulong txnId, Func? isPageLocked) { - int minBucket = GetBucket(requiredBytes); - - for (int b = BucketCount - 1; b > minBucket; b--) + EnterGate(); + try { - var arr = _buckets[b]; - var count = _counts[b]; - for (int i = count - 1; i >= 0; i--) - { - if (isPageLocked == null || !isPageLocked(arr[i], txnId)) - return arr[i]; - } + return FindPageCore(requiredBytes, txnId, isPageLocked); } - + finally { - var arr = _buckets[minBucket]; - var count = _counts[minBucket]; - for (int i = count - 1; i >= 0; i--) - { - uint pid = arr[i]; - if (_freeMap.TryGetValue(pid, out var fb) && - fb >= requiredBytes && - (isPageLocked == null || !isPageLocked(pid, txnId))) - return pid; - } + ExitGate(); } - - return 0; } // ----------------------------------------------------------------------- @@ -203,23 +182,35 @@ public uint FindPage(int requiredBytes, ulong txnId, Func? is /// public bool SnapshotForTransaction(ulong txnId, uint pageId) { - if (!_txnSnapshots.TryGetValue(txnId, out var snap)) + EnterGate(); + try { - snap = new Dictionary(); - _txnSnapshots[txnId] = snap; + if (!_txnSnapshots.TryGetValue(txnId, out var snap)) + { + snap = new Dictionary(); + _txnSnapshots[txnId] = snap; + + // Only keep the value that existed *before* the first modification in this transaction. + snap[pageId] = _freeMap.TryGetValue(pageId, out var fb) + ? new PageSnapshot(true, fb) + : default; + return true; // first page for this transaction + } // Only keep the value that existed *before* the first modification in this transaction. - snap[pageId] = _freeMap.TryGetValue(pageId, out var fb) ? fb : (ushort)0; - return true; // first page for this transaction - } + if (!snap.ContainsKey(pageId)) + { + snap[pageId] = _freeMap.TryGetValue(pageId, out var fb) + ? new PageSnapshot(true, fb) + : default; + } - // Only keep the value that existed *before* the first modification in this transaction. - if (!snap.ContainsKey(pageId)) + return false; + } + finally { - snap[pageId] = _freeMap.TryGetValue(pageId, out var fb) ? fb : (ushort)0; + ExitGate(); } - - return false; } /// @@ -230,12 +221,25 @@ public bool SnapshotForTransaction(ulong txnId, uint pageId) /// public void RollbackTransaction(ulong txnId) { - if (_txnSnapshots.TryGetValue(txnId, out var snap)) + EnterGate(); + try { - foreach (var (pageId, oldFree) in snap) - Update(pageId, oldFree); - - _txnSnapshots.Remove(txnId); + if (_txnSnapshots.TryGetValue(txnId, out var snap)) + { + foreach (var (pageId, snapshot) in snap) + { + if (snapshot.Exists) + UpdateCore(pageId, snapshot.FreeBytes); + else + RemoveCore(pageId); + } + + _txnSnapshots.Remove(txnId); + } + } + finally + { + ExitGate(); } } @@ -243,12 +247,126 @@ public void RollbackTransaction(ulong txnId) /// Discards the snapshot recorded for after a successful /// commit (the current FSI values are already consistent with the committed state). /// - public void CommitTransaction(ulong txnId) => _txnSnapshots.Remove(txnId); + public void CommitTransaction(ulong txnId) + { + EnterGate(); + try + { + _txnSnapshots.Remove(txnId); + } + finally + { + ExitGate(); + } + } // ----------------------------------------------------------------------- // Private helpers // ----------------------------------------------------------------------- + private void UpdateCore(uint pageId, int freeBytes) + { + int newBucket = GetBucket(freeBytes); + + if (_freeMap.TryGetValue(pageId, out var oldFreeBytes)) + { + int oldBucket = GetBucket(oldFreeBytes); + + if (oldBucket == newBucket) + { + // Same bucket: just refresh the stored free bytes, no structural change. + _freeMap[pageId] = (ushort)freeBytes; + return; + } + + // Different bucket: remove the page from the old bucket (swap-with-last). + RemoveFromBucket(pageId, oldBucket); + } + + // Add the page to the new bucket. + AddToBucket(pageId, newBucket); + _freeMap[pageId] = (ushort)freeBytes; + } + + private void RemoveCore(uint pageId) + { + if (!_freeMap.TryGetValue(pageId, out var freeBytes)) + return; + + RemoveFromBucket(pageId, GetBucket(freeBytes)); + _freeMap.Remove(pageId); + } + + private uint FindPageCore(int requiredBytes, Func? isPageLocked = null) + { + int minBucket = GetBucket(requiredBytes); + + // Buckets above minBucket are guaranteed to hold enough free space. + // Bucket b is zero-based and holds pages whose free bytes fall in the range + // [b * _bucketWidth, (b+1) * _bucketWidth). For any b > minBucket every + // page in that bucket already has at least (minBucket + 1) * _bucketWidth + // free bytes, which is > requiredBytes by construction, so no additional + // _freeMap lookup is needed. + for (int b = BucketCount - 1; b > minBucket; b--) + { + var arr = _buckets[b]; + var count = _counts[b]; + for (int i = count - 1; i >= 0; i--) + { + if (isPageLocked == null || !isPageLocked(arr[i])) + return arr[i]; + } + } + + // Boundary bucket: entries may have less free space than required (floor division), + // so we verify actual stored free bytes before returning a candidate. + { + var arr = _buckets[minBucket]; + var count = _counts[minBucket]; + for (int i = count - 1; i >= 0; i--) + { + uint pid = arr[i]; + if (_freeMap.TryGetValue(pid, out var fb) && + fb >= requiredBytes && + (isPageLocked == null || !isPageLocked(pid))) + return pid; + } + } + + return 0; + } + + private uint FindPageCore(int requiredBytes, ulong txnId, Func? isPageLocked) + { + int minBucket = GetBucket(requiredBytes); + + for (int b = BucketCount - 1; b > minBucket; b--) + { + var arr = _buckets[b]; + var count = _counts[b]; + for (int i = count - 1; i >= 0; i--) + { + if (isPageLocked == null || !isPageLocked(arr[i], txnId)) + return arr[i]; + } + } + + { + var arr = _buckets[minBucket]; + var count = _counts[minBucket]; + for (int i = count - 1; i >= 0; i--) + { + uint pid = arr[i]; + if (_freeMap.TryGetValue(pid, out var fb) && + fb >= requiredBytes && + (isPageLocked == null || !isPageLocked(pid, txnId))) + return pid; + } + } + + return 0; + } + private void RemoveFromBucket(uint pageId, int bucket) { var arr = _buckets[bucket]; @@ -275,4 +393,3 @@ private void AddToBucket(uint pageId, int bucket) _counts[bucket]++; } } - diff --git a/src/BLite.Core/Collections/FreeSpaceIndexProvider.cs b/src/BLite.Core/Collections/FreeSpaceIndexProvider.cs new file mode 100644 index 0000000..648e5a6 --- /dev/null +++ b/src/BLite.Core/Collections/FreeSpaceIndexProvider.cs @@ -0,0 +1,30 @@ +using System; +using System.Threading; +using BLite.Core.Storage; + +namespace BLite.Core.Collections; + +internal sealed class FreeSpaceIndexProvider +{ + private readonly int _pageSize; + private readonly bool _shareAcrossCollections; + private readonly Lazy? _sharedIndex; + + public FreeSpaceIndexProvider(StorageEngine storage) + { + if (storage == null) throw new ArgumentNullException(nameof(storage)); + + _pageSize = storage.PageSize; + _shareAcrossCollections = !storage.UsesSeparateCollectionFiles; + if (_shareAcrossCollections) + { + _sharedIndex = new Lazy( + () => new FreeSpaceIndex(_pageSize, serializeAccess: true), + LazyThreadSafetyMode.ExecutionAndPublication); + } + } + + public FreeSpaceIndex GetIndex() => _shareAcrossCollections + ? _sharedIndex!.Value + : new FreeSpaceIndex(_pageSize); +} diff --git a/src/BLite.Core/DocumentDbContext.cs b/src/BLite.Core/DocumentDbContext.cs index d90b27b..28eaf35 100644 --- a/src/BLite.Core/DocumentDbContext.cs +++ b/src/BLite.Core/DocumentDbContext.cs @@ -19,9 +19,10 @@ namespace BLite.Core; /// public abstract partial class DocumentDbContext : IDocumentDbContext { - protected readonly StorageEngine _storage; - internal readonly CDC.ChangeStreamDispatcher _cdc; - private readonly BLiteKvStore _kvStore; + protected readonly StorageEngine _storage; + internal readonly CDC.ChangeStreamDispatcher _cdc; + private readonly FreeSpaceIndexProvider _freeSpaceIndexes; + private readonly BLiteKvStore _kvStore; protected bool _disposed; /// @@ -31,9 +32,10 @@ public abstract partial class DocumentDbContext : IDocumentDbContext /// protected DocumentDbContext() { - _storage = null!; - _cdc = null!; - _kvStore = null!; + _storage = null!; + _cdc = null!; + _freeSpaceIndexes = null!; + _kvStore = null!; _model = new System.Collections.Generic.Dictionary(); InitializeCollections(); } @@ -71,10 +73,11 @@ protected DocumentDbContext(string databasePath, PageFileConfig config, BLiteKvO if (string.IsNullOrWhiteSpace(databasePath)) throw new ArgumentNullException(nameof(databasePath)); - _storage = new StorageEngine(databasePath, config); - _cdc = new CDC.ChangeStreamDispatcher(); - _storage.RegisterCdc(_cdc); - _kvStore = new BLiteKvStore(_storage, kvOptions); + _storage = new StorageEngine(databasePath, config); + _cdc = new CDC.ChangeStreamDispatcher(); + _storage.RegisterCdc(_cdc); + _freeSpaceIndexes = new FreeSpaceIndexProvider(_storage); + _kvStore = new BLiteKvStore(_storage, kvOptions); // Initialize model before collections var modelBuilder = new ModelBuilder(); @@ -100,10 +103,11 @@ protected DocumentDbContext(string databasePath, PageFileConfig config, BLiteKvO /// protected DocumentDbContext(StorageEngine storage, BLiteKvOptions? kvOptions = null) { - _storage = storage ?? throw new ArgumentNullException(nameof(storage)); - _cdc = new CDC.ChangeStreamDispatcher(); - _storage.RegisterCdc(_cdc); - _kvStore = new BLiteKvStore(_storage, kvOptions); + _storage = storage ?? throw new ArgumentNullException(nameof(storage)); + _cdc = new CDC.ChangeStreamDispatcher(); + _storage.RegisterCdc(_cdc); + _freeSpaceIndexes = new FreeSpaceIndexProvider(_storage); + _kvStore = new BLiteKvStore(_storage, kvOptions); var modelBuilder = new ModelBuilder(); OnModelCreating(modelBuilder); @@ -131,7 +135,7 @@ public IBLiteKvStore KvStore public BLiteSession OpenSession() { if (_disposed) throw new ObjectDisposedException(GetType().Name); - return new BLiteSession(_storage); + return new BLiteSession(_storage, _freeSpaceIndexes); } /// @@ -156,7 +160,7 @@ public IDocumentCollection CreateSessionCollection(IDocumentMapp customName = builder?.CollectionName; } - var collection = new DocumentCollection(_storage, holder, mapper, customName); + var collection = new DocumentCollection(_storage, holder, mapper, customName, _freeSpaceIndexes.GetIndex()); if (builder != null) { @@ -226,7 +230,7 @@ protected virtual IDocumentCollection CreateCollection(IDocument } _registeredMappers.Add(mapper); - var collection = new DocumentCollection(_storage, this, mapper, customName); + var collection = new DocumentCollection(_storage, this, mapper, customName, _freeSpaceIndexes.GetIndex()); // Apply configurations from ModelBuilder if (builder != null) diff --git a/src/BLite.Core/DynamicCollection.cs b/src/BLite.Core/DynamicCollection.cs index c6285be..ad0801f 100644 --- a/src/BLite.Core/DynamicCollection.cs +++ b/src/BLite.Core/DynamicCollection.cs @@ -84,14 +84,19 @@ public DynamicSecondaryIndex(RTreeIndex spatial, string fieldPath, IndexOptions /// Transaction holder for ACID operations /// Name of the collection /// The BSON type used for the _id field (default: ObjectId) - public DynamicCollection(StorageEngine storage, ITransactionHolder transactionHolder, string collectionName, BsonIdType idType = BsonIdType.ObjectId) - { + public DynamicCollection(StorageEngine storage, ITransactionHolder transactionHolder, string collectionName, BsonIdType idType = BsonIdType.ObjectId) + : this(storage, transactionHolder, collectionName, idType, null) + { + } + + internal DynamicCollection(StorageEngine storage, ITransactionHolder transactionHolder, string collectionName, BsonIdType idType, FreeSpaceIndex? freeSpaceIndex) + { _storage = storage ?? throw new ArgumentNullException(nameof(storage)); _transactionHolder = transactionHolder ?? throw new ArgumentNullException(nameof(transactionHolder)); _collectionName = collectionName ?? throw new ArgumentNullException(nameof(collectionName)); _idType = idType; _maxDocumentSizeForSinglePage = _storage.PageSize - 128; - _fsi = new FreeSpaceIndex(_storage.PageSize); + _fsi = freeSpaceIndex ?? new FreeSpaceIndex(_storage.PageSize); _isPageLocked = _storage.IsPageLocked; // Load or create collection metadata @@ -1286,8 +1291,9 @@ private uint AllocateNewDataPage(ITransaction transaction) }; header.WriteTo(buffer); _storage.WritePage(pageId, transaction.TransactionId, buffer.AsSpan(0, _storage.PageSize)); - _fsi.Update(pageId, header.AvailableFreeSpace); - _currentDataPage = pageId; + SnapshotFsiForTransaction(transaction, pageId); + _fsi.Update(pageId, header.AvailableFreeSpace); + _currentDataPage = pageId; } finally { @@ -1363,9 +1369,10 @@ private ushort InsertIntoPage(uint pageId, ReadOnlyMemory data, ITransacti header.WriteTo(buffer); _storage.WritePage(pageId, transaction.TransactionId, buffer.AsSpan(0, _storage.PageSize)); - _fsi.Update(pageId, header.AvailableFreeSpace); - - return slotIndex; + SnapshotFsiForTransaction(transaction, pageId); + _fsi.Update(pageId, header.AvailableFreeSpace); + + return slotIndex; } finally { @@ -1373,8 +1380,8 @@ private ushort InsertIntoPage(uint pageId, ReadOnlyMemory data, ITransacti } } - private void DeleteSlot(DocumentLocation location, ITransaction transaction) - { + private void DeleteSlot(DocumentLocation location, ITransaction transaction) + { var buffer = ArrayPool.Shared.Rent(_storage.PageSize); try { @@ -1394,10 +1401,22 @@ private void DeleteSlot(DocumentLocation location, ITransaction transaction) finally { ArrayPool.Shared.Return(buffer); - } - } - - private static IndexKey? BsonValueToIndexKey(BsonValue value) + } + } + + private void SnapshotFsiForTransaction(ITransaction transaction, uint pageId) + { + if (_fsi.SnapshotForTransaction(transaction.TransactionId, pageId)) + { + var txnId = transaction.TransactionId; + transaction.OnRollback += () => _fsi.RollbackTransaction(txnId); + + if (transaction is Transaction concreteTxn) + concreteTxn.OnCommit += () => _fsi.CommitTransaction(txnId); + } + } + + private static IndexKey? BsonValueToIndexKey(BsonValue value) { return value.Type switch { diff --git a/src/BLite.Core/Storage/StorageEngine.cs b/src/BLite.Core/Storage/StorageEngine.cs index 63cb8a9..a052319 100644 --- a/src/BLite.Core/Storage/StorageEngine.cs +++ b/src/BLite.Core/Storage/StorageEngine.cs @@ -84,6 +84,7 @@ public sealed partial class StorageEngine : IDisposable /// Exposed so that higher-level components (collections, engine, sessions) can read the same settings. /// internal LockTimeout LockTimeout => _config.LockTimeout; + internal bool UsesSeparateCollectionFiles => _collectionFiles != null; public StorageEngine(string databasePath, PageFileConfig config) { diff --git a/tests/BLite.Tests/DocumentCollectionTests.cs b/tests/BLite.Tests/DocumentCollectionTests.cs index 4df8b77..1bacc58 100644 --- a/tests/BLite.Tests/DocumentCollectionTests.cs +++ b/tests/BLite.Tests/DocumentCollectionTests.cs @@ -228,10 +228,10 @@ public async Task Insert_With_SpecifiedId_RetainsId() /// CURRENT : the second InsertAsync throws InvalidOperationException("Not enough space …"). /// [Fact] - public async Task TwoCollectionInstances_StaleFSI_SecondInsertShouldSucceed() - { - const int preFillNameLen = 10000; - const int insertNameLen = 400; + public async Task TwoCollectionInstances_StaleFSI_SecondInsertShouldSucceed() + { + const int preFillNameLen = 10000; + const int insertNameLen = 400; var dbPath = Path.Combine(Path.GetTempPath(), $"test_fsi_stale_{Guid.NewGuid():N}.db"); try @@ -285,15 +285,92 @@ await Task.WhenAll(tasks).ContinueWith(t => } finally - { - if (File.Exists(dbPath)) File.Delete(dbPath); - var wal = Path.ChangeExtension(dbPath, ".wal"); - if (File.Exists(wal)) File.Delete(wal); - } - } - - public void Dispose() - { - _db?.Dispose(); + { + if (File.Exists(dbPath)) File.Delete(dbPath); + var wal = Path.ChangeExtension(dbPath, ".wal"); + if (File.Exists(wal)) File.Delete(wal); + } + } + + [Fact] + public async Task SingleFile_CrossCollectionFsi_IsSharedAcrossCollections() + { + const int preFillNameLen = 10000; + const int insertNameLen = 400; + + var dbPath = Path.Combine(Path.GetTempPath(), $"test_fsi_shared_{Guid.NewGuid():N}.db"); + try + { + using (var seedDb = new TestDbContext(dbPath)) + { + await seedDb.Users.InsertAsync(new User { Name = new string('X', preFillNameLen), Age = 0 }); + await seedDb.SaveChangesAsync(); + } + + using (var db = new TestDbContext(dbPath)) + { + await db.Users.InsertAsync(new User { Name = new string('A', insertNameLen), Age = 1 }); + await db.SaveChangesAsync(); + + await db.ComplexUsers.InsertAsync(new ComplexUser { Name = new string('B', insertNameLen) }); + await db.SaveChangesAsync(); + + Assert.Equal(2, await db.Users.CountAsync()); + Assert.Equal(1, await db.ComplexUsers.CountAsync()); + } + } + finally + { + if (File.Exists(dbPath)) File.Delete(dbPath); + var wal = Path.ChangeExtension(dbPath, ".wal"); + if (File.Exists(wal)) File.Delete(wal); + } + } + + [Fact] + public async Task RolledBackInsert_RestoresFreeSpaceIndexForNextInsert() + { + const int preFillNameLen = 10000; + const int insertNameLen = 2500; + + var dbPath = Path.Combine(Path.GetTempPath(), $"test_fsi_insert_rollback_{Guid.NewGuid():N}.db"); + try + { + using (var db = new TestDbContext(dbPath)) + { + await db.Users.InsertAsync(new User { Name = new string('X', preFillNameLen), Age = 0 }); + await db.SaveChangesAsync(); + } + + long fileSizeBeforeCommittedInsert = new FileInfo(dbPath).Length; + + using (var db = new TestDbContext(dbPath)) + { + using (var txn = db.BeginTransaction()) + { + await db.Users.InsertAsync(new User { Name = new string('R', insertNameLen), Age = 1 }, txn); + await txn.RollbackAsync(); + } + + await db.Users.InsertAsync(new User { Name = new string('C', insertNameLen), Age = 2 }); + await db.SaveChangesAsync(); + + Assert.Equal(2, await db.Users.CountAsync()); + } + + long fileSizeAfterCommittedInsert = new FileInfo(dbPath).Length; + Assert.Equal(fileSizeBeforeCommittedInsert, fileSizeAfterCommittedInsert); + } + finally + { + if (File.Exists(dbPath)) File.Delete(dbPath); + var wal = Path.ChangeExtension(dbPath, ".wal"); + if (File.Exists(wal)) File.Delete(wal); + } + } + + public void Dispose() + { + _db?.Dispose(); } } diff --git a/tests/BLite.Tests/FreeSpaceIndexTests.cs b/tests/BLite.Tests/FreeSpaceIndexTests.cs index 12d36f6..6a07f52 100644 --- a/tests/BLite.Tests/FreeSpaceIndexTests.cs +++ b/tests/BLite.Tests/FreeSpaceIndexTests.cs @@ -250,4 +250,34 @@ public void Update_Remove_And_Re_Add_Works() var found = fsi.FindPage(8000, null); Assert.Equal(7u, found); } + + [Fact] + public void RollbackTransaction_Removes_NewlyTrackedPage() + { + const ulong txnId = 7; + var fsi = new FreeSpaceIndex(PageSize); + + Assert.True(fsi.SnapshotForTransaction(txnId, 123)); + fsi.Update(123, 4096); + + fsi.RollbackTransaction(txnId); + + Assert.False(fsi.TryGetFreeBytes(123, out _)); + } + + [Fact] + public void RollbackTransaction_Restores_PreviousFreeBytes() + { + const ulong txnId = 11; + var fsi = new FreeSpaceIndex(PageSize); + fsi.Update(42, 6000); + + Assert.True(fsi.SnapshotForTransaction(txnId, 42)); + fsi.Update(42, 1200); + + fsi.RollbackTransaction(txnId); + + Assert.True(fsi.TryGetFreeBytes(42, out var freeBytes)); + Assert.Equal(6000, freeBytes); + } }