From b2ce32371a53ec03c533fd0f3e080b18449e7bb7 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Fri, 20 Mar 2026 00:36:10 +0800 Subject: [PATCH 1/4] refine tabet writing. --- cpp/src/common/tablet.cc | 71 ++++++++++++++++++++++++++++-- cpp/src/common/tablet.h | 67 +++++++++++++++++++++++++++- cpp/src/writer/tsfile_writer.cc | 77 +++++++++++++++++++++++++-------- cpp/src/writer/tsfile_writer.h | 5 ++- 4 files changed, 196 insertions(+), 24 deletions(-) diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index f2d09e53b..6af3b31eb 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -98,10 +98,16 @@ int Tablet::init() { case BLOB: case TEXT: case STRING: { +<<<<<<< HEAD value_matrix_[c].string_data = static_cast(common::mem_alloc( sizeof(String) * max_row_num_, common::MOD_TABLET)); if (value_matrix_[c].string_data == nullptr) return E_OOM; +======= + auto* sc = new StringColumn(); + sc->init(max_row_num_, max_row_num_ * 32); + value_matrix_[c].string_col = sc; +>>>>>>> 3322a03c (refine tabet writing.) break; } default: @@ -117,6 +123,7 @@ int Tablet::init() { new (&bitmaps_[c]) BitMap(); bitmaps_[c].init(max_row_num_, false); } + return E_OK; } @@ -150,7 +157,12 @@ void Tablet::destroy() { case BLOB: case TEXT: case STRING: +<<<<<<< HEAD common::mem_free(value_matrix_[c].string_data); +======= + value_matrix_[c].string_col->destroy(); + delete value_matrix_[c].string_col; +>>>>>>> 3322a03c (refine tabet writing.) break; default: break; @@ -293,8 +305,7 @@ void* Tablet::get_value(int row_index, uint32_t schema_index, return &double_values[row_index]; } case STRING: { - auto string_values = column_values.string_data; - return &string_values[row_index]; + return &column_values.string_col->get_string_view(row_index); } default: return nullptr; @@ -304,8 +315,8 @@ void* Tablet::get_value(int row_index, uint32_t schema_index, template <> void Tablet::process_val(uint32_t row_index, uint32_t schema_index, common::String str) { - value_matrix_[schema_index].string_data[row_index].dup_from(str, - page_arena_); + value_matrix_[schema_index].string_col->append(row_index, str.buf_, + str.len_); bitmaps_[schema_index].clear(row_index); /* mark as non-null */ } @@ -444,6 +455,58 @@ void Tablet::set_column_categories( } } +void Tablet::reset_string_columns() { + size_t schema_count = schema_vec_->size(); + for (size_t c = 0; c < schema_count; c++) { + const MeasurementSchema& schema = schema_vec_->at(c); + if (schema.data_type_ == STRING || schema.data_type_ == TEXT || + schema.data_type_ == BLOB) { + value_matrix_[c].string_col->reset(); + } + } + page_arena_.reset(); +} + +std::vector Tablet::find_all_device_boundaries() const { + const uint32_t row_count = get_cur_row_size(); + if (row_count <= 1) return {}; + + // Use uint64_t bitmap instead of vector for faster set/test/scan. + const uint32_t nwords = (row_count + 63) / 64; + std::vector boundary(nwords, 0); + + for (auto col_idx : id_column_indexes_) { + const StringColumn& sc = *value_matrix_[col_idx].string_col; + const uint32_t* off = sc.offsets; + const char* buf = sc.buffer; + for (uint32_t i = 1; i < row_count; i++) { + if (boundary[i >> 6] & (1ULL << (i & 63))) continue; + uint32_t len_a = off[i] - off[i - 1]; + uint32_t len_b = off[i + 1] - off[i]; + if (len_a != len_b || + (len_a > 0 && + memcmp(buf + off[i - 1], buf + off[i], len_a) != 0)) { + boundary[i >> 6] |= (1ULL << (i & 63)); + } + } + } + + // Collect boundary positions using bitscan + std::vector result; + for (uint32_t w = 0; w < nwords; w++) { + uint64_t bits = boundary[w]; + while (bits) { + uint32_t bit = __builtin_ctzll(bits); + uint32_t idx = w * 64 + bit; + if (idx > 0 && idx < row_count) { + result.push_back(idx); + } + bits &= bits - 1; // clear lowest set bit + } + } + return result; +} + std::shared_ptr Tablet::get_device_id(int i) const { std::vector id_array; id_array.push_back(new std::string(insert_target_name_)); diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index e47aa5c42..a81dd99f7 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -46,6 +46,68 @@ class TabletColIterator; * with their associated metadata such as column names and types. */ class Tablet { + // Arrow-style string column: offsets + contiguous buffer. + // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i] + struct StringColumn { + uint32_t* offsets; // length: max_rows + 1 + char* buffer; // contiguous string data + uint32_t buf_capacity; // allocated buffer size + uint32_t buf_used; // bytes written so far + + StringColumn() + : offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0) {} + + void init(uint32_t max_rows, uint32_t init_buf_capacity) { + offsets = (uint32_t*)malloc(sizeof(uint32_t) * (max_rows + 1)); + offsets[0] = 0; + buf_capacity = init_buf_capacity; + buffer = (char*)malloc(buf_capacity); + buf_used = 0; + } + + void destroy() { + free(offsets); + offsets = nullptr; + free(buffer); + buffer = nullptr; + buf_capacity = buf_used = 0; + } + + void reset() { + buf_used = 0; + if (offsets) offsets[0] = 0; + } + + void append(uint32_t row, const char* data, uint32_t len) { + // Grow buffer if needed + while (buf_used + len > buf_capacity) { + buf_capacity = buf_capacity * 2 + len; + buffer = (char*)realloc(buffer, buf_capacity); + } + memcpy(buffer + buf_used, data, len); + offsets[row] = buf_used; + offsets[row + 1] = buf_used + len; + buf_used += len; + } + + const char* get_str(uint32_t row) const { + return buffer + offsets[row]; + } + uint32_t get_len(uint32_t row) const { + return offsets[row + 1] - offsets[row]; + } + // Return a String view for a given row. The returned reference is + // valid until the next call to get_string_view on this column. + common::String& get_string_view(uint32_t row) { + view_cache_.buf_ = buffer + offsets[row]; + view_cache_.len_ = offsets[row + 1] - offsets[row]; + return view_cache_; + } + + private: + common::String view_cache_; + }; + struct ValueMatrixEntry { union { int32_t* int32_data; @@ -53,7 +115,7 @@ class Tablet { float* float_data; double* double_data; bool* bool_data; - common::String* string_data; + StringColumn* string_col; }; }; @@ -220,6 +282,7 @@ class Tablet { void set_column_categories( const std::vector& column_categories); std::shared_ptr get_device_id(int i) const; + std::vector find_all_device_boundaries() const; /** * @brief Template function to add a value of type T to the specified row * and column by name. @@ -253,6 +316,8 @@ class Tablet { schema_map_ = schema_map; } + void reset_string_columns(); + friend class TabletColIterator; friend class TsFileWriter; friend struct MeasurementNamesFromTablet; diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 85a816ef6..0c336b138 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -942,6 +942,10 @@ int TsFileWriter::write_table(Tablet& tablet) { } } record_count_since_last_flush_ += tablet.cur_row_size_; + // Release string memory accumulated during this write. + // The page_arena_ holds all dup_from'd String buffers which are no longer + // needed after the data has been encoded into chunks. + tablet.reset_string_columns(); ret = check_memory_size_and_may_flush_chunks(); return ret; } @@ -949,10 +953,11 @@ int TsFileWriter::write_table(Tablet& tablet) { std::vector, int>> TsFileWriter::split_tablet_by_device(const Tablet& tablet) { std::vector, int>> result; - std::shared_ptr last_device_id = - std::make_shared("last_device_id"); + if (tablet.id_column_indexes_.empty()) { - result.emplace_back(std::move(last_device_id), 0); + // No ID columns — entire tablet is one device + auto sentinel = std::make_shared("last_device_id"); + result.emplace_back(std::move(sentinel), 0); std::vector id_array; id_array.push_back(new std::string(tablet.insert_target_name_)); auto res = std::make_shared(id_array); @@ -961,14 +966,26 @@ TsFileWriter::split_tablet_by_device(const Tablet& tablet) { return result; } - for (uint32_t i = 0; i < tablet.get_cur_row_size(); i++) { - std::shared_ptr cur_device_id(tablet.get_device_id(i)); - if (*cur_device_id != *last_device_id) { - result.emplace_back(std::move(last_device_id), i); - last_device_id = std::move(cur_device_id); - } + const uint32_t row_count = tablet.get_cur_row_size(); + if (row_count == 0) return result; + + // Sentinel entry (end_idx == 0, will be skipped by caller) + auto sentinel = std::make_shared("last_device_id"); + result.emplace_back(std::move(sentinel), 0); + + // Column-oriented scan: find all boundaries, then construct DeviceID + // only once per device segment. + auto boundaries = tablet.find_all_device_boundaries(); + + uint32_t seg_start = 0; + for (uint32_t b : boundaries) { + std::shared_ptr dev_id(tablet.get_device_id(seg_start)); + result.emplace_back(std::move(dev_id), b); + seg_start = b; } - result.emplace_back(std::move(last_device_id), tablet.get_cur_row_size()); + // Last segment + std::shared_ptr last_id(tablet.get_device_id(seg_start)); + result.emplace_back(std::move(last_id), row_count); return result; } @@ -1004,7 +1021,7 @@ int TsFileWriter::write_column(ChunkWriter* chunk_writer, const Tablet& tablet, col_notnull_bitmap, start_idx, end_idx); } else if (data_type == common::STRING) { ret = - write_typed_column(chunk_writer, timestamps, col_values.string_data, + write_typed_column(chunk_writer, timestamps, col_values.string_col, col_notnull_bitmap, start_idx, end_idx); } else { ASSERT(false); @@ -1069,8 +1086,8 @@ int TsFileWriter::value_write_column(ValueChunkWriter* value_chunk_writer, case common::TEXT: case common::BLOB: ret = write_typed_column(value_chunk_writer, timestamps, - (common::String*)col_values.string_data, - col_notnull_bitmap, start_idx, end_idx); + col_values.string_col, col_notnull_bitmap, + start_idx, end_idx); break; default: ret = E_NOT_SUPPORT; @@ -1148,10 +1165,21 @@ int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx) { - DO_WRITE_TYPED_COLUMN(); + int ret = E_OK; + for (uint32_t r = start_idx; r < end_idx; r++) { + if (LIKELY(!col_notnull_bitmap.test(r))) { + common::String val( + string_col->buffer + string_col->offsets[r], + string_col->offsets[r + 1] - string_col->offsets[r]); + if (RET_FAIL(chunk_writer->write(timestamps[r], val))) { + return ret; + } + } + } + return ret; } int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, @@ -1191,10 +1219,25 @@ int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx) { - DO_VALUE_WRITE_TYPED_COLUMN(); + int ret = E_OK; + for (uint32_t r = start_idx; r < end_idx; r++) { + common::String val(string_col->buffer + string_col->offsets[r], + string_col->offsets[r + 1] - string_col->offsets[r]); + if (LIKELY(col_notnull_bitmap.test(r))) { + if (RET_FAIL(value_chunk_writer->write(timestamps[r], val, true))) { + return ret; + } + } else { + if (RET_FAIL( + value_chunk_writer->write(timestamps[r], val, false))) { + return ret; + } + } + } + return ret; } // TODO make sure ret is meaningful to SDK user diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index ec8fe3f44..3c5ff25ce 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -139,7 +139,7 @@ class TsFileWriter { common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); int write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); @@ -200,7 +200,8 @@ class TsFileWriter { common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); int write_typed_column(ValueChunkWriter* value_chunk_writer, - int64_t* timestamps, common::String* col_values, + int64_t* timestamps, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); From 24590afbbf9f5d54e6868dbe29ec7882bf7b86d0 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Fri, 20 Mar 2026 08:42:34 +0800 Subject: [PATCH 2/4] fix some code. --- cpp/src/common/tablet.cc | 1 - cpp/src/common/tablet.h | 14 +++++++------- cpp/src/writer/tsfile_writer.cc | 10 ++-------- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 6af3b31eb..0b11385d2 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -464,7 +464,6 @@ void Tablet::reset_string_columns() { value_matrix_[c].string_col->reset(); } } - page_arena_.reset(); } std::vector Tablet::find_all_device_boundaries() const { diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index a81dd99f7..b2259a7e5 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -58,17 +58,18 @@ class Tablet { : offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0) {} void init(uint32_t max_rows, uint32_t init_buf_capacity) { - offsets = (uint32_t*)malloc(sizeof(uint32_t) * (max_rows + 1)); + offsets = (uint32_t*)common::mem_alloc( + sizeof(uint32_t) * (max_rows + 1), common::MOD_DEFAULT); offsets[0] = 0; buf_capacity = init_buf_capacity; - buffer = (char*)malloc(buf_capacity); + buffer = (char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT); buf_used = 0; } void destroy() { - free(offsets); + if (offsets) common::mem_free(offsets); offsets = nullptr; - free(buffer); + if (buffer) common::mem_free(buffer); buffer = nullptr; buf_capacity = buf_used = 0; } @@ -80,9 +81,9 @@ class Tablet { void append(uint32_t row, const char* data, uint32_t len) { // Grow buffer if needed - while (buf_used + len > buf_capacity) { + if (buf_used + len > buf_capacity) { buf_capacity = buf_capacity * 2 + len; - buffer = (char*)realloc(buffer, buf_capacity); + buffer = (char*)common::mem_realloc(buffer, buf_capacity); } memcpy(buffer + buf_used, data, len); offsets[row] = buf_used; @@ -330,7 +331,6 @@ class Tablet { private: template void process_val(uint32_t row_index, uint32_t schema_index, T val); - common::PageArena page_arena_{common::MOD_TABLET}; uint32_t max_row_num_; uint32_t cur_row_size_; std::string insert_target_name_; diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 0c336b138..2afd983eb 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -942,9 +942,8 @@ int TsFileWriter::write_table(Tablet& tablet) { } } record_count_since_last_flush_ += tablet.cur_row_size_; - // Release string memory accumulated during this write. - // The page_arena_ holds all dup_from'd String buffers which are no longer - // needed after the data has been encoded into chunks. + // Reset string column buffers so the tablet can be reused for the next + // batch without accumulating memory across writes. tablet.reset_string_columns(); ret = check_memory_size_and_may_flush_chunks(); return ret; @@ -955,7 +954,6 @@ TsFileWriter::split_tablet_by_device(const Tablet& tablet) { std::vector, int>> result; if (tablet.id_column_indexes_.empty()) { - // No ID columns — entire tablet is one device auto sentinel = std::make_shared("last_device_id"); result.emplace_back(std::move(sentinel), 0); std::vector id_array; @@ -969,12 +967,9 @@ TsFileWriter::split_tablet_by_device(const Tablet& tablet) { const uint32_t row_count = tablet.get_cur_row_size(); if (row_count == 0) return result; - // Sentinel entry (end_idx == 0, will be skipped by caller) auto sentinel = std::make_shared("last_device_id"); result.emplace_back(std::move(sentinel), 0); - // Column-oriented scan: find all boundaries, then construct DeviceID - // only once per device segment. auto boundaries = tablet.find_all_device_boundaries(); uint32_t seg_start = 0; @@ -983,7 +978,6 @@ TsFileWriter::split_tablet_by_device(const Tablet& tablet) { result.emplace_back(std::move(dev_id), b); seg_start = b; } - // Last segment std::shared_ptr last_id(tablet.get_device_id(seg_start)); result.emplace_back(std::move(last_id), row_count); return result; From 7cca9aa7043905731f58b3906de89e60ababef58 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Fri, 20 Mar 2026 08:51:34 +0800 Subject: [PATCH 3/4] fix format. --- cpp/src/common/tablet.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index b2259a7e5..b498cc866 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -62,7 +62,8 @@ class Tablet { sizeof(uint32_t) * (max_rows + 1), common::MOD_DEFAULT); offsets[0] = 0; buf_capacity = init_buf_capacity; - buffer = (char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT); + buffer = + (char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT); buf_used = 0; } From dbf86df8db85c6062b0b7283d1515ff1a97eb2e5 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Wed, 25 Mar 2026 16:01:59 +0800 Subject: [PATCH 4/4] fix conflict. --- cpp/src/common/tablet.cc | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 0b11385d2..eaaaaf841 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -98,16 +98,9 @@ int Tablet::init() { case BLOB: case TEXT: case STRING: { -<<<<<<< HEAD - value_matrix_[c].string_data = - static_cast(common::mem_alloc( - sizeof(String) * max_row_num_, common::MOD_TABLET)); - if (value_matrix_[c].string_data == nullptr) return E_OOM; -======= auto* sc = new StringColumn(); sc->init(max_row_num_, max_row_num_ * 32); value_matrix_[c].string_col = sc; ->>>>>>> 3322a03c (refine tabet writing.) break; } default: @@ -157,12 +150,8 @@ void Tablet::destroy() { case BLOB: case TEXT: case STRING: -<<<<<<< HEAD - common::mem_free(value_matrix_[c].string_data); -======= value_matrix_[c].string_col->destroy(); delete value_matrix_[c].string_col; ->>>>>>> 3322a03c (refine tabet writing.) break; default: break;