diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index f2d09e53b..eaaaaf841 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -98,10 +98,9 @@ int Tablet::init() { case BLOB: case TEXT: case STRING: { - value_matrix_[c].string_data = - static_cast(common::mem_alloc( - sizeof(String) * max_row_num_, common::MOD_TABLET)); - if (value_matrix_[c].string_data == nullptr) return E_OOM; + auto* sc = new StringColumn(); + sc->init(max_row_num_, max_row_num_ * 32); + value_matrix_[c].string_col = sc; break; } default: @@ -117,6 +116,7 @@ int Tablet::init() { new (&bitmaps_[c]) BitMap(); bitmaps_[c].init(max_row_num_, false); } + return E_OK; } @@ -150,7 +150,8 @@ void Tablet::destroy() { case BLOB: case TEXT: case STRING: - common::mem_free(value_matrix_[c].string_data); + value_matrix_[c].string_col->destroy(); + delete value_matrix_[c].string_col; break; default: break; @@ -293,8 +294,7 @@ void* Tablet::get_value(int row_index, uint32_t schema_index, return &double_values[row_index]; } case STRING: { - auto string_values = column_values.string_data; - return &string_values[row_index]; + return &column_values.string_col->get_string_view(row_index); } default: return nullptr; @@ -304,8 +304,8 @@ void* Tablet::get_value(int row_index, uint32_t schema_index, template <> void Tablet::process_val(uint32_t row_index, uint32_t schema_index, common::String str) { - value_matrix_[schema_index].string_data[row_index].dup_from(str, - page_arena_); + value_matrix_[schema_index].string_col->append(row_index, str.buf_, + str.len_); bitmaps_[schema_index].clear(row_index); /* mark as non-null */ } @@ -444,6 +444,57 @@ void Tablet::set_column_categories( } } +void Tablet::reset_string_columns() { + size_t schema_count = schema_vec_->size(); + for (size_t c = 0; c < schema_count; c++) { + const MeasurementSchema& schema = schema_vec_->at(c); + if (schema.data_type_ == STRING || schema.data_type_ == TEXT || + schema.data_type_ == BLOB) { + value_matrix_[c].string_col->reset(); + } + } +} + +std::vector Tablet::find_all_device_boundaries() const { + const uint32_t row_count = get_cur_row_size(); + if (row_count <= 1) return {}; + + // Use uint64_t bitmap instead of vector for faster set/test/scan. + const uint32_t nwords = (row_count + 63) / 64; + std::vector boundary(nwords, 0); + + for (auto col_idx : id_column_indexes_) { + const StringColumn& sc = *value_matrix_[col_idx].string_col; + const uint32_t* off = sc.offsets; + const char* buf = sc.buffer; + for (uint32_t i = 1; i < row_count; i++) { + if (boundary[i >> 6] & (1ULL << (i & 63))) continue; + uint32_t len_a = off[i] - off[i - 1]; + uint32_t len_b = off[i + 1] - off[i]; + if (len_a != len_b || + (len_a > 0 && + memcmp(buf + off[i - 1], buf + off[i], len_a) != 0)) { + boundary[i >> 6] |= (1ULL << (i & 63)); + } + } + } + + // Collect boundary positions using bitscan + std::vector result; + for (uint32_t w = 0; w < nwords; w++) { + uint64_t bits = boundary[w]; + while (bits) { + uint32_t bit = __builtin_ctzll(bits); + uint32_t idx = w * 64 + bit; + if (idx > 0 && idx < row_count) { + result.push_back(idx); + } + bits &= bits - 1; // clear lowest set bit + } + } + return result; +} + std::shared_ptr Tablet::get_device_id(int i) const { std::vector id_array; id_array.push_back(new std::string(insert_target_name_)); diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index e47aa5c42..b498cc866 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -46,6 +46,70 @@ class TabletColIterator; * with their associated metadata such as column names and types. */ class Tablet { + // Arrow-style string column: offsets + contiguous buffer. + // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i] + struct StringColumn { + uint32_t* offsets; // length: max_rows + 1 + char* buffer; // contiguous string data + uint32_t buf_capacity; // allocated buffer size + uint32_t buf_used; // bytes written so far + + StringColumn() + : offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0) {} + + void init(uint32_t max_rows, uint32_t init_buf_capacity) { + offsets = (uint32_t*)common::mem_alloc( + sizeof(uint32_t) * (max_rows + 1), common::MOD_DEFAULT); + offsets[0] = 0; + buf_capacity = init_buf_capacity; + buffer = + (char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT); + buf_used = 0; + } + + void destroy() { + if (offsets) common::mem_free(offsets); + offsets = nullptr; + if (buffer) common::mem_free(buffer); + buffer = nullptr; + buf_capacity = buf_used = 0; + } + + void reset() { + buf_used = 0; + if (offsets) offsets[0] = 0; + } + + void append(uint32_t row, const char* data, uint32_t len) { + // Grow buffer if needed + if (buf_used + len > buf_capacity) { + buf_capacity = buf_capacity * 2 + len; + buffer = (char*)common::mem_realloc(buffer, buf_capacity); + } + memcpy(buffer + buf_used, data, len); + offsets[row] = buf_used; + offsets[row + 1] = buf_used + len; + buf_used += len; + } + + const char* get_str(uint32_t row) const { + return buffer + offsets[row]; + } + uint32_t get_len(uint32_t row) const { + return offsets[row + 1] - offsets[row]; + } + // Return a String view for a given row. The returned reference is + // valid until the next call to get_string_view on this column. + common::String& get_string_view(uint32_t row) { + view_cache_.buf_ = buffer + offsets[row]; + view_cache_.len_ = offsets[row + 1] - offsets[row]; + return view_cache_; + } + + private: + common::String view_cache_; + }; + struct ValueMatrixEntry { union { int32_t* int32_data; @@ -53,7 +117,7 @@ class Tablet { float* float_data; double* double_data; bool* bool_data; - common::String* string_data; + StringColumn* string_col; }; }; @@ -220,6 +284,7 @@ class Tablet { void set_column_categories( const std::vector& column_categories); std::shared_ptr get_device_id(int i) const; + std::vector find_all_device_boundaries() const; /** * @brief Template function to add a value of type T to the specified row * and column by name. @@ -253,6 +318,8 @@ class Tablet { schema_map_ = schema_map; } + void reset_string_columns(); + friend class TabletColIterator; friend class TsFileWriter; friend struct MeasurementNamesFromTablet; @@ -265,7 +332,6 @@ class Tablet { private: template void process_val(uint32_t row_index, uint32_t schema_index, T val); - common::PageArena page_arena_{common::MOD_TABLET}; uint32_t max_row_num_; uint32_t cur_row_size_; std::string insert_target_name_; diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 85a816ef6..2afd983eb 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -942,6 +942,9 @@ int TsFileWriter::write_table(Tablet& tablet) { } } record_count_since_last_flush_ += tablet.cur_row_size_; + // Reset string column buffers so the tablet can be reused for the next + // batch without accumulating memory across writes. + tablet.reset_string_columns(); ret = check_memory_size_and_may_flush_chunks(); return ret; } @@ -949,10 +952,10 @@ int TsFileWriter::write_table(Tablet& tablet) { std::vector, int>> TsFileWriter::split_tablet_by_device(const Tablet& tablet) { std::vector, int>> result; - std::shared_ptr last_device_id = - std::make_shared("last_device_id"); + if (tablet.id_column_indexes_.empty()) { - result.emplace_back(std::move(last_device_id), 0); + auto sentinel = std::make_shared("last_device_id"); + result.emplace_back(std::move(sentinel), 0); std::vector id_array; id_array.push_back(new std::string(tablet.insert_target_name_)); auto res = std::make_shared(id_array); @@ -961,14 +964,22 @@ TsFileWriter::split_tablet_by_device(const Tablet& tablet) { return result; } - for (uint32_t i = 0; i < tablet.get_cur_row_size(); i++) { - std::shared_ptr cur_device_id(tablet.get_device_id(i)); - if (*cur_device_id != *last_device_id) { - result.emplace_back(std::move(last_device_id), i); - last_device_id = std::move(cur_device_id); - } + const uint32_t row_count = tablet.get_cur_row_size(); + if (row_count == 0) return result; + + auto sentinel = std::make_shared("last_device_id"); + result.emplace_back(std::move(sentinel), 0); + + auto boundaries = tablet.find_all_device_boundaries(); + + uint32_t seg_start = 0; + for (uint32_t b : boundaries) { + std::shared_ptr dev_id(tablet.get_device_id(seg_start)); + result.emplace_back(std::move(dev_id), b); + seg_start = b; } - result.emplace_back(std::move(last_device_id), tablet.get_cur_row_size()); + std::shared_ptr last_id(tablet.get_device_id(seg_start)); + result.emplace_back(std::move(last_id), row_count); return result; } @@ -1004,7 +1015,7 @@ int TsFileWriter::write_column(ChunkWriter* chunk_writer, const Tablet& tablet, col_notnull_bitmap, start_idx, end_idx); } else if (data_type == common::STRING) { ret = - write_typed_column(chunk_writer, timestamps, col_values.string_data, + write_typed_column(chunk_writer, timestamps, col_values.string_col, col_notnull_bitmap, start_idx, end_idx); } else { ASSERT(false); @@ -1069,8 +1080,8 @@ int TsFileWriter::value_write_column(ValueChunkWriter* value_chunk_writer, case common::TEXT: case common::BLOB: ret = write_typed_column(value_chunk_writer, timestamps, - (common::String*)col_values.string_data, - col_notnull_bitmap, start_idx, end_idx); + col_values.string_col, col_notnull_bitmap, + start_idx, end_idx); break; default: ret = E_NOT_SUPPORT; @@ -1148,10 +1159,21 @@ int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx) { - DO_WRITE_TYPED_COLUMN(); + int ret = E_OK; + for (uint32_t r = start_idx; r < end_idx; r++) { + if (LIKELY(!col_notnull_bitmap.test(r))) { + common::String val( + string_col->buffer + string_col->offsets[r], + string_col->offsets[r + 1] - string_col->offsets[r]); + if (RET_FAIL(chunk_writer->write(timestamps[r], val))) { + return ret; + } + } + } + return ret; } int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, @@ -1191,10 +1213,25 @@ int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx) { - DO_VALUE_WRITE_TYPED_COLUMN(); + int ret = E_OK; + for (uint32_t r = start_idx; r < end_idx; r++) { + common::String val(string_col->buffer + string_col->offsets[r], + string_col->offsets[r + 1] - string_col->offsets[r]); + if (LIKELY(col_notnull_bitmap.test(r))) { + if (RET_FAIL(value_chunk_writer->write(timestamps[r], val, true))) { + return ret; + } + } else { + if (RET_FAIL( + value_chunk_writer->write(timestamps[r], val, false))) { + return ret; + } + } + } + return ret; } // TODO make sure ret is meaningful to SDK user diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index ec8fe3f44..3c5ff25ce 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -139,7 +139,7 @@ class TsFileWriter { common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); int write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); @@ -200,7 +200,8 @@ class TsFileWriter { common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); int write_typed_column(ValueChunkWriter* value_chunk_writer, - int64_t* timestamps, common::String* col_values, + int64_t* timestamps, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx);