diff --git a/.gitignore b/.gitignore index 8f2e34e3f..d35f80845 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,5 @@ cpp/third_party/zlib-1.3.1 .vscode/ build/* +cpp/third_party/zlib-1.3.1/treebuild.xml +cpp/third_party/zlib-1.3.1/zlib-1.3.1/treebuild.xml diff --git a/cpp/src/common/container/bit_map.h b/cpp/src/common/container/bit_map.h index 356932d14..7c60a1ea3 100644 --- a/cpp/src/common/container/bit_map.h +++ b/cpp/src/common/container/bit_map.h @@ -55,6 +55,8 @@ class BitMap { *start_addr = (*start_addr) & (~bit_mask); } + FORCE_INLINE void clear_all() { memset(bitmap_, 0x00, size_); } + FORCE_INLINE bool test(uint32_t index) { uint32_t offset = index >> 3; ASSERT(offset < size_); @@ -64,9 +66,46 @@ class BitMap { return (*start_addr & bit_mask); } + // Count the number of bits set to 1 (i.e., number of null entries). + // __builtin_popcount is supported by GCC, Clang, and MinGW on Windows. + // TODO: add MSVC support if needed (e.g. __popcnt or manual bit count). + FORCE_INLINE uint32_t count_set_bits() const { + uint32_t count = 0; + const uint8_t* p = reinterpret_cast(bitmap_); + for (uint32_t i = 0; i < size_; i++) { + count += __builtin_popcount(p[i]); + } + return count; + } + + // Find the next set bit (null position) at or after @from, + // within [0, total_bits). Returns total_bits if none found. + // Skips zero bytes in bulk so cost is proportional to the number + // of null bytes, not total rows. + FORCE_INLINE uint32_t next_set_bit(uint32_t from, + uint32_t total_bits) const { + if (from >= total_bits) return total_bits; + const uint8_t* p = reinterpret_cast(bitmap_); + uint32_t byte_idx = from >> 3; + // Check remaining bits in the first (partial) byte + uint8_t byte_val = p[byte_idx] >> (from & 7); + if (byte_val) { + return from + __builtin_ctz(byte_val); + } + // Scan subsequent full bytes, skipping zeros + const uint32_t byte_end = (total_bits + 7) >> 3; + for (++byte_idx; byte_idx < byte_end; ++byte_idx) { + if (p[byte_idx]) { + uint32_t pos = (byte_idx << 3) + __builtin_ctz(p[byte_idx]); + return pos < total_bits ? pos : total_bits; + } + } + return total_bits; + } + FORCE_INLINE uint32_t get_size() { return size_; } - FORCE_INLINE char* get_bitmap() { return bitmap_; } // for debug + FORCE_INLINE char* get_bitmap() { return bitmap_; } private: FORCE_INLINE uint8_t get_bit_mask(uint32_t index) { diff --git a/cpp/src/common/container/byte_buffer.h b/cpp/src/common/container/byte_buffer.h index 922013496..88006dac6 100644 --- a/cpp/src/common/container/byte_buffer.h +++ b/cpp/src/common/container/byte_buffer.h @@ -118,6 +118,8 @@ class ByteBuffer { FORCE_INLINE char* get_data() { return data_; } + FORCE_INLINE uint32_t get_data_size() const { return real_data_size_; } + private: char* data_; uint8_t variable_type_len_; diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 10489f67d..68bdb898d 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -163,6 +163,80 @@ int Tablet::add_timestamp(uint32_t row_index, int64_t timestamp) { return E_OK; } +int Tablet::set_timestamps(const int64_t* timestamps, uint32_t count) { + if (err_code_ != E_OK) { + return err_code_; + } + ASSERT(timestamps_ != NULL); + if (UNLIKELY(count > static_cast(max_row_num_))) { + return E_OUT_OF_RANGE; + } + std::memcpy(timestamps_, timestamps, count * sizeof(int64_t)); + cur_row_size_ = std::max(count, cur_row_size_); + return E_OK; +} + +int Tablet::set_column_values(uint32_t schema_index, const void* data, + const uint8_t* bitmap, uint32_t count) { + if (err_code_ != E_OK) { + return err_code_; + } + if (UNLIKELY(schema_index >= schema_vec_->size())) { + return E_OUT_OF_RANGE; + } + if (UNLIKELY(count > static_cast(max_row_num_))) { + return E_OUT_OF_RANGE; + } + + const MeasurementSchema& schema = schema_vec_->at(schema_index); + size_t elem_size = 0; + void* dst = nullptr; + switch (schema.data_type_) { + case BOOLEAN: + elem_size = sizeof(bool); + dst = value_matrix_[schema_index].bool_data; + break; + case DATE: + case INT32: + elem_size = sizeof(int32_t); + dst = value_matrix_[schema_index].int32_data; + break; + case TIMESTAMP: + case INT64: + elem_size = sizeof(int64_t); + dst = value_matrix_[schema_index].int64_data; + break; + case FLOAT: + elem_size = sizeof(float); + dst = value_matrix_[schema_index].float_data; + break; + case DOUBLE: + elem_size = sizeof(double); + dst = value_matrix_[schema_index].double_data; + break; + default: + return E_TYPE_NOT_SUPPORTED; + } + + if (bitmap == nullptr) { + // All valid: bulk copy + mark all as non-null + std::memcpy(dst, data, count * elem_size); + bitmaps_[schema_index].clear_all(); + } else { + // Bulk copy all data (null positions will have garbage but won't be + // read). + std::memcpy(dst, data, count * elem_size); + + // bitmap uses TsFile convention (1=null, 0=valid), same as + // internal BitMap, so copy directly. + char* tsfile_bm = bitmaps_[schema_index].get_bitmap(); + uint32_t bm_bytes = (count + 7) / 8; + std::memcpy(tsfile_bm, bitmap, bm_bytes); + } + cur_row_size_ = std::max(count, cur_row_size_); + return E_OK; +} + void* Tablet::get_value(int row_index, uint32_t schema_index, common::TSDataType& data_type) const { if (UNLIKELY(schema_index >= schema_vec_->size())) { diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index 04fee7643..02691087d 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -181,6 +181,25 @@ class Tablet { */ int add_timestamp(uint32_t row_index, int64_t timestamp); + /** + * @brief Bulk copy timestamps into the tablet. + * + * @param timestamps Pointer to an array of timestamp values. + * @param count Number of timestamps to copy. Must be <= max_row_num. + * If count > cur_row_size_, cur_row_size_ is updated to count, + * so that subsequent operations know how many rows are populated. + * @return Returns 0 on success, or a non-zero error code on failure + * (E_OUT_OF_RANGE if count > max_row_num). + */ + int set_timestamps(const int64_t* timestamps, uint32_t count); + + // Bulk copy fixed-length column data. If bitmap is nullptr, all rows are + // non-null. Otherwise bit=1 means null, bit=0 means valid (same as TsFile + // BitMap convention). Callers using other conventions (e.g. Arrow, where + // 1=valid) must invert before calling. + int set_column_values(uint32_t schema_index, const void* data, + const uint8_t* bitmap, uint32_t count); + void* get_value(int row_index, uint32_t schema_index, common::TSDataType& data_type) const; /** diff --git a/cpp/src/common/tsblock/vector/vector.h b/cpp/src/common/tsblock/vector/vector.h index 20a765967..37a96c543 100644 --- a/cpp/src/common/tsblock/vector/vector.h +++ b/cpp/src/common/tsblock/vector/vector.h @@ -78,6 +78,10 @@ class Vector { FORCE_INLINE bool has_null() { return has_null_; } + FORCE_INLINE common::BitMap& get_bitmap() { return nulls_; } + + FORCE_INLINE common::ByteBuffer& get_value_data() { return values_; } + // We want derived class to have access to base class members, so it is // defined as protected protected: diff --git a/cpp/src/cwrapper/CMakeLists.txt b/cpp/src/cwrapper/CMakeLists.txt index 07f52eb33..d62250bf0 100644 --- a/cpp/src/cwrapper/CMakeLists.txt +++ b/cpp/src/cwrapper/CMakeLists.txt @@ -18,7 +18,7 @@ under the License. ]] message("Running in cwrapper directory") set(CMAKE_POSITION_INDEPENDENT_CODE ON) -set(CWRAPPER_SRC_LIST tsfile_cwrapper.cc) +set(CWRAPPER_SRC_LIST tsfile_cwrapper.cc arrow_c.cc) add_library(cwrapper_obj OBJECT ${CWRAPPER_SRC_LIST}) # install header files diff --git a/cpp/src/cwrapper/arrow_c.cc b/cpp/src/cwrapper/arrow_c.cc new file mode 100644 index 000000000..6f56cfc6a --- /dev/null +++ b/cpp/src/cwrapper/arrow_c.cc @@ -0,0 +1,903 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include "common/allocator/alloc_base.h" +#include "common/tablet.h" +#include "common/tsblock/tsblock.h" +#include "common/tsblock/tuple_desc.h" +#include "common/tsblock/vector/vector.h" +#include "cwrapper/tsfile_cwrapper.h" +#include "utils/date_utils.h" +#include "utils/errno_define.h" + +namespace arrow { + +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +// Arrow C Data Interface: a table is represented as a paired ArrowSchema + +// ArrowArray (struct type). The schema describes the column headers, and the +// struct array holds per-column data arrays. +// +// ArrowSchema ("+s") ArrowArray (struct, length=N) +// ┌──────────────────┐ ┌──────────────────────────────────────┐ +// │ children[0]: │ │ children[0]: ArrowArray (time) │ +// │ name="time" │ │ buffers[0] = null bitmap │ +// │ format="tsn:" │ │ buffers[1] = [t0, t1, t2, ...] │ +// ├──────────────────┤ ├──────────────────────────────────────┤ +// │ children[1]: │ │ children[1]: ArrowArray (col_a) │ +// │ name="col_a" │ │ buffers[0] = null bitmap │ +// │ format="i" │ │ buffers[1] = [10, 20, NULL, ...] │ +// ├──────────────────┤ ├──────────────────────────────────────┤ +// │ children[2]: │ │ children[2]: ArrowArray (col_b) │ +// │ name="col_b" │ │ buffers[0] = null bitmap │ +// │ format="g" │ │ buffers[1] = [1.1, 2.2, 3.3, ...] │ +// └──────────────────┘ └──────────────────────────────────────┘ +// (table header) (table data) +// +// Memory ownership: each ArrowArray/ArrowSchema stores a private_data pointer +// to a producer-owned struct (ArrowArrayData / ArrowSchemaData / +// StructArrayData) that holds the actual allocated memory. The release() +// callback frees it. This design allows safe cross-library transfer (e.g. to +// PyArrow). + +// Owns the buffers array and each buffer pointer within it. +// Stored in ArrowArray.private_data; freed by ReleaseArrowArray. +struct ArrowArrayData { + void** buffers; + size_t n_buffers; +}; + +// Owns format/name strings and children schemas. +// Stored in ArrowSchema.private_data; freed by ReleaseArrowSchema. +struct ArrowSchemaData { + std::string format_string; + std::string name_string; + ArrowSchema** children; + size_t n_children; +}; + +// Owns children arrays for struct-type ArrowArray. +// Stored in ArrowArray.private_data; freed by ReleaseStructArrowArray. +struct StructArrayData { + ArrowArray** children; + size_t n_children; +}; + +static const char* GetArrowFormatString(common::TSDataType datatype) { + switch (datatype) { + case common::BOOLEAN: + return "b"; + case common::INT32: + return "i"; + case common::INT64: + return "l"; + case common::TIMESTAMP: // nanosecond, no timezone + return "tsn:"; + case common::FLOAT: + return "f"; + case common::DOUBLE: + return "g"; + case common::TEXT: + case common::STRING: + return "u"; + case common::BLOB: + return "z"; + case common::DATE: + return "tdD"; // date32: days since Unix epoch, stored as int32 + default: + return nullptr; + } +} + +static size_t GetNullBitmapSize(int64_t length) { return (length + 7) / 8; } + +// Build Arrow validity bitmap from TsFile Vector and store it in +// array_data->buffers[0]. Sets out_array->null_count. +// Returns E_OK on success, E_OOM on allocation failure. +static int BuildNullBitmap(common::Vector* vec, uint32_t row_count, + ArrowArrayData* array_data, ArrowArray* out_array) { + if (!vec->has_null()) { + array_data->buffers[0] = nullptr; + out_array->null_count = 0; + return common::E_OK; + } + size_t null_bitmap_size = GetNullBitmapSize(row_count); + uint8_t* null_bitmap = static_cast( + common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); + if (null_bitmap == nullptr) { + return common::E_OOM; + } + common::BitMap& vec_bitmap = vec->get_bitmap(); + char* vec_bitmap_data = vec_bitmap.get_bitmap(); + for (size_t i = 0; i < null_bitmap_size; ++i) { + null_bitmap[i] = ~static_cast(vec_bitmap_data[i]); + } + array_data->buffers[0] = null_bitmap; + out_array->null_count = vec_bitmap.count_set_bits(); + return common::E_OK; +} + +// Reset all fields of an ArrowArray to zero/null after releasing. +static void ResetArrowArray(ArrowArray* array) { + array->length = 0; + array->null_count = 0; + array->offset = 0; + array->n_buffers = 0; + array->n_children = 0; + array->buffers = nullptr; + array->children = nullptr; + array->dictionary = nullptr; + array->release = nullptr; + array->private_data = nullptr; +} + +// Release children arrays: call each child's release(), then free the pointer. +// Used by both ReleaseStructArrowArray and error cleanup paths. +static void ReleaseArrowChildren(ArrowArray** children, size_t n_children) { + if (children == nullptr) return; + for (size_t i = 0; i < n_children; ++i) { + if (children[i] != nullptr) { + if (children[i]->release != nullptr) { + children[i]->release(children[i]); + } + common::mem_free(children[i]); + } + } + common::mem_free(children); +} + +// Free an ArrowArrayData and all buffers it owns. +static void FreeArrowArrayData(ArrowArrayData* data) { + if (data == nullptr) return; + if (data->buffers != nullptr) { + for (size_t i = 0; i < data->n_buffers; ++i) { + if (data->buffers[i] != nullptr) { + common::mem_free(data->buffers[i]); + } + } + common::mem_free(data->buffers); + } + common::mem_free(data); +} + +// Release a single-column ArrowArray (owns buffers via ArrowArrayData). +static void ReleaseArrowArray(ArrowArray* array) { + if (array == nullptr || array->private_data == nullptr) { + return; + } + FreeArrowArrayData(static_cast(array->private_data)); + ResetArrowArray(array); +} + +// Release a struct-level ArrowArray (owns children via StructArrayData). +static void ReleaseStructArrowArray(ArrowArray* array) { + if (array == nullptr || array->private_data == nullptr) { + return; + } + StructArrayData* data = static_cast(array->private_data); + ReleaseArrowChildren(data->children, data->n_children); + delete data; + ResetArrowArray(array); +} + +// Release an ArrowSchema (owns strings and children via ArrowSchemaData). +// Free an ArrowSchemaData and all resources it owns (children, strings). +static void FreeArrowSchemaData(ArrowSchemaData* data) { + if (data == nullptr) return; + if (data->children != nullptr) { + for (size_t i = 0; i < data->n_children; ++i) { + if (data->children[i] != nullptr) { + if (data->children[i]->release != nullptr) { + data->children[i]->release(data->children[i]); + } + common::mem_free(data->children[i]); + } + } + common::mem_free(data->children); + } + delete data; +} + +static void ReleaseArrowSchema(ArrowSchema* schema) { + if (schema == nullptr || schema->private_data == nullptr) { + return; + } + FreeArrowSchemaData(static_cast(schema->private_data)); + + schema->format = nullptr; + schema->name = nullptr; + schema->metadata = nullptr; + schema->flags = 0; + schema->n_children = 0; + schema->children = nullptr; + schema->dictionary = nullptr; + schema->release = nullptr; + schema->private_data = nullptr; +} + +static ArrowArrayData* AllocArrowArrayData(int64_t n_buffers) { + ArrowArrayData* data = static_cast( + common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + if (data == nullptr) return nullptr; + data->n_buffers = n_buffers; + data->buffers = static_cast( + common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); + if (data->buffers == nullptr) { + common::mem_free(data); + return nullptr; + } + for (int64_t i = 0; i < n_buffers; ++i) { + data->buffers[i] = nullptr; + } + return data; +} + +static void FinalizeArrowArray(ArrowArray* out_array, + ArrowArrayData* array_data, uint32_t row_count) { + out_array->length = row_count; + out_array->offset = 0; + out_array->n_buffers = array_data->n_buffers; + out_array->n_children = 0; + out_array->buffers = const_cast(array_data->buffers); + out_array->children = nullptr; + out_array->dictionary = nullptr; + out_array->release = ReleaseArrowArray; + out_array->private_data = array_data; +} + +template +inline int BuildFixedLengthArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + size_t type_size = sizeof(CType); + + ArrowArrayData* array_data = AllocArrowArrayData(2); + if (array_data == nullptr) return common::E_OOM; + + int bm_ret = BuildNullBitmap(vec, row_count, array_data, out_array); + if (bm_ret != common::E_OK) { + FreeArrowArrayData(array_data); + return bm_ret; + } + + char* vec_data = vec->get_value_data().get_data(); + void* data_buffer = nullptr; + + if (std::is_same::value) { + size_t packed_size = GetNullBitmapSize(row_count); + uint8_t* packed_buffer = static_cast( + common::mem_alloc(packed_size, common::MOD_TSBLOCK)); + if (packed_buffer == nullptr) { + FreeArrowArrayData(array_data); + return common::E_OOM; + } + + std::memset(packed_buffer, 0, packed_size); + + // Vector stores booleans as one byte each, densely packed + // (null rows have no entry). Scatter into Arrow bit-packed format. + // Use next_set_bit to skip null rows without per-row bitmap testing. + const uint8_t* src = reinterpret_cast(vec_data); + uint32_t src_idx = 0; + if (has_null) { + common::BitMap& bm = vec->get_bitmap(); + uint32_t pos = 0; + while (pos < row_count) { + uint32_t null_pos = bm.next_set_bit(pos, row_count); + // Process non-null run [pos, null_pos) + for (uint32_t i = pos; i < null_pos; ++i) { + if (src[src_idx] != 0) { + packed_buffer[i / 8] |= (1 << (i & 7)); + } + src_idx++; + } + if (null_pos >= row_count) break; + // Skip null row (no source data, packed_buffer already zeroed) + pos = null_pos + 1; + } + } else { + for (uint32_t i = 0; i < row_count; ++i) { + if (src[src_idx] != 0) { + packed_buffer[i / 8] |= (1 << (i & 7)); + } + src_idx++; + } + } + + data_buffer = packed_buffer; + } else { + size_t data_size = type_size * row_count; + data_buffer = common::mem_alloc(data_size, common::MOD_TSBLOCK); + if (data_buffer == nullptr) { + FreeArrowArrayData(array_data); + return common::E_OOM; + } + + if (has_null) { + // Value buffer is densely packed (no slots for null rows). + // Scatter non-null values into their correct Arrow positions. + // Use next_set_bit to jump between null positions and bulk-copy + // contiguous non-null runs in between. + common::BitMap& bm = vec->get_bitmap(); + char* dst = static_cast(data_buffer); + uint32_t src_offset = 0; + uint32_t pos = 0; + + while (pos < row_count) { + uint32_t null_pos = bm.next_set_bit(pos, row_count); + // Copy the non-null run [pos, null_pos) + if (null_pos > pos) { + uint32_t run = null_pos - pos; + std::memcpy(dst + pos * type_size, vec_data + src_offset, + run * type_size); + src_offset += run * type_size; + } + if (null_pos >= row_count) break; + // Zero-fill the null slot + std::memset(dst + null_pos * type_size, 0, type_size); + pos = null_pos + 1; + } + } else { + // No nulls: value buffer is dense and complete, direct copy + std::memcpy(data_buffer, vec_data, data_size); + } + } + + array_data->buffers[1] = data_buffer; + FinalizeArrowArray(out_array, array_data, row_count); + return common::E_OK; +} + +static int BuildStringArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + + ArrowArrayData* array_data = AllocArrowArrayData(3); + if (array_data == nullptr) return common::E_OOM; + + int bm_ret = BuildNullBitmap(vec, row_count, array_data, out_array); + if (bm_ret != common::E_OK) { + FreeArrowArrayData(array_data); + return bm_ret; + } + + size_t offsets_size = sizeof(int32_t) * (row_count + 1); + int32_t* offsets = static_cast( + common::mem_alloc(offsets_size, common::MOD_TSBLOCK)); + if (offsets == nullptr) { + FreeArrowArrayData(array_data); + return common::E_OOM; + } + + // Total string data = vec buffer bytes - length prefixes of non-null rows. + uint32_t nonnull_count = + row_count - static_cast(out_array->null_count); + common::ByteBuffer& value_buf = vec->get_value_data(); + char* vec_data = value_buf.get_data(); + uint32_t vec_offset = 0; + common::BitMap& vec_bitmap = vec->get_bitmap(); + uint32_t total_data_size = + value_buf.get_data_size() - nonnull_count * sizeof(uint32_t); + + uint8_t* data_buffer = static_cast(common::mem_alloc( + total_data_size > 0 ? total_data_size : 1, common::MOD_TSBLOCK)); + if (data_buffer == nullptr) { + common::mem_free(offsets); + FreeArrowArrayData(array_data); + return common::E_OOM; + } + + // Single pass: build offsets and copy string data together. + // Use next_set_bit to skip null rows without per-row bitmap testing. + offsets[0] = 0; + uint32_t data_offset = 0; + if (has_null) { + uint32_t pos = 0; + while (pos < row_count) { + uint32_t null_pos = vec_bitmap.next_set_bit(pos, row_count); + // Process non-null run [pos, null_pos) + for (uint32_t i = pos; i < null_pos; ++i) { + uint32_t len = 0; + std::memcpy(&len, vec_data + vec_offset, sizeof(uint32_t)); + vec_offset += sizeof(uint32_t); + if (len > 0) { + std::memcpy(data_buffer + data_offset, + vec_data + vec_offset, len); + } + vec_offset += len; + data_offset += len; + offsets[i + 1] = data_offset; + } + if (null_pos >= row_count) break; + // Null row: no source data, offset stays the same + offsets[null_pos + 1] = data_offset; + pos = null_pos + 1; + } + } else { + for (uint32_t i = 0; i < row_count; ++i) { + uint32_t len = 0; + std::memcpy(&len, vec_data + vec_offset, sizeof(uint32_t)); + vec_offset += sizeof(uint32_t); + if (len > 0) { + std::memcpy(data_buffer + data_offset, vec_data + vec_offset, + len); + } + vec_offset += len; + data_offset += len; + offsets[i + 1] = data_offset; + } + } + array_data->buffers[1] = offsets; + array_data->buffers[2] = data_buffer; + FinalizeArrowArray(out_array, array_data, row_count); + return common::E_OK; +} + +static int BuildDateArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + + ArrowArrayData* array_data = AllocArrowArrayData(2); + if (array_data == nullptr) return common::E_OOM; + + common::BitMap& vec_bitmap = vec->get_bitmap(); + int bm_ret = BuildNullBitmap(vec, row_count, array_data, out_array); + if (bm_ret != common::E_OK) { + FreeArrowArrayData(array_data); + return bm_ret; + } + + int32_t* data_buffer = static_cast( + common::mem_alloc(sizeof(int32_t) * row_count, common::MOD_TSBLOCK)); + if (data_buffer == nullptr) { + FreeArrowArrayData(array_data); + return common::E_OOM; + } + + // Use next_set_bit to skip null rows without per-row bitmap testing. + char* vec_data = vec->get_value_data().get_data(); + uint32_t src_offset = 0; + if (has_null) { + uint32_t pos = 0; + while (pos < row_count) { + uint32_t null_pos = vec_bitmap.next_set_bit(pos, row_count); + // Process non-null run [pos, null_pos) + for (uint32_t i = pos; i < null_pos; ++i) { + int32_t yyyymmdd = 0; + std::memcpy(&yyyymmdd, vec_data + src_offset, sizeof(int32_t)); + src_offset += sizeof(int32_t); + data_buffer[i] = common::YYYYMMDDToDaysSinceEpoch(yyyymmdd); + } + if (null_pos >= row_count) break; + // Null row: zero fill + data_buffer[null_pos] = 0; + pos = null_pos + 1; + } + } else { + for (uint32_t i = 0; i < row_count; ++i) { + int32_t yyyymmdd = 0; + std::memcpy(&yyyymmdd, vec_data + src_offset, sizeof(int32_t)); + src_offset += sizeof(int32_t); + data_buffer[i] = common::YYYYMMDDToDaysSinceEpoch(yyyymmdd); + } + } + + array_data->buffers[1] = data_buffer; + FinalizeArrowArray(out_array, array_data, row_count); + return common::E_OK; +} + +static int BuildColumnArrowArray(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + common::TSDataType data_type = vec->get_vector_type(); + const char* format = GetArrowFormatString(data_type); + if (format == nullptr) { + return common::E_TYPE_NOT_SUPPORTED; + } + + int ret = common::E_OK; + switch (data_type) { + case common::BOOLEAN: + ret = BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::INT32: + ret = + BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::DATE: + ret = BuildDateArrowArrayC(vec, row_count, out_array); + break; + case common::INT64: + case common::TIMESTAMP: + ret = + BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::FLOAT: + ret = BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::DOUBLE: + ret = + BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::TEXT: + case common::STRING: + case common::BLOB: + ret = BuildStringArrowArrayC(vec, row_count, out_array); + break; + default: + return common::E_TYPE_NOT_SUPPORTED; + } + return ret; +} + +// Build ArrowSchema for a single column +static int BuildColumnArrowSchema(common::TSDataType data_type, + const std::string& column_name, + ArrowSchema* out_schema) { + if (out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + const char* format = GetArrowFormatString(data_type); + if (format == nullptr) { + return common::E_TYPE_NOT_SUPPORTED; + } + + ArrowSchemaData* schema_data = new ArrowSchemaData(); + schema_data->format_string = format; + schema_data->name_string = column_name; + schema_data->children = nullptr; + schema_data->n_children = 0; + + out_schema->format = schema_data->format_string.c_str(); + out_schema->name = schema_data->name_string.c_str(); + out_schema->metadata = nullptr; + out_schema->flags = ARROW_FLAG_NULLABLE; + out_schema->n_children = 0; + out_schema->children = nullptr; + out_schema->dictionary = nullptr; + out_schema->release = ReleaseArrowSchema; + out_schema->private_data = schema_data; + + return common::E_OK; +} + +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema) { + if (out_array == nullptr || out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + uint32_t row_count = tsblock.get_row_count(); + uint32_t column_count = tsblock.get_column_count(); + common::TupleDesc* tuple_desc = tsblock.get_tuple_desc(); + + if (row_count == 0 || column_count == 0) { + return common::E_INVALID_ARG; + } + + // Build ArrowSchema for struct type + ArrowSchemaData* schema_data = new ArrowSchemaData(); + schema_data->format_string = "+s"; + schema_data->name_string = ""; + schema_data->n_children = column_count; + schema_data->children = static_cast(common::mem_alloc( + column_count * sizeof(ArrowSchema*), common::MOD_TSBLOCK)); + if (schema_data->children == nullptr) { + FreeArrowSchemaData(schema_data); + return common::E_OOM; + } + + for (uint32_t i = 0; i < column_count; ++i) { + schema_data->children[i] = nullptr; + } + + // Build schema for each column + for (uint32_t i = 0; i < column_count; ++i) { + schema_data->children[i] = static_cast( + common::mem_alloc(sizeof(ArrowSchema), common::MOD_TSBLOCK)); + if (schema_data->children[i] == nullptr) { + FreeArrowSchemaData(schema_data); + return common::E_OOM; + } + schema_data->children[i]->release = nullptr; + + common::TSDataType col_type = tuple_desc->get_column_type(i); + std::string col_name = tuple_desc->get_column_name(i); + + int ret = BuildColumnArrowSchema(col_type, col_name, + schema_data->children[i]); + if (ret != common::E_OK) { + FreeArrowSchemaData(schema_data); + return ret; + } + } + + out_schema->format = schema_data->format_string.c_str(); + out_schema->name = schema_data->name_string.c_str(); + out_schema->metadata = nullptr; + out_schema->flags = 0; + out_schema->n_children = column_count; + out_schema->children = schema_data->children; + out_schema->dictionary = nullptr; + out_schema->release = ReleaseArrowSchema; + out_schema->private_data = schema_data; + + ArrowArray** children_arrays = static_cast(common::mem_alloc( + column_count * sizeof(ArrowArray*), common::MOD_TSBLOCK)); + if (children_arrays == nullptr) { + ReleaseArrowSchema(out_schema); + return common::E_OOM; + } + + for (uint32_t i = 0; i < column_count; ++i) { + children_arrays[i] = nullptr; + } + + for (uint32_t i = 0; i < column_count; ++i) { + children_arrays[i] = static_cast( + common::mem_alloc(sizeof(ArrowArray), common::MOD_TSBLOCK)); + if (children_arrays[i] == nullptr) { + ReleaseArrowChildren(children_arrays, column_count); + ReleaseArrowSchema(out_schema); + return common::E_OOM; + } + children_arrays[i]->release = nullptr; + + common::Vector* vec = tsblock.get_vector(i); + int ret = BuildColumnArrowArray(vec, row_count, children_arrays[i]); + if (ret != common::E_OK) { + ReleaseArrowChildren(children_arrays, column_count); + ReleaseArrowSchema(out_schema); + return ret; + } + } + + StructArrayData* struct_data = new StructArrayData(); + struct_data->children = children_arrays; + struct_data->n_children = column_count; + + // Arrow C Data Interface: struct type requires n_buffers = 1 (validity + // bitmap) buffers[0] may be NULL if there are no nulls at the struct level + static const void* struct_buffers[1] = {nullptr}; + + out_array->length = row_count; + out_array->null_count = 0; // struct itself is never null + out_array->offset = 0; + out_array->n_buffers = 1; + out_array->n_children = column_count; + out_array->buffers = struct_buffers; + out_array->children = children_arrays; + out_array->dictionary = nullptr; + out_array->release = ReleaseStructArrowArray; + out_array->private_data = struct_data; + + return common::E_OK; +} + +// Check if Arrow row is valid (non-null) based on validity bitmap +static bool ArrowIsValid(const ArrowArray* arr, int64_t row) { + if (arr->null_count == 0 || arr->buffers[0] == nullptr) return true; + int64_t bit_idx = arr->offset + row; + const uint8_t* bitmap = static_cast(arr->buffers[0]); + return (bitmap[bit_idx / 8] >> (bit_idx % 8)) & 1; +} + +// Map Arrow format string to TSDataType +static common::TSDataType ArrowFormatToDataType(const char* format) { + if (strcmp(format, "b") == 0) return common::BOOLEAN; + if (strcmp(format, "i") == 0) return common::INT32; + if (strcmp(format, "l") == 0) return common::INT64; + if (strcmp(format, "tsn:") == 0) return common::TIMESTAMP; + if (strcmp(format, "f") == 0) return common::FLOAT; + if (strcmp(format, "g") == 0) return common::DOUBLE; + if (strcmp(format, "u") == 0) return common::TEXT; + if (strcmp(format, "z") == 0) return common::BLOB; + if (strcmp(format, "tdD") == 0) return common::DATE; + return common::INVALID_DATATYPE; +} + +// Convert Arrow C Data Interface struct array to storage::Tablet. +// time_col_index specifies which column in the Arrow struct to use as the +// timestamp column. +// All other columns become data columns in the Tablet. +// reg_schema: optional registered TableSchema; when provided its column types +// are used in the Tablet (so they match the writer's registered schema +// exactly). +// Arrow format strings are still used to decode the actual buffers. +int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, + const ArrowSchema* in_schema, + const storage::TableSchema* reg_schema, + storage::Tablet** out_tablet, int time_col_index) { + if (!in_array || !in_schema || !out_tablet) return common::E_INVALID_ARG; + if (strcmp(in_schema->format, "+s") != 0) return common::E_INVALID_ARG; + + int64_t n_rows = in_array->length; + int64_t n_cols = in_schema->n_children; + if (n_rows <= 0 || n_cols == 0) return common::E_INVALID_ARG; + + if (time_col_index < 0 || time_col_index >= n_cols) + return common::E_INVALID_ARG; + + std::vector col_names; + std::vector col_types; + std::vector read_modes; + std::vector data_col_indices; + + std::vector reg_data_types; + if (reg_schema) { + reg_data_types = reg_schema->get_data_types(); + } + + for (int64_t i = 0; i < n_cols; i++) { + if (static_cast(i) == time_col_index) continue; + const ArrowSchema* child = in_schema->children[i]; + common::TSDataType read_mode = ArrowFormatToDataType(child->format); + if (read_mode == common::INVALID_DATATYPE) + return common::E_TYPE_NOT_SUPPORTED; + std::string col_name = child->name ? child->name : ""; + common::TSDataType col_type = read_mode; + if (reg_schema) { + int reg_idx = const_cast(reg_schema) + ->find_column_index(col_name); + if (reg_idx >= 0 && + reg_idx < static_cast(reg_data_types.size())) { + col_type = reg_data_types[reg_idx]; + } + } + col_names.emplace_back(std::move(col_name)); + col_types.push_back(col_type); + read_modes.push_back(read_mode); + data_col_indices.push_back(static_cast(i)); + } + + if (col_names.empty()) return common::E_INVALID_ARG; + + std::string tname = table_name ? table_name : "default_table"; + auto* tablet = new storage::Tablet(tname, &col_names, &col_types, + static_cast(n_rows)); + if (tablet->err_code_ != common::E_OK) { + int err = tablet->err_code_; + delete tablet; + return err; + } + + // Fill timestamps from the time column + { + const ArrowArray* ts_arr = in_array->children[time_col_index]; + const int64_t* ts_buf = + static_cast(ts_arr->buffers[1]) + ts_arr->offset; + tablet->set_timestamps(ts_buf, static_cast(n_rows)); + } + + // Fill data columns from Arrow children (use read_modes to decode buffers) + for (size_t ci = 0; ci < data_col_indices.size(); ci++) { + const ArrowArray* col_arr = in_array->children[data_col_indices[ci]]; + common::TSDataType dtype = read_modes[ci]; + uint32_t tcol = static_cast(ci); + int64_t off = col_arr->offset; + + const uint8_t* validity = + (col_arr->null_count > 0 && col_arr->buffers[0] != nullptr) + ? static_cast(col_arr->buffers[0]) + : nullptr; + + switch (dtype) { + case common::BOOLEAN: { + const uint8_t* vals = + static_cast(col_arr->buffers[1]); + for (int64_t r = 0; r < n_rows; r++) { + if (!ArrowIsValid(col_arr, r)) continue; + int64_t bit = off + r; + bool v = (vals[bit / 8] >> (bit % 8)) & 1; + tablet->add_value(static_cast(r), tcol, v); + } + break; + } + case common::INT32: + case common::INT64: + case common::FLOAT: + case common::DOUBLE: { + // Invert Arrow bitmap (1=valid) to TsFile bitmap (1=null) + const uint8_t* null_bm = nullptr; + uint8_t* inverted_bm = nullptr; + if (validity != nullptr) { + uint32_t bm_bytes = (static_cast(n_rows) + 7) / 8; + inverted_bm = static_cast( + common::mem_alloc(bm_bytes, common::MOD_TSBLOCK)); + if (inverted_bm == nullptr) { + delete tablet; + return common::E_OOM; + } + for (uint32_t b = 0; b < bm_bytes; b++) { + inverted_bm[b] = ~validity[b]; + } + null_bm = inverted_bm; + } + tablet->set_column_values(tcol, col_arr->buffers[1], null_bm, + static_cast(n_rows)); + if (inverted_bm != nullptr) { + common::mem_free(inverted_bm); + } + break; + } + case common::DATE: { + // Arrow stores date as int32 days-since-epoch; convert to + // YYYYMMDD + const int32_t* vals = + static_cast(col_arr->buffers[1]); + for (int64_t r = 0; r < n_rows; r++) { + if (!ArrowIsValid(col_arr, r)) continue; + int32_t yyyymmdd = + common::DaysSinceEpochToYYYYMMDD(vals[off + r]); + tablet->add_value(static_cast(r), tcol, + yyyymmdd); + } + break; + } + case common::TEXT: + case common::STRING: + case common::BLOB: { + const int32_t* offsets = + static_cast(col_arr->buffers[1]); + const char* data = + static_cast(col_arr->buffers[2]); + for (int64_t r = 0; r < n_rows; r++) { + if (!ArrowIsValid(col_arr, r)) continue; + int32_t start = offsets[off + r]; + int32_t len = offsets[off + r + 1] - start; + tablet->add_value(static_cast(r), tcol, + common::String(data + start, len)); + } + break; + } + default: + delete tablet; + return common::E_TYPE_NOT_SUPPORTED; + } + } + + *out_tablet = tablet; + return common::E_OK; +} + +} // namespace arrow diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index fbcf4e6f1..620b8392f 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -24,13 +24,25 @@ #include #include +#include #include #include "common/tablet.h" #include "reader/result_set.h" +#include "reader/table_result_set.h" #include "reader/tsfile_reader.h" #include "writer/tsfile_writer.h" +// Forward declarations for arrow namespace functions (defined in arrow_c.cc) +namespace arrow { +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema); +int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, + const ArrowSchema* in_schema, + const storage::TableSchema* reg_schema, + storage::Tablet** out_tablet, int time_col_index); +} // namespace arrow + #ifdef __cplusplus extern "C" { #endif @@ -361,6 +373,21 @@ ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, return table_result_set; } +ResultSet tsfile_query_table_batch(TsFileReader reader, const char* table_name, + char** columns, uint32_t column_num, + Timestamp start_time, Timestamp end_time, + int batch_size, ERRNO* err_code) { + auto* r = static_cast(reader); + storage::ResultSet* table_result_set = nullptr; + std::vector column_names; + for (uint32_t i = 0; i < column_num; i++) { + column_names.emplace_back(columns[i]); + } + *err_code = r->query(table_name, column_names, start_time, end_time, + table_result_set, batch_size); + return table_result_set; +} + bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { auto* r = static_cast(result_set); bool has_next = true; @@ -373,6 +400,34 @@ bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { return has_next; } +ERRNO tsfile_result_set_get_next_tsblock_as_arrow(ResultSet result_set, + ArrowArray* out_array, + ArrowSchema* out_schema) { + if (result_set == nullptr || out_array == nullptr || + out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + auto* r = static_cast(result_set); + auto* table_result_set = dynamic_cast(r); + if (table_result_set == nullptr) { + return common::E_INVALID_ARG; + } + + common::TsBlock* tsblock = nullptr; + int ret = table_result_set->get_next_tsblock(tsblock); + if (ret != common::E_OK) { + return ret; + } + + if (tsblock == nullptr) { + return common::E_NO_MORE_DATA; + } + + ret = arrow::TsBlockToArrowStruct(*tsblock, out_array, out_schema); + return ret; +} + #define TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(type) \ type tsfile_result_set_get_value_by_name_##type(ResultSet result_set, \ const char* column_name) { \ @@ -744,6 +799,22 @@ ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet) { return w->write_table(*tbl); } +ERRNO _tsfile_writer_write_arrow_table(TsFileWriter writer, + const char* table_name, + ArrowArray* array, ArrowSchema* schema, + int time_col_index) { + auto* w = static_cast(writer); + std::shared_ptr reg_schema = + w->get_table_schema(table_name ? std::string(table_name) : ""); + storage::Tablet* tablet = nullptr; + int ret = arrow::ArrowStructToTablet( + table_name, array, schema, reg_schema.get(), &tablet, time_col_index); + if (ret != common::E_OK) return ret; + ret = w->write_table(*tablet); + delete tablet; + return ret; +} + ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) { auto* w = static_cast(writer); const storage::TsRecord* record = static_cast(data); diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 643b4e52b..49de2b5b2 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -20,6 +20,7 @@ #ifndef SRC_CWRAPPER_TSFILE_CWRAPPER_H_ #define SRC_CWRAPPER_TSFILE_CWRAPPER_H_ #ifdef __cplusplus + extern "C" { #endif @@ -124,6 +125,39 @@ typedef void* TsRecord; typedef void* ResultSet; +typedef struct arrow_schema { + // Array type description + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct arrow_schema** children; + struct arrow_schema* dictionary; + + // Release callback + void (*release)(struct arrow_schema*); + // Opaque producer-specific data + void* private_data; +} ArrowSchema; + +typedef struct arrow_array { + // Array data description + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct arrow_array** children; + struct arrow_array* dictionary; + + // Release callback + void (*release)(struct arrow_array*); + // Opaque producer-specific data + void* private_data; +} ArrowArray; + typedef int32_t ERRNO; typedef int64_t Timestamp; @@ -444,11 +478,15 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name, ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, uint32_t column_num, Timestamp start_time, Timestamp end_time, ERRNO* err_code); +ResultSet tsfile_query_table_batch(TsFileReader reader, const char* table_name, + char** columns, uint32_t column_num, + Timestamp start_time, Timestamp end_time, + int batch_size, ERRNO* err_code); // ResultSet tsfile_reader_query_device(TsFileReader reader, // const char* device_name, -// char** sensor_name, uint32_t sensor_num, -// Timestamp start_time, Timestamp -// end_time); +// char** sensor_name, uint32_t +// sensor_num, Timestamp start_time, +// Timestamp end_time); /** * @brief Check and fetch the next row in the ResultSet. @@ -458,6 +496,27 @@ ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, */ bool tsfile_result_set_next(ResultSet result_set, ERRNO* error_code); +/** + * @brief Gets the next TsBlock from batch ResultSet and converts it to Arrow + * format. + * + * @param result_set [in] Valid ResultSet handle from batch query + * (tsfile_query_table_batch). + * @param out_array [out] Pointer to ArrowArray pointer. Will be set to the + * converted Arrow array. + * @param out_schema [out] Pointer to ArrowSchema pointer. Will be set to the + * converted Arrow schema. + * @return ERRNO - E_OK(0) on success, E_NO_MORE_DATA if no more blocks, or + * other error codes. + * @note Caller should release ArrowArray and ArrowSchema by calling their + * release callbacks when done. + * @note This function should only be called on ResultSet obtained from + * tsfile_query_table_batch with batch_size > 0. + */ +ERRNO tsfile_result_set_get_next_tsblock_as_arrow(ResultSet result_set, + ArrowArray* out_array, + ArrowSchema* out_schema); + /** * @brief Gets value from current row by column name (generic types). * @@ -659,6 +718,14 @@ ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); // Write a tablet into a table. ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet); +// Write Arrow C Data Interface batch into a table (Arrow -> Tablet -> write). +// time_col_index: index of the time column in the Arrow struct. +// Caller should determine the correct time_col_index before calling. +ERRNO _tsfile_writer_write_arrow_table(TsFileWriter writer, + const char* table_name, + ArrowArray* array, ArrowSchema* schema, + int time_col_index); + // Write a row record into a device. ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord record); diff --git a/cpp/src/reader/result_set.h b/cpp/src/reader/result_set.h index 87303cef4..216fced4f 100644 --- a/cpp/src/reader/result_set.h +++ b/cpp/src/reader/result_set.h @@ -25,6 +25,7 @@ #include #include "common/row_record.h" +#include "common/tsblock/tsblock.h" namespace storage { /** @@ -155,6 +156,11 @@ class ResultSet : std::enable_shared_from_this { ASSERT(column_index >= 0 && column_index < row_record->get_col_num()); return row_record->get_field(column_index)->get_value(); } + + virtual int get_next_tsblock(common::TsBlock*& block) { + return common::E_INVALID_ARG; + } + /** * @brief Get the row record of the result set * diff --git a/cpp/src/reader/table_query_executor.cc b/cpp/src/reader/table_query_executor.cc index 2a01a6d5c..30fce4a75 100644 --- a/cpp/src/reader/table_query_executor.cc +++ b/cpp/src/reader/table_query_executor.cc @@ -83,8 +83,9 @@ int TableQueryExecutor::query(const std::string& table_name, ret = common::E_UNSUPPORTED_ORDER; } assert(tsblock_reader != nullptr); - ret_qds = new TableResultSet(std::move(tsblock_reader), - lower_case_column_names, data_types); + ret_qds = + new TableResultSet(std::move(tsblock_reader), lower_case_column_names, + data_types, return_mode_); return ret; } diff --git a/cpp/src/reader/table_query_executor.h b/cpp/src/reader/table_query_executor.h index 974e6b45b..718947e5a 100644 --- a/cpp/src/reader/table_query_executor.h +++ b/cpp/src/reader/table_query_executor.h @@ -44,13 +44,20 @@ class TableQueryExecutor { : meta_data_querier_(meta_data_querier), tsfile_io_reader_(tsfile_io_reader), table_query_ordering_(table_query_ordering), - block_size_(block_size) {} - TableQueryExecutor(ReadFile* read_file) { + block_size_(block_size), + return_mode_(RETURN_ROW) {} + TableQueryExecutor(ReadFile* read_file, const int batch_size = -1) { tsfile_io_reader_ = new TsFileIOReader(); tsfile_io_reader_->init(read_file); meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_); table_query_ordering_ = TableQueryOrdering::DEVICE; - block_size_ = 1024; + if (batch_size <= 0) { + block_size_ = 1024; + return_mode_ = RETURN_ROW; + } else { + block_size_ = batch_size; + return_mode_ = RETURN_BATCH; + } } ~TableQueryExecutor() { if (meta_data_querier_ != nullptr) { @@ -76,6 +83,7 @@ class TableQueryExecutor { TsFileIOReader* tsfile_io_reader_; TableQueryOrdering table_query_ordering_; int32_t block_size_; + int return_mode_; }; } // namespace storage diff --git a/cpp/src/reader/table_result_set.cc b/cpp/src/reader/table_result_set.cc index aeeefb463..81b58ce68 100644 --- a/cpp/src/reader/table_result_set.cc +++ b/cpp/src/reader/table_result_set.cc @@ -37,7 +37,12 @@ void TableResultSet::init() { TableResultSet::~TableResultSet() { close(); } int TableResultSet::next(bool& has_next) { + if (return_mode_ != RETURN_ROW) { + return tsblock_reader_->has_next(has_next); + } + int ret = common::E_OK; + while (row_iterator_ == nullptr || !row_iterator_->has_next()) { if (RET_FAIL(tsblock_reader_->has_next(has_next))) { return ret; @@ -103,6 +108,35 @@ std::shared_ptr TableResultSet::get_metadata() { return result_set_metadata_; } +int TableResultSet::get_next_tsblock(common::TsBlock*& block) { + int ret = common::E_OK; + block = nullptr; + + if (return_mode_ == RETURN_ROW) { + return common::E_INVALID_ARG; + } + + bool has_next = false; + if (RET_FAIL(tsblock_reader_->has_next(has_next))) { + return ret; + } + + if (!has_next) { + return common::E_NO_MORE_DATA; + } + + if (RET_FAIL(tsblock_reader_->next(tsblock_))) { + return ret; + } + + if (tsblock_ == nullptr) { + return common::E_NO_MORE_DATA; + } + + block = tsblock_; + return common::E_OK; +} + void TableResultSet::close() { tsblock_reader_->close(); pa_.destroy(); diff --git a/cpp/src/reader/table_result_set.h b/cpp/src/reader/table_result_set.h index 4192f7c2f..072a63f6f 100644 --- a/cpp/src/reader/table_result_set.h +++ b/cpp/src/reader/table_result_set.h @@ -24,23 +24,29 @@ #include "reader/result_set.h" namespace storage { + +enum ReturnMode { RETURN_ROW = 0, RETURN_BATCH = 1 }; + class TableResultSet : public ResultSet { public: explicit TableResultSet(std::unique_ptr tsblock_reader, std::vector column_names, - std::vector data_types) + std::vector data_types, + int return_mode = RETURN_ROW) : tsblock_reader_(std::move(tsblock_reader)), column_names_(column_names), - data_types_(data_types) { + data_types_(data_types), + return_mode_(return_mode) { init(); } - ~TableResultSet(); + ~TableResultSet() override; int next(bool& has_next) override; bool is_null(const std::string& column_name) override; bool is_null(uint32_t column_index) override; RowRecord* get_row_record() override; std::shared_ptr get_metadata() override; void close() override; + int get_next_tsblock(common::TsBlock*& block) override; private: void init(); @@ -51,6 +57,7 @@ class TableResultSet : public ResultSet { std::vector> tsblock_readers_; std::vector column_names_; std::vector data_types_; + const int return_mode_; }; } // namespace storage #endif // TABLE_RESULT_SET_H \ No newline at end of file diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index 84188b6a3..03f4bf1d1 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -44,7 +44,6 @@ int TsFileReader::open(const std::string& file_path) { } else if (RET_FAIL(tsfile_executor_->init(read_file_))) { std::cout << "filed to init " << ret << std::endl; } - table_query_executor_ = new storage::TableQueryExecutor(read_file_); return ret; } @@ -89,15 +88,16 @@ int TsFileReader::query(std::vector& path_list, int64_t start_time, int TsFileReader::query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, int64_t end_time, - ResultSet*& result_set) { + ResultSet*& result_set, int batch_size) { return this->query(table_name, columns_names, start_time, end_time, - result_set, nullptr); + result_set, nullptr, batch_size); } int TsFileReader::query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, int64_t end_time, - ResultSet*& result_set, Filter* tag_filter) { + ResultSet*& result_set, Filter* tag_filter, + int batch_size) { int ret = E_OK; TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta(); if (tsfile_meta == nullptr) { @@ -110,6 +110,9 @@ int TsFileReader::query(const std::string& table_name, } Filter* time_filter = new TimeBetween(start_time, end_time, false); + if (table_query_executor_ == nullptr) { + table_query_executor_ = new TableQueryExecutor(read_file_, batch_size); + } ret = table_query_executor_->query(to_lower(table_name), columns_names, time_filter, tag_filter, nullptr, result_set); @@ -187,6 +190,9 @@ int TsFileReader::query_table_on_tree( columns_names[i] = "col_" + std::to_string(i); } Filter* time_filter = new TimeBetween(star_time, end_time, false); + if (table_query_executor_ == nullptr) { + table_query_executor_ = new TableQueryExecutor(read_file_); + } ret = table_query_executor_->query_on_tree( satisfied_device_ids, columns_names, measurement_names_to_query, time_filter, result_set); diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index 6c8563563..9f5e38ec5 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -92,10 +92,12 @@ class TsFileReader { * @param [in] start_time the start time * @param [in] end_time the end time * @param [out] result_set the result set + * @param [in] batch_size <= 0 means row-by-row return mode, + * > 0 means return TsBlock with the specified block size. */ int query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, - int64_t end_time, ResultSet*& result_set); + int64_t end_time, ResultSet*& result_set, int batch_size = -1); /** * @brief query the tsfile by the table name, columns names, start time @@ -111,7 +113,8 @@ class TsFileReader { */ int query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, - int64_t end_time, ResultSet*& result_set, Filter* tag_filter); + int64_t end_time, ResultSet*& result_set, Filter* tag_filter, + int batch_size = 0); int query_table_on_tree(const std::vector& measurement_names, int64_t star_time, int64_t end_time, diff --git a/cpp/src/utils/date_utils.h b/cpp/src/utils/date_utils.h new file mode 100644 index 000000000..77323812e --- /dev/null +++ b/cpp/src/utils/date_utils.h @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef UTILS_DATE_UTILS_H +#define UTILS_DATE_UTILS_H + +#include + +#include + +namespace common { + +// Convert TsFile YYYYMMDD integer to days since Unix epoch (1970-01-01). +inline int32_t YYYYMMDDToDaysSinceEpoch(int32_t yyyymmdd) { + int year = yyyymmdd / 10000; + int month = (yyyymmdd % 10000) / 100; + int day = yyyymmdd % 100; + + std::tm date = {}; + date.tm_year = year - 1900; + date.tm_mon = month - 1; + date.tm_mday = day; + date.tm_hour = 12; + date.tm_isdst = -1; + + std::tm epoch = {}; + epoch.tm_year = 70; + epoch.tm_mon = 0; + epoch.tm_mday = 1; + epoch.tm_hour = 12; + epoch.tm_isdst = -1; + + time_t t1 = mktime(&date); + time_t t2 = mktime(&epoch); + return static_cast((t1 - t2) / (60 * 60 * 24)); +} + +// Convert days since Unix epoch back to YYYYMMDD integer format. +inline int32_t DaysSinceEpochToYYYYMMDD(int32_t days) { + std::tm epoch = {}; + epoch.tm_year = 70; + epoch.tm_mon = 0; + epoch.tm_mday = 1; + epoch.tm_hour = 12; + epoch.tm_isdst = -1; + time_t epoch_t = mktime(&epoch); + time_t target_t = epoch_t + static_cast(days) * 24 * 60 * 60; + std::tm* d = localtime(&target_t); + return (d->tm_year + 1900) * 10000 + (d->tm_mon + 1) * 100 + d->tm_mday; +} + +} // namespace common + +#endif // UTILS_DATE_UTILS_H diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 9a087a82f..85a816ef6 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -407,6 +407,14 @@ int TsFileWriter::do_check_and_prepare_tablet(Tablet& tablet) { return common::E_OK; } +std::shared_ptr TsFileWriter::get_table_schema( + const std::string& table_name) const { + auto& schema_map = io_writer_->get_schema()->table_schema_map_; + auto it = schema_map.find(table_name); + if (it == schema_map.end()) return nullptr; + return it->second; +} + template int TsFileWriter::do_check_schema( std::shared_ptr device_id, diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index 85c47db7f..ec8fe3f44 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -92,6 +92,8 @@ class TsFileWriter { TableSchemasMapIter; DeviceSchemasMap* get_schema_group_map() { return &schemas_; } + std::shared_ptr get_table_schema( + const std::string& table_name) const; int64_t calculate_mem_size_for_all_group(); int check_memory_size_and_may_flush_chunks(); /* diff --git a/cpp/test/common/tsblock/arrow_tsblock_test.cc b/cpp/test/common/tsblock/arrow_tsblock_test.cc new file mode 100644 index 000000000..123efb59f --- /dev/null +++ b/cpp/test/common/tsblock/arrow_tsblock_test.cc @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include + +#include "common/tsblock/tsblock.h" +#include "cwrapper/tsfile_cwrapper.h" +#include "utils/db_utils.h" + +// Forward declarations for arrow namespace (functions are defined in +// arrow_c.cc) +namespace arrow { +// Type aliases for Arrow types (defined in tsfile_cwrapper.h) +using ArrowArray = ::ArrowArray; +using ArrowSchema = ::ArrowSchema; +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +// Function declaration (defined in arrow_c.cc) +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema); +} // namespace arrow + +static void VerifyArrowSchema( + const ::arrow::ArrowSchema* schema, + const std::vector& expected_names, + const std::vector& expected_formats) { + ASSERT_NE(schema, nullptr); + EXPECT_STREQ(schema->format, "+s"); + EXPECT_EQ(schema->n_children, expected_names.size()); + ASSERT_NE(schema->children, nullptr); + + for (size_t i = 0; i < expected_names.size(); ++i) { + const arrow::ArrowSchema* child = schema->children[i]; + ASSERT_NE(child, nullptr); + EXPECT_STREQ(child->name, expected_names[i].c_str()); + EXPECT_STREQ(child->format, expected_formats[i]); + EXPECT_EQ(child->flags, ARROW_FLAG_NULLABLE); + } +} + +static void VerifyArrowArrayData(const arrow::ArrowArray* array, + uint32_t expected_length) { + ASSERT_NE(array, nullptr); + EXPECT_EQ(array->length, expected_length); + EXPECT_EQ(array->n_children, 3); + ASSERT_NE(array->children, nullptr); +} + +TEST(ArrowTsBlockTest, NormalTsBlock_NoNulls) { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + common::ColumnSchema col3("string_col", common::STRING, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + tuple_desc.push_back(col3); + + common::TsBlock tsblock(&tuple_desc, 10); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + + for (int i = 0; i < 5; ++i) { + ASSERT_TRUE(row_appender.add_row()); + + int32_t int_val = 100 + i; + row_appender.append(0, reinterpret_cast(&int_val), + sizeof(int32_t)); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } + + EXPECT_EQ(tsblock.get_row_count(), 5); + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + std::vector expected_names = {"int_col", "double_col", + "string_col"}; + std::vector expected_formats = {"i", "g", "u"}; + VerifyArrowSchema(&schema, expected_names, expected_formats); + + VerifyArrowArrayData(&array, 5); + + ASSERT_NE(array.children, nullptr); + ASSERT_NE(array.children[0], nullptr); + ASSERT_NE(array.children[1], nullptr); + ASSERT_NE(array.children[2], nullptr); + + const ArrowArray* int_array = array.children[0]; + EXPECT_EQ(int_array->length, 5); + EXPECT_EQ(int_array->null_count, 0); + ASSERT_NE(int_array->buffers, nullptr); + const int32_t* int_data = reinterpret_cast( + int_array->buffers[int_array->n_buffers - 1]); + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(int_data[i], 100 + i); + } + + const arrow::ArrowArray* double_array = array.children[1]; + EXPECT_EQ(double_array->length, 5); + EXPECT_EQ(double_array->null_count, 0); + const double* double_data = reinterpret_cast( + double_array->buffers[double_array->n_buffers - 1]); + for (int i = 0; i < 5; ++i) { + EXPECT_DOUBLE_EQ(double_data[i], 3.14 + i); + } + const arrow::ArrowArray* string_array = array.children[2]; + EXPECT_EQ(string_array->length, 5); + EXPECT_EQ(string_array->null_count, 0); + ASSERT_NE(string_array->buffers, nullptr); + const int32_t* offsets = + reinterpret_cast(string_array->buffers[1]); + const char* string_data = + reinterpret_cast(string_array->buffers[2]); + + for (int i = 0; i < 5; ++i) { + int32_t start = offsets[i]; + int32_t end = offsets[i + 1]; + std::string expected_str = "test" + std::to_string(i); + std::string actual_str(string_data + start, end - start); + EXPECT_EQ(actual_str, expected_str); + } + + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } +} + +TEST(ArrowTsBlockTest, TsBlock_WithNulls) { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + common::ColumnSchema col3("string_col", common::STRING, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + tuple_desc.push_back(col3); + + common::TsBlock tsblock(&tuple_desc, 10); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < 5; ++i) { + ASSERT_TRUE(row_appender.add_row()); + + if (i == 1) { + row_appender.append_null(0); + row_appender.append_null(1); + row_appender.append_null(2); + } else if (i == 3) { + row_appender.append_null(0); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } else { + int32_t int_val = 100 + i; + row_appender.append(0, reinterpret_cast(&int_val), + sizeof(int32_t)); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } + } + + EXPECT_EQ(tsblock.get_row_count(), 5); + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + std::vector expected_names = {"int_col", "double_col", + "string_col"}; + std::vector expected_formats = {"i", "g", "u"}; + VerifyArrowSchema(&schema, expected_names, expected_formats); + + VerifyArrowArrayData(&array, 5); + + const arrow::ArrowArray* int_array = array.children[0]; + EXPECT_EQ(int_array->null_count, 2); + + const arrow::ArrowArray* double_array = array.children[1]; + EXPECT_EQ(double_array->null_count, 1); + + const arrow::ArrowArray* string_array = array.children[2]; + EXPECT_EQ(string_array->null_count, 1); + + ASSERT_NE(int_array->buffers[0], nullptr); + const uint8_t* null_bitmap = + reinterpret_cast(int_array->buffers[0]); + EXPECT_FALSE(null_bitmap[0] & (1 << 1)); + EXPECT_FALSE(null_bitmap[0] & (1 << 3)); + EXPECT_TRUE(null_bitmap[0] & (1 << 0)); + EXPECT_TRUE(null_bitmap[0] & (1 << 2)); + EXPECT_TRUE(null_bitmap[0] & (1 << 4)); + + const int32_t* int_data = reinterpret_cast( + int_array->buffers[int_array->n_buffers - 1]); + EXPECT_NE(int_data, nullptr); + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } +} + +TEST(ArrowTsBlockTest, TsBlock_EdgeCases) { + { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("single_col", common::INT64, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + + common::TsBlock tsblock(&tuple_desc, 5); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < 3; ++i) { + ASSERT_TRUE(row_appender.add_row()); + int64_t val = 1000 + i; + row_appender.append(0, reinterpret_cast(&val), + sizeof(int64_t)); + } + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + EXPECT_STREQ(schema.format, "+s"); + EXPECT_EQ(schema.n_children, 1); + EXPECT_STREQ(schema.children[0]->name, "single_col"); + EXPECT_STREQ(schema.children[0]->format, "l"); + + EXPECT_EQ(array.length, 3); + EXPECT_EQ(array.n_children, 1); + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } + } + + { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + + const int row_count = 1000; + common::TsBlock tsblock(&tuple_desc, row_count); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < row_count; ++i) { + ASSERT_TRUE(row_appender.add_row()); + int32_t int_val = i; + row_appender.append(0, reinterpret_cast(&int_val), + sizeof(int32_t)); + double double_val = i * 0.5; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + } + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + EXPECT_EQ(array.length, row_count); + EXPECT_EQ(array.n_children, 2); + + const arrow::ArrowArray* int_array = array.children[0]; + const int32_t* int_data = + reinterpret_cast(int_array->buffers[1]); + EXPECT_EQ(int_data[0], 0); + EXPECT_EQ(int_data[row_count - 1], row_count - 1); + + const arrow::ArrowArray* double_array = array.children[1]; + const double* double_data = + reinterpret_cast(double_array->buffers[1]); + EXPECT_DOUBLE_EQ(double_data[0], 0.0); + EXPECT_DOUBLE_EQ(double_data[row_count - 1], (row_count - 1) * 0.5); + + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } + } +} diff --git a/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc new file mode 100644 index 000000000..e115552ec --- /dev/null +++ b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include +#include + +#include "common/record.h" +#include "common/schema.h" +#include "common/tablet.h" +#include "file/tsfile_io_writer.h" +#include "file/write_file.h" +#include "reader/table_result_set.h" +#include "reader/tsfile_reader.h" +#include "writer/chunk_writer.h" +#include "writer/tsfile_table_writer.h" + +using namespace storage; +using namespace common; + +class TsFileTableReaderBatchTest : public ::testing::Test { + protected: + void SetUp() override { + libtsfile_init(); + file_name_ = std::string("tsfile_reader_table_batch_test_") + + generate_random_string(10) + std::string(".tsfile"); + remove(file_name_.c_str()); + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + mode_t mode = 0666; + write_file_.create(file_name_, flags, mode); + } + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + std::string file_name_; + WriteFile write_file_; + + public: + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + + const std::string chars = + "0123456789" + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + std::string random_string; + + for (int i = 0; i < length; ++i) { + random_string += chars[dis(gen)]; + } + + return random_string; + } + + static TableSchema* gen_table_schema_no_tag() { + // Generate table schema with only FIELD columns (no TAG columns) + std::vector measurement_schemas; + std::vector column_categories; + int measurement_schema_num = 5; // 5 field columns + for (int i = 0; i < measurement_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "s" + std::to_string(i), TSDataType::INT64, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::FIELD); + } + return new TableSchema("testTableNoTag", measurement_schemas, + column_categories); + } + + static storage::Tablet gen_tablet_no_tag(TableSchema* table_schema, + int num_rows) { + // Generate tablet without tags (only field columns) + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), num_rows); + + for (int i = 0; i < num_rows; i++) { + tablet.add_timestamp(i, i); + auto column_schemas = table_schema->get_measurement_schemas(); + for (const auto& column_schema : column_schemas) { + if (column_schema->data_type_ == TSDataType::INT64) { + tablet.add_value(i, column_schema->measurement_name_, + static_cast(i)); + } + } + } + return tablet; + } + + static TableSchema* gen_table_schema() { + std::vector measurement_schemas; + std::vector column_categories; + int id_schema_num = 2; + int measurement_schema_num = 3; + for (int i = 0; i < id_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "id" + std::to_string(i), TSDataType::STRING, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::TAG); + } + for (int i = 0; i < measurement_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "s" + std::to_string(i), TSDataType::INT64, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::FIELD); + } + return new TableSchema("testTable", measurement_schemas, + column_categories); + } + + static storage::Tablet gen_tablet(TableSchema* table_schema, int offset, + int device_num, + int num_timestamp_per_device = 10) { + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), + device_num * num_timestamp_per_device); + + char* literal = new char[std::strlen("device_id") + 1]; + std::strcpy(literal, "device_id"); + String literal_str(literal, std::strlen("device_id")); + for (int i = 0; i < device_num; i++) { + for (int l = 0; l < num_timestamp_per_device; l++) { + int row_index = i * num_timestamp_per_device + l; + tablet.add_timestamp(row_index, row_index); + auto column_schemas = table_schema->get_measurement_schemas(); + for (const auto& column_schema : column_schemas) { + switch (column_schema->data_type_) { + case TSDataType::INT64: + tablet.add_value(row_index, + column_schema->measurement_name_, + static_cast(i)); + break; + case TSDataType::STRING: + tablet.add_value(row_index, + column_schema->measurement_name_, + literal_str); + break; + default: + break; + } + } + } + } + delete[] literal; + return tablet; + } +}; + +TEST_F(TsFileTableReaderBatchTest, BatchQueryWithSmallBatchSize) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + const int device_num = 2; + const int points_per_device = 50; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 20; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows = 0; + int block_count = 0; + common::TsBlock* block = nullptr; + + char* literal = new char[std::strlen("device_id") + 1]; + std::strcpy(literal, "device_id"); + String expected_string(literal, std::strlen("device_id")); + std::vector int64_sums(3, 0); + while ((ret = table_result_set->get_next_tsblock(block)) == common::E_OK) { + ASSERT_NE(block, nullptr); + block_count++; + uint32_t row_count = block->get_row_count(); + total_rows += row_count; + ASSERT_EQ(row_count, batch_size); + + common::RowIterator row_iterator(block); + while (row_iterator.has_next()) { + uint32_t len; + bool null; + + int int64_col_idx = 0; + for (uint32_t col_idx = 1; + col_idx < row_iterator.get_column_count(); ++col_idx) { + const char* value = row_iterator.read(col_idx, &len, &null); + ASSERT_FALSE(null); + TSDataType data_type = row_iterator.get_data_type(col_idx); + if (data_type == TSDataType::INT64) { + int64_t int_val = *reinterpret_cast(value); + int64_sums[int64_col_idx] += int_val; + int64_col_idx++; + } else if (data_type == TSDataType::STRING) { + String str_value(value, len); + ASSERT_EQ(str_value.compare(expected_string), 0); + } + } + row_iterator.next(); + } + } + EXPECT_EQ(total_rows, device_num * points_per_device); + EXPECT_GT(block_count, 1); + for (size_t i = 0; i < int64_sums.size(); i++) { + EXPECT_EQ(int64_sums[i], 50); + } + + delete[] literal; + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, BatchQueryWithLargeBatchSize) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + const int device_num = 1; + const int points_per_device = 120; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 100; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows = 0; + int block_count = 0; + common::TsBlock* block = nullptr; + + while (table_result_set->get_next_tsblock(block) == common::E_OK) { + ASSERT_NE(block, nullptr); + block_count++; + uint32_t row_count = block->get_row_count(); + total_rows += row_count; + + ASSERT_EQ(row_count, block_count == 1 ? batch_size : 20); + } + + EXPECT_EQ(total_rows, device_num * points_per_device); + EXPECT_GE(block_count, 2); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, BatchQueryVerifyDataCorrectness) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + const int device_num = 1; + const int points_per_device = 30; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 10; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int expected_timestamp = 0; + common::TsBlock* block = nullptr; + + while (table_result_set->get_next_tsblock(block) == common::E_OK) { + ASSERT_NE(block, nullptr); + + common::RowIterator row_iterator(block); + while (row_iterator.has_next()) { + uint32_t len; + bool null; + int64_t timestamp = *reinterpret_cast( + row_iterator.read(0, &len, &null)); + ASSERT_FALSE(null); + EXPECT_EQ(timestamp, expected_timestamp); + + for (uint32_t col_idx = 2; + col_idx < row_iterator.get_column_count(); ++col_idx) { + const char* value = row_iterator.read(col_idx, &len, &null); + if (!null && row_iterator.get_data_type(col_idx) == INT64) { + int64_t int_val = *reinterpret_cast(value); + EXPECT_EQ(int_val, 0); + } + } + row_iterator.next(); + expected_timestamp++; + } + } + + EXPECT_EQ(expected_timestamp, device_num * points_per_device); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, PerformanceComparisonSinglePointVsBatch) { + // Create table schema without tags (only fields) + auto table_schema = gen_table_schema_no_tag(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + // Write a large amount of data + const int total_rows = 1000000; + auto tablet = gen_tablet_no_tag(table_schema, total_rows); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + // Test 1: Single point query (using next() method) + { + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + // Single point query: don't specify batch_size (or use 0) + auto start_time = std::chrono::high_resolution_clock::now(); + + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, + 1000000000000, tmp_result_set); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows_read = 0; + bool has_next = false; + + // Use next() method for single point query + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + total_rows_read++; + } + + auto end_time = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + + EXPECT_EQ(total_rows_read, total_rows); + std::cout << "\n=== Single Point Query (using next() method) ===" + << std::endl; + std::cout << "Total rows read: " << total_rows_read << std::endl; + std::cout << "Time taken: " << duration.count() << " ms" << std::endl; + std::cout << "Throughput: " + << (total_rows_read * 5 * 1000.0 / duration.count()) + << " rows/sec" << std::endl; + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + } + + // Test 2: Batch query (batch_size = 1000) + { + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 10000; // Batch query + auto start_time = std::chrono::high_resolution_clock::now(); + + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, + 1000000000000, tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows_read = 0; + common::TsBlock* block = nullptr; + int block_count = 0; + + while ((ret = table_result_set->get_next_tsblock(block)) == + common::E_OK) { + ASSERT_NE(block, nullptr); + block_count++; + total_rows_read += block->get_row_count(); + } + + auto end_time = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + + EXPECT_EQ(total_rows_read, total_rows); + std::cout << "\n=== Batch Query (batch_size=10000) ===" << std::endl; + std::cout << "Total rows read: " << total_rows_read << std::endl; + std::cout << "Block count: " << block_count << std::endl; + std::cout << "Time taken: " << duration.count() << " ms" << std::endl; + std::cout << "Throughput: " + << (total_rows_read * 5 * 1000.0 / duration.count()) + << " rows/sec" << std::endl; + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + } + + delete table_schema; +} diff --git a/python/lower_case_name.tsfile b/python/lower_case_name.tsfile new file mode 100644 index 000000000..d4717671d Binary files /dev/null and b/python/lower_case_name.tsfile differ diff --git a/python/requirements.txt b/python/requirements.txt index bcccac286..9ee650389 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -22,4 +22,5 @@ numpy>=2.0.0,<3 pandas==2.2.2 setuptools==78.1.1 wheel==0.46.2 +pyarrow>=8.0.0 diff --git a/python/test1.tsfile b/python/test1.tsfile new file mode 100644 index 000000000..1141f08d9 Binary files /dev/null and b/python/test1.tsfile differ diff --git a/python/tests/bench_batch_arrow_vs_dataframe.py b/python/tests/bench_batch_arrow_vs_dataframe.py new file mode 100644 index 000000000..e1f6c421a --- /dev/null +++ b/python/tests/bench_batch_arrow_vs_dataframe.py @@ -0,0 +1,217 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" +Benchmark: query_table_batch + read_arrow_batch vs query_table + read_data_frame. + +Compares throughput and elapsed time when reading the same table data via + - Arrow path: query_table_batch(batch_size=N) then read_arrow_batch() in a loop + - DataFrame path: query_table() then result.next() + read_data_frame(N) in a loop + +Run from project root or python/tests, e.g.: + python -m pytest tests/bench_batch_arrow_vs_dataframe.py -v -s + python tests/bench_batch_arrow_vs_dataframe.py # if run as script +""" + +import os +import sys +import time +from os import remove + +import pandas as pd +import pyarrow as pa +import pytest + +from tsfile import ( + ColumnSchema, + ColumnCategory, + TSDataType, + TableSchema, + TsFileReader, + TsFileTableWriter, +) + +# Default benchmark size +DEFAULT_ROW_COUNT = 50_000 +DEFAULT_BATCH_SIZE = 4096 +DEFAULT_TIMED_ROUNDS = 3 + +BENCH_FILE = "bench_arrow_vs_dataframe.tsfile" +TABLE_NAME = "bench_table" +COLUMNS = ["device", "value1", "value2"] + + +def _ensure_bench_tsfile(file_path: str, row_count: int) -> None: + """Create tsfile with table data if not present. Uses DataFrame for fast data generation.""" + if os.path.exists(file_path): + remove(file_path) + # Build data with pandas/numpy (vectorized, much faster than row-by-row Tablet) + import numpy as np + df = pd.DataFrame({ + "time": np.arange(row_count, dtype=np.int64), + "device": pd.Series([f"device" for i in range(row_count)]), + "value1": np.arange(0, row_count * 10, 10, dtype=np.int64), + "value2": np.arange(row_count, dtype=np.float64) * 1.5, + }) + + table = TableSchema( + TABLE_NAME, + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value1", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("value2", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + with TsFileTableWriter(file_path, table) as writer: + writer.write_dataframe(df) + + +def _read_via_arrow(file_path: str, batch_size: int, end_time: int) -> int: + """Read all rows using query_table_batch + read_arrow_batch. Returns total rows.""" + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name=TABLE_NAME, + column_names=COLUMNS, + start_time=0, + end_time=end_time, + batch_size=batch_size, + ) + total_rows = 0 + try: + while True: + batch = result_set.read_arrow_batch() + if batch is None: + break + total_rows += len(batch) + finally: + result_set.close() + reader.close() + return total_rows + + +def _read_via_dataframe(file_path: str, batch_size: int, end_time: int) -> int: + """Read all rows using query_table + next + read_data_frame. Returns total rows.""" + reader = TsFileReader(file_path) + result_set = reader.query_table( + TABLE_NAME, + COLUMNS, + start_time=0, + end_time=end_time, + ) + total_rows = 0 + try: + while result_set.next(): + df = result_set.read_data_frame(max_row_num=batch_size) + if df is None or len(df) == 0: + break + total_rows += len(df) + finally: + result_set.close() + reader.close() + return total_rows + + +def _run_timed(name: str, func, *args, rounds: int = DEFAULT_TIMED_ROUNDS): + times = [] + for _ in range(rounds): + start = time.perf_counter() + n = func(*args) + elapsed = time.perf_counter() - start + times.append(elapsed) + avg = sum(times) / len(times) + total_rows = n + rows_per_sec = total_rows / avg if avg > 0 else 0 + print(f" {name}: {avg:.3f}s avg ({min(times):.3f}s min) rows={total_rows} {rows_per_sec:.0f} rows/s") + return avg, total_rows + + +def run_benchmark( + row_count: int = DEFAULT_ROW_COUNT, + batch_size: int = DEFAULT_BATCH_SIZE, + timed_rounds: int = DEFAULT_TIMED_ROUNDS, + file_path: str = BENCH_FILE, +): + _ensure_bench_tsfile(file_path, row_count) + end_time = row_count + 1 + + print(f"Benchmark: {row_count} rows, batch_size={batch_size}, timed_rounds={timed_rounds}") + + df_avg, df_rows = _run_timed( + "query_table + read_data_frame", + _read_via_dataframe, + file_path, + batch_size, + end_time, + rounds=timed_rounds, + ) + + arrow_avg, arrow_rows = _run_timed( + "query_table_batch + read_arrow_batch", + _read_via_arrow, + file_path, + batch_size, + end_time, + rounds=timed_rounds, + ) + print() + if df_avg > 0: + speedup = arrow_avg / df_avg + print(f" Arrow vs DataFrame time ratio: {speedup:.2f}x ({'Arrow faster' if speedup < 1 else 'DataFrame faster'})") + assert df_rows == row_count, f"DataFrame path row count {df_rows} != {row_count}" + assert arrow_rows == row_count, f"Arrow path row count {arrow_rows} != {row_count}" + + print() + return df_avg, arrow_avg + + +def test_bench_arrow_vs_dataframe_default(): + """Run benchmark with default size (quick sanity check).""" + run_benchmark( + row_count=5_000, + batch_size=1024, + timed_rounds=2, + ) + + +def test_bench_arrow_vs_dataframe_medium(): + """Run benchmark with medium size.""" + run_benchmark( + row_count=DEFAULT_ROW_COUNT, + batch_size=DEFAULT_BATCH_SIZE, + timed_rounds=DEFAULT_TIMED_ROUNDS, + ) + + +def test_bench_arrow_vs_dataframe_large(): + run_benchmark( + row_count=2000_000, + batch_size=8192, + timed_rounds=3, + ) + + +if __name__ == "__main__": + row_count = DEFAULT_ROW_COUNT + batch_size = DEFAULT_BATCH_SIZE + if len(sys.argv) > 1: + row_count = int(sys.argv[1]) + if len(sys.argv) > 2: + batch_size = int(sys.argv[2]) + run_benchmark(row_count=row_count, batch_size=batch_size) + # Clean up bench file when run as script (optional) + if os.path.exists(BENCH_FILE): + os.remove(BENCH_FILE) diff --git a/python/tests/bench_write_arrow_vs_dataframe.py b/python/tests/bench_write_arrow_vs_dataframe.py new file mode 100644 index 000000000..c2f9bedcd --- /dev/null +++ b/python/tests/bench_write_arrow_vs_dataframe.py @@ -0,0 +1,230 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" +Benchmark: write_arrow_batch vs write_dataframe. + +Compares write throughput (rows/s) for: + - Arrow path : write_arrow_batch(pa.RecordBatch) + - DataFrame path: write_dataframe(pd.DataFrame) + +Run: + python -m pytest tests/bench_write_arrow_vs_dataframe.py -v -s + python tests/bench_write_arrow_vs_dataframe.py [row_count [batch_size]] +""" + +import os +import sys +import time + +import numpy as np +import pandas as pd +import pyarrow as pa +import pytest + +from tsfile import ( + ColumnCategory, + ColumnSchema, + TableSchema, + TSDataType, + TsFileTableWriter, +) + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + +DEFAULT_ROW_COUNT = 100_000 +DEFAULT_BATCH_SIZE = 8_192 +DEFAULT_ROUNDS = 3 + +TABLE_NAME = "bench_table" +BENCH_FILE = "bench_write_arrow.tsfile" + +SCHEMA = TableSchema(TABLE_NAME, [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("v_i64", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("v_f64", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("v_bool", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ColumnSchema("v_str", TSDataType.STRING, ColumnCategory.FIELD), +]) + + +# --------------------------------------------------------------------------- +# Data generation +# --------------------------------------------------------------------------- + +def _make_numpy_data(row_count: int): + ts = np.arange(row_count, dtype="int64") + v_i64 = np.arange(row_count, dtype="int64") + v_f64 = np.arange(row_count, dtype="float64") * 1.5 + v_bool = (np.arange(row_count) % 2 == 0) + v_str = [f"s{i}" for i in range(row_count)] + device = ["device0"] * row_count + return ts, device, v_i64, v_f64, v_bool, v_str + + +def _make_arrow_batches(row_count: int, batch_size: int): + ts, device, v_i64, v_f64, v_bool, v_str = _make_numpy_data(row_count) + batches = [] + for start in range(0, row_count, batch_size): + end = min(start + batch_size, row_count) + batches.append(pa.record_batch({ + "time": pa.array(ts[start:end], type=pa.timestamp("ns")), + "device": pa.array(device[start:end], type=pa.string()), + "v_i64": pa.array(v_i64[start:end], type=pa.int64()), + "v_f64": pa.array(v_f64[start:end], type=pa.float64()), + "v_bool": pa.array(v_bool[start:end], type=pa.bool_()), + "v_str": pa.array(v_str[start:end], type=pa.string()), + })) + return batches + + +def _make_dataframe_chunks(row_count: int, batch_size: int): + ts, device, v_i64, v_f64, v_bool, v_str = _make_numpy_data(row_count) + chunks = [] + for start in range(0, row_count, batch_size): + end = min(start + batch_size, row_count) + chunks.append(pd.DataFrame({ + "time": pd.Series(ts[start:end], dtype="int64"), + "device": device[start:end], + "v_i64": pd.Series(v_i64[start:end], dtype="int64"), + "v_f64": pd.Series(v_f64[start:end], dtype="float64"), + "v_bool": pd.Series(v_bool[start:end], dtype="bool"), + "v_str": v_str[start:end], + })) + return chunks + + +# --------------------------------------------------------------------------- +# Benchmark runners +# --------------------------------------------------------------------------- + +def _write_arrow(file_path: str, batches): + schema = TableSchema(TABLE_NAME, [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("v_i64", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("v_f64", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("v_bool", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ColumnSchema("v_str", TSDataType.STRING, ColumnCategory.FIELD), + ]) + with TsFileTableWriter(file_path, schema) as w: + for batch in batches: + w.write_arrow_batch(batch) + + +def _write_dataframe(file_path: str, chunks): + schema = TableSchema(TABLE_NAME, [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("v_i64", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("v_f64", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("v_bool", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ColumnSchema("v_str", TSDataType.STRING, ColumnCategory.FIELD), + ]) + with TsFileTableWriter(file_path, schema) as w: + for chunk in chunks: + w.write_dataframe(chunk) + + +def _run_timed(label: str, func, *args, rounds: int = DEFAULT_ROUNDS, row_count: int = 0): + times = [] + for _ in range(rounds): + if os.path.exists(BENCH_FILE): + os.remove(BENCH_FILE) + t0 = time.perf_counter() + func(BENCH_FILE, *args) + times.append(time.perf_counter() - t0) + avg = sum(times) / len(times) + best = min(times) + rps = row_count / avg if avg > 0 else 0 + print(f" {label:42s} avg={avg:.3f}s best={best:.3f}s {rps:>10.0f} rows/s") + return avg + + +# --------------------------------------------------------------------------- +# Main benchmark +# --------------------------------------------------------------------------- + +def run_benchmark( + row_count: int = DEFAULT_ROW_COUNT, + batch_size: int = DEFAULT_BATCH_SIZE, + rounds: int = DEFAULT_ROUNDS, +): + print() + print(f"=== write benchmark: {row_count:,} rows, batch_size={batch_size}, rounds={rounds} ===") + + # Pre-build data once (exclude data-preparation time from timing) + arrow_batches = _make_arrow_batches(row_count, batch_size) + df_chunks = _make_dataframe_chunks(row_count, batch_size) + + df_avg = _run_timed( + "write_dataframe", + _write_dataframe, df_chunks, + rounds=rounds, row_count=row_count, + ) + arrow_avg = _run_timed( + "write_arrow_batch", + _write_arrow, arrow_batches, + rounds=rounds, row_count=row_count, + ) + + print() + if arrow_avg > 0 and df_avg > 0: + ratio = df_avg / arrow_avg + if ratio >= 1.0: + print(f" Arrow is {ratio:.2f}x faster than DataFrame") + else: + print(f" DataFrame is {1/ratio:.2f}x faster than Arrow") + print() + + if os.path.exists(BENCH_FILE): + os.remove(BENCH_FILE) + + return df_avg, arrow_avg + + +# --------------------------------------------------------------------------- +# Pytest entry points +# --------------------------------------------------------------------------- + +def test_bench_write_arrow_small(): + """Quick sanity check with small data (5 k rows).""" + run_benchmark(row_count=5_000, batch_size=1_024, rounds=2) + + +def test_bench_write_arrow_default(): + """Default benchmark (100 k rows).""" + run_benchmark( + row_count=DEFAULT_ROW_COUNT, + batch_size=DEFAULT_BATCH_SIZE, + rounds=DEFAULT_ROUNDS, + ) + + +def test_bench_write_arrow_large(): + """Large benchmark (1 M rows).""" + run_benchmark(row_count=10_000_000, batch_size=32_384, rounds=3) + + +# --------------------------------------------------------------------------- +# Script entry point +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + row_count = int(sys.argv[1]) if len(sys.argv) > 1 else DEFAULT_ROW_COUNT + batch_size = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_BATCH_SIZE + run_benchmark(row_count=row_count, batch_size=batch_size) diff --git a/python/tests/test_batch_arrow.py b/python/tests/test_batch_arrow.py new file mode 100644 index 000000000..75bd6c4c1 --- /dev/null +++ b/python/tests/test_batch_arrow.py @@ -0,0 +1,444 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +from datetime import date + +import numpy as np +import pandas as pd +import pytest + +from tsfile import ColumnSchema, TableSchema, TSDataType, ColumnCategory +from tsfile import Tablet +from tsfile import TsFileTableWriter, TsFileReader + + +def test_batch_read_arrow_basic(): + file_path = "test_batch_arrow_basic.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value1", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("value2", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + print("t1") + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "value1", "value2"], + [TSDataType.STRING, TSDataType.INT64, TSDataType.DOUBLE], + 1000, + ) + for i in range(1000): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("value1", i, i * 10) + tablet.add_value_by_name("value2", i, i * 1.5) + writer.write_table(tablet) + try: + import pyarrow as pa + except ImportError: + pytest.skip("pyarrow is not installed") + + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name="test_table", + column_names=["device", "value1", "value2"], + start_time=0, + end_time=1000, + batch_size=256, + ) + + total_rows = 0 + batch_count = 0 + while True: + table = result_set.read_arrow_batch() + if table is None: + break + + batch_count += 1 + assert isinstance(table, pa.Table) + assert len(table) > 0 + total_rows += len(table) + + column_names = table.column_names + assert "time" in column_names + assert "device" in column_names + assert "value1" in column_names + assert "value2" in column_names + + assert total_rows == 1000 + assert batch_count > 0 + + result_set.close() + reader.close() + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +def test_batch_read_arrow_compare_with_dataframe(): + file_path = "test_batch_arrow_compare.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value1", TSDataType.INT32, ColumnCategory.FIELD), + ColumnSchema("value2", TSDataType.FLOAT, ColumnCategory.FIELD), + ColumnSchema("value3", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "value1", "value2", "value3"], + [TSDataType.STRING, TSDataType.INT32, TSDataType.FLOAT, TSDataType.BOOLEAN], + 500, + ) + for i in range(500): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("value1", i, i * 2) + tablet.add_value_by_name("value2", i, i * 1.1) + tablet.add_value_by_name("value3", i, i % 2 == 0) + writer.write_table(tablet) + + try: + import pyarrow as pa + except ImportError: + pytest.skip("pyarrow is not installed") + + reader1 = TsFileReader(file_path) + result_set1 = reader1.query_table_batch( + table_name="test_table", + column_names=["device", "value1", "value2", "value3"], + start_time=0, + end_time=500, + batch_size=100, + ) + + arrow_tables = [] + while True: + table = result_set1.read_arrow_batch() + if table is None: + break + arrow_tables.append(table) + + if arrow_tables: + combined_arrow_table = pa.concat_tables(arrow_tables) + df_arrow = combined_arrow_table.to_pandas() + else: + df_arrow = pd.DataFrame() + + result_set1.close() + reader1.close() + reader2 = TsFileReader(file_path) + result_set2 = reader2.query_table( + table_name="test_table", + column_names=["device", "value1", "value2", "value3"], + start_time=0, + end_time=500, + ) + + df_traditional = result_set2.read_data_frame(max_row_num=1000) + result_set2.close() + reader2.close() + + assert len(df_arrow) == len(df_traditional) + assert len(df_arrow) == 500 + + for col in ["time", "device", "value1", "value2", "value3"]: + assert col in df_arrow.columns + assert col in df_traditional.columns + + df_arrow_sorted = df_arrow.sort_values("time").reset_index(drop=True) + df_traditional_sorted = df_traditional.sort_values("time").reset_index(drop=True) + + for i in range(len(df_arrow_sorted)): + assert df_arrow_sorted.iloc[i]["time"] == df_traditional_sorted.iloc[i]["time"] + assert df_arrow_sorted.iloc[i]["device"] == df_traditional_sorted.iloc[i]["device"] + assert df_arrow_sorted.iloc[i]["value1"] == df_traditional_sorted.iloc[i]["value1"] + assert abs(df_arrow_sorted.iloc[i]["value2"] - df_traditional_sorted.iloc[i]["value2"]) < 1e-5 + assert df_arrow_sorted.iloc[i]["value3"] == df_traditional_sorted.iloc[i]["value3"] + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +def test_batch_read_arrow_empty_result(): + file_path = "test_batch_arrow_empty.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.INT64, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "value"], + [TSDataType.STRING, TSDataType.INT64], + 10, + ) + for i in range(10): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("value", i, i) + writer.write_table(tablet) + + try: + import pyarrow as pa + except ImportError: + pytest.skip("pyarrow is not installed") + + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name="test_table", + column_names=["device", "value"], + start_time=1000, + end_time=2000, + batch_size=100, + ) + + table = result_set.read_arrow_batch() + assert table is None + + result_set.close() + reader.close() + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +def test_batch_read_arrow_time_range(): + + file_path = "test_batch_arrow_time_range.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.INT64, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "value"], + [TSDataType.STRING, TSDataType.INT64], + 1000, + ) + for i in range(1000): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("value", i, i) + writer.write_table(tablet) + + try: + import pyarrow as pa + except ImportError: + pytest.skip("pyarrow is not installed") + + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name="test_table", + column_names=["device", "value"], + start_time=100, + end_time=199, + batch_size=50, + ) + + total_rows = 0 + while True: + table = result_set.read_arrow_batch() + if table is None: + break + total_rows += len(table) + df = table.to_pandas() + assert df["time"].min() >= 100 + assert df["time"].max() <= 199 + + assert total_rows == 100 + + result_set.close() + reader.close() + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +def test_batch_read_arrow_all_datatypes(): + file_path = "test_batch_arrow_all_datatypes.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("bool_val", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ColumnSchema("int32_val", TSDataType.INT32, ColumnCategory.FIELD), + ColumnSchema("int64_val", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("float_val", TSDataType.FLOAT, ColumnCategory.FIELD), + ColumnSchema("double_val", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("string_val", TSDataType.STRING, ColumnCategory.FIELD), + ColumnSchema("date_val", TSDataType.DATE, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "bool_val", "int32_val", "int64_val", "float_val", "double_val", "string_val", "date_val"], + [ + TSDataType.STRING, + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.STRING, + TSDataType.DATE, + ], + 200, + ) + for i in range(200): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("bool_val", i, i % 2 == 0) + tablet.add_value_by_name("int32_val", i, i * 2) + tablet.add_value_by_name("int64_val", i, i * 3) + tablet.add_value_by_name("float_val", i, i * 1.1) + tablet.add_value_by_name("double_val", i, i * 2.2) + tablet.add_value_by_name("string_val", i, f"string_{i}") + tablet.add_value_by_name("date_val", i, date(2025, 1, (i % 28) + 1)) + writer.write_table(tablet) + + try: + import pyarrow as pa + except ImportError: + pytest.skip("pyarrow is not installed") + + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name="test_table", + column_names=["device", "bool_val", "int32_val", "int64_val", "float_val", "double_val", "string_val", "date_val"], + start_time=0, + end_time=200, + batch_size=50, + ) + + total_rows = 0 + while True: + table = result_set.read_arrow_batch() + if table is None: + break + + total_rows += len(table) + df = table.to_pandas() + + assert "time" in df.columns + assert "device" in df.columns + assert "bool_val" in df.columns + assert "int32_val" in df.columns + assert "int64_val" in df.columns + assert "float_val" in df.columns + assert "double_val" in df.columns + assert "string_val" in df.columns + assert "date_val" in df.columns + + assert total_rows == 200 + + result_set.close() + reader.close() + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +def test_batch_read_arrow_no_pyarrow(): + file_path = "test_batch_arrow_no_pyarrow.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.INT64, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "value"], + [TSDataType.STRING, TSDataType.INT64], + 10, + ) + for i in range(10): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("value", i, i) + writer.write_table(tablet) + + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name="test_table", + column_names=["device", "value"], + start_time=0, + end_time=10, + batch_size=5, + ) + result_set.close() + reader.close() + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +if __name__ == "__main__": + os.chdir(os.path.dirname(os.path.abspath(__file__))) + pytest.main([ + "test_batch_arrow.py", + "-s", "-v" + ]) diff --git a/python/tests/test_write_arrow.py b/python/tests/test_write_arrow.py new file mode 100644 index 000000000..19c8abc2e --- /dev/null +++ b/python/tests/test_write_arrow.py @@ -0,0 +1,368 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" +Tests for write_arrow_batch: write PyArrow RecordBatch/Table to tsfile +and verify correctness by reading back. +""" + +import os +from datetime import date + +import numpy as np +import pandas as pd +import pytest + +pa = pytest.importorskip("pyarrow", reason="pyarrow is not installed") + +from tsfile import ColumnCategory, ColumnSchema, TableSchema, TSDataType, TsFileReader +from tsfile import TsFileTableWriter + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_schema(table_name, extra_cols): + """Build a TableSchema with a string TAG 'device' plus the given field cols.""" + return TableSchema( + table_name, + [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG)] + extra_cols, + ) + + +def _read_all_arrow(file_path, table_name, columns, start=0, end=10**18, batch_size=4096): + """Read all rows from file via read_arrow_batch and return as a pa.Table.""" + reader = TsFileReader(file_path) + rs = reader.query_table_batch( + table_name=table_name, + column_names=columns, + start_time=start, + end_time=end, + batch_size=batch_size, + ) + batches = [] + while True: + batch = rs.read_arrow_batch() + if batch is None: + break + batches.append(batch) + rs.close() + reader.close() + if not batches: + return pa.table({}) + return pa.concat_tables(batches) + + +# --------------------------------------------------------------------------- +# Basic write + read-back +# --------------------------------------------------------------------------- + +def test_write_arrow_basic(): + """Write 1 000 rows via write_arrow_batch and verify count + values.""" + path = "test_write_arrow_basic.tsfile" + table_name = "t" + n = 1000 + + schema = _make_schema(table_name, [ + ColumnSchema("value1", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("value2", TSDataType.DOUBLE, ColumnCategory.FIELD), + ]) + + batch = pa.record_batch({ + "time": pa.array(np.arange(n, dtype="int64"), type=pa.timestamp("ns")), + "device": pa.array([f"d{i}" for i in range(n)], type=pa.string()), + "value1": pa.array(np.arange(n, dtype="int64"), type=pa.int64()), + "value2": pa.array(np.arange(n, dtype="float64") * 1.5, type=pa.float64()), + }) + + try: + if os.path.exists(path): + os.remove(path) + with TsFileTableWriter(path, schema) as w: + w.write_arrow_batch(batch) + + result = _read_all_arrow(path, table_name, ["device", "value1", "value2"]) + assert len(result) == n + + df = result.to_pandas().sort_values("time").reset_index(drop=True) + assert list(df["value1"]) == list(range(n)) + assert all(abs(df["value2"].iloc[i] - i * 1.5) < 1e-9 for i in range(n)) + finally: + if os.path.exists(path): + os.remove(path) + + +# --------------------------------------------------------------------------- +# pa.Table input +# --------------------------------------------------------------------------- + +def test_write_arrow_from_table(): + """write_arrow_batch should accept pa.Table (multi-chunk) as well.""" + path = "test_write_arrow_from_table.tsfile" + table_name = "t" + n = 500 + + schema = _make_schema(table_name, [ + ColumnSchema("v", TSDataType.INT32, ColumnCategory.FIELD), + ]) + + tbl = pa.table({ + "time": pa.array(np.arange(n, dtype="int64"), type=pa.timestamp("ns")), + "device": pa.array(["dev"] * n, type=pa.string()), + "v": pa.array(np.arange(n, dtype="int32"), type=pa.int32()), + }) + + try: + if os.path.exists(path): + os.remove(path) + with TsFileTableWriter(path, schema) as w: + w.write_arrow_batch(tbl) + + result = _read_all_arrow(path, table_name, ["device", "v"]) + assert len(result) == n + df = result.to_pandas().sort_values("time").reset_index(drop=True) + assert list(df["v"]) == list(range(n)) + finally: + if os.path.exists(path): + os.remove(path) + + +# --------------------------------------------------------------------------- +# Multiple batches +# --------------------------------------------------------------------------- + +def test_write_arrow_multiple_batches(): + """Write several batches sequentially and verify the total row count.""" + path = "test_write_arrow_multi.tsfile" + table_name = "t" + rows_per_batch = 300 + num_batches = 4 + total = rows_per_batch * num_batches + + schema = _make_schema(table_name, [ + ColumnSchema("v", TSDataType.INT64, ColumnCategory.FIELD), + ]) + + try: + if os.path.exists(path): + os.remove(path) + with TsFileTableWriter(path, schema) as w: + for b in range(num_batches): + start_ts = b * rows_per_batch + batch = pa.record_batch({ + "time": pa.array( + np.arange(start_ts, start_ts + rows_per_batch, dtype="int64"), + type=pa.timestamp("ns")), + "device": pa.array(["dev"] * rows_per_batch, type=pa.string()), + "v": pa.array( + np.arange(start_ts, start_ts + rows_per_batch, dtype="int64"), + type=pa.int64()), + }) + w.write_arrow_batch(batch) + + result = _read_all_arrow(path, table_name, ["device", "v"]) + assert len(result) == total + finally: + if os.path.exists(path): + os.remove(path) + + +# --------------------------------------------------------------------------- +# All supported data types +# --------------------------------------------------------------------------- + +def test_write_arrow_all_datatypes(): + """Write every supported data type and verify values read back correctly.""" + path = "test_write_arrow_all_types.tsfile" + table_name = "t" + n = 200 + + schema = TableSchema(table_name, [ + ColumnSchema("tag", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("bool_col", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ColumnSchema("int32_col", TSDataType.INT32, ColumnCategory.FIELD), + ColumnSchema("int64_col", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("float_col", TSDataType.FLOAT, ColumnCategory.FIELD), + ColumnSchema("double_col", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("str_col", TSDataType.STRING, ColumnCategory.FIELD), + ColumnSchema("date_col", TSDataType.DATE, ColumnCategory.FIELD), + ]) + + dates_days = [ + (date(2025, 1, (i % 28) + 1) - date(1970, 1, 1)).days for i in range(n) + ] + + batch = pa.record_batch({ + "time": pa.array(np.arange(n, dtype="int64"), type=pa.timestamp("ns")), + "tag": pa.array([f"dev{i}" for i in range(n)], type=pa.string()), + "bool_col": pa.array([i % 2 == 0 for i in range(n)], type=pa.bool_()), + "int32_col": pa.array(np.arange(n, dtype="int32"), type=pa.int32()), + "int64_col": pa.array(np.arange(n, dtype="int64") * 10, type=pa.int64()), + "float_col": pa.array(np.arange(n, dtype="float32") * 0.5, type=pa.float32()), + "double_col": pa.array(np.arange(n, dtype="float64") * 1.1, type=pa.float64()), + "str_col": pa.array([f"s{i}" for i in range(n)], type=pa.string()), + "date_col": pa.array(dates_days, type=pa.date32()), + }) + + try: + if os.path.exists(path): + os.remove(path) + with TsFileTableWriter(path, schema) as w: + w.write_arrow_batch(batch) + + result = _read_all_arrow( + path, table_name, + ["tag", "bool_col", "int32_col", "int64_col", + "float_col", "double_col", "str_col", "date_col"], + ) + assert len(result) == n + df = result.to_pandas().sort_values("time").reset_index(drop=True) + + for col in ["tag", "bool_col", "int32_col", "int64_col", + "float_col", "double_col", "str_col", "date_col"]: + assert col in df.columns, f"Column '{col}' missing from result" + + assert list(df["int32_col"]) == list(range(n)) + assert list(df["int64_col"]) == [i * 10 for i in range(n)] + for i in range(n): + assert df["bool_col"].iloc[i] == (i % 2 == 0) + assert abs(df["double_col"].iloc[i] - i * 1.1) < 1e-9 + assert df["str_col"].iloc[i] == f"s{i}" + finally: + if os.path.exists(path): + os.remove(path) + + +# --------------------------------------------------------------------------- +# Parity with write_dataframe +# --------------------------------------------------------------------------- + +def test_write_arrow_parity_with_dataframe(): + """Data written via write_arrow_batch must match data written via write_dataframe.""" + arrow_path = "test_write_arrow_parity_arrow.tsfile" + df_path = "test_write_arrow_parity_df.tsfile" + table_name = "t" + n = 500 + + schema_arrow = TableSchema(table_name, [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("v_i32", TSDataType.INT32, ColumnCategory.FIELD), + ColumnSchema("v_f64", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("v_bool", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ColumnSchema("v_str", TSDataType.STRING, ColumnCategory.FIELD), + ]) + schema_df = TableSchema(table_name, [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("v_i32", TSDataType.INT32, ColumnCategory.FIELD), + ColumnSchema("v_f64", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("v_bool", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ColumnSchema("v_str", TSDataType.STRING, ColumnCategory.FIELD), + ]) + + timestamps = np.arange(n, dtype="int64") + v_i32 = np.arange(n, dtype="int32") + v_f64 = np.arange(n, dtype="float64") * 2.5 + v_bool = np.array([i % 3 == 0 for i in range(n)]) + v_str = [f"row{i}" for i in range(n)] + device = ["dev"] * n + + batch = pa.record_batch({ + "time": pa.array(timestamps, type=pa.timestamp("ns")), + "device": pa.array(device, type=pa.string()), + "v_i32": pa.array(v_i32, type=pa.int32()), + "v_f64": pa.array(v_f64, type=pa.float64()), + "v_bool": pa.array(v_bool, type=pa.bool_()), + "v_str": pa.array(v_str, type=pa.string()), + }) + + dataframe = pd.DataFrame({ + "time": pd.Series(timestamps, dtype="int64"), + "device": device, + "v_i32": pd.Series(v_i32, dtype="int32"), + "v_f64": pd.Series(v_f64, dtype="float64"), + "v_bool": pd.Series(v_bool, dtype="bool"), + "v_str": v_str, + }) + + cols = ["device", "v_i32", "v_f64", "v_bool", "v_str"] + + try: + for p in (arrow_path, df_path): + if os.path.exists(p): + os.remove(p) + + with TsFileTableWriter(arrow_path, schema_arrow) as w: + w.write_arrow_batch(batch) + with TsFileTableWriter(df_path, schema_df) as w: + w.write_dataframe(dataframe) + + result_arrow = _read_all_arrow(arrow_path, table_name, cols).to_pandas() + result_df = _read_all_arrow(df_path, table_name, cols).to_pandas() + + result_arrow = result_arrow.sort_values("time").reset_index(drop=True) + result_df = result_df.sort_values("time").reset_index(drop=True) + + assert len(result_arrow) == len(result_df) == n + + assert list(result_arrow["v_i32"]) == list(result_df["v_i32"]) + assert list(result_arrow["v_str"]) == list(result_df["v_str"]) + assert list(result_arrow["v_bool"]) == list(result_df["v_bool"]) + for i in range(n): + assert abs(result_arrow["v_f64"].iloc[i] - result_df["v_f64"].iloc[i]) < 1e-9 + finally: + for p in (arrow_path, df_path): + if os.path.exists(p): + os.remove(p) + + +# --------------------------------------------------------------------------- +# Large batch +# --------------------------------------------------------------------------- + +def test_write_arrow_large_batch(): + """Write a single large batch (100 k rows) and verify row count.""" + path = "test_write_arrow_large.tsfile" + table_name = "t" + n = 100_000 + + schema = _make_schema(table_name, [ + ColumnSchema("v", TSDataType.DOUBLE, ColumnCategory.FIELD), + ]) + + batch = pa.record_batch({ + "time": pa.array(np.arange(n, dtype="int64"), type=pa.timestamp("ns")), + "device": pa.array(["d"] * n, type=pa.string()), + "v": pa.array(np.random.rand(n), type=pa.float64()), + }) + + try: + if os.path.exists(path): + os.remove(path) + with TsFileTableWriter(path, schema) as w: + w.write_arrow_batch(batch) + + result = _read_all_arrow(path, table_name, ["device", "v"], batch_size=8192) + assert len(result) == n + finally: + if os.path.exists(path): + os.remove(path) + + +if __name__ == "__main__": + os.chdir(os.path.dirname(os.path.abspath(__file__))) + pytest.main([__file__, "-v", "-s"]) diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index f90b23089..65681257a 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -184,6 +184,12 @@ cdef extern from "cwrapper/tsfile_cwrapper.h": int64_t start_time, int64_t end_time, ErrorCode* err_code); + ResultSet tsfile_query_table_batch(TsFileReader reader, + const char * table_name, + char** columns, uint32_t column_num, + int64_t start_time, int64_t end_time, + int batch_size, ErrorCode* err_code); + ResultSet _tsfile_reader_query_device(TsFileReader reader, const char *device_name, char ** sensor_name, uint32_t sensor_num, @@ -213,6 +219,42 @@ cdef extern from "cwrapper/tsfile_cwrapper.h": ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set); void free_result_set_meta_data(ResultSetMetaData result_set_meta_data); + # Arrow structures + ctypedef struct ArrowSchema: + const char* format + const char* name + const char* metadata + int64_t flags + int64_t n_children + ArrowSchema** children + ArrowSchema* dictionary + void (*release)(ArrowSchema*) + void* private_data + + ctypedef struct ArrowArray: + int64_t length + int64_t null_count + int64_t offset + int64_t n_buffers + int64_t n_children + const void** buffers + ArrowArray** children + ArrowArray* dictionary + void (*release)(ArrowArray*) + void* private_data + + # Arrow batch reading function + ErrorCode tsfile_result_set_get_next_tsblock_as_arrow(ResultSet result_set, + ArrowArray* out_array, + ArrowSchema* out_schema); + + # Arrow batch writing function + ErrorCode _tsfile_writer_write_arrow_table(TsFileWriter writer, + const char* table_name, + ArrowArray* array, + ArrowSchema* schema, + int time_col_index); + cdef extern from "common/config/config.h" namespace "common": diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index 2389aa9a6..cdc4f5c62 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -52,6 +52,8 @@ cdef public api ResultSet tsfile_reader_query_table_c(TsFileReader reader, objec int64_t start_time, int64_t end_time) cdef public api ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reader, object column_list, int64_t start_time, int64_t end_time) +cdef public api ResultSet tsfile_reader_query_table_batch_c(TsFileReader reader, object table_name, object column_list, + int64_t start_time, int64_t end_time, int batch_size) cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time) cdef public api object get_table_schema(TsFileReader reader, object table_name) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 3ca79a2a1..374a56eb7 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -749,6 +749,33 @@ cdef ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reader, object c free( columns) columns = NULL +cdef ResultSet tsfile_reader_query_table_batch_c(TsFileReader reader, object table_name, object column_list, + int64_t start_time, int64_t end_time, int batch_size): + cdef ResultSet result + cdef int column_num = len(column_list) + cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name) + cdef const char * table_name_c = table_name_bytes + cdef char** columns = malloc(sizeof(char *) * column_num) + cdef int i + cdef ErrorCode code = 0 + if columns == NULL: + raise MemoryError("Failed to allocate memory for columns") + try: + for i in range(column_num): + columns[i] = strdup(( column_list[i]).encode('utf-8')) + if columns[i] == NULL: + raise MemoryError("Failed to allocate memory for column name") + result = tsfile_query_table_batch(reader, table_name_c, columns, column_num, start_time, end_time, batch_size, &code) + check_error(code) + return result + finally: + if columns != NULL: + for i in range(column_num): + free( columns[i]) + columns[i] = NULL + free( columns) + columns = NULL + cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time): diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 4476d24dc..44ffd8604 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -25,6 +25,9 @@ import pandas as pd from libc.stdint cimport INT64_MIN, INT64_MAX from libc.string cimport strlen from cpython.bytes cimport PyBytes_FromStringAndSize +from libc.string cimport memset +import pyarrow as pa +from libc.stdint cimport INT64_MIN, INT64_MAX, uintptr_t from tsfile.schema import TSDataType as TSDataTypePy from .date_utils import parse_int_to_date @@ -143,6 +146,40 @@ cdef class ResultSetPy: df = df.astype(data_type_dict) return df + def read_arrow_batch(self): + self.check_result_set_invalid() + + cdef ArrowArray arrow_array + cdef ArrowSchema arrow_schema + cdef ErrorCode code = 0 + cdef ErrorCode err_code = 0 + + memset(&arrow_array, 0, sizeof(ArrowArray)) + memset(&arrow_schema, 0, sizeof(ArrowSchema)) + + code = tsfile_result_set_get_next_tsblock_as_arrow(self.result, &arrow_array, &arrow_schema) + + if code == 21: # E_NO_MORE_DATA + return None + if code != 0: + check_error(code) + + if arrow_schema.release == NULL or arrow_array.release == NULL: + raise RuntimeError("Arrow conversion returned invalid schema or array") + + try: + schema_ptr = &arrow_schema + array_ptr = &arrow_array + batch = pa.RecordBatch._import_from_c(array_ptr, schema_ptr) + table = pa.Table.from_batches([batch]) + return table + except Exception as e: + if arrow_array.release != NULL: + arrow_array.release(&arrow_array) + if arrow_schema.release != NULL: + arrow_schema.release(&arrow_schema) + raise e + def get_value_by_index(self, index : int): """ Get value by index from query result set. @@ -294,6 +331,18 @@ cdef class TsFileReaderPy: self.activate_result_set_list.add(pyresult) return pyresult + def query_table_batch(self, table_name : str, column_names : List[str], + start_time : int = INT64_MIN, end_time : int = INT64_MAX, + batch_size : int = 1024) -> ResultSetPy: + cdef ResultSet result; + result = tsfile_reader_query_table_batch_c(self.reader, table_name.lower(), + [column_name.lower() for column_name in column_names], + start_time, end_time, batch_size) + pyresult = ResultSetPy(self) + pyresult.init_c(result, table_name) + self.activate_result_set_list.add(pyresult) + return pyresult + def query_table_on_tree(self, column_names : List[str], start_time : int = INT64_MIN, end_time : int = INT64_MAX) -> ResultSetPy: """ diff --git a/python/tsfile/tsfile_table_writer.py b/python/tsfile/tsfile_table_writer.py index a8f7805d3..cfd817fec 100644 --- a/python/tsfile/tsfile_table_writer.py +++ b/python/tsfile/tsfile_table_writer.py @@ -182,6 +182,25 @@ def write_dataframe(self, dataframe: pd.DataFrame): self.writer.write_dataframe(self.tableSchema.get_table_name(), dataframe, self.tableSchema) + def write_arrow_batch(self, data): + """ + Write a PyArrow RecordBatch or Table into tsfile using Arrow C Data + Interface for efficient batch writing without Python-level row loops. + :param data: pyarrow.RecordBatch or pyarrow.Table. Must include a + timestamp column. All other columns must match the registered schema. + :return: no return value. + """ + time_col = self.tableSchema.get_time_column() + if time_col is not None: + time_col_name = time_col.get_column_name() + else: + time_col_name = "time" + + time_col_index = data.schema.get_field_index(time_col_name) + if time_col_index < 0: + raise ValueError(f"Time column '{time_col_name}' not found in Arrow schema.") + self.writer.write_arrow_batch(self.tableSchema.get_table_name(), data, time_col_index) + def close(self): """ Close TsFileTableWriter and will flush data automatically. diff --git a/python/tsfile/tsfile_writer.pyx b/python/tsfile/tsfile_writer.pyx index 4826ef72d..30a29d3bb 100644 --- a/python/tsfile/tsfile_writer.pyx +++ b/python/tsfile/tsfile_writer.pyx @@ -16,11 +16,14 @@ # under the License. # import pandas +import pyarrow as pa from tsfile.row_record import RowRecord from tsfile.schema import TableSchema as TableSchemaPy from tsfile.schema import TimeseriesSchema as TimeseriesSchemaPy, DeviceSchema as DeviceSchemaPy from tsfile.tablet import Tablet as TabletPy +from libc.string cimport memset +from libc.stdint cimport uintptr_t from .tsfile_cpp cimport * from .tsfile_py_cpp cimport * @@ -122,6 +125,43 @@ cdef class TsFileWriterPy: finally: free_c_tablet(ctablet) + def write_arrow_batch(self, table_name: str, data, time_col_index: int = -1): + """ + Write an Arrow RecordBatch or Table into tsfile using Arrow C Data + Interface for efficient batch writing without Python-level row loops. + table_name: target table name (must be registered) + data: pyarrow.RecordBatch or pyarrow.Table + time_col_index: index of the time column in the Arrow schema. + >= 0: use the specified column as the time column. + < 0: auto-detect by Arrow timestamp type (default). + """ + if isinstance(data, pa.Table): + data = data.combine_chunks().to_batches() + if not data: + return + data = data[0] + + cdef ArrowArray arrow_array + cdef ArrowSchema arrow_schema + cdef ErrorCode errno + memset(&arrow_array, 0, sizeof(ArrowArray)) + memset(&arrow_schema, 0, sizeof(ArrowSchema)) + + cdef uintptr_t array_ptr = &arrow_array + cdef uintptr_t schema_ptr = &arrow_schema + data._export_to_c(array_ptr, schema_ptr) + + cdef bytes tname = table_name.lower().encode('utf-8') + try: + errno = _tsfile_writer_write_arrow_table( + self.writer, tname, &arrow_array, &arrow_schema, time_col_index) + check_error(errno) + finally: + if arrow_array.release != NULL: + arrow_array.release(&arrow_array) + if arrow_schema.release != NULL: + arrow_schema.release(&arrow_schema) + cpdef close(self): """ Flush data and Close tsfile writer. diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py index 723707bd1..1d9a89975 100644 --- a/python/tsfile/utils.py +++ b/python/tsfile/utils.py @@ -21,6 +21,7 @@ import numpy as np import pandas as pd from pandas.core.dtypes.common import is_integer_dtype, is_object_dtype +from pandas.core.interchange.dataframe_protocol import DataFrame from tsfile import ColumnSchema, TableSchema, ColumnCategory, TSDataType, TIME_COLUMN from tsfile.exceptions import TableNotExistError, ColumnNotExistError