diff --git a/cpp/src/common/allocator/byte_stream.h b/cpp/src/common/allocator/byte_stream.h index 4e1029ea4..7abc2b4f0 100644 --- a/cpp/src/common/allocator/byte_stream.h +++ b/cpp/src/common/allocator/byte_stream.h @@ -253,6 +253,8 @@ class ByteStream { }; public: + static const uint32_t DEFAULT_PAGE_SIZE = 1024; + ByteStream(uint32_t page_size, AllocModID mid, bool enable_atomic = false, BaseAllocator& allocator = g_base_allocator) : allocator_(allocator), @@ -263,10 +265,9 @@ class ByteStream { read_pos_(0), marked_read_pos_(0), page_size_(page_size), + page_mask_(page_size - 1), mid_(mid), - wrapped_page_(false, nullptr) { - // assert(page_size >= 16); // commented out by gxh on 2023.03.09 - } + wrapped_page_(false, nullptr) {} // TODO use a specific construct function to mark it as wrapped use. // for wrap plain buffer to ByteStream @@ -279,6 +280,7 @@ class ByteStream { read_pos_(0), marked_read_pos_(0), page_size_(0), + page_mask_(0), mid_(MOD_DEFAULT), wrapped_page_(false, nullptr) {} @@ -292,6 +294,7 @@ class ByteStream { wrapped_page_.buf_ = (uint8_t*)buf; page_size_ = buf_len; + page_mask_ = buf_len - 1; head_.store(&wrapped_page_); tail_.store(&wrapped_page_); total_size_.store(buf_len); @@ -340,6 +343,7 @@ class ByteStream { // never used TODO void shallow_clone_from(ByteStream& other) { this->page_size_ = other.page_size_; + this->page_mask_ = other.page_mask_; this->mid_ = other.mid_; this->head_.store(other.head_.load()); this->tail_.store(other.tail_.load()); @@ -366,10 +370,10 @@ class ByteStream { std::cout << "write_buf error " << ret << std::endl; return ret; } - uint32_t remainder = page_size_ - (total_size_.load() % page_size_); + uint32_t remainder = page_size_ - (total_size_.load() & page_mask_); uint32_t copy_len = remainder < (len - write_len) ? remainder : (len - write_len); - memcpy(tail_.load()->buf_ + total_size_.load() % page_size_, + memcpy(tail_.load()->buf_ + (total_size_.load() & page_mask_), buf + write_len, copy_len); total_size_.atomic_aaf(copy_len); write_len += copy_len; @@ -390,11 +394,11 @@ class ByteStream { if (RET_FAIL(check_space())) { return ret; } - uint32_t remainder = page_size_ - (read_pos_ % page_size_); + uint32_t remainder = page_size_ - (read_pos_ & page_mask_); uint32_t copy_len = remainder < want_len_limited - read_len ? remainder : want_len_limited - read_len; - memcpy(buf + read_len, read_page_->buf_ + (read_pos_ % page_size_), + memcpy(buf + read_len, read_page_->buf_ + (read_pos_ & page_mask_), copy_len); read_len += copy_len; read_pos_ += copy_len; @@ -446,16 +450,17 @@ class ByteStream { return b; } b.buf_ = - (char*)(tail_.load()->buf_ + (total_size_.load() % page_size_)); - b.len_ = page_size_ - (total_size_.load() % page_size_); + (char*)(tail_.load()->buf_ + (total_size_.load() & page_mask_)); + b.len_ = page_size_ - (total_size_.load() & page_mask_); return b; } void buffer_used(uint32_t used_bytes) { ASSERT(used_bytes >= 1); // would not span page - ASSERT((total_size_.load() / page_size_) == - ((total_size_.load() + used_bytes - 1) / page_size_)); + ASSERT(page_size_ == 0 || + (total_size_.load() / page_size_) == + ((total_size_.load() + used_bytes - 1) / page_size_)); total_size_.atomic_aaf(used_bytes); } @@ -471,7 +476,7 @@ class ByteStream { if (RET_FAIL(prepare_space())) { return ret; } - uint32_t remainder = page_size_ - (total_size_.load() % page_size_); + uint32_t remainder = page_size_ - (total_size_.load() & page_mask_); uint32_t step = remainder < (len - advanced) ? remainder : (len - advanced); total_size_.atomic_aaf(step); @@ -501,8 +506,8 @@ class ByteStream { if (cur_ != nullptr) { b.buf_ = (char*)cur_->buf_; if (cur_ == end_ && - host_.total_size_.load() % host_.page_size_ != 0) { - b.len_ = host_.total_size_.load() % host_.page_size_; + (host_.total_size_.load() & host_.page_mask_) != 0) { + b.len_ = host_.total_size_.load() & host_.page_mask_; } else { b.len_ = host_.page_size_; } @@ -559,7 +564,7 @@ class ByteStream { while (true) { if (cur_ == host_end) { - if (host_total_size % host_.page_size_ == 0) { + if ((host_total_size & host_.page_mask_) == 0) { if (read_offset_within_cur_page_ == host_.page_size_) { return b; } else { @@ -573,15 +578,15 @@ class ByteStream { } } else { if (read_offset_within_cur_page_ == - (host_total_size % host_.page_size_)) { + (host_total_size & host_.page_mask_)) { return b; } else { b.buf_ = ((char*)(cur_->buf_)) + read_offset_within_cur_page_; - b.len_ = (host_total_size % host_.page_size_) - + b.len_ = (host_total_size & host_.page_mask_) - read_offset_within_cur_page_; read_offset_within_cur_page_ = - (host_total_size % host_.page_size_); + (host_total_size & host_.page_mask_); total_end_offset_ += b.len_; return b; } @@ -611,7 +616,7 @@ class ByteStream { FORCE_INLINE int prepare_space() { int ret = common::E_OK; if (UNLIKELY(tail_.load() == nullptr || - total_size_.load() % page_size_ == 0)) { + (total_size_.load() & page_mask_) == 0)) { Page* p = nullptr; if (RET_FAIL(alloc_page(p))) { return ret; @@ -628,7 +633,7 @@ class ByteStream { } if (UNLIKELY(read_page_ == nullptr)) { read_page_ = head_.load(); - } else if (UNLIKELY(read_pos_ % page_size_ == 0)) { + } else if (UNLIKELY((read_pos_ & page_mask_) == 0)) { read_page_ = read_page_->next_.load(); } if (UNLIKELY(read_page_ == nullptr)) { @@ -668,6 +673,7 @@ class ByteStream { uint32_t read_pos_; // current reader position uint32_t marked_read_pos_; // current reader position uint32_t page_size_; + uint32_t page_mask_; // page_size_ - 1, for bitwise AND instead of modulo AllocModID mid_; Page wrapped_page_; }; diff --git a/cpp/src/common/container/bit_map.h b/cpp/src/common/container/bit_map.h index 356932d14..ea4fd1a5b 100644 --- a/cpp/src/common/container/bit_map.h +++ b/cpp/src/common/container/bit_map.h @@ -64,6 +64,9 @@ class BitMap { return (*start_addr & bit_mask); } + // Set all bits to 0 (all non-null in TsFile convention where bit=1 is null) + FORCE_INLINE void clear_all() { memset(bitmap_, 0x00, size_); } + FORCE_INLINE uint32_t get_size() { return size_; } FORCE_INLINE char* get_bitmap() { return bitmap_; } // for debug diff --git a/cpp/src/common/statistic.h b/cpp/src/common/statistic.h index f6d53c206..7128b43b8 100644 --- a/cpp/src/common/statistic.h +++ b/cpp/src/common/statistic.h @@ -22,12 +22,18 @@ #include +#include #include #include "common/allocator/alloc_base.h" #include "common/allocator/byte_stream.h" #include "common/db_common.h" +#if defined(__ARM_NEON) || defined(__ARM_NEON__) +#include +#define TSFILE_HAS_NEON 1 +#endif + namespace storage { /* @@ -176,6 +182,48 @@ class Statistic { } virtual FORCE_INLINE void update(int64_t time) { ASSERT(false); } + virtual void update_time_batch(const int64_t* timestamps, uint32_t count) { + for (uint32_t i = 0; i < count; i++) { + update(timestamps[i]); + } + } + virtual void update_batch(const int64_t* timestamps, + const bool* values, uint32_t count) { + for (uint32_t i = 0; i < count; i++) { + update(timestamps[i], values[i]); + } + } + virtual void update_batch(const int64_t* timestamps, + const int32_t* values, uint32_t count) { + for (uint32_t i = 0; i < count; i++) { + update(timestamps[i], values[i]); + } + } + virtual void update_batch(const int64_t* timestamps, + const int64_t* values, uint32_t count) { + for (uint32_t i = 0; i < count; i++) { + update(timestamps[i], values[i]); + } + } + virtual void update_batch(const int64_t* timestamps, + const float* values, uint32_t count) { + for (uint32_t i = 0; i < count; i++) { + update(timestamps[i], values[i]); + } + } + virtual void update_batch(const int64_t* timestamps, + const double* values, uint32_t count) { + for (uint32_t i = 0; i < count; i++) { + update(timestamps[i], values[i]); + } + } + virtual void update_batch(const int64_t* timestamps, + const common::String* values, uint32_t count) { + for (uint32_t i = 0; i < count; i++) { + update(timestamps[i], values[i]); + } + } + virtual int serialize_to(common::ByteStream& out) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::write_var_uint(count_, out))) { @@ -638,6 +686,32 @@ class Int32Statistic : public Statistic { NUM_STAT_UPDATE(time, value); } + void update_batch(const int64_t* timestamps, const int32_t* values, + uint32_t count) override { + if (count == 0) return; + uint32_t start = 0; + if (count_ == 0) { + start_time_ = timestamps[0]; + end_time_ = timestamps[0]; + first_value_ = values[0]; + last_value_ = values[0]; + min_value_ = values[0]; + max_value_ = values[0]; + sum_value_ = (int64_t)values[0]; + count_ = 1; + start = 1; + } + for (uint32_t i = start; i < count; i++) { + if (timestamps[i] < start_time_) start_time_ = timestamps[i]; + if (timestamps[i] > end_time_) end_time_ = timestamps[i]; + if (values[i] < min_value_) min_value_ = values[i]; + if (values[i] > max_value_) max_value_ = values[i]; + sum_value_ += (int64_t)values[i]; + } + last_value_ = values[count - 1]; + count_ += (count - start); + } + FORCE_INLINE common::TSDataType get_type() { return common::INT32; } int serialize_typed_stat(common::ByteStream& out) { @@ -738,6 +812,60 @@ class Int64Statistic : public Statistic { NUM_STAT_UPDATE(time, value); } + void update_batch(const int64_t* timestamps, const int64_t* values, + uint32_t count) override { + if (count == 0) return; + uint32_t start = 0; + if (count_ == 0) { + start_time_ = timestamps[0]; + end_time_ = timestamps[0]; + first_value_ = values[0]; + last_value_ = values[0]; + min_value_ = values[0]; + max_value_ = values[0]; + sum_value_ = (double)values[0]; + count_ = 1; + start = 1; + } + // Timestamps are monotonic (verified by TimePageWriter), + // so only first/last matter for start_time_/end_time_. + if (count > start) { + if (timestamps[start] < start_time_) + start_time_ = timestamps[start]; + if (timestamps[count - 1] > end_time_) + end_time_ = timestamps[count - 1]; + } + uint32_t i = start; +#if TSFILE_HAS_NEON + { + int64x2_t vmin = vdupq_n_s64(min_value_); + int64x2_t vmax = vdupq_n_s64(max_value_); + float64x2_t vsum = vdupq_n_f64(0.0); + for (; i + 2 <= count; i += 2) { + int64x2_t v = vld1q_s64(&values[i]); + // min/max via compare+select (no vminq_s64 in NEON) + uint64x2_t lt = vcltq_s64(v, vmin); + vmin = vbslq_s64(lt, v, vmin); + uint64x2_t gt = vcgtq_s64(v, vmax); + vmax = vbslq_s64(gt, v, vmax); + vsum = vaddq_f64(vsum, vcvtq_f64_s64(v)); + } + min_value_ = std::min(vgetq_lane_s64(vmin, 0), + vgetq_lane_s64(vmin, 1)); + max_value_ = std::max(vgetq_lane_s64(vmax, 0), + vgetq_lane_s64(vmax, 1)); + sum_value_ += vgetq_lane_f64(vsum, 0) + vgetq_lane_f64(vsum, 1); + } +#endif + for (; i < count; i++) { + if (values[i] < min_value_) min_value_ = values[i]; + if (values[i] > max_value_) max_value_ = values[i]; + sum_value_ += (double)values[i]; + } + last_value_ = values[count - 1]; + count_ += (count - start); + } + FORCE_INLINE common::TSDataType get_type() { return common::INT64; } int serialize_typed_stat(common::ByteStream& out) { @@ -904,6 +1032,55 @@ class DoubleStatistic : public Statistic { NUM_STAT_UPDATE(time, value); } + void update_batch(const int64_t* timestamps, const double* values, + uint32_t count) override { + if (count == 0) return; + uint32_t start = 0; + if (count_ == 0) { + start_time_ = timestamps[0]; + end_time_ = timestamps[0]; + first_value_ = values[0]; + last_value_ = values[0]; + min_value_ = values[0]; + max_value_ = values[0]; + sum_value_ = values[0]; + count_ = 1; + start = 1; + } + if (count > start) { + if (timestamps[start] < start_time_) + start_time_ = timestamps[start]; + if (timestamps[count - 1] > end_time_) + end_time_ = timestamps[count - 1]; + } + uint32_t i = start; +#if TSFILE_HAS_NEON + { + float64x2_t vmin = vdupq_n_f64(min_value_); + float64x2_t vmax = vdupq_n_f64(max_value_); + float64x2_t vsum = vdupq_n_f64(0.0); + for (; i + 2 <= count; i += 2) { + float64x2_t v = vld1q_f64(&values[i]); + vmin = vminq_f64(vmin, v); + vmax = vmaxq_f64(vmax, v); + vsum = vaddq_f64(vsum, v); + } + min_value_ = std::min(vgetq_lane_f64(vmin, 0), + vgetq_lane_f64(vmin, 1)); + max_value_ = std::max(vgetq_lane_f64(vmax, 0), + vgetq_lane_f64(vmax, 1)); + sum_value_ += vgetq_lane_f64(vsum, 0) + vgetq_lane_f64(vsum, 1); + } +#endif + for (; i < count; i++) { + if (values[i] < min_value_) min_value_ = values[i]; + if (values[i] > max_value_) max_value_ = values[i]; + sum_value_ += values[i]; + } + last_value_ = values[count - 1]; + count_ += (count - start); + } + FORCE_INLINE common::TSDataType get_type() { return common::DOUBLE; } int serialize_typed_stat(common::ByteStream& out) { @@ -971,6 +1148,21 @@ class TimeStatistic : public Statistic { count_++; } + void update_time_batch(const int64_t* timestamps, + uint32_t count) override { + if (count == 0) return; + if (count_ == 0) { + start_time_ = timestamps[0]; + end_time_ = timestamps[0]; + } + // Timestamps are already verified monotonic in TimePageWriter, + // so first element is min candidate and last is max candidate. + if (timestamps[0] < start_time_) start_time_ = timestamps[0]; + if (timestamps[count - 1] > end_time_) + end_time_ = timestamps[count - 1]; + count_ += count; + } + FORCE_INLINE common::TSDataType get_type() { return common::VECTOR; } int serialize_typed_stat(common::ByteStream& out) { return common::E_OK; } diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 10489f67d..71e189d3c 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -85,8 +85,9 @@ int Tablet::init() { case BLOB: case TEXT: case STRING: { - value_matrix_[c].string_data = - (common::String*)malloc(sizeof(String) * max_row_num_); + auto* sc = new StringColumn(); + sc->init(max_row_num_, max_row_num_ * 32); + value_matrix_[c].string_col = sc; break; } default: @@ -99,6 +100,7 @@ int Tablet::init() { for (size_t c = 0; c < schema_count; c++) { bitmaps_[c].init(max_row_num_, false); } + return E_OK; } @@ -132,7 +134,8 @@ void Tablet::destroy() { case BLOB: case TEXT: case STRING: - free(value_matrix_[c].string_data); + value_matrix_[c].string_col->destroy(); + delete value_matrix_[c].string_col; break; default: break; @@ -163,6 +166,96 @@ int Tablet::add_timestamp(uint32_t row_index, int64_t timestamp) { return E_OK; } +int Tablet::set_timestamps(const int64_t* timestamps, uint32_t count) { + if (err_code_ != E_OK) return err_code_; + ASSERT(timestamps_ != NULL); + if (UNLIKELY(count > static_cast(max_row_num_))) { + return E_OUT_OF_RANGE; + } + std::memcpy(timestamps_, timestamps, count * sizeof(int64_t)); + cur_row_size_ = std::max(count, cur_row_size_); + return E_OK; +} + +int Tablet::set_column_values(uint32_t schema_index, const void* data, + const uint8_t* bitmap, uint32_t count) { + if (err_code_ != E_OK) return err_code_; + if (UNLIKELY(schema_index >= schema_vec_->size())) return E_OUT_OF_RANGE; + if (UNLIKELY(count > static_cast(max_row_num_))) + return E_OUT_OF_RANGE; + + const MeasurementSchema& schema = schema_vec_->at(schema_index); + size_t elem_size = 0; + void* dst = nullptr; + switch (schema.data_type_) { + case BOOLEAN: + elem_size = sizeof(bool); + dst = value_matrix_[schema_index].bool_data; + break; + case DATE: + case INT32: + elem_size = sizeof(int32_t); + dst = value_matrix_[schema_index].int32_data; + break; + case TIMESTAMP: + case INT64: + elem_size = sizeof(int64_t); + dst = value_matrix_[schema_index].int64_data; + break; + case FLOAT: + elem_size = sizeof(float); + dst = value_matrix_[schema_index].float_data; + break; + case DOUBLE: + elem_size = sizeof(double); + dst = value_matrix_[schema_index].double_data; + break; + default: + return E_NOT_SUPPORT; + } + + std::memcpy(dst, data, count * elem_size); + if (bitmap == nullptr) { + bitmaps_[schema_index].clear_all(); + } else { + char* tsfile_bm = bitmaps_[schema_index].get_bitmap(); + uint32_t bm_bytes = (count + 7) / 8; + std::memcpy(tsfile_bm, bitmap, bm_bytes); + } + cur_row_size_ = std::max(count, cur_row_size_); + return E_OK; +} + +int Tablet::set_column_string_repeated(uint32_t schema_index, const char* str, + uint32_t str_len, uint32_t count) { + if (err_code_ != E_OK) return err_code_; + if (UNLIKELY(schema_index >= schema_vec_->size())) return E_OUT_OF_RANGE; + if (UNLIKELY(count > static_cast(max_row_num_))) + return E_OUT_OF_RANGE; + + StringColumn* sc = value_matrix_[schema_index].string_col; + if (sc == nullptr) return E_INVALID_ARG; + + // Pre-allocate buffer for all identical strings + uint32_t total_bytes = str_len * count; + if (total_bytes > sc->buf_capacity) { + sc->buf_capacity = total_bytes; + sc->buffer = (char*)mem_realloc(sc->buffer, sc->buf_capacity); + } + + // Fill offsets and buffer in bulk + for (uint32_t i = 0; i < count; i++) { + sc->offsets[i] = i * str_len; + memcpy(sc->buffer + i * str_len, str, str_len); + } + sc->offsets[count] = total_bytes; + sc->buf_used = total_bytes; + + bitmaps_[schema_index].clear_all(); + cur_row_size_ = std::max(count, cur_row_size_); + return E_OK; +} + void* Tablet::get_value(int row_index, uint32_t schema_index, common::TSDataType& data_type) const { if (UNLIKELY(schema_index >= schema_vec_->size())) { @@ -197,8 +290,7 @@ void* Tablet::get_value(int row_index, uint32_t schema_index, return &double_values[row_index]; } case STRING: { - auto string_values = column_values.string_data; - return &string_values[row_index]; + return &column_values.string_col->get_string_view(row_index); } default: return nullptr; @@ -208,8 +300,8 @@ void* Tablet::get_value(int row_index, uint32_t schema_index, template <> void Tablet::process_val(uint32_t row_index, uint32_t schema_index, common::String str) { - value_matrix_[schema_index].string_data[row_index].dup_from(str, - page_arena_); + value_matrix_[schema_index].string_col->append(row_index, str.buf_, + str.len_); bitmaps_[schema_index].clear(row_index); /* mark as non-null */ } @@ -348,6 +440,57 @@ void Tablet::set_column_categories( } } +void Tablet::reset_string_columns() { + size_t schema_count = schema_vec_->size(); + for (size_t c = 0; c < schema_count; c++) { + const MeasurementSchema& schema = schema_vec_->at(c); + if (schema.data_type_ == STRING || schema.data_type_ == TEXT || + schema.data_type_ == BLOB) { + value_matrix_[c].string_col->reset(); + } + } +} + +std::vector Tablet::find_all_device_boundaries() const { + const uint32_t row_count = get_cur_row_size(); + if (row_count <= 1) return {}; + + // Use uint64_t bitmap instead of vector for faster set/test/scan. + const uint32_t nwords = (row_count + 63) / 64; + std::vector boundary(nwords, 0); + + for (auto col_idx : id_column_indexes_) { + const StringColumn& sc = *value_matrix_[col_idx].string_col; + const uint32_t* off = sc.offsets; + const char* buf = sc.buffer; + for (uint32_t i = 1; i < row_count; i++) { + if (boundary[i >> 6] & (1ULL << (i & 63))) continue; + uint32_t len_a = off[i] - off[i - 1]; + uint32_t len_b = off[i + 1] - off[i]; + if (len_a != len_b || + (len_a > 0 && + memcmp(buf + off[i - 1], buf + off[i], len_a) != 0)) { + boundary[i >> 6] |= (1ULL << (i & 63)); + } + } + } + + // Collect boundary positions using bitscan + std::vector result; + for (uint32_t w = 0; w < nwords; w++) { + uint64_t bits = boundary[w]; + while (bits) { + uint32_t bit = __builtin_ctzll(bits); + uint32_t idx = w * 64 + bit; + if (idx > 0 && idx < row_count) { + result.push_back(idx); + } + bits &= bits - 1; // clear lowest set bit + } + } + return result; +} + std::shared_ptr Tablet::get_device_id(int i) const { std::vector id_array; id_array.push_back(new std::string(insert_target_name_)); diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index 04fee7643..9770db246 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -46,6 +46,70 @@ class TabletColIterator; * with their associated metadata such as column names and types. */ class Tablet { + // Arrow-style string column: offsets + contiguous buffer. + // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i] + struct StringColumn { + uint32_t* offsets; // length: max_rows + 1 + char* buffer; // contiguous string data + uint32_t buf_capacity; // allocated buffer size + uint32_t buf_used; // bytes written so far + + StringColumn() + : offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0) {} + + void init(uint32_t max_rows, uint32_t init_buf_capacity) { + offsets = (uint32_t*)common::mem_alloc( + sizeof(uint32_t) * (max_rows + 1), common::MOD_DEFAULT); + offsets[0] = 0; + buf_capacity = init_buf_capacity; + buffer = + (char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT); + buf_used = 0; + } + + void destroy() { + if (offsets) common::mem_free(offsets); + offsets = nullptr; + if (buffer) common::mem_free(buffer); + buffer = nullptr; + buf_capacity = buf_used = 0; + } + + void reset() { + buf_used = 0; + if (offsets) offsets[0] = 0; + } + + void append(uint32_t row, const char* data, uint32_t len) { + // Grow buffer if needed + if (buf_used + len > buf_capacity) { + buf_capacity = buf_capacity * 2 + len; + buffer = (char*)common::mem_realloc(buffer, buf_capacity); + } + memcpy(buffer + buf_used, data, len); + offsets[row] = buf_used; + offsets[row + 1] = buf_used + len; + buf_used += len; + } + + const char* get_str(uint32_t row) const { + return buffer + offsets[row]; + } + uint32_t get_len(uint32_t row) const { + return offsets[row + 1] - offsets[row]; + } + // Return a String view for a given row. The returned reference is + // valid until the next call to get_string_view on this column. + common::String& get_string_view(uint32_t row) { + view_cache_.buf_ = buffer + offsets[row]; + view_cache_.len_ = offsets[row + 1] - offsets[row]; + return view_cache_; + } + + private: + common::String view_cache_; + }; + struct ValueMatrixEntry { union { int32_t* int32_data; @@ -53,7 +117,7 @@ class Tablet { float* float_data; double* double_data; bool* bool_data; - common::String* string_data; + StringColumn* string_col; }; }; @@ -181,6 +245,18 @@ class Tablet { */ int add_timestamp(uint32_t row_index, int64_t timestamp); + // Bulk copy timestamps via memcpy. count must be <= max_row_num_. + int set_timestamps(const int64_t* timestamps, uint32_t count); + + // Bulk copy fixed-length column data. bitmap=nullptr means all non-null. + // bitmap uses TsFile convention: bit=1 is null, bit=0 is valid. + int set_column_values(uint32_t schema_index, const void* data, + const uint8_t* bitmap, uint32_t count); + + // Bulk fill a STRING column with the same value for all rows. + int set_column_string_repeated(uint32_t schema_index, const char* str, + uint32_t str_len, uint32_t count); + void* get_value(int row_index, uint32_t schema_index, common::TSDataType& data_type) const; /** @@ -201,6 +277,7 @@ class Tablet { void set_column_categories( const std::vector& column_categories); std::shared_ptr get_device_id(int i) const; + std::vector find_all_device_boundaries() const; /** * @brief Template function to add a value of type T to the specified row * and column by name. @@ -234,6 +311,8 @@ class Tablet { schema_map_ = schema_map; } + void reset_string_columns(); + friend class TabletColIterator; friend class TsFileWriter; friend struct MeasurementNamesFromTablet; @@ -246,7 +325,6 @@ class Tablet { private: template void process_val(uint32_t row_index, uint32_t schema_index, T val); - common::PageArena page_arena_; uint32_t max_row_num_; uint32_t cur_row_size_; std::string insert_target_name_; diff --git a/cpp/src/common/thread_pool.h b/cpp/src/common/thread_pool.h new file mode 100644 index 000000000..a2f728722 --- /dev/null +++ b/cpp/src/common/thread_pool.h @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef COMMON_THREAD_POOL_H +#define COMMON_THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include + +namespace common { + +class ThreadPool { + public: + explicit ThreadPool(size_t num_threads) : stop_(false) { + for (size_t i = 0; i < num_threads; i++) { + workers_.emplace_back([this] { + for (;;) { + std::function task; + { + std::unique_lock lock(mutex_); + cv_.wait(lock, + [this] { return stop_ || !tasks_.empty(); }); + if (stop_ && tasks_.empty()) return; + task = std::move(tasks_.front()); + tasks_.pop(); + } + task(); + } + }); + } + } + + ~ThreadPool() { + { + std::unique_lock lock(mutex_); + stop_ = true; + } + cv_.notify_all(); + for (auto& w : workers_) w.join(); + } + + template + std::future::type> submit(F&& f) { + using RetType = typename std::result_of::type; + auto task = std::make_shared>( + std::forward(f)); + std::future result = task->get_future(); + { + std::unique_lock lock(mutex_); + tasks_.emplace([task]() { (*task)(); }); + } + cv_.notify_one(); + return result; + } + + private: + std::vector workers_; + std::queue> tasks_; + std::mutex mutex_; + std::condition_variable cv_; + bool stop_; +}; + +} // namespace common + +#endif // COMMON_THREAD_POOL_H diff --git a/cpp/src/encoding/encoder.h b/cpp/src/encoding/encoder.h index 921686446..f34a05e64 100644 --- a/cpp/src/encoding/encoder.h +++ b/cpp/src/encoding/encoder.h @@ -48,6 +48,62 @@ class Encoder { * @return the maximal size of possible memory occupied by current encoder */ virtual int get_max_byte_size() = 0; + + /* + * Batch encoding interfaces. + * Default implementations fall back to per-value encode(). + * Subclasses may override for better performance. + */ + virtual int encode_batch(const bool* values, uint32_t count, + common::ByteStream& out_stream) { + int ret = common::E_OK; + for (uint32_t i = 0; i < count; i++) { + if (RET_FAIL(encode(values[i], out_stream))) { + return ret; + } + } + return ret; + } + virtual int encode_batch(const int32_t* values, uint32_t count, + common::ByteStream& out_stream) { + int ret = common::E_OK; + for (uint32_t i = 0; i < count; i++) { + if (RET_FAIL(encode(values[i], out_stream))) { + return ret; + } + } + return ret; + } + virtual int encode_batch(const int64_t* values, uint32_t count, + common::ByteStream& out_stream) { + int ret = common::E_OK; + for (uint32_t i = 0; i < count; i++) { + if (RET_FAIL(encode(values[i], out_stream))) { + return ret; + } + } + return ret; + } + virtual int encode_batch(const float* values, uint32_t count, + common::ByteStream& out_stream) { + int ret = common::E_OK; + for (uint32_t i = 0; i < count; i++) { + if (RET_FAIL(encode(values[i], out_stream))) { + return ret; + } + } + return ret; + } + virtual int encode_batch(const double* values, uint32_t count, + common::ByteStream& out_stream) { + int ret = common::E_OK; + for (uint32_t i = 0; i < count; i++) { + if (RET_FAIL(encode(values[i], out_stream))) { + return ret; + } + } + return ret; + } }; } // end namespace storage diff --git a/cpp/src/encoding/plain_encoder.h b/cpp/src/encoding/plain_encoder.h index b768c9bf0..687fa8916 100644 --- a/cpp/src/encoding/plain_encoder.h +++ b/cpp/src/encoding/plain_encoder.h @@ -20,8 +20,15 @@ #ifndef ENCODING_PLAIN_ENCODER_H #define ENCODING_PLAIN_ENCODER_H +#include + #include "encoder.h" +#if defined(__ARM_NEON) || defined(__ARM_NEON__) +#include +#define TSFILE_HAS_NEON 1 +#endif + namespace storage { class PlainEncoder : public Encoder { @@ -64,6 +71,101 @@ class PlainEncoder : public Encoder { } int get_max_byte_size() { return 0; } + + // Optimized batch encoding: directly byte-swap into ByteStream page buffer. + // Avoids per-value write_buf overhead entirely — only calls acquire_buf() + // once per page boundary crossing. + int encode_batch(const int64_t* values, uint32_t count, + common::ByteStream& out_stream) override { + if (count == 0) return common::E_OK; + uint32_t offset = 0; + while (offset < count) { + common::ByteStream::Buffer buf = out_stream.acquire_buf(); + if (UNLIKELY(buf.buf_ == nullptr)) return common::E_OOM; + // How many int64 values fit in the remaining page space? + uint32_t capacity = buf.len_ / 8; + if (capacity == 0) { + // Page has < 8 bytes left, fall back to write_buf for this one + return Encoder::encode_batch(values + offset, count - offset, + out_stream); + } + uint32_t batch = std::min(count - offset, capacity); + uint8_t* dst = (uint8_t*)buf.buf_; + const int64_t* src = values + offset; + uint32_t i = 0; +#if TSFILE_HAS_NEON + // NEON: byte-reverse 2 x int64 per iteration + for (; i + 2 <= batch; i += 2) { + uint8x16_t v = vld1q_u8((const uint8_t*)&src[i]); + v = vrev64q_u8(v); + vst1q_u8(dst, v); + dst += 16; + } +#endif + // Scalar tail + for (; i < batch; i++) { + uint64_t v = (uint64_t)src[i]; + dst[0] = (uint8_t)(v >> 56); + dst[1] = (uint8_t)(v >> 48); + dst[2] = (uint8_t)(v >> 40); + dst[3] = (uint8_t)(v >> 32); + dst[4] = (uint8_t)(v >> 24); + dst[5] = (uint8_t)(v >> 16); + dst[6] = (uint8_t)(v >> 8); + dst[7] = (uint8_t)(v); + dst += 8; + } + out_stream.buffer_used(batch * 8); + offset += batch; + } + return common::E_OK; + } + + int encode_batch(const double* values, uint32_t count, + common::ByteStream& out_stream) override { + return encode_batch(reinterpret_cast(values), count, + out_stream); + } + + int encode_batch(const float* values, uint32_t count, + common::ByteStream& out_stream) override { + if (count == 0) return common::E_OK; + uint32_t offset = 0; + while (offset < count) { + common::ByteStream::Buffer buf = out_stream.acquire_buf(); + if (UNLIKELY(buf.buf_ == nullptr)) return common::E_OOM; + uint32_t capacity = buf.len_ / 4; + if (capacity == 0) { + return Encoder::encode_batch(values + offset, count - offset, + out_stream); + } + uint32_t batch = std::min(count - offset, capacity); + uint8_t* dst = (uint8_t*)buf.buf_; + const float* src = values + offset; + uint32_t i = 0; +#if TSFILE_HAS_NEON + // NEON: byte-reverse 4 x float (32-bit) per iteration + for (; i + 4 <= batch; i += 4) { + uint8x16_t v = vld1q_u8((const uint8_t*)&src[i]); + v = vrev32q_u8(v); + vst1q_u8(dst, v); + dst += 16; + } +#endif + for (; i < batch; i++) { + uint32_t v; + memcpy(&v, &src[i], sizeof(float)); + dst[0] = (uint8_t)(v >> 24); + dst[1] = (uint8_t)(v >> 16); + dst[2] = (uint8_t)(v >> 8); + dst[3] = (uint8_t)(v); + dst += 4; + } + out_stream.buffer_used(batch * 4); + offset += batch; + } + return common::E_OK; + } }; } // end namespace storage diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h index 3032ff9a5..e88747531 100644 --- a/cpp/src/writer/chunk_writer.h +++ b/cpp/src/writer/chunk_writer.h @@ -103,6 +103,36 @@ class ChunkWriter { CW_DO_WRITE_FOR_TYPE(); } + template + int write_batch(const int64_t* timestamps, const T* values, + uint32_t count) { + int ret = common::E_OK; + uint32_t offset = 0; + while (offset < count) { + uint32_t cur_points = page_writer_.get_point_numer(); + uint32_t page_remaining = + common::g_config_value_.page_writer_max_point_num_ - cur_points; + if (page_remaining == 0) { + if (RET_FAIL(seal_cur_page(false))) { + return ret; + } + page_remaining = + common::g_config_value_.page_writer_max_point_num_; + } + uint32_t batch_size = std::min(count - offset, page_remaining); + if (RET_FAIL(page_writer_.write_batch(timestamps + offset, + values + offset, + batch_size))) { + return ret; + } + offset += batch_size; + if (RET_FAIL(seal_cur_page_if_full())) { + return ret; + } + } + return ret; + } + int end_encode_chunk(); common::ByteStream& get_chunk_data() { return chunk_data_; } Statistic* get_chunk_statistic() { return chunk_statistic_; } diff --git a/cpp/src/writer/page_writer.h b/cpp/src/writer/page_writer.h index cff4b60ed..f34e7de2f 100644 --- a/cpp/src/writer/page_writer.h +++ b/cpp/src/writer/page_writer.h @@ -150,6 +150,21 @@ class PageWriter { PW_DO_WRITE_FOR_TYPE(); } + template + FORCE_INLINE int write_batch(const int64_t* timestamps, const T* values, + uint32_t count) { + int ret = common::E_OK; + if (count == 0) return ret; + if (RET_FAIL(time_encoder_->encode_batch(timestamps, count, + time_out_stream_))) { + } else if (RET_FAIL(value_encoder_->encode_batch(values, count, + value_out_stream_))) { + } else { + statistic_->update_batch(timestamps, values, count); + } + return ret; + } + FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_; } FORCE_INLINE uint32_t get_time_out_stream_size() const { return time_out_stream_.total_size(); @@ -194,7 +209,7 @@ class PageWriter { private: // static const uint32_t OUT_STREAM_PAGE_SIZE = 48; - static const uint32_t OUT_STREAM_PAGE_SIZE = 1024; + static const uint32_t OUT_STREAM_PAGE_SIZE = 65536; private: common::TSDataType data_type_; diff --git a/cpp/src/writer/time_chunk_writer.h b/cpp/src/writer/time_chunk_writer.h index ac3b374b0..c071551bb 100644 --- a/cpp/src/writer/time_chunk_writer.h +++ b/cpp/src/writer/time_chunk_writer.h @@ -63,6 +63,33 @@ class TimeChunkWriter { return ret; } + int write_batch(const int64_t* timestamps, uint32_t count) { + int ret = common::E_OK; + uint32_t offset = 0; + while (offset < count) { + uint32_t cur_points = time_page_writer_.get_point_numer(); + uint32_t page_remaining = + common::g_config_value_.page_writer_max_point_num_ - cur_points; + if (page_remaining == 0) { + if (RET_FAIL(seal_cur_page(false))) { + return ret; + } + page_remaining = + common::g_config_value_.page_writer_max_point_num_; + } + uint32_t batch_size = std::min(count - offset, page_remaining); + if (RET_FAIL(time_page_writer_.write_batch(timestamps + offset, + batch_size))) { + return ret; + } + offset += batch_size; + if (RET_FAIL(seal_cur_page_if_full())) { + return ret; + } + } + return ret; + } + int end_encode_chunk(); common::ByteStream& get_chunk_data() { return chunk_data_; } Statistic* get_chunk_statistic() { return chunk_statistic_; } diff --git a/cpp/src/writer/time_page_writer.h b/cpp/src/writer/time_page_writer.h index 4c01044a6..7b749f7b2 100644 --- a/cpp/src/writer/time_page_writer.h +++ b/cpp/src/writer/time_page_writer.h @@ -84,6 +84,28 @@ class TimePageWriter { return ret; } + int write_batch(const int64_t* timestamps, uint32_t count) { + int ret = common::E_OK; + if (count == 0) return ret; + // Check order: first timestamp vs existing end_time + if (statistic_->count_ != 0 && is_inited_ && + timestamps[0] <= statistic_->end_time_) { + return common::E_OUT_OF_ORDER; + } + // Check monotonicity within batch + for (uint32_t i = 1; i < count; i++) { + if (timestamps[i] <= timestamps[i - 1]) { + return common::E_OUT_OF_ORDER; + } + } + if (RET_FAIL(time_encoder_->encode_batch(timestamps, count, + time_out_stream_))) { + } else { + statistic_->update_time_batch(timestamps, count); + } + return ret; + } + FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_; } FORCE_INLINE uint32_t get_time_out_stream_size() const { return time_out_stream_.total_size(); @@ -115,7 +137,7 @@ class TimePageWriter { common::ByteStream& pages_data); private: - static const uint32_t OUT_STREAM_PAGE_SIZE = 1024; + static const uint32_t OUT_STREAM_PAGE_SIZE = 65536; private: common::TSDataType data_type_; diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 9a087a82f..a861b08b4 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -796,15 +796,16 @@ int TsFileWriter::write_tablet_aligned(const Tablet& tablet) { data_types))) { return ret; } - time_write_column(time_chunk_writer, tablet); + time_write_column_batch(time_chunk_writer, tablet, 0, + tablet.get_cur_row_size()); ASSERT(value_chunk_writers.size() == tablet.get_column_count()); for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; if (IS_NULL(value_chunk_writer)) { continue; } - if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, 0, - tablet.get_cur_row_size()))) { + if (RET_FAIL(value_write_column_batch(value_chunk_writer, tablet, c, 0, + tablet.get_cur_row_size()))) { return ret; } } @@ -827,7 +828,8 @@ int TsFileWriter::write_tablet(const Tablet& tablet) { if (IS_NULL(chunk_writer)) { continue; } - if (RET_FAIL(write_column(chunk_writer, tablet, c))) { + if (RET_FAIL(write_column_batch(chunk_writer, tablet, c, 0, + tablet.max_row_num_))) { return ret; } } @@ -888,28 +890,72 @@ int TsFileWriter::write_table(Tablet& tablet) { value_chunk_writers))) { return ret; } - for (int i = start_idx; i < end_idx; i++) { - if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) { - return ret; - } - } + + // Collect column tasks for parallel execution + struct ColTask { + ValueChunkWriter* writer; + uint32_t col_idx; + }; + std::vector tasks; uint32_t field_col_count = 0; for (uint32_t i = 0; i < tablet.get_column_count(); ++i) { if (tablet.column_categories_[i] == common::ColumnCategory::FIELD) { - ValueChunkWriter* value_chunk_writer = + ValueChunkWriter* vcw = value_chunk_writers[field_col_count]; - if (IS_NULL(value_chunk_writer)) { - continue; + if (!IS_NULL(vcw)) { + tasks.push_back({vcw, i}); } + field_col_count++; + } + } - if (RET_FAIL(value_write_column(value_chunk_writer, tablet, - i, start_idx, end_idx))) { + // Parallel encode: time column + all value columns concurrently. + // Each ChunkWriter has its own Encoder, Statistic, ByteStream — + // zero shared state, no locks needed. + const uint32_t si = start_idx; + const uint32_t ei = end_idx; + + if (tasks.size() >= 2) { + // Launch time column + value columns in parallel via thread pool + auto time_future = thread_pool_.submit( + [this, time_chunk_writer, &tablet, si, ei]() { + return time_write_column_batch(time_chunk_writer, + tablet, si, ei); + }); + + std::vector> val_futures; + for (size_t t = 0; t < tasks.size(); t++) { + auto& task = tasks[t]; + val_futures.push_back(thread_pool_.submit( + [this, &task, &tablet, si, ei]() { + return value_write_column_batch( + task.writer, tablet, task.col_idx, si, ei); + })); + } + + // Wait for all and check errors + ret = time_future.get(); + if (ret != E_OK) return ret; + for (auto& f : val_futures) { + int r = f.get(); + if (r != E_OK && ret == E_OK) ret = r; + } + if (ret != E_OK) return ret; + } else { + // Too few columns to justify thread overhead, run serially + if (RET_FAIL(time_write_column_batch(time_chunk_writer, + tablet, si, ei))) { + return ret; + } + for (auto& task : tasks) { + if (RET_FAIL(value_write_column_batch( + task.writer, tablet, task.col_idx, si, ei))) { return ret; } - field_col_count++; } } + start_idx = end_idx; } else { MeasurementNamesFromTablet mnames_getter(tablet); @@ -920,20 +966,43 @@ int TsFileWriter::write_table(Tablet& tablet) { return ret; } ASSERT(chunk_writers.size() == tablet.get_column_count()); - for (uint32_t c = 0; c < chunk_writers.size(); c++) { - ChunkWriter* chunk_writer = chunk_writers[c]; - if (IS_NULL(chunk_writer)) { - continue; + + // Parallel encode for non-aligned path + if (chunk_writers.size() >= 2) { + const uint32_t si = start_idx; + const uint32_t ei = device_id_end_index_pair.second; + std::vector> futures; + for (uint32_t c = 0; c < chunk_writers.size(); c++) { + ChunkWriter* cw = chunk_writers[c]; + if (IS_NULL(cw)) continue; + futures.push_back(thread_pool_.submit( + [this, cw, &tablet, c, si, ei]() { + return write_column_batch(cw, tablet, c, si, ei); + })); } - if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx, - device_id_end_index_pair.second))) { - return ret; + for (auto& f : futures) { + int r = f.get(); + if (r != E_OK && ret == E_OK) ret = r; + } + if (ret != E_OK) return ret; + } else { + for (uint32_t c = 0; c < chunk_writers.size(); c++) { + ChunkWriter* chunk_writer = chunk_writers[c]; + if (IS_NULL(chunk_writer)) continue; + if (RET_FAIL(write_column_batch( + chunk_writer, tablet, c, start_idx, + device_id_end_index_pair.second))) { + return ret; + } } } start_idx = device_id_end_index_pair.second; } } record_count_since_last_flush_ += tablet.cur_row_size_; + // Reset string column buffers so the tablet can be reused for the next + // batch without accumulating memory across writes. + tablet.reset_string_columns(); ret = check_memory_size_and_may_flush_chunks(); return ret; } @@ -941,10 +1010,10 @@ int TsFileWriter::write_table(Tablet& tablet) { std::vector, int>> TsFileWriter::split_tablet_by_device(const Tablet& tablet) { std::vector, int>> result; - std::shared_ptr last_device_id = - std::make_shared("last_device_id"); + if (tablet.id_column_indexes_.empty()) { - result.emplace_back(std::move(last_device_id), 0); + auto sentinel = std::make_shared("last_device_id"); + result.emplace_back(std::move(sentinel), 0); std::vector id_array; id_array.push_back(new std::string(tablet.insert_target_name_)); auto res = std::make_shared(id_array); @@ -953,14 +1022,22 @@ TsFileWriter::split_tablet_by_device(const Tablet& tablet) { return result; } - for (uint32_t i = 0; i < tablet.get_cur_row_size(); i++) { - std::shared_ptr cur_device_id(tablet.get_device_id(i)); - if (*cur_device_id != *last_device_id) { - result.emplace_back(std::move(last_device_id), i); - last_device_id = std::move(cur_device_id); - } + const uint32_t row_count = tablet.get_cur_row_size(); + if (row_count == 0) return result; + + auto sentinel = std::make_shared("last_device_id"); + result.emplace_back(std::move(sentinel), 0); + + auto boundaries = tablet.find_all_device_boundaries(); + + uint32_t seg_start = 0; + for (uint32_t b : boundaries) { + std::shared_ptr dev_id(tablet.get_device_id(seg_start)); + result.emplace_back(std::move(dev_id), b); + seg_start = b; } - result.emplace_back(std::move(last_device_id), tablet.get_cur_row_size()); + std::shared_ptr last_id(tablet.get_device_id(seg_start)); + result.emplace_back(std::move(last_id), row_count); return result; } @@ -996,7 +1073,7 @@ int TsFileWriter::write_column(ChunkWriter* chunk_writer, const Tablet& tablet, col_notnull_bitmap, start_idx, end_idx); } else if (data_type == common::STRING) { ret = - write_typed_column(chunk_writer, timestamps, col_values.string_data, + write_typed_column(chunk_writer, timestamps, col_values.string_col, col_notnull_bitmap, start_idx, end_idx); } else { ASSERT(false); @@ -1061,8 +1138,8 @@ int TsFileWriter::value_write_column(ValueChunkWriter* value_chunk_writer, case common::TEXT: case common::BLOB: ret = write_typed_column(value_chunk_writer, timestamps, - (common::String*)col_values.string_data, - col_notnull_bitmap, start_idx, end_idx); + col_values.string_col, col_notnull_bitmap, + start_idx, end_idx); break; default: ret = E_NOT_SUPPORT; @@ -1140,10 +1217,21 @@ int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx) { - DO_WRITE_TYPED_COLUMN(); + int ret = E_OK; + for (uint32_t r = start_idx; r < end_idx; r++) { + if (LIKELY(!col_notnull_bitmap.test(r))) { + common::String val( + string_col->buffer + string_col->offsets[r], + string_col->offsets[r + 1] - string_col->offsets[r]); + if (RET_FAIL(chunk_writer->write(timestamps[r], val))) { + return ret; + } + } + } + return ret; } int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, @@ -1183,10 +1271,160 @@ int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx) { - DO_VALUE_WRITE_TYPED_COLUMN(); + int ret = E_OK; + for (uint32_t r = start_idx; r < end_idx; r++) { + common::String val(string_col->buffer + string_col->offsets[r], + string_col->offsets[r + 1] - string_col->offsets[r]); + if (LIKELY(col_notnull_bitmap.test(r))) { + if (RET_FAIL(value_chunk_writer->write(timestamps[r], val, true))) { + return ret; + } + } else { + if (RET_FAIL( + value_chunk_writer->write(timestamps[r], val, false))) { + return ret; + } + } + } + return ret; +} + +int TsFileWriter::time_write_column_batch(TimeChunkWriter* time_chunk_writer, + const Tablet& tablet, + uint32_t start_idx, + uint32_t end_idx) { + int64_t* timestamps = tablet.timestamps_; + int ret = E_OK; + if (IS_NULL(time_chunk_writer) || IS_NULL(timestamps)) { + return E_INVALID_ARG; + } + end_idx = std::min(end_idx, tablet.max_row_num_); + uint32_t count = end_idx - start_idx; + if (count == 0) return ret; + return time_chunk_writer->write_batch(timestamps + start_idx, count); +} + +int TsFileWriter::write_column_batch(ChunkWriter* chunk_writer, + const Tablet& tablet, int col_idx, + uint32_t start_idx, uint32_t end_idx) { + int ret = E_OK; + common::TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_; + int64_t* timestamps = tablet.timestamps_; + Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx]; + BitMap& col_notnull_bitmap = tablet.bitmaps_[col_idx]; + end_idx = std::min(end_idx, tablet.max_row_num_); + uint32_t count = end_idx - start_idx; + if (count == 0) return ret; + + // For non-aligned write, bitmap bit=0 means not null. + // We need to check if there are any nulls. + bool has_null = false; + for (uint32_t r = start_idx; r < end_idx; r++) { + if (col_notnull_bitmap.test(r)) { + has_null = true; + break; + } + } + + if (!has_null) { + // Fast path: no nulls, batch write directly + switch (data_type) { + case common::BOOLEAN: + ret = chunk_writer->write_batch(timestamps + start_idx, + col_values.bool_data + start_idx, + count); + break; + case common::INT32: + case common::DATE: + ret = chunk_writer->write_batch(timestamps + start_idx, + col_values.int32_data + start_idx, + count); + break; + case common::INT64: + case common::TIMESTAMP: + ret = chunk_writer->write_batch(timestamps + start_idx, + col_values.int64_data + start_idx, + count); + break; + case common::FLOAT: + ret = chunk_writer->write_batch(timestamps + start_idx, + col_values.float_data + start_idx, + count); + break; + case common::DOUBLE: + ret = chunk_writer->write_batch(timestamps + start_idx, + col_values.double_data + start_idx, + count); + break; + default: + // Fall back to per-row for STRING and other types + ret = write_column(chunk_writer, tablet, col_idx, start_idx, + end_idx); + break; + } + } else { + // Slow path: has nulls, fall back to per-row + ret = write_column(chunk_writer, tablet, col_idx, start_idx, end_idx); + } + return ret; +} + +int TsFileWriter::value_write_column_batch(ValueChunkWriter* value_chunk_writer, + const Tablet& tablet, int col_idx, + uint32_t start_idx, + uint32_t end_idx) { + int ret = E_OK; + common::TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_; + int64_t* timestamps = tablet.timestamps_; + Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx]; + BitMap& col_notnull_bitmap = tablet.bitmaps_[col_idx]; + end_idx = std::min(end_idx, tablet.max_row_num_); + uint32_t count = end_idx - start_idx; + if (count == 0) return ret; + + switch (data_type) { + case common::BOOLEAN: + ret = value_chunk_writer->write_batch( + timestamps, col_values.bool_data, col_notnull_bitmap, + start_idx, count); + break; + case common::DATE: + case common::INT32: + ret = value_chunk_writer->write_batch( + timestamps, col_values.int32_data, col_notnull_bitmap, + start_idx, count); + break; + case common::TIMESTAMP: + case common::INT64: + ret = value_chunk_writer->write_batch( + timestamps, col_values.int64_data, col_notnull_bitmap, + start_idx, count); + break; + case common::FLOAT: + ret = value_chunk_writer->write_batch( + timestamps, col_values.float_data, col_notnull_bitmap, + start_idx, count); + break; + case common::DOUBLE: + ret = value_chunk_writer->write_batch( + timestamps, col_values.double_data, col_notnull_bitmap, + start_idx, count); + break; + case common::STRING: + case common::TEXT: + case common::BLOB: + // String types: fall back to per-row for now + ret = value_write_column(value_chunk_writer, tablet, col_idx, + start_idx, end_idx); + break; + default: + ret = E_NOT_SUPPORT; + break; + } + return ret; } // TODO make sure ret is meaningful to SDK user diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index 85c47db7f..19a7c7a52 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -32,6 +33,7 @@ #include "common/record.h" #include "common/schema.h" #include "common/tablet.h" +#include "common/thread_pool.h" namespace storage { class WriteFile; @@ -137,7 +139,7 @@ class TsFileWriter { common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); int write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); @@ -187,6 +189,7 @@ class TsFileWriter { bool write_file_created_; bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*) bool table_aligned_ = true; + common::ThreadPool thread_pool_{6}; int write_typed_column(ValueChunkWriter* value_chunk_writer, int64_t* timestamps, bool* col_values, @@ -198,7 +201,8 @@ class TsFileWriter { common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); int write_typed_column(ValueChunkWriter* value_chunk_writer, - int64_t* timestamps, common::String* col_values, + int64_t* timestamps, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); @@ -220,6 +224,16 @@ class TsFileWriter { int value_write_column(ValueChunkWriter* value_chunk_writer, const Tablet& tablet, int col_idx, uint32_t start_idx, uint32_t end_idx); + + int write_column_batch(storage::ChunkWriter* chunk_writer, + const Tablet& tablet, int col_idx, + uint32_t start_idx, uint32_t end_idx); + int time_write_column_batch(TimeChunkWriter* time_chunk_writer, + const Tablet& tablet, uint32_t start_idx, + uint32_t end_idx); + int value_write_column_batch(ValueChunkWriter* value_chunk_writer, + const Tablet& tablet, int col_idx, + uint32_t start_idx, uint32_t end_idx); }; } // end namespace storage diff --git a/cpp/src/writer/value_chunk_writer.h b/cpp/src/writer/value_chunk_writer.h index 859fb57b0..afe94760c 100644 --- a/cpp/src/writer/value_chunk_writer.h +++ b/cpp/src/writer/value_chunk_writer.h @@ -109,6 +109,37 @@ class ValueChunkWriter { VCW_DO_WRITE_FOR_TYPE(isnull); } + template + int write_batch(const int64_t* timestamps, const T* values, + const common::BitMap& col_notnull_bitmap, + uint32_t start_idx, uint32_t count) { + int ret = common::E_OK; + uint32_t offset = 0; + while (offset < count) { + uint32_t cur_points = value_page_writer_.get_point_numer(); + uint32_t page_remaining = + common::g_config_value_.page_writer_max_point_num_ - cur_points; + if (page_remaining == 0) { + if (RET_FAIL(seal_cur_page(false))) { + return ret; + } + page_remaining = + common::g_config_value_.page_writer_max_point_num_; + } + uint32_t batch_size = std::min(count - offset, page_remaining); + if (RET_FAIL(value_page_writer_.write_batch( + timestamps, values, col_notnull_bitmap, + start_idx + offset, batch_size))) { + return ret; + } + offset += batch_size; + if (RET_FAIL(seal_cur_page_if_full())) { + return ret; + } + } + return ret; + } + int end_encode_chunk(); common::ByteStream& get_chunk_data() { return chunk_data_; } Statistic* get_chunk_statistic() { return chunk_statistic_; } diff --git a/cpp/src/writer/value_page_writer.h b/cpp/src/writer/value_page_writer.h index 60d75b0b8..d229bfd57 100644 --- a/cpp/src/writer/value_page_writer.h +++ b/cpp/src/writer/value_page_writer.h @@ -151,6 +151,63 @@ class ValuePageWriter { VPW_DO_WRITE_FOR_TYPE(isnull); } + // Batch write for aligned/table model. + // In the tablet bitmap: bit=1 means null, bit=0 means not null. + // In VPW_DO_WRITE_FOR_TYPE: ISNULL=true skips encoding. + // So: tablet bitmap.test(r)=true -> isnull=true (null value) + // tablet bitmap.test(r)=false -> isnull=false (valid value) + template + int write_batch(const int64_t* timestamps, const T* values, + const common::BitMap& col_notnull_bitmap, + uint32_t start_idx, uint32_t count) { + int ret = common::E_OK; + if (count == 0) return ret; + + uint32_t valid_count = 0; + for (uint32_t i = 0; i < count; i++) { + uint32_t row = start_idx + i; + if ((size_ / 8) + 1 > col_notnull_bitmap_.size()) { + col_notnull_bitmap_.push_back(0); + } + // bit=1 in tablet bitmap means null; bit=0 means not null + bool is_null = + const_cast(col_notnull_bitmap).test(row); + if (!is_null) { + // Mark as not-null in page bitmap + col_notnull_bitmap_[size_ / 8] |= (MASK >> (size_ % 8)); + valid_count++; + } + size_++; + } + + if (valid_count == 0) return ret; + + // If all values are valid, we can encode the batch directly + if (valid_count == count) { + if (RET_FAIL(value_encoder_->encode_batch(values + start_idx, + count, + value_out_stream_))) { + return ret; + } + statistic_->update_batch(timestamps + start_idx, + values + start_idx, count); + } else { + // Encode only non-null values one by one + for (uint32_t i = 0; i < count; i++) { + uint32_t row = start_idx + i; + if (!const_cast(col_notnull_bitmap) + .test(row)) { + if (RET_FAIL(value_encoder_->encode(values[row], + value_out_stream_))) { + return ret; + } + statistic_->update(timestamps[row], values[row]); + } + } + } + return ret; + } + FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_; } FORCE_INLINE uint32_t get_col_notnull_bitmap_out_stream_size() const { return col_notnull_bitmap_out_stream_.total_size(); @@ -199,7 +256,7 @@ class ValuePageWriter { common::ByteStream& pages_data); private: - static const uint32_t OUT_STREAM_PAGE_SIZE = 1024; + static const uint32_t OUT_STREAM_PAGE_SIZE = 65536; private: common::TSDataType data_type_;