Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions AElf.All.sln
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AElf.Kernel.FeatureDisable.
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AElf.Kernel.FeatureDisable.Core", "src\AElf.Kernel.FeatureDisable.Core\AElf.Kernel.FeatureDisable.Core.csproj", "{659A7C7A-44C9-424E-B4F6-D1D3656F7AD4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AElf.Kernel.BlockPruning", "src\AElf.Kernel.BlockPruning\AElf.Kernel.BlockPruning.csproj", "{651281CD-F268-49FC-9305-B19EE65552F7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AElf.Kernel.BlockPruning.Tests", "test\AElf.Kernel.BlockPruning.Tests\AElf.Kernel.BlockPruning.Tests.csproj", "{147343C3-BFDF-4C20-A990-975BB82CB73B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1107,6 +1111,14 @@ Global
{659A7C7A-44C9-424E-B4F6-D1D3656F7AD4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{659A7C7A-44C9-424E-B4F6-D1D3656F7AD4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{659A7C7A-44C9-424E-B4F6-D1D3656F7AD4}.Release|Any CPU.Build.0 = Release|Any CPU
{651281CD-F268-49FC-9305-B19EE65552F7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{651281CD-F268-49FC-9305-B19EE65552F7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{651281CD-F268-49FC-9305-B19EE65552F7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{651281CD-F268-49FC-9305-B19EE65552F7}.Release|Any CPU.Build.0 = Release|Any CPU
{147343C3-BFDF-4C20-A990-975BB82CB73B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{147343C3-BFDF-4C20-A990-975BB82CB73B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{147343C3-BFDF-4C20-A990-975BB82CB73B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{147343C3-BFDF-4C20-A990-975BB82CB73B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1301,5 +1313,7 @@ Global
{A4ACE6D2-4CF8-4B52-93C9-BB8BEC0C098E} = {90B310B4-C2DB-419E-B5EE-97FA096B62CC}
{8C0D86A4-D1A7-4B61-AC44-755F5AC75D67} = {4E54480A-D155-43ED-9736-1A5BE7957211}
{659A7C7A-44C9-424E-B4F6-D1D3656F7AD4} = {90B310B4-C2DB-419E-B5EE-97FA096B62CC}
{651281CD-F268-49FC-9305-B19EE65552F7} = {90B310B4-C2DB-419E-B5EE-97FA096B62CC}
{147343C3-BFDF-4C20-A990-975BB82CB73B} = {4E54480A-D155-43ED-9736-1A5BE7957211}
EndGlobalSection
EndGlobal
9 changes: 9 additions & 0 deletions protobuf/block_pruning.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax = "proto3";

package aelf;

option csharp_namespace = "AElf.Kernel.BlockPruning";

message BlockPruningInfo {
int64 last_pruned_block_height = 1;
}
20 changes: 20 additions & 0 deletions src/AElf.Kernel.BlockPruning/AElf.Kernel.BlockPruning.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<LangVersion>latest</LangVersion>
<PackageId>AElf.Kernel.BlockPruning</PackageId>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Description>Blockchain historical data pruning module.</Description>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\AElf.Kernel.Core\AElf.Kernel.Core.csproj" />
</ItemGroup>

<ItemGroup>
<CommonMessage Include="..\..\protobuf\block_pruning.proto">
<Link>Protobuf\Proto\block_pruning.proto</Link>
</CommonMessage>
</ItemGroup>
</Project>
148 changes: 148 additions & 0 deletions src/AElf.Kernel.BlockPruning/Application/BlockPruningService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using AElf.Kernel.BlockPruning.Domain;
using AElf.Kernel.Blockchain.Application;
using AElf.Kernel.Blockchain.Domain;
using AElf.Types;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;

namespace AElf.Kernel.BlockPruning.Application;

public class BlockPruningService : IBlockPruningService, ITransientDependency
{
private readonly IBlockManager _blockManager;
private readonly IBlockchainService _blockchainService;
private readonly IChainManager _chainManager;
private readonly IBlockPruningInfoManager _blockPruningInfoManager;
private readonly BlockPruningOptions _options;
private readonly ITransactionBlockIndexManager _transactionBlockIndexManager;
private readonly ITransactionManager _transactionManager;
private readonly ITransactionResultManager _transactionResultManager;

public ILogger<BlockPruningService> Logger { get; set; }

public BlockPruningService(IBlockchainService blockchainService,
IChainManager chainManager,
IBlockManager blockManager,
ITransactionManager transactionManager,
ITransactionResultManager transactionResultManager,
ITransactionBlockIndexManager transactionBlockIndexManager,
IBlockPruningInfoManager blockPruningInfoManager,
IOptionsSnapshot<BlockPruningOptions> options)
{
_blockchainService = blockchainService;
_chainManager = chainManager;
_blockManager = blockManager;
_transactionManager = transactionManager;
_transactionResultManager = transactionResultManager;
_transactionBlockIndexManager = transactionBlockIndexManager;
_blockPruningInfoManager = blockPruningInfoManager;
_options = options.Value;

Logger = NullLogger<BlockPruningService>.Instance;
}

public async Task PruneBlockchainDataAsync()
{
if (!_options.Enabled)
return;

var chain = await _blockchainService.GetChainAsync();
var pruneTargetHeight = chain.LastIrreversibleBlockHeight - _options.RetainDistance;
var lastPrunedHeight = await _blockPruningInfoManager.GetLastPrunedHeightAsync();

if (pruneTargetHeight <= lastPrunedHeight)
return;

var gap = pruneTargetHeight - lastPrunedHeight;
if (gap < _options.PruneThreshold)
{
Logger.LogDebug(
"Pruning skipped: gap {Gap} below threshold {Threshold} (target={Target}, lastPruned={LastPruned})",
gap, _options.PruneThreshold, pruneTargetHeight, lastPrunedHeight);
return;
}

var startHeight = Math.Max(2, lastPrunedHeight + 1);
if (startHeight > pruneTargetHeight)
return;

Logger.LogInformation(
"Block pruning started: from height {StartHeight} to {TargetHeight} (LIB={LIBHeight}, retain={RetainDistance})",
startHeight, pruneTargetHeight, chain.LastIrreversibleBlockHeight, _options.RetainDistance);

var totalPruned = 0L;

for (var batchStart = startHeight; batchStart <= pruneTargetHeight; batchStart += _options.BatchSize)
{
var batchEnd = Math.Min(batchStart + _options.BatchSize - 1, pruneTargetHeight);

var allTxIds = new List<Hash>();
var allTxResultBlockHashes = new List<Hash>();
var allBlockHashes = new List<Hash>();
var heights = new List<long>();
var foundBlockBodyCount = 0;

for (var height = batchStart; height <= batchEnd; height++)
{
heights.Add(height);
}

var chainBlockIndices = await _chainManager.GetChainBlockIndicesAsync(heights);

foreach (var chainBlockIndex in chainBlockIndices)
{
if (chainBlockIndex == null)
continue;

allBlockHashes.Add(chainBlockIndex.BlockHash);
}

var blockBodies = await _blockManager.GetBlockBodiesAsync(allBlockHashes);

for (var i = 0; i < allBlockHashes.Count; i++)
{
var blockBody = blockBodies[i];
if (blockBody == null)
continue;

foundBlockBodyCount++;
var blockHash = allBlockHashes[i];
foreach (var txId in blockBody.TransactionIds)
{
allTxIds.Add(txId);
allTxResultBlockHashes.Add(blockHash);
}
}

Logger.LogDebug(
"Pruning batch [{BatchStart}..{BatchEnd}]: found {ChainBlockIndexCount} chain block indices, loaded {BlockBodyCount} block bodies",
batchStart, batchEnd, allBlockHashes.Count, foundBlockBodyCount);

await _transactionResultManager.RemoveTransactionResultsAsync(allTxIds, allTxResultBlockHashes);
await _transactionBlockIndexManager.RemoveTransactionIndicesAsync(allTxIds);
await _transactionManager.RemoveTransactionsAsync(allTxIds);
await _blockManager.RemoveBlocksAsync(allBlockHashes);
await _chainManager.RemoveChainBlockLinksAsync(allBlockHashes);

await _blockPruningInfoManager.SetLastPrunedHeightAsync(batchEnd);

totalPruned += batchEnd - batchStart + 1;

Logger.LogDebug(
"Pruned batch [{BatchStart}..{BatchEnd}]: {BlockCount} blocks, {TxCount} transactions",
batchStart, batchEnd, allBlockHashes.Count, allTxIds.Count);

if (_options.BatchDelayMilliseconds > 0)
await Task.Delay(_options.BatchDelayMilliseconds);
}

Logger.LogInformation(
"Block pruning completed: pruned {TotalPruned} heights, new last pruned height = {PrunedHeight}",
totalPruned, pruneTargetHeight);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using System.Threading.Tasks;

namespace AElf.Kernel.BlockPruning.Application;

public interface IBlockPruningService
{
Task PruneBlockchainDataAsync();
}
35 changes: 35 additions & 0 deletions src/AElf.Kernel.BlockPruning/BlockPruningAElfModule.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using AElf.Modularity;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp;
using Volo.Abp.Modularity;

namespace AElf.Kernel.BlockPruning;

[DependsOn(
typeof(CoreKernelAElfModule)
)]
public class BlockPruningAElfModule : AElfModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();

Configure<BlockPruningOptions>(configuration.GetSection("BlockPruning"));
context.Services.PostConfigure<BlockPruningOptions>(options =>
{
options.RetainDistance = Math.Max(options.RetainDistance, BlockPruningConstants.MinRetainDistance);
options.BatchSize = Math.Clamp(options.BatchSize, 1, BlockPruningConstants.MaxBatchSize);
options.PruneThreshold = Math.Max(options.PruneThreshold, 0);
options.BatchDelayMilliseconds = Math.Max(options.BatchDelayMilliseconds, 0);
});

context.Services.AddStoreKeyPrefixProvide<BlockPruningInfo>("bp");
}

public override void OnPreApplicationInitialization(ApplicationInitializationContext context)
{
var taskQueueManager = context.ServiceProvider.GetRequiredService<ITaskQueueManager>();
taskQueueManager.CreateQueue(BlockPruningConstants.BlockPruningQueueName);
}
}
8 changes: 8 additions & 0 deletions src/AElf.Kernel.BlockPruning/BlockPruningConstants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace AElf.Kernel.BlockPruning;

public static class BlockPruningConstants
{
public const string BlockPruningQueueName = "BlockPruningQueue";
public const long MinRetainDistance = 2419200;
public const int MaxBatchSize = 10000;
}
10 changes: 10 additions & 0 deletions src/AElf.Kernel.BlockPruning/BlockPruningOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace AElf.Kernel.BlockPruning;

public class BlockPruningOptions
{
public bool Enabled { get; set; }
public long RetainDistance { get; set; } = 5184000;
public int BatchSize { get; set; } = 100;
public int PruneThreshold { get; set; } = 256;
public int BatchDelayMilliseconds { get; set; } = 50;
}
33 changes: 33 additions & 0 deletions src/AElf.Kernel.BlockPruning/Domain/BlockPruningInfoManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System.Threading.Tasks;
using AElf.Kernel.Blockchain.Infrastructure;
using AElf.Kernel.Infrastructure;
using Volo.Abp.DependencyInjection;

namespace AElf.Kernel.BlockPruning.Domain;

public class BlockPruningInfoManager : IBlockPruningInfoManager, ISingletonDependency
{
private readonly string _key;
private readonly IBlockchainStore<BlockPruningInfo> _store;

public BlockPruningInfoManager(IBlockchainStore<BlockPruningInfo> store,
IStaticChainInformationProvider chainInformationProvider)
{
_store = store;
_key = chainInformationProvider.ChainId.ToStorageKey();
}

public async Task<long> GetLastPrunedHeightAsync()
{
var value = await _store.GetAsync(_key);
return value?.LastPrunedBlockHeight ?? 0;
}

public async Task SetLastPrunedHeightAsync(long height)
{
await _store.SetAsync(_key, new BlockPruningInfo
{
LastPrunedBlockHeight = height
});
}
}
11 changes: 11 additions & 0 deletions src/AElf.Kernel.BlockPruning/Domain/IBlockPruningInfoManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Threading.Tasks;

namespace AElf.Kernel.BlockPruning.Domain;

public interface IBlockPruningInfoManager
{
Task<long> GetLastPrunedHeightAsync();
Task SetLastPrunedHeightAsync(long height);
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Threading.Tasks;
using AElf.Kernel.BlockPruning.Application;
using AElf.Kernel.Blockchain.Events;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus;

namespace AElf.Kernel.BlockPruning;

public class NewIrreversibleBlockFoundEventHandler : ILocalEventHandler<NewIrreversibleBlockFoundEvent>,
ITransientDependency
{
private readonly IBlockPruningService _blockPruningService;
private readonly BlockPruningOptions _options;
private readonly ITaskQueueManager _taskQueueManager;

public ILogger<NewIrreversibleBlockFoundEventHandler> Logger { get; set; }

public NewIrreversibleBlockFoundEventHandler(ITaskQueueManager taskQueueManager,
IBlockPruningService blockPruningService,
IOptionsSnapshot<BlockPruningOptions> options)
{
_taskQueueManager = taskQueueManager;
_blockPruningService = blockPruningService;
_options = options.Value;

Logger = NullLogger<NewIrreversibleBlockFoundEventHandler>.Instance;
}

public Task HandleEventAsync(NewIrreversibleBlockFoundEvent eventData)
{
if (!_options.Enabled)
return Task.CompletedTask;

var queue = _taskQueueManager.GetQueue(BlockPruningConstants.BlockPruningQueueName);
if (queue == null || queue.Size > 0)
{
Logger.LogDebug("Block pruning skipped: queue is busy (size={QueueSize})",
queue?.Size ?? -1);
return Task.CompletedTask;
}

Logger.LogDebug(
"Enqueueing block pruning task (LIB height={LIBHeight})",
eventData.BlockHeight);

queue.Enqueue(async () => { await _blockPruningService.PruneBlockchainDataAsync(); });
return Task.CompletedTask;
}
}
Loading
Loading