From b5d74783415a6a45a37c3a4ed98b50a311a6774c Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 19 Mar 2026 09:32:54 +0800 Subject: [PATCH 1/5] enable memory detect. --- cpp/CMakeLists.txt | 5 ++ cpp/src/common/allocator/alloc_base.h | 86 +++++++++---------- cpp/src/common/allocator/byte_stream.h | 31 ++++--- cpp/src/common/allocator/mem_alloc.cc | 65 ++++++++------ cpp/src/common/tablet.cc | 84 ++++++++++-------- cpp/src/compress/gzip_compressor.h | 4 +- cpp/src/encoding/int32_rle_encoder.h | 2 +- cpp/src/encoding/int64_rle_decoder.h | 12 +-- cpp/src/encoding/int64_rle_encoder.h | 2 +- cpp/src/reader/tsfile_series_scan_iterator.cc | 4 +- .../table_view/tsfile_writer_table_test.cc | 54 +++++++++++- 11 files changed, 216 insertions(+), 133 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index c85150d8f..f89675a7d 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -58,6 +58,11 @@ if (${COV_ENABLED}) message("add_definitions -DCOV_ENABLED=1") endif () +if (ENABLE_MEM_STAT) + add_definitions(-DENABLE_MEM_STAT) + message("add_definitions -DENABLE_MEM_STAT") +endif () + if (NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE) diff --git a/cpp/src/common/allocator/alloc_base.h b/cpp/src/common/allocator/alloc_base.h index effe267d0..d34c9cff7 100644 --- a/cpp/src/common/allocator/alloc_base.h +++ b/cpp/src/common/allocator/alloc_base.h @@ -30,46 +30,33 @@ namespace common { enum AllocModID { __FIRST_MOD_ID = 0, - // if you are sure you will not consume too much memory, you can use - // MOD_DEFAULT. MOD_DEFAULT = 0, - MOD_MEMTABLE = 1, - MOD_SCHEMA = 2, - MOD_SQL = 3, - MOD_NET = 4, - MOD_NET_DATA = 5, - MOD_TVLIST_DATA = 6, - MOD_TVLIST_OBJ = 7, - MOD_TSBLOCK = 8, - MOD_CONTAINER = 9, - MOD_TSSTORE_OBJ = 10, - MOD_FLUSH_TASK_OBJ = 11, - MOD_PAGE_WRITER_OUTPUT_STREAM = 12, - MOD_CW_PAGES_DATA = 13, - MOD_CHUNK_WRITER_OBJ = 14, - MOD_STATISTIC_OBJ = 15, - MOD_ENCODER_OBJ = 16, - MOD_DECODER_OBJ = 17, - MOD_TSFILE_WRITER_META = 18, - MOD_TSFILE_WRITE_STREAM = 19, - MOD_TIMESERIES_INDEX_OBJ = 20, - MOD_BLOOM_FILTER = 21, - MOD_OPEN_FILE_OBJ = 22, - MOD_TSFILE_READER = 23, - MOD_CHUNK_READER = 24, - MOD_COMPRESSOR_OBJ = 25, - MOD_ARRAY = 26, - MOD_HASH_TABLE = 27, - MOD_WRITER_INDEX_NODE = 28, - MOD_TS2DIFF_OBJ = 29, - MOD_BITENCODE_OBJ = 30, - MOD_DICENCODE_OBJ = 31, - MOD_ZIGZAG_OBJ = 32, - MOD_DEVICE_META_ITER = 33, - MOD_DEVICE_TASK_ITER = 34, - MOD_DEVICE_ORDER_TSBLOCK_READER = 35, - __LAST_MOD_ID = 36, // prev + 1, - __MAX_MOD_ID = 127, // leave 1 bit to detect header size + MOD_TVLIST_DATA = 1, + MOD_TSBLOCK = 2, + MOD_PAGE_WRITER_OUTPUT_STREAM = 3, + MOD_CW_PAGES_DATA = 4, + MOD_STATISTIC_OBJ = 5, + MOD_ENCODER_OBJ = 6, + MOD_DECODER_OBJ = 7, + MOD_TSFILE_WRITER_META = 8, + MOD_TSFILE_WRITE_STREAM = 9, + MOD_TIMESERIES_INDEX_OBJ = 10, + MOD_BLOOM_FILTER = 11, + MOD_TSFILE_READER = 12, + MOD_CHUNK_READER = 13, + MOD_COMPRESSOR_OBJ = 14, + MOD_ARRAY = 15, + MOD_HASH_TABLE = 16, + MOD_WRITER_INDEX_NODE = 17, + MOD_TS2DIFF_OBJ = 18, + MOD_BITENCODE_OBJ = 19, + MOD_DICENCODE_OBJ = 20, + MOD_ZIGZAG_OBJ = 21, + MOD_DEVICE_META_ITER = 22, + MOD_DEVICE_TASK_ITER = 23, + MOD_TABLET = 24, + __LAST_MOD_ID = 25, + __MAX_MOD_ID = 127, }; extern const char* g_mod_names[__LAST_MOD_ID]; @@ -84,22 +71,27 @@ class ModStat { ModStat() : stat_arr_(NULL) {} static ModStat& get_instance() { - /* - * This is the singleton of Mod Memory Statistic. - * gms is short for Global Mod Statistic - */ static ModStat gms; +#ifdef ENABLE_MEM_STAT + if (UNLIKELY(gms.stat_arr_ == NULL)) { + gms.init(); + } +#endif return gms; } void init(); void destroy(); INLINE void update_alloc(AllocModID mid, int32_t size) { - // ASSERT(mid < __LAST_MOD_ID); - // ATOMIC_FAA(get_item(mid), size); +#ifdef ENABLE_MEM_STAT + ASSERT(mid < __LAST_MOD_ID); + ATOMIC_FAA(get_item(mid), size); +#endif } void update_free(AllocModID mid, uint32_t size) { - // ASSERT(mid < __LAST_MOD_ID); - // ATOMIC_FAA(get_item(mid), 0 - size); +#ifdef ENABLE_MEM_STAT + ASSERT(mid < __LAST_MOD_ID); + ATOMIC_FAA(get_item(mid), 0 - size); +#endif } void print_stat(); diff --git a/cpp/src/common/allocator/byte_stream.h b/cpp/src/common/allocator/byte_stream.h index 4e1029ea4..b9c8ca1a0 100644 --- a/cpp/src/common/allocator/byte_stream.h +++ b/cpp/src/common/allocator/byte_stream.h @@ -896,10 +896,11 @@ class SerializationUtil { FORCE_INLINE static int chunk_read_all_data(ByteStream& in, ByteStream& out, size_t chunk_size = 4096) { - char* buffer = new char[chunk_size]; + char* buffer = static_cast( + mem_alloc(chunk_size, MOD_DEFAULT)); + if (buffer == nullptr) return E_OOM; int ret = common::E_OK; while (in.remaining_size() > 0) { - // Adjust read size based on remaining input size uint32_t bytes_to_read = static_cast( std::min(chunk_size, static_cast(in.remaining_size()))); @@ -913,7 +914,7 @@ class SerializationUtil { break; } } - delete[] buffer; + mem_free(buffer); return ret; } @@ -1172,16 +1173,18 @@ class SerializationUtil { str = nullptr; return ret; } else { - char* tmp_buf = static_cast(malloc(len)); + char* tmp_buf = static_cast( + mem_alloc(len, MOD_DEFAULT)); + if (tmp_buf == nullptr) return E_OOM; if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) { - free(tmp_buf); + mem_free(tmp_buf); return ret; } else if (len != read_len) { - free(tmp_buf); + mem_free(tmp_buf); ret = E_BUF_NOT_ENOUGH; } else { str = new std::string(tmp_buf, len); - free(tmp_buf); + mem_free(tmp_buf); } } } @@ -1194,7 +1197,9 @@ class SerializationUtil { int32_t read_len = 0; if (RET_FAIL(read_var_int(len, in))) { } else { - char* tmp_buf = (char*)malloc(len + 1); + char* tmp_buf = static_cast( + mem_alloc(len + 1, MOD_DEFAULT)); + if (tmp_buf == nullptr) return E_OOM; tmp_buf[len] = '\0'; if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) { } else if (len != read_len) { @@ -1202,7 +1207,7 @@ class SerializationUtil { } else { str = std::string(tmp_buf); } - free(tmp_buf); + mem_free(tmp_buf); } return ret; } @@ -1220,7 +1225,9 @@ class SerializationUtil { if (RET_FAIL(read_i32(len, in))) { } else { int32_t read_len = 0; - char* tmp_buf = static_cast(malloc(len + 1)); + char* tmp_buf = static_cast( + mem_alloc(len + 1, MOD_DEFAULT)); + if (tmp_buf == nullptr) return E_OOM; tmp_buf[len] = '\0'; if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) { } else if (len != read_len) { @@ -1228,7 +1235,7 @@ class SerializationUtil { } else { str = std::string(tmp_buf); } - free(tmp_buf); + mem_free(tmp_buf); } return ret; } @@ -1308,7 +1315,7 @@ FORCE_INLINE char* get_bytes_from_bytestream(ByteStream& bs) { return nullptr; } uint32_t size = bs.total_size(); - char* ret_buf = (char*)malloc(size); + char* ret_buf = static_cast(mem_alloc(size, MOD_DEFAULT)); if (ret_buf == nullptr) { return nullptr; } diff --git a/cpp/src/common/allocator/mem_alloc.cc b/cpp/src/common/allocator/mem_alloc.cc index c79e78858..d077483b9 100644 --- a/cpp/src/common/allocator/mem_alloc.cc +++ b/cpp/src/common/allocator/mem_alloc.cc @@ -34,33 +34,30 @@ namespace common { const char* g_mod_names[__LAST_MOD_ID] = { /* 0 */ "DEFAULT", - /* 1 */ "MEMTABLE", - /* 2 */ "SCHEMA", - /* 3 */ "SQL", - /* 4 */ "NET", - /* 5 */ "NET_DATA", - /* 6 */ "TVLIST_DATA", - /* 7 */ "TVLIST_OBJ", - /* 8 */ "TSBLOCK", - /* 9 */ "CONTAINER", - /* 10 */ "TSSTORE_OBJ", - /* 11 */ "FLUSH_TASK_OBJ", - /* 12 */ "PAGE_WRITER_OUTPUT_STREAM", - /* 13 */ "CW_PAGES_DATA", - /* 14 */ "CHUNK_WRITER_OBJ", - /* 15 */ "STATISTIC_OBJ", - /* 16 */ "ENCODER_OBJ", - /* 17 */ "DECODER_OBJ", - /* 18 */ "TSFILE_WRITER_META", - /* 19 */ "TSFILE_WRITE_STREAM", - /* 20 */ "TIMESERIES_INDEX_OBJ", - /* 21 */ "BLOOM_FILTER", - /* 22 */ "OPEN_FILE_OBJ", - /* 23 */ "TSFILE_READER", - /* 24 */ "CHUNK_READER", - /* 25 */ "COMPRESSOR_OBJ", - /* 26 */ "ARRAY", - /* 27 */ "HASH_TABLE", + /* 1 */ "TVLIST_DATA", + /* 2 */ "TSBLOCK", + /* 3 */ "PAGE_WRITER_OUTPUT_STREAM", + /* 4 */ "CW_PAGES_DATA", + /* 5 */ "STATISTIC_OBJ", + /* 6 */ "ENCODER_OBJ", + /* 7 */ "DECODER_OBJ", + /* 8 */ "TSFILE_WRITER_META", + /* 9 */ "TSFILE_WRITE_STREAM", + /* 10 */ "TIMESERIES_INDEX_OBJ", + /* 11 */ "BLOOM_FILTER", + /* 12 */ "TSFILE_READER", + /* 13 */ "CHUNK_READER", + /* 14 */ "COMPRESSOR_OBJ", + /* 15 */ "ARRAY", + /* 16 */ "HASH_TABLE", + /* 17 */ "WRITER_INDEX_NODE", + /* 18 */ "TS2DIFF_OBJ", + /* 19 */ "BITENCODE_OBJ", + /* 20 */ "DICENCODE_OBJ", + /* 21 */ "ZIGZAG_OBJ", + /* 22 */ "DEVICE_META_ITER", + /* 23 */ "DEVICE_TASK_ITER", + /* 24 */ "TABLET", }; // Most modern CPUs (e.g., x86_64, Arm) support at least 8-byte alignment, @@ -97,6 +94,7 @@ void* mem_alloc(uint32_t size, AllocModID mid) { auto high4b = static_cast(header >> 32); *reinterpret_cast(raw) = high4b; *reinterpret_cast(raw + 4) = low4b; + ModStat::get_instance().update_alloc(mid, static_cast(size)); return raw + header_size; } @@ -173,6 +171,19 @@ void ModStat::init() { void ModStat::destroy() { ::free(stat_arr_); } +void ModStat::print_stat() { + if (stat_arr_ == NULL) return; + std::cout << "=== Memory Statistics ===" << std::endl; + for (int i = 0; i < __LAST_MOD_ID; i++) { + int32_t val = ATOMIC_FAA(get_item(i), 0); + if (val != 0) { + const char* name = (i < __LAST_MOD_ID) ? g_mod_names[i] : "UNKNOWN"; + std::cout << " " << name << ": " << val << " bytes" << std::endl; + } + } + std::cout << "=========================" << std::endl; +} + BaseAllocator g_base_allocator; } // end namespace common \ No newline at end of file diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 10489f67d..8348e3cfe 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -21,6 +21,7 @@ #include +#include "allocator/alloc_base.h" #include "datatype/date_converter.h" #include "utils/errno_define.h" @@ -30,7 +31,9 @@ namespace storage { int Tablet::init() { ASSERT(timestamps_ == nullptr); - timestamps_ = (int64_t*)malloc(sizeof(int64_t) * max_row_num_); + timestamps_ = static_cast( + common::mem_alloc(sizeof(int64_t) * max_row_num_, common::MOD_TABLET)); + if (timestamps_ == nullptr) return E_OOM; cur_row_size_ = 0; size_t schema_count = schema_vec_->size(); @@ -39,54 +42,58 @@ int Tablet::init() { ins_res = schema_map_.insert( std::make_pair(to_lower(schema_vec_->at(c).measurement_name_), c)); if (!ins_res.second) { - // maybe dup measurement_name return E_INVALID_ARG; } } ASSERT(schema_map_.size() == schema_count); - value_matrix_ = - (ValueMatrixEntry*)malloc(sizeof(ValueMatrixEntry) * schema_count); + value_matrix_ = static_cast(common::mem_alloc( + sizeof(ValueMatrixEntry) * schema_count, common::MOD_TABLET)); + if (value_matrix_ == nullptr) return E_OOM; for (size_t c = 0; c < schema_count; ++c) { const MeasurementSchema& schema = schema_vec_->at(c); + uint32_t elem_size = get_data_type_size(schema.data_type_); + uint32_t alloc_size = elem_size * max_row_num_; switch (schema.data_type_) { case BOOLEAN: - value_matrix_[c].bool_data = (bool*)malloc( - get_data_type_size(schema.data_type_) * max_row_num_); - memset(value_matrix_[c].bool_data, 0, - get_data_type_size(schema.data_type_) * max_row_num_); + value_matrix_[c].bool_data = static_cast( + common::mem_alloc(alloc_size, common::MOD_TABLET)); + if (value_matrix_[c].bool_data == nullptr) return E_OOM; + memset(value_matrix_[c].bool_data, 0, alloc_size); break; case DATE: case INT32: - value_matrix_[c].int32_data = (int32_t*)malloc( - get_data_type_size(schema.data_type_) * max_row_num_); - memset(value_matrix_[c].int32_data, 0, - get_data_type_size(schema.data_type_) * max_row_num_); + value_matrix_[c].int32_data = static_cast( + common::mem_alloc(alloc_size, common::MOD_TABLET)); + if (value_matrix_[c].int32_data == nullptr) return E_OOM; + memset(value_matrix_[c].int32_data, 0, alloc_size); break; case TIMESTAMP: case INT64: - value_matrix_[c].int64_data = (int64_t*)malloc( - get_data_type_size(schema.data_type_) * max_row_num_); - memset(value_matrix_[c].int64_data, 0, - get_data_type_size(schema.data_type_) * max_row_num_); + value_matrix_[c].int64_data = static_cast( + common::mem_alloc(alloc_size, common::MOD_TABLET)); + if (value_matrix_[c].int64_data == nullptr) return E_OOM; + memset(value_matrix_[c].int64_data, 0, alloc_size); break; case FLOAT: - value_matrix_[c].float_data = (float*)malloc( - get_data_type_size(schema.data_type_) * max_row_num_); - memset(value_matrix_[c].float_data, 0, - get_data_type_size(schema.data_type_) * max_row_num_); + value_matrix_[c].float_data = static_cast( + common::mem_alloc(alloc_size, common::MOD_TABLET)); + if (value_matrix_[c].float_data == nullptr) return E_OOM; + memset(value_matrix_[c].float_data, 0, alloc_size); break; case DOUBLE: - value_matrix_[c].double_data = (double*)malloc( - get_data_type_size(schema.data_type_) * max_row_num_); - memset(value_matrix_[c].double_data, 0, - get_data_type_size(schema.data_type_) * max_row_num_); + value_matrix_[c].double_data = static_cast( + common::mem_alloc(alloc_size, common::MOD_TABLET)); + if (value_matrix_[c].double_data == nullptr) return E_OOM; + memset(value_matrix_[c].double_data, 0, alloc_size); break; case BLOB: case TEXT: case STRING: { value_matrix_[c].string_data = - (common::String*)malloc(sizeof(String) * max_row_num_); + static_cast(common::mem_alloc( + sizeof(String) * max_row_num_, common::MOD_TABLET)); + if (value_matrix_[c].string_data == nullptr) return E_OOM; break; } default: @@ -95,8 +102,11 @@ int Tablet::init() { } } - bitmaps_ = new BitMap[schema_count]; + bitmaps_ = static_cast(common::mem_alloc( + sizeof(BitMap) * schema_count, common::MOD_TABLET)); + if (bitmaps_ == nullptr) return E_OOM; for (size_t c = 0; c < schema_count; c++) { + new (&bitmaps_[c]) BitMap(); bitmaps_[c].init(max_row_num_, false); } return E_OK; @@ -104,7 +114,7 @@ int Tablet::init() { void Tablet::destroy() { if (timestamps_ != nullptr) { - free(timestamps_); + common::mem_free(timestamps_); timestamps_ = nullptr; } @@ -114,36 +124,40 @@ void Tablet::destroy() { switch (schema.data_type_) { case DATE: case INT32: - free(value_matrix_[c].int32_data); + common::mem_free(value_matrix_[c].int32_data); break; case TIMESTAMP: case INT64: - free(value_matrix_[c].int64_data); + common::mem_free(value_matrix_[c].int64_data); break; case FLOAT: - free(value_matrix_[c].float_data); + common::mem_free(value_matrix_[c].float_data); break; case DOUBLE: - free(value_matrix_[c].double_data); + common::mem_free(value_matrix_[c].double_data); break; case BOOLEAN: - free(value_matrix_[c].bool_data); + common::mem_free(value_matrix_[c].bool_data); break; case BLOB: case TEXT: case STRING: - free(value_matrix_[c].string_data); + common::mem_free(value_matrix_[c].string_data); break; default: break; } } - free(value_matrix_); + common::mem_free(value_matrix_); value_matrix_ = nullptr; } if (bitmaps_ != nullptr) { - delete[] bitmaps_; + size_t schema_count = schema_vec_->size(); + for (size_t c = 0; c < schema_count; c++) { + bitmaps_[c].~BitMap(); + } + common::mem_free(bitmaps_); bitmaps_ = nullptr; } } diff --git a/cpp/src/compress/gzip_compressor.h b/cpp/src/compress/gzip_compressor.h index 677b72663..a803107ac 100644 --- a/cpp/src/compress/gzip_compressor.h +++ b/cpp/src/compress/gzip_compressor.h @@ -46,7 +46,7 @@ class GzipCompressor { void destroy() { end_zstream(); } int compress(char* uncompressed_buf, uint32_t uncompressed_buf_len, char*& compressed_buf, uint32_t& compressed_buf_len); - void after_compress(char* compressed_buf) { ::free(compressed_buf); } + void after_compress(char* compressed_buf) { common::mem_free(compressed_buf); } int compress_into_bytestream(char* uncompressed_buf, uint32_t uncompressed_buf_len, common::ByteStream& out); @@ -69,7 +69,7 @@ class GzipDeCompressor { void destroy() { end_zstream(); } int uncompress(char* compressed_buf, uint32_t compressed_buf_len, char*& uncompressed_buf, uint32_t& uncompressed_buf_len); - void after_uncompress(char* uncompressed_buf) { ::free(uncompressed_buf); } + void after_uncompress(char* uncompressed_buf) { common::mem_free(uncompressed_buf); } int decompress_into_bytestream(char* compressed_buf, uint32_t compressed_buf_len, common::ByteStream& out); diff --git a/cpp/src/encoding/int32_rle_encoder.h b/cpp/src/encoding/int32_rle_encoder.h index d9eecf5dd..6b9ae1634 100644 --- a/cpp/src/encoding/int32_rle_encoder.h +++ b/cpp/src/encoding/int32_rle_encoder.h @@ -146,7 +146,7 @@ class Int32RleEncoder : public Encoder { void convert_buffer() { // TODO: put the bytes on the stack instead on the heap unsigned char* bytes = (unsigned char*)common::mem_alloc( - bit_width_, common::MOD_BITENCODE_OBJ); + bit_width_, common::MOD_ENCODER_OBJ); int32_t tmp_buffer[8]; for (int i = 0; i < 8; i++) { tmp_buffer[i] = (int64_t)buffered_values_[i]; diff --git a/cpp/src/encoding/int64_rle_decoder.h b/cpp/src/encoding/int64_rle_decoder.h index b2f85ed1e..25d64b084 100644 --- a/cpp/src/encoding/int64_rle_decoder.h +++ b/cpp/src/encoding/int64_rle_decoder.h @@ -150,9 +150,11 @@ class Int64RleDecoder : public Decoder { void read_bit_packing_buffer(int bit_packed_group_count, int last_bit_packed_num) { if (current_buffer_ != nullptr) { - delete[] current_buffer_; + common::mem_free(current_buffer_); } - current_buffer_ = new int64_t[bit_packed_group_count * 8]; + current_buffer_ = static_cast(common::mem_alloc( + sizeof(int64_t) * bit_packed_group_count * 8, + common::MOD_DECODER_OBJ)); int bytes_to_read = bit_packed_group_count * bit_width_; if (bytes_to_read > (int)byte_cache_.remaining_size()) { bytes_to_read = byte_cache_.remaining_size(); @@ -199,13 +201,13 @@ class Int64RleDecoder : public Decoder { void init_packer() { packer_ = new Int64Packer(bit_width_); } - void destroy() { /* do nothing for BitpackEncoder */ + void destroy() { if (packer_) { delete (packer_); packer_ = nullptr; } if (current_buffer_) { - delete[] current_buffer_; + common::mem_free(current_buffer_); current_buffer_ = nullptr; } if (tmp_buf_) { @@ -221,7 +223,7 @@ class Int64RleDecoder : public Decoder { is_length_and_bitwidth_readed_ = false; current_count_ = 0; if (current_buffer_) { - delete[] current_buffer_; + common::mem_free(current_buffer_); current_buffer_ = nullptr; } if (packer_) { diff --git a/cpp/src/encoding/int64_rle_encoder.h b/cpp/src/encoding/int64_rle_encoder.h index 82fd40f38..edd28180f 100644 --- a/cpp/src/encoding/int64_rle_encoder.h +++ b/cpp/src/encoding/int64_rle_encoder.h @@ -146,7 +146,7 @@ class Int64RleEncoder : public Encoder { void convert_buffer() { // TODO: put the bytes on the stack instead on the heap unsigned char* bytes = (unsigned char*)common::mem_alloc( - bit_width_, common::MOD_BITENCODE_OBJ); + bit_width_, common::MOD_ENCODER_OBJ); int64_t tmp_buffer[8]; for (int i = 0; i < 8; i++) { tmp_buffer[i] = (int64_t)buffered_values_[i]; diff --git a/cpp/src/reader/tsfile_series_scan_iterator.cc b/cpp/src/reader/tsfile_series_scan_iterator.cc index 5e78574e7..75655fb31 100644 --- a/cpp/src/reader/tsfile_series_scan_iterator.cc +++ b/cpp/src/reader/tsfile_series_scan_iterator.cc @@ -96,7 +96,7 @@ int TsFileSeriesScanIterator::init_chunk_reader() { int ret = E_OK; is_aligned_ = itimeseries_index_->get_data_type() == common::VECTOR; if (!is_aligned_) { - void* buf = common::mem_alloc(sizeof(ChunkReader), common::MOD_DEFAULT); + void* buf = common::mem_alloc(sizeof(ChunkReader), common::MOD_CHUNK_READER); chunk_reader_ = new (buf) ChunkReader; chunk_meta_cursor_ = itimeseries_index_->get_chunk_meta_list()->begin(); ChunkMeta* cm = chunk_meta_cursor_.get(); @@ -110,7 +110,7 @@ int TsFileSeriesScanIterator::init_chunk_reader() { } } else { void* buf = - common::mem_alloc(sizeof(AlignedChunkReader), common::MOD_DEFAULT); + common::mem_alloc(sizeof(AlignedChunkReader), common::MOD_CHUNK_READER); chunk_reader_ = new (buf) AlignedChunkReader; time_chunk_meta_cursor_ = itimeseries_index_->get_time_chunk_meta_list()->begin(); diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index 1f8c80ff6..8f1072eee 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -1095,4 +1095,56 @@ TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) { delete[] literal; delete[] text_literal; delete table_schema; -} \ No newline at end of file +} + +#ifdef ENABLE_MEM_STAT +TEST_F(TsFileWriterTableTest, MemStatWriteAndVerify) { + TableSchema* table_schema = gen_table_schema(0, 2, 3); + auto tsfile_table_writer = + std::make_shared(&write_file_, table_schema); + + const int num_devices = 10; + const int num_timestamps = 100; + + for (int flush = 0; flush < 3; flush++) { + Tablet tablet = gen_tablet(table_schema, + flush * num_devices * num_timestamps, + num_devices, num_timestamps); + ASSERT_EQ(tsfile_table_writer->write_table(tablet), E_OK); + ASSERT_EQ(tsfile_table_writer->flush(), E_OK); + + std::cout << "--- After flush " << flush << " ---" << std::endl; + ModStat::get_instance().print_stat(); + } + + ASSERT_EQ(tsfile_table_writer->close(), E_OK); + + std::cout << "--- After writer close ---" << std::endl; + ModStat::get_instance().print_stat(); + + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* result_set = nullptr; + ASSERT_EQ(reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, + INT64_MAX, result_set), E_OK); + int row_count = 0; + bool has_next = false; + auto* table_result_set = static_cast(result_set); + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + row_count++; + } + EXPECT_EQ(row_count, num_devices * num_timestamps * 3); + + std::cout << "--- After read ---" << std::endl; + ModStat::get_instance().print_stat(); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), E_OK); + + std::cout << "--- After cleanup ---" << std::endl; + ModStat::get_instance().print_stat(); + + delete table_schema; +} +#endif \ No newline at end of file From ae36d5ee3d344de9304199ad861ef2a1ffd256d2 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 19 Mar 2026 09:50:05 +0800 Subject: [PATCH 2/5] fix format. --- cpp/src/common/allocator/byte_stream.h | 12 ++++-------- cpp/src/common/tablet.cc | 4 ++-- cpp/src/compress/gzip_compressor.h | 8 ++++++-- cpp/src/encoding/int64_rle_decoder.h | 6 +++--- cpp/src/reader/tsfile_series_scan_iterator.cc | 7 ++++--- .../writer/table_view/tsfile_writer_table_test.cc | 11 ++++++----- 6 files changed, 25 insertions(+), 23 deletions(-) diff --git a/cpp/src/common/allocator/byte_stream.h b/cpp/src/common/allocator/byte_stream.h index b9c8ca1a0..4774f4f5f 100644 --- a/cpp/src/common/allocator/byte_stream.h +++ b/cpp/src/common/allocator/byte_stream.h @@ -896,8 +896,7 @@ class SerializationUtil { FORCE_INLINE static int chunk_read_all_data(ByteStream& in, ByteStream& out, size_t chunk_size = 4096) { - char* buffer = static_cast( - mem_alloc(chunk_size, MOD_DEFAULT)); + char* buffer = static_cast(mem_alloc(chunk_size, MOD_DEFAULT)); if (buffer == nullptr) return E_OOM; int ret = common::E_OK; while (in.remaining_size() > 0) { @@ -1173,8 +1172,7 @@ class SerializationUtil { str = nullptr; return ret; } else { - char* tmp_buf = static_cast( - mem_alloc(len, MOD_DEFAULT)); + char* tmp_buf = static_cast(mem_alloc(len, MOD_DEFAULT)); if (tmp_buf == nullptr) return E_OOM; if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) { mem_free(tmp_buf); @@ -1197,8 +1195,7 @@ class SerializationUtil { int32_t read_len = 0; if (RET_FAIL(read_var_int(len, in))) { } else { - char* tmp_buf = static_cast( - mem_alloc(len + 1, MOD_DEFAULT)); + char* tmp_buf = static_cast(mem_alloc(len + 1, MOD_DEFAULT)); if (tmp_buf == nullptr) return E_OOM; tmp_buf[len] = '\0'; if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) { @@ -1225,8 +1222,7 @@ class SerializationUtil { if (RET_FAIL(read_i32(len, in))) { } else { int32_t read_len = 0; - char* tmp_buf = static_cast( - mem_alloc(len + 1, MOD_DEFAULT)); + char* tmp_buf = static_cast(mem_alloc(len + 1, MOD_DEFAULT)); if (tmp_buf == nullptr) return E_OOM; tmp_buf[len] = '\0'; if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) { diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 8348e3cfe..5bb1518d3 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -102,8 +102,8 @@ int Tablet::init() { } } - bitmaps_ = static_cast(common::mem_alloc( - sizeof(BitMap) * schema_count, common::MOD_TABLET)); + bitmaps_ = static_cast( + common::mem_alloc(sizeof(BitMap) * schema_count, common::MOD_TABLET)); if (bitmaps_ == nullptr) return E_OOM; for (size_t c = 0; c < schema_count; c++) { new (&bitmaps_[c]) BitMap(); diff --git a/cpp/src/compress/gzip_compressor.h b/cpp/src/compress/gzip_compressor.h index a803107ac..f2aba1310 100644 --- a/cpp/src/compress/gzip_compressor.h +++ b/cpp/src/compress/gzip_compressor.h @@ -46,7 +46,9 @@ class GzipCompressor { void destroy() { end_zstream(); } int compress(char* uncompressed_buf, uint32_t uncompressed_buf_len, char*& compressed_buf, uint32_t& compressed_buf_len); - void after_compress(char* compressed_buf) { common::mem_free(compressed_buf); } + void after_compress(char* compressed_buf) { + common::mem_free(compressed_buf); + } int compress_into_bytestream(char* uncompressed_buf, uint32_t uncompressed_buf_len, common::ByteStream& out); @@ -69,7 +71,9 @@ class GzipDeCompressor { void destroy() { end_zstream(); } int uncompress(char* compressed_buf, uint32_t compressed_buf_len, char*& uncompressed_buf, uint32_t& uncompressed_buf_len); - void after_uncompress(char* uncompressed_buf) { common::mem_free(uncompressed_buf); } + void after_uncompress(char* uncompressed_buf) { + common::mem_free(uncompressed_buf); + } int decompress_into_bytestream(char* compressed_buf, uint32_t compressed_buf_len, common::ByteStream& out); diff --git a/cpp/src/encoding/int64_rle_decoder.h b/cpp/src/encoding/int64_rle_decoder.h index 25d64b084..6526d7793 100644 --- a/cpp/src/encoding/int64_rle_decoder.h +++ b/cpp/src/encoding/int64_rle_decoder.h @@ -152,9 +152,9 @@ class Int64RleDecoder : public Decoder { if (current_buffer_ != nullptr) { common::mem_free(current_buffer_); } - current_buffer_ = static_cast(common::mem_alloc( - sizeof(int64_t) * bit_packed_group_count * 8, - common::MOD_DECODER_OBJ)); + current_buffer_ = static_cast( + common::mem_alloc(sizeof(int64_t) * bit_packed_group_count * 8, + common::MOD_DECODER_OBJ)); int bytes_to_read = bit_packed_group_count * bit_width_; if (bytes_to_read > (int)byte_cache_.remaining_size()) { bytes_to_read = byte_cache_.remaining_size(); diff --git a/cpp/src/reader/tsfile_series_scan_iterator.cc b/cpp/src/reader/tsfile_series_scan_iterator.cc index 75655fb31..8130bd8ba 100644 --- a/cpp/src/reader/tsfile_series_scan_iterator.cc +++ b/cpp/src/reader/tsfile_series_scan_iterator.cc @@ -96,7 +96,8 @@ int TsFileSeriesScanIterator::init_chunk_reader() { int ret = E_OK; is_aligned_ = itimeseries_index_->get_data_type() == common::VECTOR; if (!is_aligned_) { - void* buf = common::mem_alloc(sizeof(ChunkReader), common::MOD_CHUNK_READER); + void* buf = + common::mem_alloc(sizeof(ChunkReader), common::MOD_CHUNK_READER); chunk_reader_ = new (buf) ChunkReader; chunk_meta_cursor_ = itimeseries_index_->get_chunk_meta_list()->begin(); ChunkMeta* cm = chunk_meta_cursor_.get(); @@ -109,8 +110,8 @@ int TsFileSeriesScanIterator::init_chunk_reader() { chunk_meta_cursor_++; } } else { - void* buf = - common::mem_alloc(sizeof(AlignedChunkReader), common::MOD_CHUNK_READER); + void* buf = common::mem_alloc(sizeof(AlignedChunkReader), + common::MOD_CHUNK_READER); chunk_reader_ = new (buf) AlignedChunkReader; time_chunk_meta_cursor_ = itimeseries_index_->get_time_chunk_meta_list()->begin(); diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index 8f1072eee..dcb7e8bf6 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -1107,9 +1107,9 @@ TEST_F(TsFileWriterTableTest, MemStatWriteAndVerify) { const int num_timestamps = 100; for (int flush = 0; flush < 3; flush++) { - Tablet tablet = gen_tablet(table_schema, - flush * num_devices * num_timestamps, - num_devices, num_timestamps); + Tablet tablet = + gen_tablet(table_schema, flush * num_devices * num_timestamps, + num_devices, num_timestamps); ASSERT_EQ(tsfile_table_writer->write_table(tablet), E_OK); ASSERT_EQ(tsfile_table_writer->flush(), E_OK); @@ -1126,8 +1126,9 @@ TEST_F(TsFileWriterTableTest, MemStatWriteAndVerify) { ASSERT_EQ(reader.open(file_name_), E_OK); ResultSet* result_set = nullptr; ASSERT_EQ(reader.query(table_schema->get_table_name(), - table_schema->get_measurement_names(), 0, - INT64_MAX, result_set), E_OK); + table_schema->get_measurement_names(), 0, INT64_MAX, + result_set), + E_OK); int row_count = 0; bool has_next = false; auto* table_result_set = static_cast(result_set); From b415509d451f0d8e030c3acd70475041a41ac05c Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 19 Mar 2026 11:34:35 +0800 Subject: [PATCH 3/5] fix memory loss. --- cpp/CMakeLists.txt | 2 + cpp/src/common/allocator/byte_stream.h | 23 ++++-- cpp/src/common/allocator/mem_alloc.cc | 36 ++++++++- cpp/src/common/allocator/page_arena.h | 5 +- cpp/src/common/tablet.cc | 42 ++++++---- cpp/src/common/tablet.h | 2 +- cpp/src/common/tablet_iterator.h | 78 ------------------- cpp/src/common/tsfile_common.h | 3 +- cpp/src/encoding/int32_rle_decoder.h | 2 +- cpp/src/encoding/int32_rle_encoder.h | 2 +- cpp/src/encoding/int64_rle_decoder.h | 2 +- cpp/src/encoding/int64_rle_encoder.h | 2 +- cpp/src/file/tsfile_io_reader.h | 2 +- cpp/src/file/tsfile_io_writer.h | 2 +- cpp/src/reader/aligned_chunk_reader.h | 8 +- cpp/src/reader/chunk_reader.h | 6 +- cpp/src/writer/chunk_writer.h | 2 +- cpp/src/writer/page_writer.h | 4 +- cpp/src/writer/time_chunk_writer.h | 2 +- cpp/src/writer/time_page_writer.h | 2 +- cpp/src/writer/value_chunk_writer.h | 2 +- cpp/src/writer/value_page_writer.h | 5 +- .../table_view/tsfile_writer_table_test.cc | 34 ++++++-- 23 files changed, 132 insertions(+), 136 deletions(-) delete mode 100644 cpp/src/common/tablet_iterator.h diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index f89675a7d..4cd36988b 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -58,6 +58,8 @@ if (${COV_ENABLED}) message("add_definitions -DCOV_ENABLED=1") endif () +option(ENABLE_MEM_STAT "Enable memory status" ON) + if (ENABLE_MEM_STAT) add_definitions(-DENABLE_MEM_STAT) message("add_definitions -DENABLE_MEM_STAT") diff --git a/cpp/src/common/allocator/byte_stream.h b/cpp/src/common/allocator/byte_stream.h index 4774f4f5f..6d08744c3 100644 --- a/cpp/src/common/allocator/byte_stream.h +++ b/cpp/src/common/allocator/byte_stream.h @@ -268,9 +268,8 @@ class ByteStream { // assert(page_size >= 16); // commented out by gxh on 2023.03.09 } - // TODO use a specific construct function to mark it as wrapped use. // for wrap plain buffer to ByteStream - ByteStream() + ByteStream(AllocModID mid = MOD_DEFAULT) : allocator_(g_base_allocator), head_(nullptr, false), tail_(nullptr, false), @@ -279,7 +278,7 @@ class ByteStream { read_pos_(0), marked_read_pos_(0), page_size_(0), - mid_(MOD_DEFAULT), + mid_(mid), wrapped_page_(false, nullptr) {} ~ByteStream() { destroy(); } @@ -669,6 +668,11 @@ class ByteStream { uint32_t marked_read_pos_; // current reader position uint32_t page_size_; AllocModID mid_; + + public: + AllocModID get_mid() const { return mid_; } + + private: Page wrapped_page_; }; @@ -896,7 +900,7 @@ class SerializationUtil { FORCE_INLINE static int chunk_read_all_data(ByteStream& in, ByteStream& out, size_t chunk_size = 4096) { - char* buffer = static_cast(mem_alloc(chunk_size, MOD_DEFAULT)); + char* buffer = static_cast(mem_alloc(chunk_size, out.get_mid())); if (buffer == nullptr) return E_OOM; int ret = common::E_OK; while (in.remaining_size() > 0) { @@ -1172,7 +1176,8 @@ class SerializationUtil { str = nullptr; return ret; } else { - char* tmp_buf = static_cast(mem_alloc(len, MOD_DEFAULT)); + char* tmp_buf = + static_cast(mem_alloc(len, in.get_mid())); if (tmp_buf == nullptr) return E_OOM; if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) { mem_free(tmp_buf); @@ -1195,7 +1200,8 @@ class SerializationUtil { int32_t read_len = 0; if (RET_FAIL(read_var_int(len, in))) { } else { - char* tmp_buf = static_cast(mem_alloc(len + 1, MOD_DEFAULT)); + char* tmp_buf = + static_cast(mem_alloc(len + 1, in.get_mid())); if (tmp_buf == nullptr) return E_OOM; tmp_buf[len] = '\0'; if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) { @@ -1222,7 +1228,8 @@ class SerializationUtil { if (RET_FAIL(read_i32(len, in))) { } else { int32_t read_len = 0; - char* tmp_buf = static_cast(mem_alloc(len + 1, MOD_DEFAULT)); + char* tmp_buf = + static_cast(mem_alloc(len + 1, in.get_mid())); if (tmp_buf == nullptr) return E_OOM; tmp_buf[len] = '\0'; if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) { @@ -1311,7 +1318,7 @@ FORCE_INLINE char* get_bytes_from_bytestream(ByteStream& bs) { return nullptr; } uint32_t size = bs.total_size(); - char* ret_buf = static_cast(mem_alloc(size, MOD_DEFAULT)); + char* ret_buf = static_cast(mem_alloc(size, bs.get_mid())); if (ret_buf == nullptr) { return nullptr; } diff --git a/cpp/src/common/allocator/mem_alloc.cc b/cpp/src/common/allocator/mem_alloc.cc index d077483b9..42126660b 100644 --- a/cpp/src/common/allocator/mem_alloc.cc +++ b/cpp/src/common/allocator/mem_alloc.cc @@ -22,6 +22,7 @@ #endif #include +#include #include #include "alloc_base.h" @@ -173,14 +174,43 @@ void ModStat::destroy() { ::free(stat_arr_); } void ModStat::print_stat() { if (stat_arr_ == NULL) return; - std::cout << "=== Memory Statistics ===" << std::endl; + + struct Entry { + const char* name; + int32_t val; + }; + Entry entries[__LAST_MOD_ID]; + int count = 0; + int64_t total = 0; + for (int i = 0; i < __LAST_MOD_ID; i++) { int32_t val = ATOMIC_FAA(get_item(i), 0); + total += val; if (val != 0) { - const char* name = (i < __LAST_MOD_ID) ? g_mod_names[i] : "UNKNOWN"; - std::cout << " " << name << ": " << val << " bytes" << std::endl; + entries[count++] = {g_mod_names[i], val}; } } + + for (int i = 0; i < count - 1; i++) { + for (int j = i + 1; j < count; j++) { + if (entries[j].val > entries[i].val) { + Entry tmp = entries[i]; + entries[i] = entries[j]; + entries[j] = tmp; + } + } + } + + std::cout << "=== Memory Statistics ===" << std::endl; + for (int i = 0; i < count; i++) { + std::cout << " " << entries[i].name << ": " << entries[i].val + << " bytes" << std::endl; + } + double kb = total / 1024.0; + double mb = kb / 1024.0; + std::cout << " TOTAL: " << total << " bytes / " << std::fixed + << std::setprecision(2) << kb << " KB / " << mb << " MB" + << std::endl; std::cout << "=========================" << std::endl; } diff --git a/cpp/src/common/allocator/page_arena.h b/cpp/src/common/allocator/page_arena.h index 71c11988d..9b8ce5ef6 100644 --- a/cpp/src/common/allocator/page_arena.h +++ b/cpp/src/common/allocator/page_arena.h @@ -31,9 +31,10 @@ namespace common { */ class PageArena { public: - explicit PageArena(BaseAllocator& base_allocator = g_base_allocator) + explicit PageArena(AllocModID mid = __FIRST_MOD_ID, + BaseAllocator& base_allocator = g_base_allocator) : page_size_(0), - mid_(__FIRST_MOD_ID), + mid_(mid), base_allocator_(base_allocator), dummy_head_() {} ~PageArena() { destroy(); } diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 5bb1518d3..829313684 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -51,42 +51,50 @@ int Tablet::init() { if (value_matrix_ == nullptr) return E_OOM; for (size_t c = 0; c < schema_count; ++c) { const MeasurementSchema& schema = schema_vec_->at(c); - uint32_t elem_size = get_data_type_size(schema.data_type_); - uint32_t alloc_size = elem_size * max_row_num_; switch (schema.data_type_) { - case BOOLEAN: + case BOOLEAN: { + size_t sz = sizeof(bool) * max_row_num_; value_matrix_[c].bool_data = static_cast( - common::mem_alloc(alloc_size, common::MOD_TABLET)); + common::mem_alloc(sz, common::MOD_TABLET)); if (value_matrix_[c].bool_data == nullptr) return E_OOM; - memset(value_matrix_[c].bool_data, 0, alloc_size); + memset(value_matrix_[c].bool_data, 0, sz); break; + } case DATE: - case INT32: + case INT32: { + size_t sz = sizeof(int32_t) * max_row_num_; value_matrix_[c].int32_data = static_cast( - common::mem_alloc(alloc_size, common::MOD_TABLET)); + common::mem_alloc(sz, common::MOD_TABLET)); if (value_matrix_[c].int32_data == nullptr) return E_OOM; - memset(value_matrix_[c].int32_data, 0, alloc_size); + memset(value_matrix_[c].int32_data, 0, sz); break; + } case TIMESTAMP: - case INT64: + case INT64: { + size_t sz = sizeof(int64_t) * max_row_num_; value_matrix_[c].int64_data = static_cast( - common::mem_alloc(alloc_size, common::MOD_TABLET)); + common::mem_alloc(sz, common::MOD_TABLET)); if (value_matrix_[c].int64_data == nullptr) return E_OOM; - memset(value_matrix_[c].int64_data, 0, alloc_size); + memset(value_matrix_[c].int64_data, 0, sz); break; - case FLOAT: + } + case FLOAT: { + size_t sz = sizeof(float) * max_row_num_; value_matrix_[c].float_data = static_cast( - common::mem_alloc(alloc_size, common::MOD_TABLET)); + common::mem_alloc(sz, common::MOD_TABLET)); if (value_matrix_[c].float_data == nullptr) return E_OOM; - memset(value_matrix_[c].float_data, 0, alloc_size); + memset(value_matrix_[c].float_data, 0, sz); break; - case DOUBLE: + } + case DOUBLE: { + size_t sz = sizeof(double) * max_row_num_; value_matrix_[c].double_data = static_cast( - common::mem_alloc(alloc_size, common::MOD_TABLET)); + common::mem_alloc(sz, common::MOD_TABLET)); if (value_matrix_[c].double_data == nullptr) return E_OOM; - memset(value_matrix_[c].double_data, 0, alloc_size); + memset(value_matrix_[c].double_data, 0, sz); break; + } case BLOB: case TEXT: case STRING: { diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index 04fee7643..94bd8ca7c 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -246,7 +246,7 @@ class Tablet { private: template void process_val(uint32_t row_index, uint32_t schema_index, T val); - common::PageArena page_arena_; + common::PageArena page_arena_{common::MOD_TABLET}; uint32_t max_row_num_; uint32_t cur_row_size_; std::string insert_target_name_; diff --git a/cpp/src/common/tablet_iterator.h b/cpp/src/common/tablet_iterator.h deleted file mode 100644 index 53163f7e6..000000000 --- a/cpp/src/common/tablet_iterator.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef COMMON_TABLET_H -#define COMMON_TABLET_H - -namespace storage { - -#define GET_TYPED_NEXT(timestamp, CppType, TsType, value) \ - do { \ - if (row_iter_ >= max_rows_) { \ - return E_NO_MORE_DATA; \ - } \ - if (data_type_ != TsType) { \ - return E_DATA_TYPE_NOT_MATCH; \ - } \ - timestamp = tablet_.timestamps_[row_iter_]; \ - void* value_arr = value_matrix_[col_idx_]; \ - if (data_type_ == TsType) { \ - value = ((CppType*)value_arr) + row_iter_; \ - } \ - return E_OK; \ - } while (false) - -class TabletColIterator { - public: - TabletColIterator(const Tablet& tablet, int col_idx) - : tablet_(tablet), col_idx_(col_idx) { - ASSERT(col_idx <= tablet.schema_vec_->size()); - data_type = get_data_type_size(tablet.schema_vec_->at(i).data_type_); - row_iter_ = 0; - } - - const MeasurementSchema& get_measurement_schema() const { - return schema_vec_->at(col_idx_); - } - - int get_next(int64_t& timestamp, bool& value) { - GET_TYPED_NEXT(timestamp, bool, BOOLEAN, value); - } - int get_next(int64_t& timestamp, int32_t& value) { - GET_TYPED_NEXT(timestamp, int32_t, INT32, value); - } - int get_next(int64_t& timestamp, int64_t& value) { - GET_TYPED_NEXT(timestamp, int64_t, INT64, value); - } - int get_next(int64_t& timestamp, float& value) { - GET_TYPED_NEXT(timestamp, float, FLOAT, value); - } - int get_next(int64_t& timestamp, double& value) { - GET_TYPED_NEXT(timestamp, double, DOUBLE, value); - } - - private: - const Tablet& tablet_; - TSDataType data_type_; - int col_idx_; - int row_iter_; -}; - -} // end namespace storage -#endif diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h index ad2fa5911..c866e4995 100644 --- a/cpp/src/common/tsfile_common.h +++ b/cpp/src/common/tsfile_common.h @@ -565,7 +565,8 @@ class TimeseriesIndex : public ITimeseriesIndex { */ Statistic* statistic_; bool statistic_from_pa_; - common::ByteStream chunk_meta_list_serialized_buf_; + common::ByteStream chunk_meta_list_serialized_buf_{ + common::MOD_TSFILE_WRITER_META}; // common::PageArena page_arena_; common::SimpleList* chunk_meta_list_; // for deserialize_from }; diff --git a/cpp/src/encoding/int32_rle_decoder.h b/cpp/src/encoding/int32_rle_decoder.h index 757a92599..aee9048a1 100644 --- a/cpp/src/encoding/int32_rle_decoder.h +++ b/cpp/src/encoding/int32_rle_decoder.h @@ -37,7 +37,7 @@ class Int32RleDecoder : public Decoder { int bitpacking_num_; bool is_length_and_bitwidth_readed_; int current_count_; - common::ByteStream byte_cache_; + common::ByteStream byte_cache_{common::MOD_DECODER_OBJ}; int32_t* current_buffer_; Int32Packer* packer_; uint8_t* tmp_buf_; diff --git a/cpp/src/encoding/int32_rle_encoder.h b/cpp/src/encoding/int32_rle_encoder.h index 6b9ae1634..9301135a4 100644 --- a/cpp/src/encoding/int32_rle_encoder.h +++ b/cpp/src/encoding/int32_rle_encoder.h @@ -36,7 +36,7 @@ class Int32RleEncoder : public Encoder { int num_buffered_values_; int bit_width_; Int32Packer* packer_; - common::ByteStream byte_cache_; + common::ByteStream byte_cache_{common::MOD_ENCODER_OBJ}; std::vector values_; // all data tobe encoded int32_t buffered_values_[8]; // encode each 8 values std::vector bytes_buffer_; diff --git a/cpp/src/encoding/int64_rle_decoder.h b/cpp/src/encoding/int64_rle_decoder.h index 6526d7793..8010fe0f7 100644 --- a/cpp/src/encoding/int64_rle_decoder.h +++ b/cpp/src/encoding/int64_rle_decoder.h @@ -37,7 +37,7 @@ class Int64RleDecoder : public Decoder { int bitpacking_num_; bool is_length_and_bitwidth_readed_; int current_count_; - common::ByteStream byte_cache_; + common::ByteStream byte_cache_{common::MOD_DECODER_OBJ}; int64_t* current_buffer_; Int64Packer* packer_; uint8_t* tmp_buf_; diff --git a/cpp/src/encoding/int64_rle_encoder.h b/cpp/src/encoding/int64_rle_encoder.h index edd28180f..119b36981 100644 --- a/cpp/src/encoding/int64_rle_encoder.h +++ b/cpp/src/encoding/int64_rle_encoder.h @@ -36,7 +36,7 @@ class Int64RleEncoder : public Encoder { int num_buffered_values_; int bit_width_; Int64Packer* packer_; - common::ByteStream byte_cache_; + common::ByteStream byte_cache_{common::MOD_ENCODER_OBJ}; std::vector values_; // all data tobe encoded int64_t buffered_values_[8]; // encode each 8 values std::vector bytes_buffer_; diff --git a/cpp/src/file/tsfile_io_reader.h b/cpp/src/file/tsfile_io_reader.h index 19bcfea0b..2f4135e0e 100644 --- a/cpp/src/file/tsfile_io_reader.h +++ b/cpp/src/file/tsfile_io_reader.h @@ -45,7 +45,7 @@ class TsFileIOReader { tsfile_meta_(&tsfile_meta_page_arena_), tsfile_meta_ready_(false), read_file_created_(false) { - tsfile_meta_page_arena_.init(512, common::MOD_DEFAULT); + tsfile_meta_page_arena_.init(512, common::MOD_TSFILE_READER); } int init(const std::string& file_path); diff --git a/cpp/src/file/tsfile_io_writer.h b/cpp/src/file/tsfile_io_writer.h index 8fcc8fa55..6b12f4015 100644 --- a/cpp/src/file/tsfile_io_writer.h +++ b/cpp/src/file/tsfile_io_writer.h @@ -205,7 +205,7 @@ class TsFileIOWriter { private: common::PageArena meta_allocator_; - common::ByteStream write_stream_; + common::ByteStream write_stream_{common::MOD_TSFILE_WRITE_STREAM}; common::ByteStream::Consumer write_stream_consumer_; ChunkMeta* cur_chunk_meta_; ChunkGroupMeta* cur_chunk_group_meta_; diff --git a/cpp/src/reader/aligned_chunk_reader.h b/cpp/src/reader/aligned_chunk_reader.h index aefb7bc58..12e0b9289 100644 --- a/cpp/src/reader/aligned_chunk_reader.h +++ b/cpp/src/reader/aligned_chunk_reader.h @@ -156,8 +156,8 @@ class AlignedChunkReader : public IChunkReader { * also refer to offset within the chunk (including chunk header). * It advanced by step of a page header or a page tv data. */ - common::ByteStream time_in_stream_; - common::ByteStream value_in_stream_; + common::ByteStream time_in_stream_{common::MOD_CHUNK_READER}; + common::ByteStream value_in_stream_{common::MOD_CHUNK_READER}; int32_t file_data_time_buf_size_; int32_t file_data_value_buf_size_; uint32_t time_chunk_visit_offset_; @@ -170,8 +170,8 @@ class AlignedChunkReader : public IChunkReader { Decoder* time_decoder_; Decoder* value_decoder_; - common::ByteStream time_in_; - common::ByteStream value_in_; + common::ByteStream time_in_{common::MOD_CHUNK_READER}; + common::ByteStream value_in_{common::MOD_CHUNK_READER}; char* time_uncompressed_buf_; char* value_uncompressed_buf_; std::vector value_page_col_notnull_bitmap_; diff --git a/cpp/src/reader/chunk_reader.h b/cpp/src/reader/chunk_reader.h index 52c7f7a59..106b8648b 100644 --- a/cpp/src/reader/chunk_reader.h +++ b/cpp/src/reader/chunk_reader.h @@ -125,7 +125,7 @@ class ChunkReader : public IChunkReader { * also refer to offset within the chunk (including chunk header). * It advanced by step of a page header or a page tv data. */ - common::ByteStream in_stream_; + common::ByteStream in_stream_{common::MOD_CHUNK_READER}; int32_t file_data_buf_size_; uint32_t chunk_visit_offset_; @@ -135,8 +135,8 @@ class ChunkReader : public IChunkReader { Decoder* time_decoder_; Decoder* value_decoder_; - common::ByteStream time_in_; - common::ByteStream value_in_; + common::ByteStream time_in_{common::MOD_CHUNK_READER}; + common::ByteStream value_in_{common::MOD_CHUNK_READER}; char* uncompressed_buf_; }; diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h index 3032ff9a5..6eb3f5418 100644 --- a/cpp/src/writer/chunk_writer.h +++ b/cpp/src/writer/chunk_writer.h @@ -150,7 +150,7 @@ class ChunkWriter { common::TSDataType data_type_; PageWriter page_writer_; Statistic* chunk_statistic_; - common::ByteStream chunk_data_; + common::ByteStream chunk_data_{common::MOD_CW_PAGES_DATA}; // to save first page data PageData first_page_data_; diff --git a/cpp/src/writer/page_writer.h b/cpp/src/writer/page_writer.h index cff4b60ed..d3966d865 100644 --- a/cpp/src/writer/page_writer.h +++ b/cpp/src/writer/page_writer.h @@ -201,8 +201,8 @@ class PageWriter { Encoder* time_encoder_; Encoder* value_encoder_; Statistic* statistic_; - common::ByteStream time_out_stream_; - common::ByteStream value_out_stream_; + common::ByteStream time_out_stream_{common::MOD_PAGE_WRITER_OUTPUT_STREAM}; + common::ByteStream value_out_stream_{common::MOD_PAGE_WRITER_OUTPUT_STREAM}; PageData cur_page_data_; Compressor* compressor_; bool is_inited_; diff --git a/cpp/src/writer/time_chunk_writer.h b/cpp/src/writer/time_chunk_writer.h index ac3b374b0..0c6e1f18a 100644 --- a/cpp/src/writer/time_chunk_writer.h +++ b/cpp/src/writer/time_chunk_writer.h @@ -102,7 +102,7 @@ class TimeChunkWriter { private: TimePageWriter time_page_writer_; Statistic* chunk_statistic_; - common::ByteStream chunk_data_; + common::ByteStream chunk_data_{common::MOD_CW_PAGES_DATA}; // to save first page data TimePageData first_page_data_; diff --git a/cpp/src/writer/time_page_writer.h b/cpp/src/writer/time_page_writer.h index 4c01044a6..d9dcecff1 100644 --- a/cpp/src/writer/time_page_writer.h +++ b/cpp/src/writer/time_page_writer.h @@ -121,7 +121,7 @@ class TimePageWriter { common::TSDataType data_type_; Encoder* time_encoder_; Statistic* statistic_; - common::ByteStream time_out_stream_; + common::ByteStream time_out_stream_{common::MOD_PAGE_WRITER_OUTPUT_STREAM}; TimePageData cur_page_data_; Compressor* compressor_; bool is_inited_; diff --git a/cpp/src/writer/value_chunk_writer.h b/cpp/src/writer/value_chunk_writer.h index 859fb57b0..4391b7540 100644 --- a/cpp/src/writer/value_chunk_writer.h +++ b/cpp/src/writer/value_chunk_writer.h @@ -148,7 +148,7 @@ class ValueChunkWriter { common::TSDataType data_type_; ValuePageWriter value_page_writer_; Statistic* chunk_statistic_; - common::ByteStream chunk_data_; + common::ByteStream chunk_data_{common::MOD_CW_PAGES_DATA}; // to save first page data ValuePageData first_page_data_; diff --git a/cpp/src/writer/value_page_writer.h b/cpp/src/writer/value_page_writer.h index 60d75b0b8..ec115c9da 100644 --- a/cpp/src/writer/value_page_writer.h +++ b/cpp/src/writer/value_page_writer.h @@ -205,8 +205,9 @@ class ValuePageWriter { common::TSDataType data_type_; Encoder* value_encoder_; Statistic* statistic_; - common::ByteStream col_notnull_bitmap_out_stream_; - common::ByteStream value_out_stream_; + common::ByteStream col_notnull_bitmap_out_stream_{ + common::MOD_PAGE_WRITER_OUTPUT_STREAM}; + common::ByteStream value_out_stream_{common::MOD_PAGE_WRITER_OUTPUT_STREAM}; ValuePageData cur_page_data_; Compressor* compressor_; bool is_inited_; diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index dcb7e8bf6..477f875e7 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -1098,7 +1098,7 @@ TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) { } #ifdef ENABLE_MEM_STAT -TEST_F(TsFileWriterTableTest, MemStatWriteAndVerify) { +TEST_F(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) { TableSchema* table_schema = gen_table_schema(0, 2, 3); auto tsfile_table_writer = std::make_shared(&write_file_, table_schema); @@ -1106,11 +1106,14 @@ TEST_F(TsFileWriterTableTest, MemStatWriteAndVerify) { const int num_devices = 10; const int num_timestamps = 100; - for (int flush = 0; flush < 3; flush++) { + for (int flush = 0; flush < 10; flush++) { Tablet tablet = gen_tablet(table_schema, flush * num_devices * num_timestamps, num_devices, num_timestamps); ASSERT_EQ(tsfile_table_writer->write_table(tablet), E_OK); + std::cout << "--- After write, before flush " << flush << " ---" + << std::endl; + ModStat::get_instance().print_stat(); ASSERT_EQ(tsfile_table_writer->flush(), E_OK); std::cout << "--- After flush " << flush << " ---" << std::endl; @@ -1124,26 +1127,47 @@ TEST_F(TsFileWriterTableTest, MemStatWriteAndVerify) { TsFileReader reader; ASSERT_EQ(reader.open(file_name_), E_OK); + + std::cout << "--- After reader open ---" << std::endl; + ModStat::get_instance().print_stat(); + ResultSet* result_set = nullptr; ASSERT_EQ(reader.query(table_schema->get_table_name(), table_schema->get_measurement_names(), 0, INT64_MAX, result_set), E_OK); + + std::cout << "--- After query init ---" << std::endl; + ModStat::get_instance().print_stat(); + int row_count = 0; bool has_next = false; + const int total_rows = num_devices * num_timestamps * 10; + const int sample_interval = total_rows / 5; + int next_sample = sample_interval; auto* table_result_set = static_cast(result_set); while (IS_SUCC(table_result_set->next(has_next)) && has_next) { row_count++; + if (row_count == next_sample) { + std::cout << "--- Reading row " << row_count << "/" << total_rows + << " ---" << std::endl; + ModStat::get_instance().print_stat(); + next_sample += sample_interval; + } } - EXPECT_EQ(row_count, num_devices * num_timestamps * 3); + EXPECT_EQ(row_count, total_rows); - std::cout << "--- After read ---" << std::endl; + std::cout << "--- After read complete ---" << std::endl; ModStat::get_instance().print_stat(); reader.destroy_query_data_set(table_result_set); + + std::cout << "--- After destroy result set ---" << std::endl; + ModStat::get_instance().print_stat(); + ASSERT_EQ(reader.close(), E_OK); - std::cout << "--- After cleanup ---" << std::endl; + std::cout << "--- After reader close ---" << std::endl; ModStat::get_instance().print_stat(); delete table_schema; From 9f52d3df98385339b0458bbd8cd740d31447a5d8 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 19 Mar 2026 11:51:40 +0800 Subject: [PATCH 4/5] fix memory loss. --- cpp/src/common/allocator/alloc_base.h | 1 + cpp/src/common/allocator/mem_alloc.cc | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cpp/src/common/allocator/alloc_base.h b/cpp/src/common/allocator/alloc_base.h index d34c9cff7..facfd8081 100644 --- a/cpp/src/common/allocator/alloc_base.h +++ b/cpp/src/common/allocator/alloc_base.h @@ -69,6 +69,7 @@ void* mem_realloc(void* ptr, uint32_t size); class ModStat { public: ModStat() : stat_arr_(NULL) {} + ~ModStat() { destroy(); } static ModStat& get_instance() { static ModStat gms; diff --git a/cpp/src/common/allocator/mem_alloc.cc b/cpp/src/common/allocator/mem_alloc.cc index 42126660b..49eb46e79 100644 --- a/cpp/src/common/allocator/mem_alloc.cc +++ b/cpp/src/common/allocator/mem_alloc.cc @@ -170,7 +170,10 @@ void ModStat::init() { } } -void ModStat::destroy() { ::free(stat_arr_); } +void ModStat::destroy() { + ::free(stat_arr_); + stat_arr_ = NULL; +} void ModStat::print_stat() { if (stat_arr_ == NULL) return; From 1df055f1359b3cca4ea4c1f77e15d52f79da308c Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 19 Mar 2026 15:06:28 +0800 Subject: [PATCH 5/5] fix memory leak. --- cpp/src/common/allocator/mem_alloc.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/src/common/allocator/mem_alloc.cc b/cpp/src/common/allocator/mem_alloc.cc index 49eb46e79..524287e75 100644 --- a/cpp/src/common/allocator/mem_alloc.cc +++ b/cpp/src/common/allocator/mem_alloc.cc @@ -163,6 +163,9 @@ void* mem_realloc(void* ptr, uint32_t size) { } void ModStat::init() { + if (stat_arr_ != NULL) { + return; + } stat_arr_ = (int32_t*)(::malloc(ITEM_SIZE * ITEM_COUNT)); for (int8_t i = 0; i < __LAST_MOD_ID; i++) { int32_t* item = get_item(i);