Skip to content
51 changes: 47 additions & 4 deletions programs/local/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Common/TLDListsHolder.h>
#include <Common/quoteString.h>
#include <Common/randomSeed.h>
#include <Common/UnifiedCache.h>
#include <Loggers/Loggers.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
Expand Down Expand Up @@ -559,27 +560,69 @@ void LocalServer::processConfig()
/// There is no need for concurrent queries, override max_concurrent_queries.
global_context->getProcessList().setMaxSize(0);

/// Set up caches.

/// Dummy strategy
// auto block_cache_size = 1000_MiB;
// auto max_size_to_evict_on_purging = 300_MiB;
// std::string rebalance_strategy_name = "dummy";
// RebalanceStrategySettings rebalance_strategy_settings;
// rebalance_strategy_settings.setBlockCacheSetting("max_total_size", "marks", Field(block_cache_size));
// rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging));
// rebalance_strategy_settings.setBlockCacheSetting("max_total_size", "uncompressed", Field(block_cache_size));
// rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging));

/// Buddy static strategy
// auto memory_arena_size = 4_GiB;
// auto max_size_to_evict_on_purging = 300_MiB;
// std::string rebalance_strategy_name = "buddy_static";
// RebalanceStrategySettings rebalance_strategy_settings;
// rebalance_strategy_settings.set("memory_arena_size", Field(memory_arena_size));
// rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging));
// rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging));

/// Buddy dynamic strategy
auto memory_arena_capacity = 4_GiB;
auto memory_arena_initial_size = 128_MiB;
size_t allocated_memory_multiplier = 2;
auto max_size_to_evict_on_purging = 300_MiB;
std::string rebalance_strategy_name = "buddy_dynamic";
RebalanceStrategySettings rebalance_strategy_settings;
rebalance_strategy_settings.set("memory_arena_capacity", Field(memory_arena_capacity));
rebalance_strategy_settings.set("memory_arena_initial_size", Field(memory_arena_initial_size));
rebalance_strategy_settings.set("allocated_memory_multiplier", Field(allocated_memory_multiplier));
rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging));
rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging));

auto & block_cache_manager = DB::BlockCachesManager<UInt128>::instance();
std::vector<std::string> block_cache_names = {"marks", "uncompressed"};
block_cache_manager.initialize(block_cache_names, rebalance_strategy_name, rebalance_strategy_settings);

/// Size of cache for uncompressed blocks. Zero means disabled.
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", "");
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0);
if (uncompressed_cache_size)
global_context->setUncompressedCache(uncompressed_cache_size, uncompressed_cache_policy);
global_context->setUncompressedCache("global");
// global_context->setUncompressedCache(uncompressed_cache_size, uncompressed_cache_policy);

/// Size of cache for marks (index of MergeTree family of tables).
String mark_cache_policy = config().getString("mark_cache_policy", "");
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);
if (mark_cache_size)
global_context->setMarkCache(mark_cache_size, mark_cache_policy);
global_context->setMarkCache("global");
// global_context->setMarkCache(mark_cache_size, mark_cache_policy);

/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);
global_context->setIndexUncompressedCache("global");
// global_context->setIndexUncompressedCache(index_uncompressed_cache_size);

/// Size of cache for index marks (index of MergeTree skip indices).
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);
global_context->setIndexMarkCache("global");
// global_context->setIndexMarkCache(index_mark_cache_size);

/// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
Expand Down
47 changes: 43 additions & 4 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <Poco/Net/NetException.h>
#include <Poco/Util/HelpFormatter.h>
#include <Poco/Environment.h>
#include <Common/UnifiedCache.h>
#include <Common/scope_guard_safe.h>
#include <Common/logger_useful.h>
#include <base/phdr_cache.h>
Expand Down Expand Up @@ -1145,6 +1146,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
}

total_memory_tracker.setHardLimit(max_server_memory_usage);
/// TODO: Add setting
// total_memory_tracker.setLimitToPurgeCache(1500_MiB);
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);

Expand Down Expand Up @@ -1393,6 +1396,42 @@ int Server::main(const std::vector<std::string> & /*args*/)

/// Set up caches.

/// Dummy strategy
// auto block_cache_size = 1000_MiB;
// auto max_size_to_evict_on_purging = 300_MiB;
// std::string rebalance_strategy_name = "dummy";
// RebalanceStrategySettings rebalance_strategy_settings;
// rebalance_strategy_settings.setBlockCacheSetting("max_total_size", "marks", Field(block_cache_size));
// rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging));
// rebalance_strategy_settings.setBlockCacheSetting("max_total_size", "uncompressed", Field(block_cache_size));
// rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging));

/// Buddy static strategy
// auto memory_arena_size = 4_GiB;
// auto max_size_to_evict_on_purging = 300_MiB;
// std::string rebalance_strategy_name = "buddy_static";
// RebalanceStrategySettings rebalance_strategy_settings;
// rebalance_strategy_settings.set("memory_arena_size", Field(memory_arena_size));
// rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging));
// rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging));

/// Buddy dynamic strategy
auto memory_arena_capacity = 4_GiB;
UInt64 memory_arena_initial_size = 0;
size_t allocated_memory_multiplier = 1;
auto max_size_to_evict_on_purging = 300_MiB;
std::string rebalance_strategy_name = "buddy_dynamic";
RebalanceStrategySettings rebalance_strategy_settings;
rebalance_strategy_settings.set("memory_arena_capacity", Field(memory_arena_capacity));
rebalance_strategy_settings.set("memory_arena_initial_size", Field(memory_arena_initial_size));
rebalance_strategy_settings.set("allocated_memory_multiplier", Field(allocated_memory_multiplier));
rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "marks", Field(max_size_to_evict_on_purging));
rebalance_strategy_settings.setBlockCacheSetting("max_size_to_evict_on_purging", "uncompressed", Field(max_size_to_evict_on_purging));

auto & block_cache_manager = DB::BlockCachesManager<UInt128>::instance();
std::vector<std::string> block_cache_names = {"marks", "uncompressed"};
block_cache_manager.initialize(block_cache_names, rebalance_strategy_name, rebalance_strategy_settings);

/// Lower cache size on low-memory systems.
double cache_size_to_ram_max_ratio = config().getDouble("cache_size_to_ram_max_ratio", 0.5);
size_t max_cache_size = static_cast<size_t>(memory_amount * cache_size_to_ram_max_ratio);
Expand All @@ -1407,7 +1446,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Uncompressed cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setUncompressedCache(uncompressed_cache_size, uncompressed_cache_policy);
global_context->setUncompressedCache("uncompressed");

/// Load global settings from default_profile and system_profile.
global_context->setDefaultProfiles(config());
Expand All @@ -1434,17 +1473,17 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Mark cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(mark_cache_size));
}
global_context->setMarkCache(mark_cache_size, mark_cache_policy);
global_context->setMarkCache("marks");

/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);
global_context->setIndexUncompressedCache("uncompressed");

/// Size of cache for index marks (index of MergeTree skip indices).
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);
global_context->setIndexMarkCache("marks");

/// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
Expand Down
14 changes: 14 additions & 0 deletions src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "MemoryTracker.h"

#include <IO/WriteHelpers.h>
#include <Common/UnifiedCache.h>
#include <Common/VariableContext.h>
#include <Interpreters/TraceCollector.h>
#include <Common/Exception.h>
Expand Down Expand Up @@ -168,6 +169,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT

Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);
Int64 current_limit_to_purge_cache = limit_to_purge_cache.load(std::memory_order_relaxed);

bool memory_limit_exceeded_ignored = false;

Expand Down Expand Up @@ -237,6 +239,14 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
}
#endif

/// TODO: Rewrite with a rebalance strategy
if (unlikely(current_limit_to_purge_cache && will_be > current_limit_to_purge_cache))
{
auto & cache_instance = DB::BlockCachesManager<UInt128>::instance();
/// TODO: Make a better shrinkage with a relative ration to the full amount of memory
cache_instance.purge();
}

if (unlikely(current_hard_limit && will_be > limit_to_check))
{
if (memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
Expand Down Expand Up @@ -456,6 +466,10 @@ void MemoryTracker::setHardLimit(Int64 value)
hard_limit.store(value, std::memory_order_relaxed);
}

void MemoryTracker::setLimitToPurgeCache(Int64 value)
{
limit_to_purge_cache.store(value, std::memory_order_relaxed);
}

void MemoryTracker::setOrRaiseHardLimit(Int64 value)
{
Expand Down
6 changes: 6 additions & 0 deletions src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class MemoryTracker
std::atomic<Int64> soft_limit {0};
std::atomic<Int64> hard_limit {0};
std::atomic<Int64> profiler_limit {0};
std::atomic<Int64> limit_to_purge_cache {0};

static std::atomic<Int64> free_memory_in_allocator_arenas;

Expand Down Expand Up @@ -116,6 +117,7 @@ class MemoryTracker

void setSoftLimit(Int64 value);
void setHardLimit(Int64 value);
void setLimitToPurgeCache(Int64 value);

Int64 getHardLimit() const
{
Expand All @@ -125,6 +127,10 @@ class MemoryTracker
{
return soft_limit.load(std::memory_order_relaxed);
}
Int64 getLimitToPurgeCache() const
{
return limit_to_purge_cache.load(std::memory_order_relaxed);
}

/** Set limit if it was not set.
* Otherwise, set limit to new value, if new value is greater than previous limit.
Expand Down
Loading