diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index ffec435239ea..f2eb13511691 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -559,27 +560,69 @@ void LocalServer::processConfig() /// There is no need for concurrent queries, override max_concurrent_queries. global_context->getProcessList().setMaxSize(0); + /// Set up caches. + + /// Dummy strategy + // auto block_cache_size = 1000_MiB; + // auto max_size_to_evict_on_purging = 300_MiB; + // std::string rebalance_strategy_name = "dummy"; + // RebalanceStrategySettings rebalance_strategy_settings; + // rebalance_strategy_settings.setBlockCacheSetting("max_total_size", "marks", Field(block_cache_size)); + // rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging)); + // rebalance_strategy_settings.setBlockCacheSetting("max_total_size", "uncompressed", Field(block_cache_size)); + // rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging)); + + /// Buddy static strategy + // auto memory_arena_size = 4_GiB; + // auto max_size_to_evict_on_purging = 300_MiB; + // std::string rebalance_strategy_name = "buddy_static"; + // RebalanceStrategySettings rebalance_strategy_settings; + // rebalance_strategy_settings.set("memory_arena_size", Field(memory_arena_size)); + // rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging)); + // rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging)); + + /// Buddy dynamic strategy + auto memory_arena_capacity = 4_GiB; + auto memory_arena_initial_size = 128_MiB; + size_t allocated_memory_multiplier = 2; + auto max_size_to_evict_on_purging = 300_MiB; + std::string rebalance_strategy_name = "buddy_dynamic"; + RebalanceStrategySettings rebalance_strategy_settings; + rebalance_strategy_settings.set("memory_arena_capacity", Field(memory_arena_capacity)); + rebalance_strategy_settings.set("memory_arena_initial_size", Field(memory_arena_initial_size)); + rebalance_strategy_settings.set("allocated_memory_multiplier", Field(allocated_memory_multiplier)); + rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging)); + rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging)); + + auto & block_cache_manager = DB::BlockCachesManager::instance(); + std::vector block_cache_names = {"marks", "uncompressed"}; + block_cache_manager.initialize(block_cache_names, rebalance_strategy_name, rebalance_strategy_settings); + /// Size of cache for uncompressed blocks. Zero means disabled. String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", ""); size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0); if (uncompressed_cache_size) - global_context->setUncompressedCache(uncompressed_cache_size, uncompressed_cache_policy); + global_context->setUncompressedCache("global"); + // global_context->setUncompressedCache(uncompressed_cache_size, uncompressed_cache_policy); /// Size of cache for marks (index of MergeTree family of tables). String mark_cache_policy = config().getString("mark_cache_policy", ""); size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120); if (mark_cache_size) - global_context->setMarkCache(mark_cache_size, mark_cache_policy); + global_context->setMarkCache("global"); + // global_context->setMarkCache(mark_cache_size, mark_cache_policy); /// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled. size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0); if (index_uncompressed_cache_size) - global_context->setIndexUncompressedCache(index_uncompressed_cache_size); + global_context->setIndexUncompressedCache("global"); + // global_context->setIndexUncompressedCache(index_uncompressed_cache_size); /// Size of cache for index marks (index of MergeTree skip indices). size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0); if (index_mark_cache_size) - global_context->setIndexMarkCache(index_mark_cache_size); + global_context->setIndexMarkCache("global"); + // global_context->setIndexMarkCache(index_mark_cache_size); /// A cache for mmapped files. size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary. diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index d1f1df24398f..b7d267d830c6 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -1145,6 +1146,8 @@ int Server::main(const std::vector & /*args*/) } total_memory_tracker.setHardLimit(max_server_memory_usage); + /// TODO: Add setting + // total_memory_tracker.setLimitToPurgeCache(1500_MiB); total_memory_tracker.setDescription("(total)"); total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); @@ -1393,6 +1396,42 @@ int Server::main(const std::vector & /*args*/) /// Set up caches. + /// Dummy strategy + // auto block_cache_size = 1000_MiB; + // auto max_size_to_evict_on_purging = 300_MiB; + // std::string rebalance_strategy_name = "dummy"; + // RebalanceStrategySettings rebalance_strategy_settings; + // rebalance_strategy_settings.setBlockCacheSetting("max_total_size", "marks", Field(block_cache_size)); + // rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging)); + // rebalance_strategy_settings.setBlockCacheSetting("max_total_size", "uncompressed", Field(block_cache_size)); + // rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging)); + + /// Buddy static strategy + // auto memory_arena_size = 4_GiB; + // auto max_size_to_evict_on_purging = 300_MiB; + // std::string rebalance_strategy_name = "buddy_static"; + // RebalanceStrategySettings rebalance_strategy_settings; + // rebalance_strategy_settings.set("memory_arena_size", Field(memory_arena_size)); + // rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging)); + // rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging)); + + /// Buddy dynamic strategy + auto memory_arena_capacity = 4_GiB; + UInt64 memory_arena_initial_size = 0; + size_t allocated_memory_multiplier = 1; + auto max_size_to_evict_on_purging = 300_MiB; + std::string rebalance_strategy_name = "buddy_dynamic"; + RebalanceStrategySettings rebalance_strategy_settings; + rebalance_strategy_settings.set("memory_arena_capacity", Field(memory_arena_capacity)); + rebalance_strategy_settings.set("memory_arena_initial_size", Field(memory_arena_initial_size)); + rebalance_strategy_settings.set("allocated_memory_multiplier", Field(allocated_memory_multiplier)); + rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging)); + rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging)); + + auto & block_cache_manager = DB::BlockCachesManager::instance(); + std::vector block_cache_names = {"marks", "uncompressed"}; + block_cache_manager.initialize(block_cache_names, rebalance_strategy_name, rebalance_strategy_settings); + /// Lower cache size on low-memory systems. double cache_size_to_ram_max_ratio = config().getDouble("cache_size_to_ram_max_ratio", 0.5); size_t max_cache_size = static_cast(memory_amount * cache_size_to_ram_max_ratio); @@ -1407,7 +1446,7 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "Uncompressed cache size was lowered to {} because the system has low amount of memory", formatReadableSizeWithBinarySuffix(uncompressed_cache_size)); } - global_context->setUncompressedCache(uncompressed_cache_size, uncompressed_cache_policy); + global_context->setUncompressedCache("uncompressed"); /// Load global settings from default_profile and system_profile. global_context->setDefaultProfiles(config()); @@ -1434,17 +1473,17 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "Mark cache size was lowered to {} because the system has low amount of memory", formatReadableSizeWithBinarySuffix(mark_cache_size)); } - global_context->setMarkCache(mark_cache_size, mark_cache_policy); + global_context->setMarkCache("marks"); /// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled. size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0); if (index_uncompressed_cache_size) - global_context->setIndexUncompressedCache(index_uncompressed_cache_size); + global_context->setIndexUncompressedCache("uncompressed"); /// Size of cache for index marks (index of MergeTree skip indices). size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0); if (index_mark_cache_size) - global_context->setIndexMarkCache(index_mark_cache_size); + global_context->setIndexMarkCache("marks"); /// A cache for mmapped files. size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary. diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 8bd316817067..c5a72af63fd3 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -1,6 +1,7 @@ #include "MemoryTracker.h" #include +#include #include #include #include @@ -168,6 +169,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed); Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed); + Int64 current_limit_to_purge_cache = limit_to_purge_cache.load(std::memory_order_relaxed); bool memory_limit_exceeded_ignored = false; @@ -237,6 +239,14 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT } #endif + /// TODO: Rewrite with a rebalance strategy + if (unlikely(current_limit_to_purge_cache && will_be > current_limit_to_purge_cache)) + { + auto & cache_instance = DB::BlockCachesManager::instance(); + /// TODO: Make a better shrinkage with a relative ration to the full amount of memory + cache_instance.purge(); + } + if (unlikely(current_hard_limit && will_be > limit_to_check)) { if (memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded) @@ -456,6 +466,10 @@ void MemoryTracker::setHardLimit(Int64 value) hard_limit.store(value, std::memory_order_relaxed); } +void MemoryTracker::setLimitToPurgeCache(Int64 value) +{ + limit_to_purge_cache.store(value, std::memory_order_relaxed); +} void MemoryTracker::setOrRaiseHardLimit(Int64 value) { diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 2d898935dcf5..ab176b5af01e 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -55,6 +55,7 @@ class MemoryTracker std::atomic soft_limit {0}; std::atomic hard_limit {0}; std::atomic profiler_limit {0}; + std::atomic limit_to_purge_cache {0}; static std::atomic free_memory_in_allocator_arenas; @@ -116,6 +117,7 @@ class MemoryTracker void setSoftLimit(Int64 value); void setHardLimit(Int64 value); + void setLimitToPurgeCache(Int64 value); Int64 getHardLimit() const { @@ -125,6 +127,10 @@ class MemoryTracker { return soft_limit.load(std::memory_order_relaxed); } + Int64 getLimitToPurgeCache() const + { + return limit_to_purge_cache.load(std::memory_order_relaxed); + } /** Set limit if it was not set. * Otherwise, set limit to new value, if new value is greater than previous limit. diff --git a/src/Common/UnifiedCache.cpp b/src/Common/UnifiedCache.cpp new file mode 100644 index 000000000000..9390fcd5daa8 --- /dev/null +++ b/src/Common/UnifiedCache.cpp @@ -0,0 +1,1523 @@ +#include +#include "Core/Block.h" +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +BuddyArena::~BuddyArena() noexcept +{ + if (isValid()) + // Do not throw exceptions from invalid munmap + munmap(arena_buffer, total_size_bytes); +} + +bool BuddyArena::isValid() const +{ + return number_of_levels != 0; +} + +bool BuddyArena::isAllocated(const void *ptr) const +{ + const auto * arena_end = reinterpret_cast(arena_buffer) + total_size_bytes; + return ptr >= arena_buffer && ptr < reinterpret_cast(arena_end); +} + +void BuddyArena::initialize(size_t minimal_allocation_size, size_t size, size_t capacity) +{ + /// We will divide the whole size bytes on blocks with size == minimal_allocation_size + assert(size % minimal_allocation_size == 0); + assert(capacity % minimal_allocation_size == 0); + assert(capacity >= size); + + total_size_bytes = capacity; + number_of_levels = 1; + minimal_allocation_size_bytes = minimal_allocation_size; + + /// Calculate number of levels + size_t number_of_blocks_on_level = capacity / minimal_allocation_size; + + /// Number of blocks on the level could not be the power of 2 + /// In this case we intentionally add one level to the binary tree so we'll have enough + /// leaves to store all minimal blocks + if (number_of_blocks_on_level > 1 && number_of_blocks_on_level % 2 != 0) + ++number_of_levels; + + while (number_of_blocks_on_level > 1) + { + number_of_blocks_on_level >>= 1; + ++number_of_levels; + } + + /// Calculate number of minimal blocks in the memory arena + min_blocks_num = (1ull << (number_of_levels - 1)); + // free_min_blocks = min_blocks_num; + + arena_buffer = allocateArena(total_size_bytes); + const auto * arena_buffer_char_ptr = reinterpret_cast(arena_buffer); + + auto * current_storage_ptr = initializeMetaStorage(); + + // TODO: Optimize + /// Initialize free_lists + + /// Calculate number of blocks for the meta storage + size_t meta_storage_size_minimal_blocks = (current_storage_ptr - arena_buffer_char_ptr) / minimal_allocation_size_bytes; + + /// Calculate the nearest power of 2 that is greater than meta storage size + size_t meta_storage_size_round_up_to_power_of_2 = 1; + while (meta_storage_size_round_up_to_power_of_2 < meta_storage_size_minimal_blocks) + { + meta_storage_size_round_up_to_power_of_2 *= 2; + } + + /// Deallocate (add to the free_lists or shadow_lists) minimal blocks to fill the space between the meta storage and the size that is + /// the nearest power of 2, so after it we can deallocate blocks with an exponential increasing sizes + /// Arena buffer: + /// [*******-------------|------------------------------------] + /// | | | + /// ^ meta storage | ^ meta_storage_size_round_up_to_power_of_2 + /// ^ deallocate these blocks on this step + const auto * minimal_blocks_area_end = arena_buffer_char_ptr + meta_storage_size_round_up_to_power_of_2 * minimal_allocation_size_bytes; + while (current_storage_ptr != minimal_blocks_area_end) + { + auto * memory_block = reinterpret_cast(current_storage_ptr); + + if (static_cast(current_storage_ptr - arena_buffer_char_ptr) < size) + { + /// Do not fill the size bytes, add the block to the free_lists and reclaim it from the OS + returnBlockToLists(free_lists, memory_block, number_of_levels - 1); + reclaimBlock(memory_block, number_of_levels - 1); + } + else + { + /// Already filled the size bytes, do not add the block to the free_lists + /// Instead add it to the shadow list, so we can track it + returnBlockToLists(shadow_lists, memory_block, number_of_levels - 1); + } + + current_storage_ptr += minimal_allocation_size_bytes; + } + + /// Deallocate (add to the free_lists or shadow_lists) all next blocks after meta_storage_size_round_up_to_power_of_2 with increasing sizes + size_t current_block_size_bytes = meta_storage_size_round_up_to_power_of_2 * minimal_allocation_size; + size_t current_block_level = calculateLevel(current_block_size_bytes); + while (current_block_level > 0) + { + auto * memory_block = reinterpret_cast(current_storage_ptr); + + /// TODO: Add a separate method: code duplication + if (static_cast(current_storage_ptr - arena_buffer_char_ptr) < size) + { + /// Do not fill the size bytes, add the block to the free_lists and reclaim it from the OS + returnBlockToLists(free_lists, memory_block, current_block_level); + reclaimBlock(memory_block, current_block_level); + } + else + { + /// Already filled the size bytes, do not add the block to the free_lists + /// Instead add it to the shadow list, so we can track it + returnBlockToLists(shadow_lists, memory_block, current_block_level); + } + + current_storage_ptr += current_block_size_bytes; + current_block_size_bytes *= 2; + --current_block_level; + } +} + +void * BuddyArena::malloc(size_t size, size_t align) +{ + /// TODO: Add assertion + /// assert that align is a power of 2 and a multiple of sizeof(void*) + /// https://en.cppreference.com/w/cpp/memory/c/aligned_alloc + + auto level = calculateLevel(size, align); + + auto * block = allocateBlock(level); + if (block == nullptr) + block = acquireBlock(level); + if (block == nullptr) + return nullptr; + + setPointerLevel(block, level); + + /// TODO: Remove temp local dummy memory tracker + const size_t allocated_memory_blocks = 1ull << ((number_of_levels - level) - 1); + free_min_blocks.fetch_sub(allocated_memory_blocks); + // if (free_min_blocks % 10000 == 0) { + // printMemoryUsageDummy(); + // } + + return block->data; +} + +void BuddyArena::free(void * buf) noexcept +{ + auto * block = reinterpret_cast(buf); + auto level = getPointerLevel(block); + + deallocateBlock(block, level); + + /// TODO: Remove temp local dummy memory tracker + const size_t freeing_memory_blocks = 1ull << ((number_of_levels - level) - 1); + free_min_blocks.fetch_add(freeing_memory_blocks); + // if (free_min_blocks % 10000 == 0) { + // printMemoryUsageDummy(); + // } +} + +void BuddyArena::purge() +{ + for (size_t current_level = 0; current_level < number_of_levels; ++current_level) + { + while (auto * block = takeBlockFromLists(free_lists, current_level)) + tryReleaseBlock(block, current_level); + } +} + +double BuddyArena::getFreeSpaceRatio() const +{ + /// TODO: Change seq_cst memory_order on the free_min_blocks atomic + auto occupied_blocks = min_blocks_num - free_min_blocks.load(); + return static_cast(min_blocks_num - occupied_blocks) / min_blocks_num; +} + +size_t BuddyArena::getTotalSizeBytes() const +{ + return total_size_bytes; +} + +void * BuddyArena::allocateArena(size_t size) +{ + void * buffer = mmap(nullptr, size, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS, /*fd=*/ -1, /*offset=*/ 0); + if (MAP_FAILED == buffer) + DB::throwFromErrno(fmt::format("BuddyArena: Cannot mmap {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY); + + return buffer; +} + +void BuddyArena::deallocateArena(void * buffer, size_t size) +{ + // TODO: Not checking the result value + munmap(buffer, size); +} + +size_t BuddyArena::calculateLevel(size_t size) const +{ + // TODO: optimize + size_t current_level = number_of_levels - 1; + size_t current_block_size = calculateBlockSizeOnLevel(current_level); + + while (size > current_block_size) + { + // Low in memory + if (current_level == 0) + DB::throwFromErrno(fmt::format("BuddyArena: Cannot find level for size {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY); + + --current_level; + current_block_size *= 2; + } + + return current_level; +} + +size_t BuddyArena::calculateLevel(size_t size, size_t align) const +{ + if (align < size) + /// Will be automatically aligned if align is valid (as it will be power of 2 that less than size) + return calculateLevel(size); + else + return calculateLevel(align); +} + +size_t BuddyArena::calculateBlockSizeOnLevel(size_t level) const +{ + return minimal_allocation_size_bytes * (1ull << ((number_of_levels - level) - 1)); +} + +size_t BuddyArena::calculateIndexInLevel(const MemoryBlock * block, size_t level) const +{ + const auto * block_data = reinterpret_cast(block); + const auto * arena_data = static_cast(arena_buffer); + return (block_data - arena_data) / calculateBlockSizeOnLevel(level); +} + +size_t BuddyArena::calculateIndex(const MemoryBlock * block, size_t level) const +{ + return (1ull << level) + calculateIndexInLevel(block, level) - 1ull; +} + +size_t BuddyArena::calculatePointerIndex(const MemoryBlock * block) const +{ + const auto * block_data = reinterpret_cast(block); + const auto * arena_data = static_cast(arena_buffer); + + return (block_data - arena_data) / calculateBlockSizeOnLevel(number_of_levels - 1); +} + +size_t BuddyArena::calculateMinBlocksNumber(size_t size_bytes) const +{ + return size_bytes / minimal_allocation_size_bytes + (size_bytes % minimal_allocation_size_bytes == 0 ? 0 : 1); +} + +BuddyArena::MemoryBlock * BuddyArena::blockFromIndexInLevel(size_t index_in_level, size_t level) const +{ + auto block_size = calculateBlockSizeOnLevel(level); + return reinterpret_cast(static_cast(arena_buffer) + block_size * index_in_level); +} + +size_t BuddyArena::getPointerLevel(const MemoryBlock * block) const +{ + const auto pointer_index = calculatePointerIndex(block); + return pointers_levels[pointer_index]; +} + +void BuddyArena::setPointerLevel(const MemoryBlock * block, size_t level) +{ + const auto pointer_index = calculatePointerIndex(block); + pointers_levels[pointer_index] = level; +} + +BuddyArena::MemoryBlock * BuddyArena::takeBlockFromLists(GuardedLists & guarded_lists, size_t level) +{ + MemoryBlock * current_block = nullptr; + size_t current_level = level; + do + { + std::lock_guard lock(guarded_lists.mutexes[current_level]); + + /// Found a block on this level + if (!isListEmpty(guarded_lists.lists, current_level)) + { + current_block = guarded_lists.lists[current_level]; + removeFromList(guarded_lists.lists, current_block, current_level); + setBlockStatus(current_block, current_level, BlockStatus::NotInLists); + break; + } + + /// There are no blocks on this level, go to the top + if (current_level == 0) + break; + --current_level; + } while (current_level != 0); + + /// Already on the top without free blocks + if (current_block == nullptr) + return nullptr; + + /// Iterate from current_level to the level and split current_block + while (current_level != level) + { + auto [block, buddy_block] = divideBlock(current_block, current_level); + { + std::lock_guard lock(guarded_lists.mutexes[current_level + 1]); + addToList(guarded_lists.lists, buddy_block, current_level + 1); + setBlockStatus(buddy_block, current_level + 1, guarded_lists.lists_block_status); + } + current_block = block; + ++current_level; + } + + return current_block; +} + +void BuddyArena::returnBlockToLists(GuardedLists & guarded_lists, MemoryBlock * block, size_t level) +{ + assert(block); + + size_t current_level = level; + MemoryBlock * current_block = block; + + do + { + std::lock_guard lock(guarded_lists.mutexes[current_level]); + + auto * buddy = getBuddy(current_block, current_level); + if (buddy && (getBlockStatus(buddy, current_level) == guarded_lists.lists_block_status)) + { + // Merge with buddy + auto * merged_block = mergeWithBuddy(current_block, buddy); + removeFromList(guarded_lists.lists, buddy, current_level); + setBlockStatus(buddy, current_level, BlockStatus::NotInLists); + current_block = merged_block; + } else + { + addToList(guarded_lists.lists, current_block, current_level); + setBlockStatus(current_block, current_level, guarded_lists.lists_block_status); + break; + } + + if (current_level == 0) + break; + --current_level; + } while (current_level != 0); +} + +BuddyArena::MemoryBlock * BuddyArena::allocateBlock(size_t level) +{ + return takeBlockFromLists(free_lists, level); +} + +void BuddyArena::deallocateBlock(MemoryBlock * block, size_t level) +{ + returnBlockToLists(free_lists, block, level); +} + +bool BuddyArena::adviseBlock(MemoryBlock * block, size_t level) const +{ + const auto block_size = calculateBlockSizeOnLevel(level); + + void * block_ptr = reinterpret_cast(block->data); + auto ret = madvise(block_ptr, block_size, MADV_DONTNEED); + return ret == 0; +} + +void BuddyArena::reclaimBlock(MemoryBlock * block, size_t level) const +{ + const auto block_size = calculateBlockSizeOnLevel(level); + const auto page_num = block_size / kPageSize; + + auto * block_ptr = block->data; + + mlock(reinterpret_cast(block_ptr), block_size); + + /// Trigger page faults, use volatile to fool the compiler + for (size_t page_offset = 0; page_offset < page_num; ++page_offset) { + volatile const auto val = *(block_ptr + page_offset * kPageSize); + (void)val; + } +} + +BuddyArena::MemoryBlock * BuddyArena::acquireBlock(size_t level) +{ + auto * block = takeBlockFromLists(shadow_lists, level); + if (block != nullptr) + reclaimBlock(block, level); + return block; +} + +void BuddyArena::tryReleaseBlock(MemoryBlock * block, size_t level) +{ + if (adviseBlock(block, level)) + returnBlockToLists(shadow_lists, block, level); +} + +char * BuddyArena::initializeMetaStorage() +{ + /// TODD: Note that if there is a big min_allocation_size, we can potentially waste memory on the unnecessary alignment + /// Calculate sizes + size_t block_status_size = (1ull << number_of_levels) - 1ull; + size_t block_status_size_bytes = sizeof(bool) * block_status_size; + size_t block_status_size_minimal_blocks = calculateMinBlocksNumber(block_status_size_bytes); + + size_t free_lists_size_bytes = sizeof(MemoryBlock *) * number_of_levels; // NOLINT + size_t free_lists_size_minimal_blocks = calculateMinBlocksNumber(free_lists_size_bytes); + + size_t shadow_lists_size_bytes = sizeof(MemoryBlock *) * number_of_levels; // NOLINT + size_t shadow_lists_size_minimal_blocks = calculateMinBlocksNumber(shadow_lists_size_bytes); + + size_t pointers_levels_size = (1ull << (number_of_levels - 1)); + size_t pointers_levels_size_bytes = sizeof(uint8_t) * pointers_levels_size; + size_t pointers_levels_size_minmal_blocks = calculateMinBlocksNumber(pointers_levels_size_bytes); + + size_t mutexes_size_bytes = sizeof(std::mutex) * number_of_levels; + size_t mutexes_size_minimal_blocks = calculateMinBlocksNumber(mutexes_size_bytes); + + /// Populate pointers + auto * current_storage_ptr = reinterpret_cast(arena_buffer); + + block_status = reinterpret_cast(current_storage_ptr); + current_storage_ptr += block_status_size_minimal_blocks * minimal_allocation_size_bytes; + + /// Place free_lists.lists and free_lists.mutexes for the better cache locality + free_lists.lists = reinterpret_cast(current_storage_ptr); + current_storage_ptr += free_lists_size_minimal_blocks * minimal_allocation_size_bytes; + + free_lists.mutexes = reinterpret_cast(current_storage_ptr); + current_storage_ptr += mutexes_size_minimal_blocks * minimal_allocation_size_bytes; + + pointers_levels = reinterpret_cast(current_storage_ptr); + current_storage_ptr += pointers_levels_size_minmal_blocks * minimal_allocation_size_bytes; + + /// Place shadow_lists.lists and shadow_lists.mutexes for the better cache locality + shadow_lists.lists = reinterpret_cast(current_storage_ptr); + current_storage_ptr += shadow_lists_size_minimal_blocks * minimal_allocation_size_bytes; + + shadow_lists.mutexes = reinterpret_cast(current_storage_ptr); + current_storage_ptr += mutexes_size_minimal_blocks * minimal_allocation_size_bytes; + + return current_storage_ptr; +} + +std::pair BuddyArena::divideBlock( + MemoryBlock * block, size_t level) +{ + auto block_size = calculateBlockSizeOnLevel(level); + + // TODO: Can be optimized + auto * block_data = reinterpret_cast(block); + auto * buddy_block_data = block_data + block_size / 2; + + auto * memory_block = reinterpret_cast(block_data); + auto * buddy_memory_block = reinterpret_cast(buddy_block_data); + + return {memory_block, buddy_memory_block}; +} + +BuddyArena::MemoryBlock * BuddyArena::mergeWithBuddy(MemoryBlock * block, MemoryBlock * buddy) +{ + assert(block); + assert(buddy); + + auto * block_data = reinterpret_cast(block); + auto * buddy_block_data = reinterpret_cast(buddy); + + if (block_data < buddy_block_data) + return reinterpret_cast(block_data); + else + return reinterpret_cast(buddy_block_data); +} + +BuddyArena::MemoryBlock * BuddyArena::getBuddy(MemoryBlock * block, size_t level) const +{ + auto index_in_level = calculateIndexInLevel(block, level); + size_t buddy_index_in_level = index_in_level % 2 == 0 ? index_in_level + 1 : index_in_level - 1; + return blockFromIndexInLevel(buddy_index_in_level, level); +} + +BuddyArena::BlockStatus BuddyArena::getBlockStatus(const MemoryBlock * block, size_t level) const +{ + auto index_of_block = calculateIndex(block, level); + return block_status[index_of_block]; +} + +void BuddyArena::setBlockStatus(const MemoryBlock * block, size_t level, BlockStatus status) +{ + auto index_of_block = calculateIndex(block, level); + block_status[index_of_block] = status; +} + +void BuddyArena::printMemoryUsageDummy() const +{ + auto occupied_blocks = min_blocks_num - free_min_blocks; + double usage = static_cast(occupied_blocks) / min_blocks_num * 100.0; + std::cout << "***Memory usage: " << occupied_blocks << " / " << min_blocks_num << " (" << std::fixed << std::setprecision(1) << usage << " %)***" << std::endl; +} + +void BuddyArena::addToList(MemoryBlock ** lists, MemoryBlock * block, size_t level) +{ + auto & list = lists[level]; + + if (list) + { + block->pointers.next = list; + block->pointers.previous = nullptr; + list->pointers.previous = block; + } else + { + block->pointers.next = nullptr; + block->pointers.previous = nullptr; + } + + list = block; +} + +bool BuddyArena::isListEmpty(MemoryBlock ** lists, size_t level) +{ + return lists[level] == nullptr; +} + +void BuddyArena::removeFromList(MemoryBlock ** lists, MemoryBlock * block, size_t level) +{ + assert(!isListEmpty(lists, level)); + + if (block->pointers.next) + block->pointers.next->pointers.previous = block->pointers.previous; + + if (block->pointers.previous) + block->pointers.previous->pointers.next = block->pointers.next; + + /// Move the end of the list if we delete it + if (block == lists[level]) + lists[level] = block->pointers.next; + + block->pointers.next = nullptr; + block->pointers.previous = nullptr; +} + + +MemoryBlock::MemoryBlock(size_t size_) : size(size_), is_owner(true) +{ + ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if (MAP_FAILED == ptr) + DB::throwFromErrno(fmt::format("BlockCache: Cannot mmap {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY); +} + +MemoryBlock::MemoryBlock(void * ptr_, size_t size_) : ptr(ptr_), size(size_), is_owner(false) +{} + +MemoryBlock::~MemoryBlock() +{ + if (is_owner && ptr && 0 != munmap(ptr, size)) + DB::throwFromErrno(fmt::format("BlockCache: Cannot munmap {}.", ReadableSize(size)), DB::ErrorCodes::CANNOT_MUNMAP); +} + + +template +BlockCache::Holder::Holder(BlockCache & cache_, BlockCache::RegionMetadata & region_, std::lock_guard&) + : cache(cache_) + , region(region_) + , region_ptr(®ion_) +{ + /// Remove this region from the lru_list, so we can't evict it + if (++region.refcount == 1 && region.LRUListHook::is_linked()) + cache.lru_list.erase(cache.lru_list.iterator_to(region)); +} + +template +BlockCache::Holder::~Holder() +{ + /// Add this region to the lru_list if there are no references + std::lock_guard cache_lock(cache.mutex); + if (--region.refcount == 0) + cache.lru_list.push_back(region); +} + + +template +BlockCache::HolderCreater::HolderCreater(BlockCache & cache_, BlockCache::RegionMetadata & region_, std::lock_guard & cache_lock) + : initial_holder(cache_, region_, cache_lock) +{} + +template +typename BlockCache::Holder* BlockCache::HolderCreater::acquireHolder(std::lock_guard & cache_lock) +{ + auto * holder = new Holder(initial_holder.cache, initial_holder.region, cache_lock); + return holder; +} + + +template +typename BlockCache::RegionMetadata * BlockCache::addNewRegion( + MemoryBlock & memory_block, + std::lock_guard & /*cache_lock*/) +{ + auto * free_region = RegionMetadata::create(); + free_region->ptr = memory_block.ptr; + assert(free_region->ptr != nullptr); + free_region->memory_block = &memory_block; + free_region->size = memory_block.size; + + adjacency_list.push_back(*free_region); + size_multimap.insert(*free_region); + + return free_region; +} + +template +typename BlockCache::RegionMetadata * BlockCache::allocateFromFreeRegion( + typename BlockCache::RegionMetadata * free_region, + size_t size, + std::lock_guard & /*cache_lock*/) +{ + if (!free_region) + return nullptr; + + if (free_region->size == size) + { + // Move memory blocks from free list to the acquired list if the region was the only one in the block + if (free_region->isBlockOwner()) + { + free_memory_blocks.erase(free_memory_blocks.iterator_to(*free_region->memory_block)); + acquired_memory_blocks.push_back(*free_region->memory_block); + } + size_multimap.erase(size_multimap.iterator_to(*free_region)); + return free_region; + } + + auto * allocated_region = RegionMetadata::create(); + allocated_region->ptr = free_region->ptr; + assert(allocated_region->ptr != nullptr); + allocated_region->memory_block = free_region->memory_block; + allocated_region->size = size; + + // Move block from free list to the acquired list if the region was the only one in the block + if (free_region->isBlockOwner()) + { + free_memory_blocks.erase(free_memory_blocks.iterator_to(*free_region->memory_block)); + acquired_memory_blocks.push_back(*free_region->memory_block); + } + size_multimap.erase(size_multimap.iterator_to(*free_region)); + free_region->size -= size; + free_region->char_ptr += size; + // Do not add corresponding block in the acquired_blocks as we've alredy did it for the free_region + size_multimap.insert(*free_region); + + adjacency_list.insert(adjacency_list.iterator_to(*free_region), *allocated_region); + return allocated_region; +} + +static size_t roundUp(size_t x, size_t rounding) +{ + return (x + (rounding - 1)) / rounding * rounding; +} + +template +typename BlockCache::RegionMetadata * BlockCache::allocate(size_t size) +{ + size = roundUp(size, alignment); + + /// Fast path. Look up to size multimap to find free region of specified size. + { + std::lock_guard cache_lock(mutex); + auto it = size_multimap.lower_bound(size, RegionCompareBySize()); + if (size_multimap.end() != it) + return allocateFromFreeRegion(&*it, size, cache_lock); + } + + /// Slow path. If nothing was found, ask for rebalancing and check again for the space + if (rebalance_strategy) + rebalance_strategy->shouldRebalance(name, size); + + std::lock_guard cache_lock(mutex); + + if (rebalance_strategy) + { + /// Check again for free blocks + auto rebalanced_it = size_multimap.lower_bound(size, RegionCompareBySize()); + if (size_multimap.end() != rebalanced_it) + return allocateFromFreeRegion(&*rebalanced_it, size, cache_lock); + } + + /// Nothing works. Start evictions + return evict(size, cache_lock); +} + +template +void BlockCache::freeRegion(typename BlockCache::RegionMetadata * region, std::lock_guard & /*cache_lock*/) noexcept +{ + if (!region) + return; + + auto adjacency_list_it = adjacency_list.iterator_to(*region); + + auto left_it = adjacency_list_it; + if (left_it != adjacency_list.begin()) + { + --left_it; + + if (left_it->memory_block == region->memory_block && left_it->isFree()) + { + region->size += left_it->size; + region->char_ptr -= left_it->size; + size_multimap.erase(size_multimap.iterator_to(*left_it)); + adjacency_list.erase_and_dispose(left_it, [](RegionMetadata * elem) { elem->destroy(); }); + } + } + + auto right_it = adjacency_list_it; + ++right_it; + if (right_it != adjacency_list.end()) + { + if (right_it->memory_block == region->memory_block && right_it->isFree()) + { + region->size += right_it->size; + size_multimap.erase(size_multimap.iterator_to(*right_it)); + adjacency_list.erase_and_dispose(right_it, [](RegionMetadata * elem) { elem->destroy(); }); + } + } + + size_multimap.insert(*region); + if (region->isBlockOwner()) { + acquired_memory_blocks.erase(acquired_memory_blocks.iterator_to(*region->memory_block)); + acquired_memory_blocks.push_back(*region->memory_block); + } +} + +template +void BlockCache::evictRegion(RegionMetadata * evicted_region, std::lock_guard & cache_lock) noexcept +{ + lru_list.erase(lru_list.iterator_to(*evicted_region)); + + if (evicted_region->KeyMapHook::is_linked()) + key_map.erase(key_map.iterator_to(*evicted_region)); + + freeRegion(evicted_region, cache_lock); +} + +template +typename BlockCache::RegionMetadata * BlockCache::evictSome(size_t requested_size, std::lock_guard & cache_lock) noexcept +{ + if (lru_list.empty()) + return nullptr; + + auto it = adjacency_list.iterator_to(lru_list.front()); + + while (true) + { + RegionMetadata & evicted_region = *it; + + /// Statistics + /// Note: change total_useful_size before eviction of the region as we could coalesce with neighbors regions (which can be initially free) + total_useful_cache_size.fetch_sub(evicted_region.size, std::memory_order_relaxed); + total_useful_cache_count.fetch_sub(1, std::memory_order_relaxed); + + evictRegion(&evicted_region, cache_lock); + + if (evicted_region.size >= requested_size) + return &evicted_region; + + ++it; + if (it == adjacency_list.end() || + it->memory_block != evicted_region.memory_block || + !it->LRUListHook::is_linked()) + return &evicted_region; + } +} + +template +typename BlockCache::RegionMetadata * BlockCache::evict(size_t requested_size, std::lock_guard & cache_lock) noexcept +{ + while (true) + { + RegionMetadata * res = evictSome(requested_size, cache_lock); + + /// Nothing to evict. All cache is full and in use - cannot allocate memory. + if (!res) + return nullptr; + + /// Not enough. Evict more. + if (res->size < requested_size) + continue; + + return allocateFromFreeRegion(res, requested_size, cache_lock); + } +} + +template +BlockCache::BlockCache(const std::string & name_, std::shared_ptr rebalance_strategy_) + : name(name_), rebalance_strategy(std::move(rebalance_strategy_)) +{ + auto initial_memory_blocks = rebalance_strategy->initialize(name); + addNewMemoryBlocks(initial_memory_blocks); +} + +template +BlockCache::~BlockCache() +{ + key_map.clear(); + size_multimap.clear(); + lru_list.clear(); + adjacency_list.clear_and_dispose([](RegionMetadata * elem) { elem->destroy(); }); + + auto remaining_blocks = std::move(acquired_memory_blocks); + remaining_blocks.splice(remaining_blocks.end(), free_memory_blocks); + + rebalance_strategy->finalize(name, std::move(remaining_blocks)); + + acquired_memory_blocks.clear(); + free_memory_blocks.clear(); +} + +template +typename BlockCache::HolderPtr BlockCache::get(const Key & key) +{ + std::lock_guard cache_lock(mutex); + + auto it = key_map.find(key, RegionCompareByKey()); + if (key_map.end() != it) + { + return new Holder(*this, *it, cache_lock); + } + return nullptr; +} + +template +size_t BlockCache::getCacheWeight() const +{ + return total_useful_cache_size.load(std::memory_order_relaxed); +} + +template +size_t BlockCache::getCacheCount() const +{ + return total_useful_cache_count.load(std::memory_order_relaxed); +} + +template +MemoryBlockList BlockCache::takeMemoryBlocks(size_t max_size_to_evict, std::lock_guard & cache_lock) +{ + /// Firstly, try to evict entries + /// If max_size_to_evict == 0, take only free blocks in the moment + size_t evicted_size = 0; + while (evicted_size < max_size_to_evict) + { + const auto * evicted_region = evictSome(max_size_to_evict - evicted_size, cache_lock); + if (evicted_region == nullptr) + break; + evicted_size += evicted_region->size; + } + + /// Secondly, take all free blocks + MemoryBlockList list; + list.swap(free_memory_blocks); + + /// Thirdly, remove all free regions from sizemap as we will take blocks under them + /// TODO: Optimize full scan of sizemap here, we can do linear scan before eviction (as normally we wouldn't have a lot of free regions) + /// and then add some flag to the evictSome function says that we don't need to add to the sizemap free regions which cover whole block + + /// All regions with smaller sizes won't cover the whole block + auto it = size_multimap.lower_bound(min_block_size, RegionCompareBySize()); + while (it != size_multimap.end()) + { + if (it->isBlockOwner()) + { + auto previous_it = it; + ++it; + size_multimap.erase(previous_it); + } else + { + ++it; + } + } + + /// TODO: Remove it after moving to the rebalance strategy + for (const auto & block : list) { + total_memory_blocks_size -= block.size; + } + + return list; +} + +template +MemoryBlockList BlockCache::takeMemoryBlocks(size_t max_size_to_evict) +{ + std::lock_guard lock(mutex); + + return takeMemoryBlocks(max_size_to_evict, lock); +} + +template +void BlockCache::addNewMemoryBlocks(MemoryBlockList & memory_blocks, std::lock_guard & cache_lock) +{ + /// Populate regions and update the total size + for (auto & block : memory_blocks) + { + addNewRegion(block, cache_lock); + total_memory_blocks_size += block.size; + } + + /// Add memory_blocks to the free_memory_blocks list + free_memory_blocks.splice(free_memory_blocks.end(), memory_blocks); +} + +template +void BlockCache::addNewMemoryBlocks(MemoryBlockList & memory_blocks) +{ + std::lock_guard lock(mutex); + + addNewMemoryBlocks(memory_blocks, lock); +} + + +void RebalanceStrategySettings::set(const std::string & key, const Field & field) +{ + rebalance_strategy_settings.emplace(std::make_pair(key, field)); +} + +void RebalanceStrategySettings::setBlockCacheSetting( + const std::string & key, + const std::string & block_cache_name, + const Field & field) +{ + block_caches_settings[block_cache_name].emplace(std::make_pair(key, field)); +} + +bool RebalanceStrategySettings::tryGet(const std::string & key, Field & field) const +{ + auto it = rebalance_strategy_settings.find(key); + if (it == rebalance_strategy_settings.end()) + return false; + + field = (*it).second; + return true; +} + +bool RebalanceStrategySettings::tryGetBlockCacheSetting( + const std::string & key, + const std::string & block_cache_name, + Field & field) const +{ + auto block_cache_it = block_caches_settings.find(block_cache_name); + if (block_cache_it == block_caches_settings.end()) + return false; + + const auto & block_cache_settings = block_cache_it->second; + auto it = block_cache_settings.find(key); + if (it == block_cache_settings.end()) + return false; + + field = (*it).second; + return true; +} + + +template +DummyRebalanceStrategy::DummyRebalanceStrategy( + BlockCacheMapping & caches_, const std::vector & block_cache_names_) + : Base(caches_) + , block_cache_names(block_cache_names_) +{ + for (const auto & cache_name : block_cache_names) + block_cache_stats.try_emplace(cache_name); +} + +template +MemoryBlockList DummyRebalanceStrategy::initialize(const std::string & /*caller_name*/) +{ + // nop + return {}; +} + +template +void DummyRebalanceStrategy::shouldRebalance(const std::string &caller_name, size_t new_item_size) +{ + assert(block_cache_stats.count(caller_name) != 0); + size_t page_size = static_cast(::getPageSize()); + size_t required_block_size = std::max(Settings::min_memory_block_size, roundUp(new_item_size, page_size)); + + const auto & settings_entry = settings.block_cache_settings.at(caller_name); + auto & stats_entry = block_cache_stats.at(caller_name); + if (stats_entry.total_memory_blocks_size + required_block_size <= settings_entry.max_total_size) + { + auto * block = MemoryBlock::create(required_block_size); + MemoryBlockList memory_blocks; + memory_blocks.push_back(*block); + Base::caches.at(caller_name).addNewMemoryBlocks(memory_blocks); + stats_entry.total_memory_blocks_size.fetch_add(required_block_size, std::memory_order_relaxed); + } +} + +template +void DummyRebalanceStrategy::finalize(const std::string &caller_name, MemoryBlockList memory_blocks) +{ + assert(block_cache_stats.count(caller_name) != 0); + memory_blocks.clear_and_dispose([](MemoryBlock * block) { block->destroy(); }); + block_cache_stats[caller_name].total_memory_blocks_size.store(0, std::memory_order_relaxed); +} + +template +void DummyRebalanceStrategy::initializeWithSettings(const RebalanceStrategySettings & settings_) +{ + Field field; + for (const auto & cache_name : block_cache_names) { + size_t current_max_total_size = 0; + size_t current_max_size_to_evict_on_purging = 0; + + if (settings_.tryGetBlockCacheSetting("max_total_size", cache_name, field)) + current_max_total_size = field.safeGet(); + + if (settings_.tryGetBlockCacheSetting("max_size_to_evict_on_purging", cache_name, field)) + current_max_size_to_evict_on_purging = field.safeGet(); + + settings.block_cache_settings.try_emplace(cache_name, current_max_total_size, current_max_size_to_evict_on_purging); + } +} + +template +size_t DummyRebalanceStrategy::getWeight() const +{ + size_t total_weight = 0; + for (const auto & item : Base::caches) + { + const auto & block_cache = item.second; + total_weight += block_cache.getCacheWeight(); + } + return total_weight; +} + +template +size_t DummyRebalanceStrategy::getCount() const +{ + size_t total_count = 0; + for (const auto & item : Base::caches) + { + const auto & block_cache = item.second; + total_count += block_cache.getCacheCount(); + } + return total_count; +} + +template +void DummyRebalanceStrategy::purge() +{ + shrinkBlockCaches(/*use_limit = */true); +} + +template +void DummyRebalanceStrategy::reset() +{ + shrinkBlockCaches(/*use_limit = */false); +} + +template +typename DummyRebalanceStrategy::StatResponses DummyRebalanceStrategy::getStats(const StatRequests & requests) +{ + StatResponses responses; + for (const auto & request : requests) + { + auto & stats_entry = block_cache_stats[request.block_cache_name]; + const auto & stat_name = request.stat_name; + if (stat_name == "total_memory_blocks_size") + responses.push_back(Field(stats_entry.total_memory_blocks_size.load(std::memory_order_relaxed))); + else + throw Exception("Invalid stat name in getStats method", ErrorCodes::BAD_ARGUMENTS); + } + return responses; +} + +template +void DummyRebalanceStrategy::shrinkBlockCaches(bool use_limit) +{ + for (auto & item : block_cache_stats) + { + const auto & name = item.first; + auto & cache_stats = item.second; + const auto & cache_settings = settings.block_cache_settings.at(name); + const auto eviction_limit = use_limit? cache_settings.max_size_to_evict_on_purging : cache_settings.max_total_size; + + auto blocks = Base::caches.at(name).takeMemoryBlocks(eviction_limit); + size_t evicted_size = 0; + blocks.clear_and_dispose([&evicted_size](MemoryBlock * block) + { + evicted_size += block->size; + block->destroy(); + }); + cache_stats.total_memory_blocks_size.fetch_sub(evicted_size, std::memory_order_relaxed); + } +} + + +template +BuddyStaticRebalanceStrategy::BuddyStaticRebalanceStrategy( + BlockCacheMapping & caches_, + const std::vector & block_cache_names_) + : Base(caches_) + , block_cache_names(block_cache_names_) +{ + for (const auto & cache_name : block_cache_names) + block_cache_stats.try_emplace(cache_name); +} + +template +MemoryBlockList BuddyStaticRebalanceStrategy::initialize(const std::string & /*caller_name*/) +{ + // nop + return {}; +} + +template +void BuddyStaticRebalanceStrategy::shouldRebalance(const std::string & caller_name, size_t new_item_size) +{ + assert(block_cache_stats.count(caller_name) != 0); + + size_t required_block_size = roundUpToPowerOf2(new_item_size, Settings::minimal_allocation_size); + auto & stats_entry = block_cache_stats[caller_name]; + + auto * block_ptr = memory_arena.malloc(required_block_size); + if (block_ptr == nullptr) + return; + + auto * block = MemoryBlock::create(block_ptr, required_block_size); + + MemoryBlockList memory_blocks; + memory_blocks.push_back(*block); + Base::caches.at(caller_name).addNewMemoryBlocks(memory_blocks); + + stats_entry.total_memory_blocks_size.fetch_add(required_block_size, std::memory_order_relaxed); +} + +template +void BuddyStaticRebalanceStrategy::finalize(const std::string & caller_name, MemoryBlockList memory_blocks) +{ + assert(block_cache_stats.count(caller_name) != 0); + memory_blocks.clear_and_dispose([this](MemoryBlock * block) + { + memory_arena.free(block->ptr); + block->destroy(); + }); + block_cache_stats[caller_name].total_memory_blocks_size.store(0, std::memory_order_relaxed); +} + +template +void BuddyStaticRebalanceStrategy::initializeWithSettings(const RebalanceStrategySettings & settings_) +{ + Field field; + /// Initialize a rebalance strategy setting + if (settings_.tryGet("memory_arena_size", field)) + settings.memory_arena_size = field.safeGet(); + + /// Initialize block cache settings + for (const auto & cache_name : block_cache_names) { + size_t current_max_size_to_evict_on_purging = 0; + + if (settings_.tryGetBlockCacheSetting("max_size_to_evict_on_purging", cache_name, field)) + current_max_size_to_evict_on_purging = field.safeGet(); + + settings.block_cache_settings.try_emplace(cache_name, current_max_size_to_evict_on_purging); + } + + /// Initialize the memory arena + memory_arena.initialize(Settings::minimal_allocation_size, settings.memory_arena_size, settings.memory_arena_size); +} + +template +size_t BuddyStaticRebalanceStrategy::getWeight() const +{ + size_t total_weight = 0; + for (const auto & item : Base::caches) + { + const auto & block_cache = item.second; + total_weight += block_cache.getCacheWeight(); + } + return total_weight; +} + +template +size_t BuddyStaticRebalanceStrategy::getCount() const +{ + size_t total_count = 0; + for (const auto & item : Base::caches) + { + const auto & block_cache = item.second; + total_count += block_cache.getCacheCount(); + } + return total_count; +} + +template +void BuddyStaticRebalanceStrategy::purge() +{ + shrinkBlockCaches(/*use_limit = */true); +} + +template +void BuddyStaticRebalanceStrategy::reset() +{ + shrinkBlockCaches(/*use_limit = */false); +} + +template +typename BuddyStaticRebalanceStrategy::StatResponses BuddyStaticRebalanceStrategy::getStats(const StatRequests & requests) +{ + StatResponses responses; + for (const auto & request : requests) + { + auto & stats_entry = block_cache_stats[request.block_cache_name]; + const auto & stat_name = request.stat_name; + if (stat_name == "total_memory_blocks_size") + responses.push_back(Field(stats_entry.total_memory_blocks_size.load(std::memory_order_relaxed))); + else + throw Exception("Invalid stat name in getStats method", ErrorCodes::BAD_ARGUMENTS); + } + return responses; +} + +template +void BuddyStaticRebalanceStrategy::shrinkBlockCaches(bool use_limit) +{ + for (auto & item : block_cache_stats) + { + const auto & name = item.first; + auto & cache_stats = item.second; + const auto & cache_settings = settings.block_cache_settings.at(name); + const auto eviction_limit = use_limit? cache_settings.max_size_to_evict_on_purging : settings.memory_arena_size; + + auto blocks = Base::caches.at(name).takeMemoryBlocks(eviction_limit); + size_t evicted_size = 0; + blocks.clear_and_dispose([this, &evicted_size](MemoryBlock * block) + { + evicted_size += block->size; + memory_arena.free(block->ptr); + block->destroy(); + }); + cache_stats.total_memory_blocks_size.fetch_sub(evicted_size, std::memory_order_relaxed); + } +} + + +template +BuddyDynamicRebalanceStrategy::BuddyDynamicRebalanceStrategy( + BlockCacheMapping & caches_, + const std::vector & block_cache_names_) + : Base(caches_) + , block_cache_names(block_cache_names_) +{ + for (const auto & cache_name : block_cache_names) + block_cache_stats.try_emplace(cache_name); +} + +template +MemoryBlockList BuddyDynamicRebalanceStrategy::initialize(const std::string & /*caller_name*/) +{ + // nop + return {}; +} + +template +void BuddyDynamicRebalanceStrategy::shouldRebalance(const std::string & caller_name, size_t new_item_size) +{ + auto & caller_stats_entry = block_cache_stats[caller_name]; + + caller_stats_entry.should_rebalance_to_stealing_attempts_diff.fetch_add(1, std::memory_order_relaxed); + + assert(block_cache_stats.count(caller_name) != 0); + size_t required_block_size = roundUpToPowerOf2(new_item_size, Settings::minimal_allocation_size); + + /// Try to steal free memory blocks from other cache pools + MemoryBlockList stolen_memory_blocks; + { + std::lock_guard lock(stealing_mutex); + for (auto & item : Base::caches) + { + const auto & name = item.first; + /// Do not steal from themselves + if (name == caller_name) + continue; + auto & block_cache = item.second; + auto & current_stats_entry = block_cache_stats[name]; + + /// Do not steal from the cache pool with a positive difference between shouldRebalance calls and attempts to steal from it + if (current_stats_entry.should_rebalance_to_stealing_attempts_diff.fetch_sub(1, std::memory_order_relaxed) >= 0) + continue; + + size_t initial_size = block_cache.getCacheWeight(); + auto blocks = block_cache.takeMemoryBlocks(0); + stolen_memory_blocks.splice(stolen_memory_blocks.end(), blocks); + size_t after_steal_size = block_cache.getCacheWeight(); + assert(initial_size >= after_steal_size); + + current_stats_entry.total_memory_blocks_size.fetch_sub(initial_size - after_steal_size, std::memory_order_relaxed); + } + } + + /// Return all stealt blocks to the BuddyArena + stolen_memory_blocks.clear_and_dispose([this](MemoryBlock * block) + { + memory_arena.free(block->ptr); + block->destroy(); + }); + + /// Take memory from the BuddyArena + auto * block_ptr = memory_arena.malloc(settings.allocated_memory_multiplier * required_block_size); + if (block_ptr == nullptr) + { + caller_stats_entry.memory_allocation_failures.fetch_add(1, std::memory_order_relaxed); + return; + } + + auto * block = MemoryBlock::create(block_ptr, required_block_size); + + /// Add it to the cache pool + MemoryBlockList memory_blocks; + memory_blocks.push_back(*block); + Base::caches.at(caller_name).addNewMemoryBlocks(memory_blocks); + + caller_stats_entry.total_memory_blocks_size.fetch_add(required_block_size, std::memory_order_relaxed); +} + +template +void BuddyDynamicRebalanceStrategy::finalize(const std::string & caller_name, MemoryBlockList memory_blocks) +{ + assert(block_cache_stats.count(caller_name) != 0); + memory_blocks.clear_and_dispose([this](MemoryBlock * block) + { + memory_arena.free(block->ptr); + block->destroy(); + }); + block_cache_stats[caller_name].total_memory_blocks_size.store(0, std::memory_order_relaxed); +} + +template +void BuddyDynamicRebalanceStrategy::initializeWithSettings(const RebalanceStrategySettings & settings_) +{ + Field field; + /// Initialize a rebalance strategy setting + if (settings_.tryGet("memory_arena_capacity", field)) + settings.memory_arena_capacity = field.safeGet(); + + if (settings_.tryGet("memory_arena_initial_size", field)) + settings.memory_arena_initial_size = field.safeGet(); + + if (settings_.tryGet("allocated_memory_multiplier", field)) + settings.allocated_memory_multiplier = field.safeGet(); + + /// Initialize block cache settings + for (const auto & cache_name : block_cache_names) { + size_t current_max_size_to_evict_on_purging = 0; + + if (settings_.tryGetBlockCacheSetting("max_size_to_evict_on_purging", cache_name, field)) + current_max_size_to_evict_on_purging = field.safeGet(); + + settings.block_cache_settings.try_emplace(cache_name, current_max_size_to_evict_on_purging); + } + + /// Initialize the memory arena + memory_arena.initialize(Settings::minimal_allocation_size, settings.memory_arena_initial_size, settings.memory_arena_capacity); +} + +template +size_t BuddyDynamicRebalanceStrategy::getWeight() const +{ + size_t total_weight = 0; + for (const auto & item : Base::caches) + { + const auto & block_cache = item.second; + total_weight += block_cache.getCacheWeight(); + } + return total_weight; +} + +template +size_t BuddyDynamicRebalanceStrategy::getCount() const +{ + size_t total_count = 0; + for (const auto & item : Base::caches) + { + const auto & block_cache = item.second; + total_count += block_cache.getCacheCount(); + } + return total_count; +} + +template +void BuddyDynamicRebalanceStrategy::purge() +{ + shrinkBlockCaches(/*use_limit = */true); +} + +template +void BuddyDynamicRebalanceStrategy::reset() +{ + shrinkBlockCaches(/*use_limit = */false); +} + +template +typename BuddyDynamicRebalanceStrategy::StatResponses BuddyDynamicRebalanceStrategy::getStats(const StatRequests & requests) +{ + StatResponses responses; + for (const auto & request : requests) + { + auto & stats_entry = block_cache_stats[request.block_cache_name]; + const auto & stat_name = request.stat_name; + if (stat_name == "total_memory_blocks_size") + responses.push_back(Field(stats_entry.total_memory_blocks_size.load(std::memory_order_relaxed))); + else if (stat_name == "should_rebalance_to_stealing_attempts_diff") + responses.push_back(Field(stats_entry.should_rebalance_to_stealing_attempts_diff.load(std::memory_order_relaxed))); + else if (stat_name == "memory_allocation_failures") + responses.push_back(Field(stats_entry.memory_allocation_failures.load(std::memory_order_relaxed))); + else + throw Exception("Invalid stat name in getStats method", ErrorCodes::BAD_ARGUMENTS); + } + return responses; +} + +template +void BuddyDynamicRebalanceStrategy::shrinkBlockCaches(bool use_limit) +{ + for (auto & item : block_cache_stats) + { + const auto & name = item.first; + auto & cache_stats = item.second; + const auto & cache_settings = settings.block_cache_settings.at(name); + const auto eviction_limit = use_limit? cache_settings.max_size_to_evict_on_purging : settings.memory_arena_capacity; + + auto blocks = Base::caches.at(name).takeMemoryBlocks(eviction_limit); + size_t evicted_size = 0; + blocks.clear_and_dispose([this, &evicted_size](MemoryBlock * block) + { + evicted_size += block->size; + memory_arena.free(block->ptr); + block->destroy(); + }); + cache_stats.total_memory_blocks_size.fetch_sub(evicted_size, std::memory_order_relaxed); + } +} + + +template +void BlockCachesManager::initialize( + const std::vector & block_cache_names, + std::string_view rebalance_strategy_name, + const RebalanceStrategySettings & rebalance_strategy_settings) +{ + if (rebalance_strategy_name.empty()) + rebalance_strategy_name = default_rebalance_strategy; + + if (rebalance_strategy_name == "dummy") + rebalance_strategy = std::make_shared>(block_caches, block_cache_names); + else if (rebalance_strategy_name == "buddy_static") + rebalance_strategy = std::make_shared>(block_caches, block_cache_names); + else if (rebalance_strategy_name == "buddy_dynamic") + rebalance_strategy = std::make_shared>(block_caches, block_cache_names); + else + throw Exception("Invalid rebalance strategy name", ErrorCodes::BAD_ARGUMENTS); + + rebalance_strategy->initializeWithSettings(rebalance_strategy_settings); + + /// Populate block_caches mapping, the key is block_cache_name and block_cache_name with rebalance_strategy are provided in the constructor + for (const auto & block_cache_name : block_cache_names) + block_caches.try_emplace(block_cache_name, block_cache_name, rebalance_strategy); +} + +template +BlockCache & BlockCachesManager::getBlockCacheInstance(const std::string & name) +{ + return block_caches.at(name); +} + +template +size_t BlockCachesManager::getCacheWeight() const +{ + assert(rebalance_strategy != nullptr); + return rebalance_strategy->getWeight(); +} + +template +size_t BlockCachesManager::getCacheCount() const +{ + assert(rebalance_strategy != nullptr); + return rebalance_strategy->getCount(); +} + +template +void BlockCachesManager::purge() +{ + rebalance_strategy->purge(); +} + +template +void BlockCachesManager::reset() +{ + rebalance_strategy->reset(); +} + +template +typename BlockCachesManager::StatResponses BlockCachesManager::getStats(const StatRequests & requests) +{ + return rebalance_strategy->getStats(requests); +} + +template class BlockCache; +template class DummyRebalanceStrategy; +template class BlockCachesManager; + +} diff --git a/src/Common/UnifiedCache.h b/src/Common/UnifiedCache.h new file mode 100644 index 000000000000..801a164eed03 --- /dev/null +++ b/src/Common/UnifiedCache.h @@ -0,0 +1,1058 @@ +#pragma once + +#include +#include "Core/Field.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int CANNOT_ALLOCATE_MEMORY; + extern const int CANNOT_MUNMAP; +} + + +/// Global memory arena under buddy allocator schema +class BuddyArena +{ +public: + union MemoryBlock; + + /// Node of double-linked list + struct FreeMemoryBlock + { + MemoryBlock * previous; + MemoryBlock * next; + }; + + union MemoryBlock + { + FreeMemoryBlock pointers; + char data[0]; + }; + + static BuddyArena & instance() + { + static BuddyArena arena; + return arena; + } + + BuddyArena() = default; + + ~BuddyArena() noexcept; + + BuddyArena(const BuddyArena&) = delete; + BuddyArena(BuddyArena&&) = delete; + BuddyArena& operator=(const BuddyArena&) = delete; + BuddyArena& operator=(BuddyArena&&) = delete; + + bool isValid() const; + + /// Check that the ptr is contained in the allocated memory arena + bool isAllocated(const void * ptr) const; + + void initialize(size_t minimal_allocation_size, size_t size, size_t capacity); + + void * malloc(size_t size, size_t align = 0); + void free(void * buf) noexcept; + + void purge(); + + [[nodiscard]] double getFreeSpaceRatio() const; + [[nodiscard]] size_t getTotalSizeBytes() const; + +private: + static constexpr size_t kPageSize = 4096; + + void * arena_buffer; + + size_t number_of_levels = 0; + size_t total_size_bytes = 0; + + size_t minimal_allocation_size_bytes = 0; + + std::atomic free_min_blocks = 0; + size_t min_blocks_num = 0; + + /// Meta storage + + /// TODO: Compact block_status and pointers_levels + enum class BlockStatus : uint8_t { + NotInLists = 0, + InFreeLists = 1, + InShadowLists = 2, + }; + BlockStatus * block_status; + + uint8_t * pointers_levels; + struct GuardedLists { + MemoryBlock ** lists; + std::mutex * mutexes; + BlockStatus lists_block_status; + }; + GuardedLists free_lists{.lists_block_status = BlockStatus::InFreeLists}; + GuardedLists shadow_lists{.lists_block_status = BlockStatus::InShadowLists}; + + [[nodiscard]] static void * allocateArena(size_t size); + static void deallocateArena(void * buffer, size_t size); + + size_t calculateLevel(size_t size) const; + size_t calculateLevel(size_t size, size_t align) const; + size_t calculateBlockSizeOnLevel(size_t level) const; + size_t calculateIndexInLevel(const MemoryBlock * block, size_t level) const; + size_t calculateIndex(const MemoryBlock * block, size_t level) const; + size_t calculatePointerIndex(const MemoryBlock * block) const; + + /// Calculate number of minimal blocks for the allocation on the lower level + /// Used to calculate offset for the meta storage beforehand + size_t calculateMinBlocksNumber(size_t size_bytes) const; + + MemoryBlock * blockFromIndexInLevel(size_t index_in_level, size_t level) const; + + size_t getPointerLevel(const MemoryBlock * block) const; + void setPointerLevel(const MemoryBlock * block, size_t level); + + MemoryBlock * takeBlockFromLists(GuardedLists & guarded_lists, size_t level); + void returnBlockToLists(GuardedLists & guarded_lists, MemoryBlock * block, size_t level); + + /// Take the block from free_lists + MemoryBlock * allocateBlock(size_t level); + /// Return the block to the free_lists + void deallocateBlock(MemoryBlock * block, size_t level); + + bool adviseBlock(MemoryBlock * block, size_t level) const; + void reclaimBlock(MemoryBlock * block, size_t level) const; + + /// Take the block from the shadow_lists, trigger page mapping on the block + MemoryBlock * acquireBlock(size_t level); + /// Try to return the block to the shadow_lists and advise the memory block. Will fail, if madvice fails + void tryReleaseBlock(MemoryBlock * block, size_t level); + + char * initializeMetaStorage(); + + std::pair divideBlock(MemoryBlock * block, size_t level); + static MemoryBlock * mergeWithBuddy(MemoryBlock * block, MemoryBlock * buddy); + MemoryBlock * getBuddy(MemoryBlock * block, size_t level) const; + + BlockStatus getBlockStatus(const MemoryBlock * block, size_t level) const; + void setBlockStatus(const MemoryBlock * block, size_t level, BlockStatus status); + + /// TODO: Remove this method + /// Temporarly use only + void printMemoryUsageDummy() const; + + static void addToList(MemoryBlock ** lists, MemoryBlock * block, size_t level); + static bool isListEmpty(MemoryBlock ** lists, size_t level); + static void removeFromList(MemoryBlock ** lists, MemoryBlock * block, size_t level); + + void addToFreeList(MemoryBlock * block, size_t level); + bool isFreeListEmpty(size_t level) const; + void removeFromFreeList(MemoryBlock * block, size_t level); +}; + + +template +class IRebalanceStrategy; + + +struct MemoryBlockListTag; +using MemoryBlockListHook = boost::intrusive::list_base_hook>; +struct MemoryBlock : public MemoryBlockListHook, private boost::noncopyable +{ + void * ptr; + size_t size; + bool is_owner; + + MemoryBlock(MemoryBlock && other) noexcept : ptr(other.ptr), size(other.size) + { + other.ptr = nullptr; + } + + static MemoryBlock * create(size_t size_) + { + return new MemoryBlock(size_); + } + + static MemoryBlock * create(void * ptr_, size_t size_) + { + return new MemoryBlock(ptr_, size_); + } + + void destroy() + { + delete this; + } +private: + explicit MemoryBlock(size_t size_); + MemoryBlock(void * ptr_, size_t size_); + + ~MemoryBlock(); +}; +using MemoryBlockList = boost::intrusive::list, boost::intrusive::constant_time_size>; + + +template +class BlockCache : private boost::noncopyable +{ +private: + /** Invariants: + * acquired_memory_blocks contains all blocks which contain at least one non-free region + * free_memory_blocks contains all blocks which contain exactly once free region + */ + MemoryBlockList acquired_memory_blocks; + MemoryBlockList free_memory_blocks; + + // using Chunks = std::list; + // Chunks chunks; + + struct LRUListTag; + struct AdjacencyListTag; + struct SizeMultimapTag; + struct KeyMapTag; + + using LRUListHook = boost::intrusive::list_base_hook>; + using AdjacencyListHook = boost::intrusive::list_base_hook>; + using SizeMultimapHook = boost::intrusive::set_base_hook>; + using KeyMapHook = boost::intrusive::set_base_hook>; + + struct RegionMetadata : public LRUListHook, AdjacencyListHook, SizeMultimapHook, KeyMapHook + { + Key key; + + union + { + void * ptr; + char * char_ptr; + }; + size_t size; + size_t refcount = 0; + MemoryBlock * memory_block; + + bool operator< (const RegionMetadata & other) const { return size < other.size; } + + bool isFree() const { return SizeMultimapHook::is_linked(); } + + static RegionMetadata * create() + { + return new RegionMetadata; + } + + void destroy() + { + delete this; + } + + [[nodiscard]] bool isBlockOwner() const + { + return memory_block != nullptr && ptr == memory_block->ptr && memory_block->size == size; + } + + private: + RegionMetadata() = default; + ~RegionMetadata() = default; + }; + + struct RegionCompareBySize + { + bool operator() (const RegionMetadata & a, const RegionMetadata & b) const { return a.size < b.size; } + bool operator() (const RegionMetadata & a, size_t size) const { return a.size < size; } + bool operator() (size_t size, const RegionMetadata & b) const { return size < b.size; } + }; + + struct RegionCompareByKey + { + bool operator() (const RegionMetadata & a, const RegionMetadata & b) const { return a.key < b.key; } + bool operator() (const RegionMetadata & a, Key key) const { return a.key < key; } + bool operator() (Key key, const RegionMetadata & b) const { return key < b.key; } + }; + + // TODO: boost::intrusize::unordered_set? + using LRUList = boost::intrusive::list, boost::intrusive::constant_time_size>; + using AdjacencyList = boost::intrusive::list, boost::intrusive::constant_time_size>; + using SizeMultimap = boost::intrusive::multiset, boost::intrusive::base_hook, boost::intrusive::constant_time_size>; + using KeyMap = boost::intrusive::set, boost::intrusive::base_hook, boost::intrusive::constant_time_size>; + + /** Each region could be: + * - free: not holding any data; + * - allocated: having data, addressed by key; + * -- allocated, in use: holded externally, could not be evicted; + * -- allocated, not in use: not holded, could be evicted. + */ + + /** Invariants: + * adjacency_list contains all regions + * size_multimap contains free regions + * key_map contains allocated regions + * lru_list contains allocated regions, that are not in use + */ + + LRUList lru_list; + AdjacencyList adjacency_list; + SizeMultimap size_multimap; + KeyMap key_map; + + mutable std::mutex mutex; + + /// TODO: Remove temp workaround, add single flight system + mutable std::mutex get_or_set_mutex; + + // size_t max_total_size = 0; + // size_t max_size_to_evict_on_purging = 0; + + /// We will allocate memory in blocks of at least that size. + /// 64 MB makes mmap overhead comparable to memory throughput. + static constexpr size_t min_block_size = 64 * 1024 * 1024; + + static constexpr size_t alignment = 16; + + size_t total_memory_blocks_size = 0; + std::atomic total_useful_cache_size = 0; + std::atomic total_useful_cache_count = 0; + + /// Block cache identifier used in the rebalance_strategy + const std::string name; + + using RebalanceStrategy = IRebalanceStrategy; + std::shared_ptr rebalance_strategy; + + RegionMetadata * addNewRegion(MemoryBlock & memory_block, std::lock_guard & cache_lock); + + /// Precondition: free_region.size >= size. + RegionMetadata * allocateFromFreeRegion(RegionMetadata * free_region, size_t size, std::lock_guard & cache_lock); + + RegionMetadata * allocate(size_t size); + + /// Precondition: region is not in lru_list, not in key_map, not in size_multimap. + /// Postcondition: region is not in lru_list, not in key_map, + /// inserted into size_multimap, possibly coalesced with adjacent free regions. + void freeRegion(RegionMetadata * region, std::lock_guard & cache_lock) noexcept; + + void evictRegion(RegionMetadata * evicted_region, std::lock_guard & cache_lock) noexcept; + + /// Evict region from cache and return it, coalesced with nearby free regions. + /// While size is not enough, evict adjacent regions at right, if any. + /// If nothing to evict, returns nullptr. + /// Region is removed from lru_list and key_map and inserted into size_multimap. + RegionMetadata * evictSome(size_t requested_size, std::lock_guard & cache_lock) noexcept; + + /// Evicting regions from cache until requested_size is evicted or cache is empty + RegionMetadata * evict(size_t requested_size, std::lock_guard & cache_lock) noexcept; + + struct HolderCreater; + +public: + + struct Holder : private boost::noncopyable + { + Holder(BlockCache & cache_, RegionMetadata & region_, std::lock_guard&); + ~Holder(); + + void * ptr() + { + assert(region.ptr != nullptr); + return region.ptr; + } + const void * ptr() const + { + assert(region.ptr != nullptr); + return region.ptr; + } + size_t size() const { return region.size; } + Key key() const { return region.key; } + + private: + friend struct HolderCreater; + + BlockCache & cache; + RegionMetadata & region; + void * region_ptr; + }; + + using HolderPtr = Holder *; + +private: + struct HolderCreater : private boost::noncopyable + { + HolderCreater(BlockCache & cache_, RegionMetadata & region_, std::lock_guard & cache_lock); + ~HolderCreater() = default; + + HolderPtr acquireHolder(std::lock_guard & cache_lock); + + private: + Holder initial_holder; + }; + + using HolderCreaterPtr = HolderCreater *; + + /// Represents pending insertion attempt. + struct InsertToken + { + explicit InsertToken(BlockCache & cache_) : cache(cache_) {} + ~InsertToken() { delete value; } + + std::mutex mutex; + bool cleaned_up = false; /// Protected by the token mutex + HolderCreaterPtr value = nullptr; /// Protected by the token mutex + + BlockCache & cache; + size_t refcount = 0; /// Protected by the cache mutex + }; + + using InsertTokens = std::unordered_map>; + InsertTokens insert_tokens; + + /// This class is responsible for removing used insert tokens from the insert_tokens map. + /// Among several concurrent threads the first successful one is responsible for removal. But if they all + /// fail, then the last one is responsible. + struct InsertTokenHolder + { + const Key * key = nullptr; + std::shared_ptr token; + bool cleaned_up = false; + + InsertTokenHolder() = default; + + void acquire(const Key * key_, const std::shared_ptr & token_, [[maybe_unused]] std::lock_guard & cache_lock) + { + key = key_; + token = token_; + ++token->refcount; + } + + void cleanup([[maybe_unused]] std::lock_guard & token_lock, [[maybe_unused]] std::lock_guard & cache_lock) + { + token->cache.insert_tokens.erase(*key); + token->cleaned_up = true; + cleaned_up = true; + } + + ~InsertTokenHolder() + { + if (!token) + return; + + if (cleaned_up) + return; + + std::lock_guard token_lock(token->mutex); + + if (token->cleaned_up) + return; + + std::lock_guard cache_lock(token->cache.mutex); + + --token->refcount; + if (token->refcount == 0) + cleanup(token_lock, cache_lock); + } + }; + + friend struct InsertTokenHolder; + +public: + + BlockCache(const std::string & name_, std::shared_ptr rebalance_strategy_); + + ~BlockCache(); + + /// Applications isn't supposed to call these methods directly without + /// proxy classes to hold raw pointers + template + HolderPtr getOrSet(const Key & key, + GetSizeFunc && get_size, + InitializeFunc && initialize, + bool * was_calculated) + { + /// TODO: Remove temp hack, add single-flight system + InsertTokenHolder token_holder; + + // std::lock_guard get_or_set_lock(get_or_set_mutex); + + { + std::lock_guard cache_lock(mutex); + + auto it = key_map.find(key, RegionCompareByKey()); + if (key_map.end() != it) + { + if (was_calculated) + *was_calculated = false; + return new Holder(*this, *it, cache_lock); + } + + auto & token = insert_tokens[key]; + if (!token) + token = std::make_shared(*this); + + token_holder.acquire(&key, token, cache_lock); + } + + InsertToken * token = token_holder.token.get(); + + std::lock_guard token_lock(token->mutex); + token_holder.cleaned_up = token->cleaned_up; + if (token->value) + { + if (was_calculated) + *was_calculated = false; + + std::lock_guard cache_lock(mutex); + return token->value->acquireHolder(cache_lock); + } + + RegionMetadata * region; + size_t size = get_size(); + region = allocate(size); + + if (!region) + return {}; + + region->key = key; + + try + { + initialize(region->ptr); + } + catch (...) + { + std::lock_guard cache_lock(mutex); + freeRegion(region, cache_lock); + throw; + } + + std::lock_guard cache_lock(mutex); + // key_map.insert(*region); + token->value = new HolderCreater(*this, *region, cache_lock); + auto * holder = token->value->acquireHolder(cache_lock); + + /// Insert the new value only if the token is still in present in insert_tokens. + /// (The token may be absent because of a concurrent reset() call). + auto token_it = insert_tokens.find(key); + if (token_it != insert_tokens.end() && token_it->second.get() == token) + { + key_map.insert(*region); + } + if (!token->cleaned_up) + token_holder.cleanup(token_lock, cache_lock); + + if (was_calculated) + *was_calculated = true; + + /// Statistics + if (region) { + total_useful_cache_size.fetch_add(region->size, std::memory_order_relaxed); + total_useful_cache_count.fetch_add(1, std::memory_order_relaxed); + } + + // return new Holder(*this, *region, cache_lock); + return holder; + } + + HolderPtr get(const Key & key); + + size_t getCacheWeight() const; + + size_t getCacheCount() const; + + MemoryBlockList takeMemoryBlocks(size_t max_size_to_evict, std::lock_guard & cache_lock); + MemoryBlockList takeMemoryBlocks(size_t max_size_to_evict); + + void addNewMemoryBlocks(MemoryBlockList & memory_blocks, std::lock_guard & cache_lock); + void addNewMemoryBlocks(MemoryBlockList & memory_blocks); +}; + + +class RebalanceStrategySettings +{ +public: + void set(const std::string & key, const Field & field); + void setBlockCacheSetting(const std::string & key, const std::string & block_cache_name, const Field & field); + + bool tryGet(const std::string & key, Field & field) const; + bool tryGetBlockCacheSetting(const std::string & key, const std::string & block_cache_name, Field & field) const; + +private: + using SettingsMapping = std::unordered_map; + using BlockCacheSettingsMapping = std::unordered_map; + + SettingsMapping rebalance_strategy_settings; + BlockCacheSettingsMapping block_caches_settings; +}; + + +/// Lifetime of the IRebalanceStrategy is the same as the lifetime of the BlockCachesManager +/// TODO: Change ownership of the IRebalanceStrategy in the BlockCache from the shared_ptr to the weak_ptr +template +class IRebalanceStrategy +{ +public: + using BlockCacheMapping = std::unordered_map>; + explicit IRebalanceStrategy(BlockCacheMapping & caches_) : caches(caches_) + {} + + virtual ~IRebalanceStrategy() = default; + + /// Non-copyable + IRebalanceStrategy(const IRebalanceStrategy &) = delete; + IRebalanceStrategy & operator=(const IRebalanceStrategy &) = delete; + + /// Movable + IRebalanceStrategy(IRebalanceStrategy &&) noexcept = default; + IRebalanceStrategy & operator=(IRebalanceStrategy &&) noexcept = default; + + /// Interfaces for the BlockCache + virtual MemoryBlockList initialize(const std::string & caller_name) = 0; + virtual void shouldRebalance(const std::string & caller_name, size_t new_item_size) = 0; + virtual void finalize(const std::string & caller_name, MemoryBlockList memory_blocks) = 0; + + /// Interfaces for the BlockCacheManager + virtual void initializeWithSettings(const RebalanceStrategySettings & settings_) = 0; + virtual size_t getWeight() const = 0; + virtual size_t getCount() const = 0; + virtual void purge() = 0; + virtual void reset() = 0; + + struct StatRequest { + std::string block_cache_name; + std::string stat_name; + }; + using StatRequests = std::vector; + using StatResponses = std::vector; + virtual StatResponses getStats(const StatRequests & requests) = 0; + +protected: + BlockCacheMapping & caches; +}; + + +template +class DummyRebalanceStrategy : public IRebalanceStrategy +{ + using Base = IRebalanceStrategy; + using BlockCacheMapping = typename Base::BlockCacheMapping; + using BlockCache = BlockCache; + using StatRequests = typename Base::StatRequests; + using StatResponses = typename Base::StatResponses; + +public: + struct BlockCacheStats + { + std::atomic total_memory_blocks_size = 0; + + BlockCacheStats() = default; + }; + using BlockCacheStatsMapping = std::unordered_map; + + struct BlockCacheSettings + { + const size_t max_total_size = 0; + const size_t max_size_to_evict_on_purging = 0; + + BlockCacheSettings(size_t max_total_size_, size_t max_size_to_evict_on_purging_) + : max_total_size(max_total_size_) + , max_size_to_evict_on_purging(max_size_to_evict_on_purging_) + {} + }; + using BlockCacheSettingsMapping = std::unordered_map; + + struct Settings + { + BlockCacheSettingsMapping block_cache_settings; + + /// We will allocate memory in blocks of at least that size. + /// 64 MB makes mmap overhead comparable to memory throughput. + static constexpr size_t min_memory_block_size = 64 * 1024 * 1024; + }; + + DummyRebalanceStrategy(BlockCacheMapping & caches_, const std::vector & block_cache_names); + + MemoryBlockList initialize(const std::string & caller_name) final; + + void shouldRebalance(const std::string & caller_name, size_t new_item_size) final; + + void finalize(const std::string & caller_name, MemoryBlockList memory_blocks) final; + + void initializeWithSettings(const RebalanceStrategySettings & settings_) final; + + /// Get a size of the useful data in the cache + /// Note: can return out-of-thin-air values for the sum of cache weights + /// Use only for the statistics purpose + size_t getWeight() const final; + + /// Get a number of useful items in the cache + /// Note: can return out-of-thin-air values for the sum of cache weights + /// Use only for the statistics purpose + size_t getCount() const final; + + void purge() final; + + void reset() final; + + StatResponses getStats(const StatRequests & requests) final; + +private: + std::vector block_cache_names; + BlockCacheStatsMapping block_cache_stats; + Settings settings; + + void shrinkBlockCaches(bool use_limit); + + static size_t roundUp(size_t x, size_t rounding) + { + return (x + (rounding - 1)) / rounding * rounding; + } +}; + + +template +class BuddyStaticRebalanceStrategy : public IRebalanceStrategy +{ + using Base = IRebalanceStrategy; + using BlockCacheMapping = typename Base::BlockCacheMapping; + using BlockCache = BlockCache; + using StatRequests = typename Base::StatRequests; + using StatResponses = typename Base::StatResponses; + +public: + struct BlockCacheStats + { + std::atomic total_memory_blocks_size = 0; + + BlockCacheStats() = default; + }; + using BlockCacheStatsMapping = std::unordered_map; + + struct BlockCacheSettings + { + const size_t max_size_to_evict_on_purging = 0; + + explicit BlockCacheSettings(size_t max_size_to_evict_on_purging_) + : max_size_to_evict_on_purging(max_size_to_evict_on_purging_) + {} + }; + using BlockCacheSettingsMapping = std::unordered_map; + + struct Settings + { + BlockCacheSettingsMapping block_cache_settings; + size_t memory_arena_size = 0; + + static constexpr size_t minimal_allocation_size = 4096; + }; + + BuddyStaticRebalanceStrategy(BlockCacheMapping & caches_, const std::vector & block_cache_names_); + + MemoryBlockList initialize(const std::string & caller_name) final; + + void shouldRebalance(const std::string & caller_name, size_t new_item_size) final; + + void finalize(const std::string & caller_name, MemoryBlockList memory_blocks) final; + + void initializeWithSettings(const RebalanceStrategySettings & settings_) final; + + /// Get a size of the useful data in the cache + /// Note: can return out-of-thin-air values for the sum of cache weights + /// Use only for the statistics purpose + size_t getWeight() const final; + + /// Get a number of useful items in the cache + /// Note: can return out-of-thin-air values for the sum of cache weights + /// Use only for the statistics purpose + size_t getCount() const final; + + void purge() final; + + void reset() final; + + StatResponses getStats(const StatRequests & requests) final; + +private: + std::vector block_cache_names; + BlockCacheStatsMapping block_cache_stats; + BuddyArena memory_arena; + Settings settings; + + void shrinkBlockCaches(bool use_limit); + + size_t roundUpToPowerOf2(size_t x, size_t min_result) + { + size_t power = min_result; + while (power < x) + power *= 2; + return power; + } +}; + + +template +class BuddyDynamicRebalanceStrategy : public IRebalanceStrategy +{ + using Base = IRebalanceStrategy; + using BlockCacheMapping = typename Base::BlockCacheMapping; + using BlockCache = BlockCache; + using StatRequests = typename Base::StatRequests; + using StatResponses = typename Base::StatResponses; + +public: + struct BlockCacheStats + { + std::atomic total_memory_blocks_size = 0; + + /// Scores that equals to shouldRebalance calls minus attempts to steal from this pool + std::atomic should_rebalance_to_stealing_attempts_diff = 0; + + std::atomic memory_allocation_failures = 0; + + BlockCacheStats() = default; + }; + using BlockCacheStatsMapping = std::unordered_map; + + struct BlockCacheSettings + { + const size_t max_size_to_evict_on_purging = 0; + + explicit BlockCacheSettings(size_t max_size_to_evict_on_purging_) + : max_size_to_evict_on_purging(max_size_to_evict_on_purging_) + {} + }; + using BlockCacheSettingsMapping = std::unordered_map; + + struct Settings + { + BlockCacheSettingsMapping block_cache_settings; + size_t memory_arena_capacity = 0; + size_t memory_arena_initial_size = 0; + size_t allocated_memory_multiplier = 1; + + static constexpr size_t minimal_allocation_size = 64 * 1024 * 1024; + }; + + BuddyDynamicRebalanceStrategy(BlockCacheMapping & caches_, const std::vector & block_cache_names_); + + MemoryBlockList initialize(const std::string & caller_name) final; + + void shouldRebalance(const std::string & caller_name, size_t new_item_size) final; + + void finalize(const std::string & caller_name, MemoryBlockList memory_blocks) final; + + void initializeWithSettings(const RebalanceStrategySettings & settings_) final; + + /// Get a size of the useful data in the cache + /// Note: can return out-of-thin-air values for the sum of cache weights + /// Use only for the statistics purpose + size_t getWeight() const final; + + /// Get a number of useful items in the cache + /// Note: can return out-of-thin-air values for the sum of cache weights + /// Use only for the statistics purpose + size_t getCount() const final; + + void purge() final; + + void reset() final; + + StatResponses getStats(const StatRequests & requests) final; + +private: + std::vector block_cache_names; + BlockCacheStatsMapping block_cache_stats; + BuddyArena memory_arena; + Settings settings; + + std::mutex stealing_mutex; + + void shrinkBlockCaches(bool use_limit); + + size_t roundUpToPowerOf2(size_t x, size_t min_result) + { + size_t power = min_result; + while (power < x) + power *= 2; + return power; + } +}; + + +template +class BlockCachesManager +{ + using RebalanceStrategy = IRebalanceStrategy; + using BlockCacheMapping = typename RebalanceStrategy::BlockCacheMapping; + using StatRequests = typename RebalanceStrategy::StatRequests; + using StatResponses = typename RebalanceStrategy::StatResponses; + +public: + static BlockCachesManager & instance() + { + static BlockCachesManager manager; + return manager; + } + + void initialize( + const std::vector & block_cache_names, + std::string_view rebalance_strategy_name = "", + const RebalanceStrategySettings & rebalance_strategy_settings = {}); + + /// block_caches hashmap doesn't change since initialize function so we can access it wihtout locking + BlockCache & getBlockCacheInstance(const std::string & name); + + size_t getCacheWeight() const; + + size_t getCacheCount() const; + + void purge(); + + void reset(); + + StatResponses getStats(const StatRequests & requests); + +private: + BlockCacheMapping block_caches; + /// TODO: Change shared_ptr to unique_ptr + std::shared_ptr rebalance_strategy; + + static constexpr std::string_view default_rebalance_strategy = "dummy"; + + BlockCachesManager() = default; +}; + + +// Unified interface for the access to items from the cache and items not from the cache +template +struct PayloadHolder +{ + PayloadHolder() = default; + virtual ~PayloadHolder() = default; + + virtual Payload & payload() = 0; + virtual const Payload & payload() const = 0; + Payload & operator*() { return payload(); } + const Payload & operator*() const { return payload(); } + + /// PayloadHolder should be light-weight and copy- and move-constuctible + PayloadHolder(const PayloadHolder&) = default; + PayloadHolder& operator=(const PayloadHolder&) = default; + PayloadHolder(PayloadHolder&&) noexcept = default; + PayloadHolder& operator=(PayloadHolder&&) noexcept = default; +}; + + +// Simple holder for the pointer with an unified interface with CachePayloadHolder +// Used to unify access to the cache items and non-cache items +template +struct SharedPayloadHolder : public PayloadHolder +{ + /// Adopt payload_ptr_ and own a memory for it + explicit SharedPayloadHolder(Payload * payload_ptr_) + : payload_ptr(payload_ptr_) + {} + + ~SharedPayloadHolder() override { delete payload_ptr; } + + Payload & payload() override { return *payload_ptr; } + const Payload & payload() const override { return *payload_ptr; } +private: + Payload * payload_ptr; +}; + + +template +class UnifiedCacheAdapter +{ +public: + using Key = TKey; + using HolderPtr = typename BlockCache::HolderPtr; + + struct CachePayloadHolder : public PayloadHolder { + /// Adopt holder_ptr_ and own a reference to the memory for it + explicit CachePayloadHolder(HolderPtr holder_ptr_) + : payload_ptr(reinterpret_cast(holder_ptr_->ptr())) + , holder_ptr(std::move(holder_ptr_)) + {} + + ~CachePayloadHolder() override { delete holder_ptr; } + + Payload & payload() override { return *payload_ptr; } + const Payload & payload() const override { return *payload_ptr; } + private: + Payload * payload_ptr; + HolderPtr holder_ptr; + }; + + using CachePayloadHolderPtr = std::shared_ptr; + + explicit UnifiedCacheAdapter(const std::string & block_cache_name) + : instance(BlockCachesManager::instance().getBlockCacheInstance(block_cache_name)) + { + } + + /// initialize must construct Payload object in the start of the memory block + /// get_size must return number of additional bytes that is needed fro the object (except for the object initialization) + template + CachePayloadHolderPtr getOrSet(const Key & key, + GetSizeFunc && get_size, + InitializeFunc && initialize, + bool * was_calculated) + { + auto get_full_size = [&get_size]() + { + return sizeof(Payload) + get_size(); + }; + auto * holder_ptr = instance.getOrSet(key, get_full_size, initialize, was_calculated); + if (holder_ptr == nullptr) + return nullptr; + else + return std::make_shared(holder_ptr); + } + + CachePayloadHolderPtr get(const Key & key) + { + auto * holder_ptr = instance.get(key); + return std::make_shared(holder_ptr); + } + + void reset() + { + /// TODO: implement support for the resetting some specific type of cache + BlockCachesManager::instance().reset(); + } + + size_t weight() const + { + /// TODO: implement + return 0; + } + + size_t count() const + { + /// TODO: implement + return 0; + } + +private: + BlockCache & instance; +}; + + +extern template class BlockCache; +extern template class DummyRebalanceStrategy; +extern template class BlockCachesManager; + +} diff --git a/src/Compression/CachedCompressedReadBuffer.cpp b/src/Compression/CachedCompressedReadBuffer.cpp index 8abc16ebb2a9..59eb17c16cdb 100644 --- a/src/Compression/CachedCompressedReadBuffer.cpp +++ b/src/Compression/CachedCompressedReadBuffer.cpp @@ -2,6 +2,7 @@ #include #include +#include "IO/UncompressedCache.h" #include @@ -40,31 +41,48 @@ bool CachedCompressedReadBuffer::nextImpl() /// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists. UInt128 key = cache->hash(path, file_pos); - owned_cell = cache->getOrSet(key, [&]() + size_t size_compressed = 0; + size_t size_decompressed = 0; + size_t size_compressed_without_checksum = 0; + size_t additional_bytes = 0; + + auto get_size = [&]() -> size_t { initInput(); file_in->seek(file_pos, SEEK_SET); - auto cell = std::make_shared(); - - size_t size_decompressed; - size_t size_compressed_without_checksum; - cell->compressed_size = readCompressedData(size_decompressed, size_compressed_without_checksum, false); - - if (cell->compressed_size) + size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum, false); + if (size_compressed > 0) + { + additional_bytes = codec->getAdditionalSizeAtTheEndOfBuffer(); + return size_decompressed + additional_bytes; + } + else { - cell->additional_bytes = codec->getAdditionalSizeAtTheEndOfBuffer(); - cell->data.resize(size_decompressed + cell->additional_bytes); - decompressTo(cell->data.data(), size_decompressed, size_compressed_without_checksum); + return 0; } + }; + + auto initialize = [&](void * ptr) + { + auto * data_ptr = reinterpret_cast(ptr) + sizeof(UncompressedCacheCell); + auto * uncompressed_payload = new (ptr) UncompressedCacheCell(); + uncompressed_payload->data = data_ptr; + uncompressed_payload->data_size = size_decompressed + additional_bytes; + uncompressed_payload->compressed_size = size_compressed; + uncompressed_payload->additional_bytes = additional_bytes; + + decompressTo(data_ptr, size_decompressed, size_compressed_without_checksum); + }; + + owned_region = cache->getOrSet(key, get_size, initialize); - return cell; - }); + auto & owned_cell = owned_region->payload(); - if (owned_cell->data.size() == 0) + if (owned_cell.data_size == 0) return false; - working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes); + working_buffer = Buffer(owned_cell.data, owned_cell.data + owned_cell.data_size - owned_cell.additional_bytes); /// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to /// check that we are not seeking beyond working buffer. @@ -73,7 +91,7 @@ bool CachedCompressedReadBuffer::nextImpl() " (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")", ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); - file_pos += owned_cell->compressed_size; + file_pos += owned_cell.compressed_size; return true; } @@ -88,13 +106,13 @@ CachedCompressedReadBuffer::CachedCompressedReadBuffer( void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) { /// Nothing to do if we already at required position - if (!owned_cell && file_pos == offset_in_compressed_file + if (!owned_region && file_pos == offset_in_compressed_file && ((!buffer().empty() && offset() == offset_in_decompressed_block) || nextimpl_working_buffer_offset == offset_in_decompressed_block)) return; - if (owned_cell && - offset_in_compressed_file == file_pos - owned_cell->compressed_size && + if (owned_region && + offset_in_compressed_file == file_pos - owned_region->payload().compressed_size && offset_in_decompressed_block <= working_buffer.size()) { pos = working_buffer.begin() + offset_in_decompressed_block; @@ -107,7 +125,7 @@ void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t o bytes += offset(); /// No data, everything discarded resetWorkingBuffer(); - owned_cell.reset(); + owned_region.reset(); /// Remember required offset in decompressed block which will be set in /// the next ReadBuffer::next() call diff --git a/src/Compression/CachedCompressedReadBuffer.h b/src/Compression/CachedCompressedReadBuffer.h index 4493a40156c8..2dad806d32d8 100644 --- a/src/Compression/CachedCompressedReadBuffer.h +++ b/src/Compression/CachedCompressedReadBuffer.h @@ -19,6 +19,9 @@ namespace DB */ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer { + using UncompressedCellHolder = PayloadHolder; + using UncompressedCellHolderPtr = std::shared_ptr; + private: std::function()> file_in_creator; UncompressedCache * cache; @@ -30,7 +33,7 @@ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadB size_t file_pos; /// A piece of data from the cache, or a piece of read data that we put into the cache. - UncompressedCache::MappedPtr owned_cell; + UncompressedCellHolderPtr owned_region; void initInput(); diff --git a/src/Formats/MarkInCompressedFile.h b/src/Formats/MarkInCompressedFile.h index 1cd545e1a034..6907a6f41228 100644 --- a/src/Formats/MarkInCompressedFile.h +++ b/src/Formats/MarkInCompressedFile.h @@ -40,15 +40,69 @@ struct MarkInCompressedFile }; -class MarksInCompressedFile : public PODArray +class MarksInCompressedFile final: boost::noncopyable, Allocator { public: - explicit MarksInCompressedFile(size_t n) : PODArray(n) {} + MarksInCompressedFile() = default; - void read(ReadBuffer & buffer, size_t from, size_t count) + explicit MarksInCompressedFile(size_t marks_size_) + : marks_size(marks_size_) + , marks_data(Allocator::alloc(marks_size * sizeof(MarkInCompressedFile))) + , owns_marks_data(true) + {} + + MarksInCompressedFile(size_t marks_size_, void * marks_data_) + : marks_size(marks_size_) + , marks_data(marks_data_) + , owns_marks_data(false) + {} + + MarksInCompressedFile(MarksInCompressedFile&& rhs) noexcept + { + *this = std::move(rhs); + } + + MarksInCompressedFile& operator=(MarksInCompressedFile&& rhs) noexcept + { + std::swap(marks_size, rhs.marks_size); + std::swap(owns_marks_data, rhs.owns_marks_data); + std::swap(marks_data, rhs.marks_data); + + return *this; + } + + inline void read(ReadBuffer & buffer, size_t from, size_t count) { buffer.readStrict(reinterpret_cast(data() + from), count * sizeof(MarkInCompressedFile)); } + + inline const MarkInCompressedFile * data() const { return static_cast(marks_data); } + + inline MarkInCompressedFile * data() { return static_cast(marks_data); } + + inline const MarkInCompressedFile & operator[](size_t i) const { return data()[i]; } + + inline MarkInCompressedFile & operator[](size_t i) { return data()[i]; } + + inline size_t size() const { return marks_size; } + + ~MarksInCompressedFile() + { + dealloc(); + } +private: + inline void dealloc() + { + if (owns_marks_data && marks_data) + { + Allocator::free(marks_data, marks_size * sizeof(MarkInCompressedFile)); + marks_data = nullptr; /// To avoid double free if next alloc will throw an exception + } + } + + size_t marks_size = 0; + void * marks_data = nullptr; + bool owns_marks_data = false; }; } diff --git a/src/IO/UncompressedCache.h b/src/IO/UncompressedCache.h index 3d1c907d364c..ed96d9ea2628 100644 --- a/src/IO/UncompressedCache.h +++ b/src/IO/UncompressedCache.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -20,30 +21,22 @@ namespace DB struct UncompressedCacheCell { - Memory<> data; + char * data; + size_t data_size; size_t compressed_size; UInt32 additional_bytes; }; -struct UncompressedSizeWeightFunction -{ - size_t operator()(const UncompressedCacheCell & x) const - { - return x.data.size(); - } -}; - -/** Cache of decompressed blocks for implementation of CachedCompressedReadBuffer. thread-safe. - */ -class UncompressedCache : public CacheBase +class UncompressedCache : public UnifiedCacheAdapter { private: - using Base = CacheBase; + using Base = UnifiedCacheAdapter; + using HolderPtr = Base::CachePayloadHolderPtr; public: - explicit UncompressedCache(size_t max_size_in_bytes, const String & uncompressed_cache_policy = "") - : Base(max_size_in_bytes, 0, uncompressed_cache_policy) {} + explicit UncompressedCache(const String & block_cache_name) : Base(block_cache_name) + {} /// Calculate key from path to file and offset. static UInt128 hash(const String & path_to_file, size_t offset) @@ -58,26 +51,30 @@ class UncompressedCache : public CacheBase - MappedPtr getOrSet(const Key & key, LoadFunc && load) + template + HolderPtr getOrSet(const Key & key, GetSizeFunc && get_size, InitializeFunc && initialize) { - auto result = Base::getOrSet(key, std::forward(load)); + bool was_calculated = false; + auto result = Base::getOrSet(key, std::forward(get_size), std::forward(initialize), &was_calculated); - if (result.second) + if (was_calculated) ProfileEvents::increment(ProfileEvents::UncompressedCacheMisses); else ProfileEvents::increment(ProfileEvents::UncompressedCacheHits); - return result.first; + return result; } -private: - void onRemoveOverflowWeightLoss(size_t weight_loss) override - { - ProfileEvents::increment(ProfileEvents::UncompressedCacheWeightLost, weight_loss); - } +// TODO: Add support for the ProfileEvents::UncompressedCacheWeightLost +// private: +// void onRemoveOverflowWeightLoss(size_t weight_loss) override +// { +// ProfileEvents::increment(ProfileEvents::UncompressedCacheWeightLost, weight_loss); +// } }; + + using UncompressedCachePtr = std::shared_ptr; } diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index 338ae1bbbfd9..24dbc6aee0ea 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -620,6 +621,69 @@ void AsynchronousMetrics::update(TimePoint update_time) } } + // Temporary add cache tracking + // TODO: Remove + { + auto & unified_cache = BlockCachesManager::instance(); + + /// Basic tracking + const auto unified_cache_weight = unified_cache.getCacheWeight(); + const auto unified_cache_count = unified_cache.getCacheCount(); + new_values["UnifiedCacheBytes"] = unified_cache_weight; + new_values["UnifiedCacheCount"] = unified_cache_count; + LOG_TRACE(log, + "UnifiedCacheTracking: weight is {}, count is {}", + ReadableSize(unified_cache_weight), + unified_cache_count); + + /// Metrics + using RebalanceStrategy = IRebalanceStrategy; + using StatRequest = RebalanceStrategy::StatRequest; + using StatRequests = RebalanceStrategy::StatRequests; + + StatRequests requests + { + StatRequest{.block_cache_name = "marks", .stat_name = "should_rebalance_to_stealing_attempts_diff"}, + StatRequest{.block_cache_name = "uncompressed", .stat_name = "should_rebalance_to_stealing_attempts_diff"}, + StatRequest{.block_cache_name = "marks", .stat_name = "memory_allocation_failures"}, + StatRequest{.block_cache_name = "uncompressed", .stat_name = "memory_allocation_failures"}, + StatRequest{.block_cache_name = "marks", .stat_name = "total_memory_blocks_size"}, + StatRequest{.block_cache_name = "uncompressed", .stat_name = "total_memory_blocks_size"}, + }; + auto responses = unified_cache.getStats(requests); + + LOG_TRACE(log, + "UnifiedCacheStats: in {} pool {} is {}", + requests[0].block_cache_name, + requests[0].stat_name, + responses[0].safeGet()); + LOG_TRACE(log, + "UnifiedCacheStats: in {} pool {} is {}", + requests[1].block_cache_name, + requests[1].stat_name, + responses[1].safeGet()); + LOG_TRACE(log, + "UnifiedCacheStats: in {} pool {} is {}", + requests[2].block_cache_name, + requests[2].stat_name, + responses[2].safeGet()); + LOG_TRACE(log, + "UnifiedCacheStats: in {} pool {} is {}", + requests[3].block_cache_name, + requests[3].stat_name, + responses[3].safeGet()); + LOG_TRACE(log, + "UnifiedCacheStats: in {} pool {} is {}", + requests[4].block_cache_name, + requests[4].stat_name, + responses[4].safeGet()); + LOG_TRACE(log, + "UnifiedCacheStats: in {} pool {} is {}", + requests[5].block_cache_name, + requests[5].stat_name, + responses[5].safeGet()); + } + #if USE_ROCKSDB { if (auto metadata_cache = getContext()->tryGetMergeTreeMetadataCache()) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 3c406058cb50..570af621d4d9 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1701,14 +1701,14 @@ ProcessList::Element * Context::getProcessListElement() const } -void Context::setUncompressedCache(size_t max_size_in_bytes, const String & uncompressed_cache_policy) +void Context::setUncompressedCache(const String & block_cache_name) { auto lock = getLock(); if (shared->uncompressed_cache) throw Exception("Uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR); - shared->uncompressed_cache = std::make_shared(max_size_in_bytes, uncompressed_cache_policy); + shared->uncompressed_cache = std::make_shared(block_cache_name); } @@ -1727,14 +1727,14 @@ void Context::dropUncompressedCache() const } -void Context::setMarkCache(size_t cache_size_in_bytes, const String & mark_cache_policy) +void Context::setMarkCache(const String & block_cache_name) { auto lock = getLock(); if (shared->mark_cache) throw Exception("Mark cache has been already created.", ErrorCodes::LOGICAL_ERROR); - shared->mark_cache = std::make_shared(cache_size_in_bytes, mark_cache_policy); + shared->mark_cache = std::make_shared(block_cache_name); } MarkCachePtr Context::getMarkCache() const @@ -1762,14 +1762,14 @@ ThreadPool & Context::getLoadMarksThreadpool() const return *shared->load_marks_threadpool; } -void Context::setIndexUncompressedCache(size_t max_size_in_bytes) +void Context::setIndexUncompressedCache(const String & block_cache_name) { auto lock = getLock(); if (shared->index_uncompressed_cache) throw Exception("Index uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR); - shared->index_uncompressed_cache = std::make_shared(max_size_in_bytes); + shared->index_uncompressed_cache = std::make_shared(block_cache_name); } @@ -1788,14 +1788,14 @@ void Context::dropIndexUncompressedCache() const } -void Context::setIndexMarkCache(size_t cache_size_in_bytes) +void Context::setIndexMarkCache(const String & block_cache_name) { auto lock = getLock(); if (shared->index_mark_cache) throw Exception("Index mark cache has been already created.", ErrorCodes::LOGICAL_ERROR); - shared->index_mark_cache = std::make_shared(cache_size_in_bytes); + shared->index_mark_cache = std::make_shared(block_cache_name); } MarkCachePtr Context::getIndexMarkCache() const diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 6a417cbd3de9..84a5526e5f2f 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -802,23 +802,23 @@ class Context: public std::enable_shared_from_this void setSystemZooKeeperLogAfterInitializationIfNeeded(); /// Create a cache of uncompressed blocks of specified size. This can be done only once. - void setUncompressedCache(size_t max_size_in_bytes, const String & uncompressed_cache_policy); + void setUncompressedCache(const String & block_cache_name); std::shared_ptr getUncompressedCache() const; void dropUncompressedCache() const; /// Create a cache of marks of specified size. This can be done only once. - void setMarkCache(size_t cache_size_in_bytes, const String & mark_cache_policy); + void setMarkCache(const String & block_cache_name); std::shared_ptr getMarkCache() const; void dropMarkCache() const; ThreadPool & getLoadMarksThreadpool() const; /// Create a cache of index uncompressed blocks of specified size. This can be done only once. - void setIndexUncompressedCache(size_t max_size_in_bytes); + void setIndexUncompressedCache(const String & block_cache_name); std::shared_ptr getIndexUncompressedCache() const; void dropIndexUncompressedCache() const; /// Create a cache of index marks of specified size. This can be done only once. - void setIndexMarkCache(size_t cache_size_in_bytes); + void setIndexMarkCache(const String & block_cache_name); std::shared_ptr getIndexMarkCache() const; void dropIndexMarkCache() const; diff --git a/src/Storages/MarkCache.h b/src/Storages/MarkCache.h index 9095bf6bb356..2bfe949facff 100644 --- a/src/Storages/MarkCache.h +++ b/src/Storages/MarkCache.h @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -18,30 +19,15 @@ namespace ProfileEvents namespace DB { -/// Estimate of number of bytes in cache for marks. -struct MarksWeightFunction -{ - /// We spent additional bytes on key in hashmap, linked lists, shared pointers, etc ... - static constexpr size_t MARK_CACHE_OVERHEAD = 128; - - size_t operator()(const MarksInCompressedFile & marks) const - { - return marks.size() * sizeof(MarkInCompressedFile) + MARK_CACHE_OVERHEAD; - } -}; - - -/** Cache of 'marks' for StorageMergeTree. - * Marks is an index structure that addresses ranges in column file, corresponding to ranges of primary key. - */ -class MarkCache : public CacheBase +class MarkCache : public UnifiedCacheAdapter { private: - using Base = CacheBase; + using Base = UnifiedCacheAdapter; + using HolderPtr = Base::CachePayloadHolderPtr; public: - explicit MarkCache(size_t max_size_in_bytes, const String & mark_cache_policy = "") - : Base(max_size_in_bytes, 0, mark_cache_policy) {} + explicit MarkCache(const String & block_cache_name) : Base(block_cache_name) + {} /// Calculate key from path to file and offset. static UInt128 hash(const String & path_to_file) @@ -55,16 +41,17 @@ class MarkCache : public CacheBase - MappedPtr getOrSet(const Key & key, LoadFunc && load) + template + HolderPtr getOrSet(const Key & key, GetSizeFunc && get_size, InitializeFunc && initialize) { - auto result = Base::getOrSet(key, load); - if (result.second) + bool was_calculated = false; + auto result = Base::getOrSet(key, std::forward(get_size), std::forward(initialize), &was_calculated); + if (was_calculated) ProfileEvents::increment(ProfileEvents::MarkCacheMisses); else ProfileEvents::increment(ProfileEvents::MarkCacheHits); - return result.first; + return result; } }; diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index 397a9d826556..78e399fea8c0 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -86,11 +86,10 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz throw Exception(ErrorCodes::LOGICAL_ERROR, "Column index: {} is out of range [0, {})", column_index, columns_in_mark); #endif - return (*marks)[row_index * columns_in_mark + column_index]; + return (*marks).payload()[row_index * columns_in_mark + column_index]; } - -MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() +void MergeTreeMarksLoader::readIntoMarks(MarksInCompressedFile & marks_to_load) { /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. MemoryTrackerBlockerInThread temporarily_disable_memory_tracker; @@ -99,8 +98,6 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark); size_t expected_uncompressed_size = mark_size * marks_count; - auto res = std::make_shared(marks_count * columns_in_mark); - if (!index_granularity_info.mark_type.compressed && expected_uncompressed_size != file_size) throw Exception( ErrorCodes::CORRUPTED_DATA, @@ -118,7 +115,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() if (!index_granularity_info.mark_type.adaptive) { /// Read directly to marks. - reader->readStrict(reinterpret_cast(res->data()), expected_uncompressed_size); + reader->readStrict(reinterpret_cast(marks_to_load.data()), expected_uncompressed_size); if (!reader->eof()) throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, @@ -131,7 +128,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() size_t granularity; while (!reader->eof()) { - res->read(*reader, i * columns_in_mark, columns_in_mark); + marks_to_load.read(*reader, i * columns_in_mark, columns_in_mark); readIntBinary(granularity, *reader); ++i; } @@ -140,28 +137,44 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}", mrk_path); } - res->protect(); - return res; + /// TODO: Implement similar functionalities + // res->protect(); +} + +MergeTreeMarksLoader::MarksHolderPtr MergeTreeMarksLoader::loadMarksImpl() +{ + const size_t marks_size = marks_count * columns_in_mark; + + auto * marks_payload = new MarksInCompressedFile(marks_size); + readIntoMarks(*marks_payload); + return std::make_shared(marks_payload); } -MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks() +MergeTreeMarksLoader::MarksHolderPtr MergeTreeMarksLoader::loadMarks() { - MarkCache::MappedPtr loaded_marks; + MergeTreeMarksLoader::MarksHolderPtr loaded_marks; + + const size_t marks_size = marks_count * columns_in_mark; if (mark_cache) { auto key = mark_cache->hash(fs::path(data_part_storage->getFullPath()) / mrk_path); if (save_marks_in_cache) { - auto callback = [this]{ return loadMarksImpl(); }; - loaded_marks = mark_cache->getOrSet(key, callback); + auto get_size = [marks_size](){ return marks_size * sizeof(MarkInCompressedFile); }; + auto initialize = [this, marks_size](void * ptr) + { + auto * data_ptr = reinterpret_cast(reinterpret_cast(ptr) + sizeof(MarksInCompressedFile)); + auto * marks_payload = new (ptr) MarksInCompressedFile(marks_size, data_ptr); + readIntoMarks(*marks_payload); + }; + loaded_marks = mark_cache->getOrSet(key, get_size, initialize); } else - { loaded_marks = mark_cache->get(key); - if (!loaded_marks) - loaded_marks = loadMarksImpl(); - } + + if (!loaded_marks) + loaded_marks = loadMarksImpl(); } else loaded_marks = loadMarksImpl(); @@ -176,13 +189,13 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks() return loaded_marks; } -std::future MergeTreeMarksLoader::loadMarksAsync() +std::future MergeTreeMarksLoader::loadMarksAsync() { ThreadGroupStatusPtr thread_group; if (CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()) thread_group = CurrentThread::get().getThreadGroup(); - auto task = std::make_shared>([thread_group, this] + auto task = std::make_shared>([thread_group, this] { setThreadName("loadMarksThread"); diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.h b/src/Storages/MergeTree/MergeTreeMarksLoader.h index 4497339d7673..b2921a8660d1 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.h +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.h @@ -3,7 +3,9 @@ #include #include #include +#include #include +#include namespace DB @@ -15,7 +17,9 @@ class Threadpool; class MergeTreeMarksLoader { public: - using MarksPtr = MarkCache::MappedPtr; + using SharedMarksHolder = SharedPayloadHolder; + using MarksHolder = PayloadHolder; + using MarksHolderPtr = std::shared_ptr; MergeTreeMarksLoader( DataPartStoragePtr data_part_storage_, @@ -40,14 +44,15 @@ class MergeTreeMarksLoader const MergeTreeIndexGranularityInfo & index_granularity_info; bool save_marks_in_cache = false; size_t columns_in_mark; - MarkCache::MappedPtr marks; + MarksHolderPtr marks; ReadSettings read_settings; - MarkCache::MappedPtr loadMarks(); - std::future loadMarksAsync(); - MarkCache::MappedPtr loadMarksImpl(); + void readIntoMarks(MarksInCompressedFile & marks_to_load); + MarksHolderPtr loadMarksImpl(); + MarksHolderPtr loadMarks(); + std::future loadMarksAsync(); - std::future future; + std::future future; ThreadPool * load_marks_threadpool; };