Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions src/BLite.Core/BLiteEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BLite.Bson;
using BLite.Core.KeyValue;
using BLite.Core.Storage;
using BLite.Core.Transactions;
using System.Threading;
using System.Threading.Tasks;
using BLite.Bson;
using BLite.Core.Collections;
using BLite.Core.KeyValue;
using BLite.Core.Storage;
using BLite.Core.Transactions;

namespace BLite.Core;

Expand All @@ -22,9 +23,10 @@ namespace BLite.Core;
/// </summary>
public sealed class BLiteEngine : IDisposable, ITransactionHolder
{
private readonly StorageEngine _storage;
private readonly ConcurrentDictionary<string, DynamicCollection> _collections = new(StringComparer.OrdinalIgnoreCase);
private readonly BLiteKvStore _kvStore;
private readonly StorageEngine _storage;
private readonly ConcurrentDictionary<string, DynamicCollection> _collections = new(StringComparer.OrdinalIgnoreCase);
private readonly FreeSpaceIndexProvider _freeSpaceIndexes;
private readonly BLiteKvStore _kvStore;
private bool _disposed;

/// <summary>
Expand Down Expand Up @@ -68,8 +70,9 @@ public BLiteEngine(string databasePath, PageFileConfig config, BLiteKvOptions? k
if (string.IsNullOrWhiteSpace(databasePath))
throw new ArgumentNullException(nameof(databasePath));

_storage = new StorageEngine(databasePath, config);
_kvStore = new BLiteKvStore(_storage, kvOptions);
_storage = new StorageEngine(databasePath, config);
_freeSpaceIndexes = new FreeSpaceIndexProvider(_storage);
_kvStore = new BLiteKvStore(_storage, kvOptions);
}

/// <summary>
Expand All @@ -78,8 +81,9 @@ public BLiteEngine(string databasePath, PageFileConfig config, BLiteKvOptions? k
/// </summary>
internal BLiteEngine(StorageEngine storage, BLiteKvOptions? kvOptions = null)
{
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
_kvStore = new BLiteKvStore(_storage, kvOptions);
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
_freeSpaceIndexes = new FreeSpaceIndexProvider(_storage);
_kvStore = new BLiteKvStore(_storage, kvOptions);
}

/// <summary>
Expand Down Expand Up @@ -150,8 +154,8 @@ public static BLiteEngine CreateFromStorage(StorageEngine storage, BLiteKvOption
/// <returns>A new <see cref="BLiteSession"/> backed by this engine's storage.</returns>
public BLiteSession OpenSession()
{
ThrowIfDisposed();
return new BLiteSession(_storage);
ThrowIfDisposed();
return new BLiteSession(_storage, _freeSpaceIndexes);
}

#endregion
Expand Down Expand Up @@ -182,7 +186,7 @@ public DynamicCollection GetOrCreateCollection(string name, BsonIdType idType =
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentNullException(nameof(name));

return _collections.GetOrAdd(name, n => new DynamicCollection(_storage, this, n, idType));
return _collections.GetOrAdd(name, n => new DynamicCollection(_storage, this, n, idType, _freeSpaceIndexes.GetIndex()));
}

/// <summary>
Expand Down Expand Up @@ -224,7 +228,7 @@ public IReadOnlyList<string> ListCollections()
// own metadata from storage, so the idType argument is irrelevant for
// existing collections (only matters when creating brand-new ones).
foreach (var meta in _storage.GetAllCollectionsMetadata())
_collections.GetOrAdd(meta.Name, n => new DynamicCollection(_storage, this, n));
_collections.GetOrAdd(meta.Name, n => new DynamicCollection(_storage, this, n, BsonIdType.ObjectId, _freeSpaceIndexes.GetIndex()));

return _collections.Keys.ToList();
}
Expand Down
7 changes: 5 additions & 2 deletions src/BLite.Core/BLiteSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using System.Threading.Tasks;
using BLite.Bson;
using BLite.Core.Collections;
using BLite.Core.Storage;
using BLite.Core.Transactions;

Expand All @@ -26,14 +27,16 @@ namespace BLite.Core;
public sealed class BLiteSession : ITransactionHolder, IDisposable
{
private readonly StorageEngine _storage;
private readonly FreeSpaceIndexProvider _freeSpaceIndexes;
private readonly ConcurrentDictionary<string, Lazy<DynamicCollection>> _collections =
new(StringComparer.OrdinalIgnoreCase);
private ITransaction? _currentTransaction;
private bool _disposed;

internal BLiteSession(StorageEngine storage)
internal BLiteSession(StorageEngine storage, FreeSpaceIndexProvider freeSpaceIndexes)
{
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
_freeSpaceIndexes = freeSpaceIndexes ?? throw new ArgumentNullException(nameof(freeSpaceIndexes));
}

// ─────────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -142,7 +145,7 @@ public DynamicCollection GetOrCreateCollection(string name, BsonIdType idType =

return _collections.GetOrAdd(name,
n => new Lazy<DynamicCollection>(
() => new DynamicCollection(_storage, this, n, idType),
() => new DynamicCollection(_storage, this, n, idType, _freeSpaceIndexes.GetIndex()),
System.Threading.LazyThreadSafetyMode.ExecutionAndPublication)).Value;
}

Expand Down
89 changes: 50 additions & 39 deletions src/BLite.Core/Collections/DocumentCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,26 @@ public async Task ForcePruneAsync()
await transaction.CommitAsync();
}

[RequiresDynamicCode("DocumentCollection uses CollectionIndexManager which compiles index key selectors via Expression.Compile().")]
[RequiresUnreferencedCode("Index creation uses reflection (Expression.PropertyOrField) to access type members. Ensure all entity types and their members are preserved.")]
public DocumentCollection(StorageEngine storage, ITransactionHolder transactionHolder, IDocumentMapper<TId, T> mapper, string? collectionName = null)
{
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
_transactionHolder = transactionHolder ?? throw new ArgumentNullException(nameof(transactionHolder));
_mapper = mapper ?? throw new ArgumentNullException(nameof(mapper));
_collectionName = collectionName ?? _mapper.CollectionName;
[RequiresDynamicCode("DocumentCollection uses CollectionIndexManager which compiles index key selectors via Expression.Compile().")]
[RequiresUnreferencedCode("Index creation uses reflection (Expression.PropertyOrField) to access type members. Ensure all entity types and their members are preserved.")]
public DocumentCollection(StorageEngine storage, ITransactionHolder transactionHolder, IDocumentMapper<TId, T> mapper, string? collectionName = null)
: this(storage, transactionHolder, mapper, collectionName, null)
{
}

[RequiresDynamicCode("DocumentCollection uses CollectionIndexManager which compiles index key selectors via Expression.Compile().")]
[RequiresUnreferencedCode("Index creation uses reflection (Expression.PropertyOrField) to access type members. Ensure all entity types and their members are preserved.")]
internal DocumentCollection(StorageEngine storage, ITransactionHolder transactionHolder, IDocumentMapper<TId, T> mapper, string? collectionName, FreeSpaceIndex? freeSpaceIndex)
{
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
_transactionHolder = transactionHolder ?? throw new ArgumentNullException(nameof(transactionHolder));
_mapper = mapper ?? throw new ArgumentNullException(nameof(mapper));
_collectionName = collectionName ?? _mapper.CollectionName;

// Initialize secondary index manager first (loads metadata including Primary Root Page ID)
_indexManager = new CollectionIndexManager<TId, T>(_storage, _mapper, _collectionName);
_fsi = new FreeSpaceIndex(_storage.PageSize);
_isPageLocked = _storage.IsPageLocked;
// Initialize secondary index manager first (loads metadata including Primary Root Page ID)
_indexManager = new CollectionIndexManager<TId, T>(_storage, _mapper, _collectionName);
_fsi = freeSpaceIndex ?? new FreeSpaceIndex(_storage.PageSize);
_isPageLocked = _storage.IsPageLocked;

// Calculate max document size dynamically based on page size
// Reserve space for PageHeader (24) and some safety margin
Expand Down Expand Up @@ -165,9 +172,10 @@ public DocumentCollection(StorageEngine storage, ITransactionHolder transactionH
_storage.RegisterKeys(_mapper.UsedKeys);

// Rebuild the free-space index from existing page headers so that a cold-start
// DocumentCollection can reuse partially-filled pages instead of always allocating new ones.
RebuildFreeSpaceIndex();
}
// DocumentCollection can reuse partially-filled pages instead of always allocating new ones.
RebuildFreeSpaceIndex();
}


/// <summary>
/// Scans the header of every data page belonging to this collection and
Expand Down Expand Up @@ -1005,12 +1013,12 @@ private Task<uint> FindPageWithSpace(int requiredBytes, ITransaction transaction
if (freeBytes >= requiredBytes && !_storage.IsPageLocked(_currentDataPage, txnId))
return Task.FromResult(_currentDataPage);
}
else
{
// Page not yet in FSI (e.g., after a rollback recovery): read header from disk.
Span<byte> page = stackalloc byte[SlottedPageHeader.Size];
_storage.ReadPage(_currentDataPage, null, page);
var header = SlottedPageHeader.ReadFrom(page);
else
{
// Page not yet in FSI (e.g., after a rollback recovery): read header from disk.
Span<byte> page = stackalloc byte[SlottedPageHeader.Size];
_storage.ReadPageHeader(_currentDataPage, null, page);
var header = SlottedPageHeader.ReadFrom(page);

if (header.AvailableFreeSpace >= requiredBytes)
{
Expand Down Expand Up @@ -1061,9 +1069,9 @@ private Task<uint> AllocateNewDataPage(ITransaction transaction)
_storage.WritePage(pageId, transaction.TransactionId, buffer);
}

// Track free space
_fsi.Update(pageId, header.AvailableFreeSpace);
_currentDataPage = pageId;
SnapshotFsiForTransaction(transaction, pageId);
_fsi.Update(pageId, header.AvailableFreeSpace);
_currentDataPage = pageId;
}
finally
{
Expand Down Expand Up @@ -1158,8 +1166,8 @@ private async Task<ushort> InsertIntoPage(uint pageId, byte[] data, ITransaction
_storage.WritePage(pageId, transaction.TransactionId, buffer);
}

// UpdateAsync free space index
_fsi.Update(pageId, header.AvailableFreeSpace);
SnapshotFsiForTransaction(transaction, pageId);
_fsi.Update(pageId, header.AvailableFreeSpace);

return slotIndex;
}
Expand Down Expand Up @@ -1341,8 +1349,8 @@ private uint AllocateOverflowPage(ReadOnlySpan<byte> data, uint nextOverflowPage
);
((Transaction)transaction).AddWrite(writeOp);

// UpdateAsync free space index
_fsi.Update(primaryPageId, header.AvailableFreeSpace);
SnapshotFsiForTransaction(transaction, primaryPageId);
_fsi.Update(primaryPageId, header.AvailableFreeSpace);

return (primaryPageId, slotIndex);
}
Expand Down Expand Up @@ -2265,17 +2273,8 @@ private async Task<bool> DeleteCore(TId id, ITransaction transaction, bool notif
// Update the free space index with post-compaction free bytes.
// Snapshot the pre-modification value first so that a rollback can restore it.
var compactedHeader = SlottedPageHeader.ReadFrom(buffer.AsSpan(0, SlottedPageHeader.Size));
if (_fsi.SnapshotForTransaction(transaction.TransactionId, location.PageId))
{
// First page tracked for this transaction — subscribe once to restore/cleanup.
var txnId = transaction.TransactionId;
transaction.OnRollback += () => _fsi.RollbackTransaction(txnId);
// OnCommit is only on the concrete Transaction type (not on ITransaction)
// to avoid a breaking change to the public interface.
if (transaction is Transactions.Transaction concreteTxn)
concreteTxn.OnCommit += () => _fsi.CommitTransaction(txnId);
}
_fsi.Update(location.PageId, compactedHeader.AvailableFreeSpace);
SnapshotFsiForTransaction(transaction, location.PageId);
_fsi.Update(location.PageId, compactedHeader.AvailableFreeSpace);

// Remove from primary index
_primaryIndex.Delete(key, location, transaction.TransactionId);
Expand Down Expand Up @@ -2717,6 +2716,18 @@ Func<T, bool> GetCompiled() =>
}
}

private void SnapshotFsiForTransaction(ITransaction transaction, uint pageId)
{
if (_fsi.SnapshotForTransaction(transaction.TransactionId, pageId))
{
var txnId = transaction.TransactionId;
transaction.OnRollback += () => _fsi.RollbackTransaction(txnId);

if (transaction is Transaction concreteTxn)
concreteTxn.OnCommit += () => _fsi.CommitTransaction(txnId);
}
}

private static readonly IEqualityComparer<object?> s_inProbeKeyComparer = new InProbeKeyComparer();

private sealed class InProbeKeyComparer : IEqualityComparer<object?>
Expand Down
Loading
Loading