Conversation
| page_size_ = buf_len; | ||
| page_mask_ = buf_len - 1; |
There was a problem hiding this comment.
Is it ensured that page_size_ must be the power of 2?
| auto* sc = new StringColumn(); | ||
| sc->init(max_row_num_, max_row_num_ * 32); | ||
| value_matrix_[c].string_col = sc; |
There was a problem hiding this comment.
Where does the 32 come from?
| int Tablet::set_timestamps(const int64_t* timestamps, uint32_t count) { | ||
| if (err_code_ != E_OK) return err_code_; | ||
| ASSERT(timestamps_ != NULL); | ||
| if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_))) { | ||
| return E_OUT_OF_RANGE; | ||
| } | ||
| std::memcpy(timestamps_, timestamps, count * sizeof(int64_t)); | ||
| cur_row_size_ = std::max(count, cur_row_size_); | ||
| return E_OK; | ||
| } | ||
|
|
||
| int Tablet::set_column_values(uint32_t schema_index, const void* data, | ||
| const uint8_t* bitmap, uint32_t count) { | ||
| if (err_code_ != E_OK) return err_code_; | ||
| if (UNLIKELY(schema_index >= schema_vec_->size())) return E_OUT_OF_RANGE; | ||
| if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_))) | ||
| return E_OUT_OF_RANGE; | ||
|
|
There was a problem hiding this comment.
Duplicated with #747
Refer to the comments in that PR and discuss with @hongzhi-gao to decide where to put the changes.
| for (auto col_idx : id_column_indexes_) { | ||
| const StringColumn& sc = *value_matrix_[col_idx].string_col; | ||
| const uint32_t* off = sc.offsets; | ||
| const char* buf = sc.buffer; | ||
| for (uint32_t i = 1; i < row_count; i++) { | ||
| if (boundary[i >> 6] & (1ULL << (i & 63))) continue; | ||
| uint32_t len_a = off[i] - off[i - 1]; | ||
| uint32_t len_b = off[i + 1] - off[i]; | ||
| if (len_a != len_b || | ||
| (len_a > 0 && | ||
| memcmp(buf + off[i - 1], buf + off[i], len_a) != 0)) { | ||
| boundary[i >> 6] |= (1ULL << (i & 63)); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
id_column_indexes_ -> tag_column_indexes.
May scan the indexes in a reversed order, because in general, the later tag is more likely to change often.
E.g., devices are more likely to have orders like:
beijing.haidian.wf0001.wt0001
beijing.haidian.wf0001.wt0002
beijing.haidian.wf0001.wt0003
beijing.haidian.wf0002.wt0001
beijing.haidian.wf0002.wt0002
beijing.haidian.wf0002.wt0003
And, may break fast when the number of boundaries set equals row_count.
Will we benefit from it if we maintain the boundaries during insertions of tag columns?
| std::vector<uint32_t> result; | ||
| for (uint32_t w = 0; w < nwords; w++) { | ||
| uint64_t bits = boundary[w]; | ||
| while (bits) { | ||
| uint32_t bit = __builtin_ctzll(bits); | ||
| uint32_t idx = w * 64 + bit; | ||
| if (idx > 0 && idx < row_count) { | ||
| result.push_back(idx); | ||
| } | ||
| bits &= bits - 1; // clear lowest set bit | ||
| } | ||
| } | ||
| return result; |
There was a problem hiding this comment.
Add some comments, better to give some examples.
| // Collect column tasks for parallel execution | ||
| struct ColTask { | ||
| ValueChunkWriter* writer; | ||
| uint32_t col_idx; | ||
| }; | ||
| std::vector<ColTask> tasks; |
There was a problem hiding this comment.
This may not be desirable when resources are limited.
Better to add a compilation option or configuration for this.
| if (tasks.size() >= 2) { | ||
| // Launch time column + value columns in parallel via thread pool | ||
| auto time_future = thread_pool_.submit( | ||
| [this, time_chunk_writer, &tablet, si, ei]() { | ||
| return time_write_column_batch(time_chunk_writer, | ||
| tablet, si, ei); | ||
| }); | ||
|
|
||
| std::vector<std::future<int>> val_futures; | ||
| for (size_t t = 0; t < tasks.size(); t++) { | ||
| auto& task = tasks[t]; | ||
| val_futures.push_back(thread_pool_.submit( | ||
| [this, &task, &tablet, si, ei]() { | ||
| return value_write_column_batch( | ||
| task.writer, tablet, task.col_idx, si, ei); | ||
| })); | ||
| } | ||
|
|
||
| // Wait for all and check errors | ||
| ret = time_future.get(); | ||
| if (ret != E_OK) return ret; | ||
| for (auto& f : val_futures) { | ||
| int r = f.get(); | ||
| if (r != E_OK && ret == E_OK) ret = r; | ||
| } | ||
| if (ret != E_OK) return ret; | ||
| } else { |
There was a problem hiding this comment.
May always encode the time column in the local thread to reduce overhead.
|
|
||
| uint32_t seg_start = 0; | ||
| for (uint32_t b : boundaries) { | ||
| std::shared_ptr<IDeviceID> dev_id(tablet.get_device_id(seg_start)); |
There was a problem hiding this comment.
May add an IDeviceID implementation that directly points to a row in a tablet.
| if (col_notnull_bitmap.test(r)) { | ||
| has_null = true; |
There was a problem hiding this comment.
col_notnull_bitmap -> col_null_bitmap
| bool write_file_created_; | ||
| bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*) | ||
| bool table_aligned_ = true; | ||
| common::ThreadPool thread_pool_{6}; |
There was a problem hiding this comment.
Add an item in the global configuration and use it
No description provided.