Skip to content

Experiment#749

Open
ColinLeeo wants to merge 4 commits intodevelopfrom
experiment
Open

Experiment#749
ColinLeeo wants to merge 4 commits intodevelopfrom
experiment

Conversation

@ColinLeeo
Copy link
Contributor

No description provided.

Comment on lines 296 to +297
page_size_ = buf_len;
page_mask_ = buf_len - 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ensured that page_size_ must be the power of 2?

Comment on lines +88 to +90
auto* sc = new StringColumn();
sc->init(max_row_num_, max_row_num_ * 32);
value_matrix_[c].string_col = sc;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the 32 come from?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use mem_alloc?

Comment on lines +169 to +186
int Tablet::set_timestamps(const int64_t* timestamps, uint32_t count) {
if (err_code_ != E_OK) return err_code_;
ASSERT(timestamps_ != NULL);
if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_))) {
return E_OUT_OF_RANGE;
}
std::memcpy(timestamps_, timestamps, count * sizeof(int64_t));
cur_row_size_ = std::max(count, cur_row_size_);
return E_OK;
}

int Tablet::set_column_values(uint32_t schema_index, const void* data,
const uint8_t* bitmap, uint32_t count) {
if (err_code_ != E_OK) return err_code_;
if (UNLIKELY(schema_index >= schema_vec_->size())) return E_OUT_OF_RANGE;
if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_)))
return E_OUT_OF_RANGE;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated with #747

Refer to the comments in that PR and discuss with @hongzhi-gao to decide where to put the changes.

Comment on lines +462 to +476
for (auto col_idx : id_column_indexes_) {
const StringColumn& sc = *value_matrix_[col_idx].string_col;
const uint32_t* off = sc.offsets;
const char* buf = sc.buffer;
for (uint32_t i = 1; i < row_count; i++) {
if (boundary[i >> 6] & (1ULL << (i & 63))) continue;
uint32_t len_a = off[i] - off[i - 1];
uint32_t len_b = off[i + 1] - off[i];
if (len_a != len_b ||
(len_a > 0 &&
memcmp(buf + off[i - 1], buf + off[i], len_a) != 0)) {
boundary[i >> 6] |= (1ULL << (i & 63));
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id_column_indexes_ -> tag_column_indexes.

May scan the indexes in a reversed order, because in general, the later tag is more likely to change often.
E.g., devices are more likely to have orders like:
beijing.haidian.wf0001.wt0001
beijing.haidian.wf0001.wt0002
beijing.haidian.wf0001.wt0003
beijing.haidian.wf0002.wt0001
beijing.haidian.wf0002.wt0002
beijing.haidian.wf0002.wt0003

And, may break fast when the number of boundaries set equals row_count.

Will we benefit from it if we maintain the boundaries during insertions of tag columns?

Comment on lines +479 to +491
std::vector<uint32_t> result;
for (uint32_t w = 0; w < nwords; w++) {
uint64_t bits = boundary[w];
while (bits) {
uint32_t bit = __builtin_ctzll(bits);
uint32_t idx = w * 64 + bit;
if (idx > 0 && idx < row_count) {
result.push_back(idx);
}
bits &= bits - 1; // clear lowest set bit
}
}
return result;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments, better to give some examples.

Comment on lines +894 to +899
// Collect column tasks for parallel execution
struct ColTask {
ValueChunkWriter* writer;
uint32_t col_idx;
};
std::vector<ColTask> tasks;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be desirable when resources are limited.
Better to add a compilation option or configuration for this.

Comment on lines +919 to +945
if (tasks.size() >= 2) {
// Launch time column + value columns in parallel via thread pool
auto time_future = thread_pool_.submit(
[this, time_chunk_writer, &tablet, si, ei]() {
return time_write_column_batch(time_chunk_writer,
tablet, si, ei);
});

std::vector<std::future<int>> val_futures;
for (size_t t = 0; t < tasks.size(); t++) {
auto& task = tasks[t];
val_futures.push_back(thread_pool_.submit(
[this, &task, &tablet, si, ei]() {
return value_write_column_batch(
task.writer, tablet, task.col_idx, si, ei);
}));
}

// Wait for all and check errors
ret = time_future.get();
if (ret != E_OK) return ret;
for (auto& f : val_futures) {
int r = f.get();
if (r != E_OK && ret == E_OK) ret = r;
}
if (ret != E_OK) return ret;
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May always encode the time column in the local thread to reduce overhead.


uint32_t seg_start = 0;
for (uint32_t b : boundaries) {
std::shared_ptr<IDeviceID> dev_id(tablet.get_device_id(seg_start));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May add an IDeviceID implementation that directly points to a row in a tablet.

Comment on lines +1326 to +1327
if (col_notnull_bitmap.test(r)) {
has_null = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

col_notnull_bitmap -> col_null_bitmap

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check other places

bool write_file_created_;
bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*)
bool table_aligned_ = true;
common::ThreadPool thread_pool_{6};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an item in the global configuration and use it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants