From a878ad56eb0c17ffd75c50864f9b51ac63016a07 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Sun, 1 Feb 2026 23:46:00 +0800 Subject: [PATCH 1/8] tmp code. support arrow. --- cpp/CMakeLists.txt | 2 +- cpp/src/common/tsblock/vector/vector.h | 4 + cpp/src/cwrapper/CMakeLists.txt | 2 +- cpp/src/cwrapper/arrow_c.cc | 761 ++++++++++++++++++ cpp/src/cwrapper/tsfile_cwrapper.cc | 51 ++ cpp/src/cwrapper/tsfile_cwrapper.h | 65 +- cpp/src/reader/qds_with_timegenerator.h | 1 + cpp/src/reader/qds_without_timegenerator.h | 1 + cpp/src/reader/result_set.h | 4 + cpp/src/reader/table_query_executor.h | 14 +- cpp/src/reader/table_result_set.cc | 34 + cpp/src/reader/table_result_set.h | 10 +- cpp/src/reader/tsfile_reader.cc | 14 +- cpp/src/reader/tsfile_reader.h | 5 +- cpp/test/common/tsblock/arrow_tsblock_test.cc | 334 ++++++++ .../tsfile_reader_table_batch_test.cc | 475 +++++++++++ cpp/third_party/zlib-1.3.1/treebuild.xml | 188 +++-- .../zlib-1.3.1/zlib-1.3.1/treebuild.xml | 188 +++-- python/Untitled | 1 + python/lower_case_name.tsfile | Bin 0 -> 23089 bytes python/requirements.txt | 2 + python/setup.py | 16 +- python/test1.tsfile | Bin 0 -> 23089 bytes .../tests/bench_batch_arrow_vs_dataframe.py | 264 ++++++ python/tests/test_batch_arrow.py | 444 ++++++++++ python/tsfile/tsfile_cpp.pxd | 35 + python/tsfile/tsfile_py_cpp.pxd | 2 + python/tsfile/tsfile_py_cpp.pyx | 27 + python/tsfile/tsfile_reader.pyx | 47 ++ python/tsfile/utils.py | 1 + 30 files changed, 2780 insertions(+), 212 deletions(-) create mode 100644 cpp/src/cwrapper/arrow_c.cc create mode 100644 cpp/test/common/tsblock/arrow_tsblock_test.cc create mode 100644 cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc create mode 100644 python/Untitled create mode 100644 python/lower_case_name.tsfile create mode 100644 python/test1.tsfile create mode 100644 python/tests/bench_batch_arrow_vs_dataframe.py create mode 100644 python/tests/test_batch_arrow.py diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index c85150d8f..fc35de43c 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -111,7 +111,7 @@ endif () option(BUILD_TEST "Build tests" ON) message("cmake using: BUILD_TEST=${BUILD_TEST}") -option(ENABLE_ANTLR4 "Enable ANTLR4 runtime" ON) +option(ENABLE_ANTLR4 "Enable ANTLR4 runtime" OFF) message("cmake using: ENABLE_ANTLR4=${ENABLE_ANTLR4}") option(ENABLE_SNAPPY "Enable Google Snappy compression" ON) diff --git a/cpp/src/common/tsblock/vector/vector.h b/cpp/src/common/tsblock/vector/vector.h index 20a765967..37a96c543 100644 --- a/cpp/src/common/tsblock/vector/vector.h +++ b/cpp/src/common/tsblock/vector/vector.h @@ -78,6 +78,10 @@ class Vector { FORCE_INLINE bool has_null() { return has_null_; } + FORCE_INLINE common::BitMap& get_bitmap() { return nulls_; } + + FORCE_INLINE common::ByteBuffer& get_value_data() { return values_; } + // We want derived class to have access to base class members, so it is // defined as protected protected: diff --git a/cpp/src/cwrapper/CMakeLists.txt b/cpp/src/cwrapper/CMakeLists.txt index 07f52eb33..d62250bf0 100644 --- a/cpp/src/cwrapper/CMakeLists.txt +++ b/cpp/src/cwrapper/CMakeLists.txt @@ -18,7 +18,7 @@ under the License. ]] message("Running in cwrapper directory") set(CMAKE_POSITION_INDEPENDENT_CODE ON) -set(CWRAPPER_SRC_LIST tsfile_cwrapper.cc) +set(CWRAPPER_SRC_LIST tsfile_cwrapper.cc arrow_c.cc) add_library(cwrapper_obj OBJECT ${CWRAPPER_SRC_LIST}) # install header files diff --git a/cpp/src/cwrapper/arrow_c.cc b/cpp/src/cwrapper/arrow_c.cc new file mode 100644 index 000000000..e7f6c2de5 --- /dev/null +++ b/cpp/src/cwrapper/arrow_c.cc @@ -0,0 +1,761 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include "common/allocator/alloc_base.h" +#include "common/tsblock/tsblock.h" +#include "common/tsblock/tuple_desc.h" +#include "common/tsblock/vector/vector.h" +#include "cwrapper/tsfile_cwrapper.h" +#include "utils/errno_define.h" + +namespace arrow { + +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +struct ArrowArrayData { + void** buffers; + size_t n_buffers; +}; + +struct ArrowSchemaData { + std::vector* format_strings; + std::vector* name_strings; + ArrowSchema** children; + size_t n_children; +}; + +struct StructArrayData { + ArrowArray** children; + size_t n_children; +}; + +static const char* GetArrowFormatString(common::TSDataType datatype) { + switch (datatype) { + case common::BOOLEAN: + return "b"; + case common::INT32: + return "i"; + case common::INT64: + return "l"; + case common::TIMESTAMP: // nanosecond, no timezone + return "tsn:"; + case common::FLOAT: + return "f"; + case common::DOUBLE: + return "g"; + case common::TEXT: + case common::STRING: + return "u"; + case common::DATE: + return "tdD"; // date32: days since Unix epoch, stored as int32 + default: + return nullptr; + } +} + +static inline size_t GetNullBitmapSize(int64_t length) { + return (length + 7) / 8; +} + +static void ReleaseArrowArray(ArrowArray* array) { + if (array == nullptr || array->private_data == nullptr) { + return; + } + ArrowArrayData* data = static_cast(array->private_data); + if (data->buffers != nullptr) { + for (size_t i = 0; i < data->n_buffers; ++i) { + if (data->buffers[i] != nullptr) { + common::mem_free(data->buffers[i]); + } + } + common::mem_free(data->buffers); + } + common::mem_free(data); + + array->length = 0; + array->null_count = 0; + array->offset = 0; + array->n_buffers = 0; + array->n_children = 0; + array->buffers = nullptr; + array->children = nullptr; + array->dictionary = nullptr; + array->release = nullptr; + array->private_data = nullptr; +} + +static void ReleaseStructArrowArray(ArrowArray* array) { + if (array == nullptr || array->private_data == nullptr) { + return; + } + StructArrayData* data = static_cast(array->private_data); + if (data->children != nullptr) { + for (size_t i = 0; i < data->n_children; ++i) { + if (data->children[i] != nullptr) { + if (data->children[i]->release != nullptr) { + data->children[i]->release(data->children[i]); + } + common::mem_free(data->children[i]); + } + } + common::mem_free(data->children); + } + delete data; + + array->length = 0; + array->null_count = 0; + array->offset = 0; + array->n_buffers = 0; + array->n_children = 0; + array->buffers = nullptr; + array->children = nullptr; + array->dictionary = nullptr; + array->release = nullptr; + array->private_data = nullptr; +} + +static void ReleaseArrowSchema(ArrowSchema* schema) { + if (schema == nullptr || schema->private_data == nullptr) { + return; + } + ArrowSchemaData* data = static_cast(schema->private_data); + + // Release children schemas first + if (data->children != nullptr) { + for (size_t i = 0; i < data->n_children; ++i) { + if (data->children[i] != nullptr) { + if (data->children[i]->release != nullptr) { + data->children[i]->release(data->children[i]); + } + common::mem_free(data->children[i]); + } + } + common::mem_free(data->children); + } + + // Release string storage + if (data->format_strings != nullptr) { + delete data->format_strings; + } + if (data->name_strings != nullptr) { + delete data->name_strings; + } + + delete data; + + schema->format = nullptr; + schema->name = nullptr; + schema->metadata = nullptr; + schema->flags = 0; + schema->n_children = 0; + schema->children = nullptr; + schema->dictionary = nullptr; + schema->release = nullptr; + schema->private_data = nullptr; +} + +template +inline int BuildFixedLengthArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + size_t type_size = sizeof(CType); + // Arrow C Data Interface: fixed-width types always have 2 buffers + // buffers[0] = validity bitmap (may be NULL if no nulls) + // buffers[1] = values + static constexpr int64_t n_buffers = 2; + + ArrowArrayData* array_data = static_cast( + common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + if (array_data == nullptr) { + return common::E_OOM; + } + + array_data->n_buffers = n_buffers; + array_data->buffers = static_cast( + common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); + if (array_data->buffers == nullptr) { + common::mem_free(array_data); + return common::E_OOM; + } + + for (int64_t i = 0; i < n_buffers; ++i) { + array_data->buffers[i] = nullptr; + } + + uint8_t* null_bitmap = nullptr; + if (has_null) { + size_t null_bitmap_size = GetNullBitmapSize(row_count); + null_bitmap = static_cast( + common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); + if (null_bitmap == nullptr) { + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + common::BitMap& vec_bitmap = vec->get_bitmap(); + char* vec_bitmap_data = vec_bitmap.get_bitmap(); + for (size_t i = 0; i < null_bitmap_size; ++i) { + null_bitmap[i] = ~static_cast(vec_bitmap_data[i]); + } + array_data->buffers[0] = null_bitmap; + + int64_t null_count = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (vec_bitmap.test(i)) { + null_count++; + } + } + out_array->null_count = null_count; + } else { + array_data->buffers[0] = nullptr; + out_array->null_count = 0; + } + + char* vec_data = vec->get_value_data().get_data(); + void* data_buffer = nullptr; + + if (std::is_same::value) { + size_t packed_size = GetNullBitmapSize(row_count); + uint8_t* packed_buffer = static_cast( + common::mem_alloc(packed_size, common::MOD_TSBLOCK)); + if (packed_buffer == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + std::memset(packed_buffer, 0, packed_size); + + const uint8_t* src = reinterpret_cast(vec_data); + for (uint32_t i = 0; i < row_count; ++i) { + if (src[i] != 0) { + uint32_t byte_idx = i / 8; + uint32_t bit_idx = i % 8; + packed_buffer[byte_idx] |= (1 << bit_idx); + } + } + + data_buffer = packed_buffer; + } else { + size_t data_size = type_size * row_count; + data_buffer = common::mem_alloc(data_size, common::MOD_TSBLOCK); + if (data_buffer == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + std::memcpy(data_buffer, vec_data, data_size); + } + + array_data->buffers[1] = data_buffer; + + out_array->length = row_count; + out_array->offset = 0; + out_array->n_buffers = n_buffers; + out_array->n_children = 0; + out_array->buffers = const_cast(array_data->buffers); + out_array->children = nullptr; + out_array->dictionary = nullptr; + out_array->release = ReleaseArrowArray; + out_array->private_data = array_data; + + return common::E_OK; +} + +static int BuildStringArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + int64_t n_buffers = 3; + ArrowArrayData* array_data = static_cast( + common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + if (array_data == nullptr) { + return common::E_OOM; + } + + array_data->n_buffers = n_buffers; + array_data->buffers = static_cast( + common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); + if (array_data->buffers == nullptr) { + common::mem_free(array_data); + return common::E_OOM; + } + + for (int64_t i = 0; i < n_buffers; ++i) { + array_data->buffers[i] = nullptr; + } + + uint8_t* null_bitmap = nullptr; + if (has_null) { + size_t null_bitmap_size = GetNullBitmapSize(row_count); + null_bitmap = static_cast( + common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); + if (null_bitmap == nullptr) { + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + common::BitMap& vec_bitmap = vec->get_bitmap(); + char* vec_bitmap_data = vec_bitmap.get_bitmap(); + for (size_t i = 0; i < null_bitmap_size; ++i) { + null_bitmap[i] = ~static_cast(vec_bitmap_data[i]); + } + array_data->buffers[0] = null_bitmap; + + int64_t null_count = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (vec_bitmap.test(i)) { + null_count++; + } + } + out_array->null_count = null_count; + } else { + array_data->buffers[0] = nullptr; + out_array->null_count = 0; + } + size_t offsets_size = sizeof(int32_t) * (row_count + 1); + int32_t* offsets = static_cast( + common::mem_alloc(offsets_size, common::MOD_TSBLOCK)); + if (offsets == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + offsets[0] = 0; + uint32_t current_offset = 0; + char* vec_data = vec->get_value_data().get_data(); + uint32_t vec_offset = 0; + + // 获取 vec_bitmap 用于后续检查 + common::BitMap& vec_bitmap = vec->get_bitmap(); + + for (uint32_t i = 0; i < row_count; ++i) { + if (has_null && vec_bitmap.test(i)) { + offsets[i + 1] = current_offset; + } else { + uint32_t len = 0; + std::memcpy(&len, vec_data + vec_offset, sizeof(uint32_t)); + vec_offset += sizeof(uint32_t); + + current_offset += len; + offsets[i + 1] = current_offset; + vec_offset += len; + } + } + + array_data->buffers[1] = offsets; + + size_t data_size = current_offset; + uint8_t* data_buffer = static_cast( + common::mem_alloc(data_size, common::MOD_TSBLOCK)); + if (data_buffer == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(offsets); + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + vec_offset = 0; + uint32_t data_offset = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (!has_null || !vec_bitmap.test(i)) { + uint32_t len = 0; + std::memcpy(&len, vec_data + vec_offset, sizeof(uint32_t)); + vec_offset += sizeof(uint32_t); + + if (len > 0) { + std::memcpy(data_buffer + data_offset, vec_data + vec_offset, + len); + data_offset += len; + } + vec_offset += len; + } + } + + array_data->buffers[2] = data_buffer; + + out_array->length = row_count; + out_array->offset = 0; + out_array->n_buffers = n_buffers; + out_array->n_children = 0; + out_array->buffers = const_cast(array_data->buffers); + out_array->children = nullptr; + out_array->dictionary = nullptr; + out_array->release = ReleaseArrowArray; + out_array->private_data = array_data; + + return common::E_OK; +} + +// Convert TsFile YYYYMMDD integer to days since Unix epoch (1970-01-01) +static int32_t YYYYMMDDToDaysSinceEpoch(int32_t yyyymmdd) { + int year = yyyymmdd / 10000; + int month = (yyyymmdd % 10000) / 100; + int day = yyyymmdd % 100; + + std::tm date = {}; + date.tm_year = year - 1900; + date.tm_mon = month - 1; + date.tm_mday = day; + date.tm_hour = 12; + date.tm_isdst = -1; + + std::tm epoch = {}; + epoch.tm_year = 70; + epoch.tm_mon = 0; + epoch.tm_mday = 1; + epoch.tm_hour = 12; + epoch.tm_isdst = -1; + + time_t t1 = mktime(&date); + time_t t2 = mktime(&epoch); + return static_cast((t1 - t2) / (60 * 60 * 24)); +} + +static int BuildDateArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + static constexpr int64_t n_buffers = 2; + + ArrowArrayData* array_data = static_cast( + common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + if (array_data == nullptr) return common::E_OOM; + + array_data->n_buffers = n_buffers; + array_data->buffers = static_cast( + common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); + if (array_data->buffers == nullptr) { + common::mem_free(array_data); + return common::E_OOM; + } + for (int64_t i = 0; i < n_buffers; ++i) array_data->buffers[i] = nullptr; + + common::BitMap& vec_bitmap = vec->get_bitmap(); + uint8_t* null_bitmap = nullptr; + if (has_null) { + size_t null_bitmap_size = GetNullBitmapSize(row_count); + null_bitmap = static_cast( + common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); + if (null_bitmap == nullptr) { + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + char* vec_bitmap_data = vec_bitmap.get_bitmap(); + for (size_t i = 0; i < null_bitmap_size; ++i) { + null_bitmap[i] = ~static_cast(vec_bitmap_data[i]); + } + int64_t null_count = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (vec_bitmap.test(i)) null_count++; + } + out_array->null_count = null_count; + array_data->buffers[0] = null_bitmap; + } else { + out_array->null_count = 0; + array_data->buffers[0] = nullptr; + } + + int32_t* data_buffer = static_cast( + common::mem_alloc(sizeof(int32_t) * row_count, common::MOD_TSBLOCK)); + if (data_buffer == nullptr) { + if (null_bitmap) common::mem_free(null_bitmap); + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + char* vec_data = vec->get_value_data().get_data(); + for (uint32_t i = 0; i < row_count; ++i) { + if (has_null && vec_bitmap.test(i)) { + data_buffer[i] = 0; + } else { + int32_t yyyymmdd = 0; + std::memcpy(&yyyymmdd, vec_data + i * sizeof(int32_t), + sizeof(int32_t)); + data_buffer[i] = YYYYMMDDToDaysSinceEpoch(yyyymmdd); + } + } + + array_data->buffers[1] = data_buffer; + out_array->length = row_count; + out_array->offset = 0; + out_array->n_buffers = n_buffers; + out_array->n_children = 0; + out_array->buffers = const_cast(array_data->buffers); + out_array->children = nullptr; + out_array->dictionary = nullptr; + out_array->release = ReleaseArrowArray; + out_array->private_data = array_data; + return common::E_OK; +} + +// Helper function to build ArrowArray for a single column +static int BuildColumnArrowArray(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + common::TSDataType data_type = vec->get_vector_type(); + const char* format = GetArrowFormatString(data_type); + if (format == nullptr) { + return common::E_TYPE_NOT_SUPPORTED; + } + + int ret = common::E_OK; + switch (data_type) { + case common::BOOLEAN: + ret = BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::INT32: + ret = + BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::DATE: + ret = BuildDateArrowArrayC(vec, row_count, out_array); + break; + case common::INT64: + case common::TIMESTAMP: + ret = + BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::FLOAT: + ret = BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::DOUBLE: + ret = + BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::TEXT: + case common::STRING: + ret = BuildStringArrowArrayC(vec, row_count, out_array); + break; + default: + return common::E_TYPE_NOT_SUPPORTED; + } + return ret; +} + +// Build ArrowSchema for a single column +static int BuildColumnArrowSchema(common::TSDataType data_type, + const std::string& column_name, + ArrowSchema* out_schema) { + if (out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + const char* format = GetArrowFormatString(data_type); + if (format == nullptr) { + return common::E_TYPE_NOT_SUPPORTED; + } + + ArrowSchemaData* schema_data = new ArrowSchemaData(); + schema_data->format_strings = new std::vector(); + schema_data->name_strings = new std::vector(); + schema_data->children = nullptr; + schema_data->n_children = 0; + + schema_data->format_strings->push_back(format); + schema_data->name_strings->push_back(column_name); + + out_schema->format = schema_data->format_strings->back().c_str(); + out_schema->name = schema_data->name_strings->back().c_str(); + out_schema->metadata = nullptr; + out_schema->flags = ARROW_FLAG_NULLABLE; + out_schema->n_children = 0; + out_schema->children = nullptr; + out_schema->dictionary = nullptr; + out_schema->release = ReleaseArrowSchema; + out_schema->private_data = schema_data; + + return common::E_OK; +} + +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema) { + if (out_array == nullptr || out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + uint32_t row_count = tsblock.get_row_count(); + uint32_t column_count = tsblock.get_column_count(); + common::TupleDesc* tuple_desc = tsblock.get_tuple_desc(); + + if (row_count == 0 || column_count == 0) { + return common::E_INVALID_ARG; + } + + // Build ArrowSchema for struct type + ArrowSchemaData* schema_data = new ArrowSchemaData(); + schema_data->format_strings = new std::vector(); + schema_data->name_strings = new std::vector(); + schema_data->n_children = column_count; + schema_data->children = static_cast(common::mem_alloc( + column_count * sizeof(ArrowSchema*), common::MOD_TSBLOCK)); + if (schema_data->children == nullptr) { + delete schema_data->format_strings; + delete schema_data->name_strings; + delete schema_data; + return common::E_OOM; + } + + // Store format string for struct type + schema_data->format_strings->push_back("+s"); + schema_data->name_strings->push_back(""); + + // Build schema for each column + for (uint32_t i = 0; i < column_count; ++i) { + schema_data->children[i] = static_cast( + common::mem_alloc(sizeof(ArrowSchema), common::MOD_TSBLOCK)); + if (schema_data->children[i] == nullptr) { + for (uint32_t j = 0; j < i; ++j) { + if (schema_data->children[j] != nullptr && + schema_data->children[j]->release != nullptr) { + schema_data->children[j]->release(schema_data->children[j]); + } + } + common::mem_free(schema_data->children); + delete schema_data->format_strings; + delete schema_data->name_strings; + delete schema_data; + return common::E_OOM; + } + + common::TSDataType col_type = tuple_desc->get_column_type(i); + std::string col_name = tuple_desc->get_column_name(i); + + int ret = BuildColumnArrowSchema(col_type, col_name, + schema_data->children[i]); + if (ret != common::E_OK) { + for (uint32_t j = 0; j <= i; ++j) { + if (schema_data->children[j] != nullptr && + schema_data->children[j]->release != nullptr) { + schema_data->children[j]->release(schema_data->children[j]); + } + } + common::mem_free(schema_data->children); + delete schema_data->format_strings; + delete schema_data->name_strings; + delete schema_data; + return ret; + } + } + + out_schema->format = schema_data->format_strings->at(0).c_str(); + out_schema->name = schema_data->name_strings->at(0).c_str(); + out_schema->metadata = nullptr; + out_schema->flags = 0; + out_schema->n_children = column_count; + out_schema->children = schema_data->children; + out_schema->dictionary = nullptr; + out_schema->release = ReleaseArrowSchema; + out_schema->private_data = schema_data; + + ArrowArray** children_arrays = static_cast(common::mem_alloc( + column_count * sizeof(ArrowArray*), common::MOD_TSBLOCK)); + if (children_arrays == nullptr) { + ReleaseArrowSchema(out_schema); + return common::E_OOM; + } + + for (uint32_t i = 0; i < column_count; ++i) { + children_arrays[i] = static_cast( + common::mem_alloc(sizeof(ArrowArray), common::MOD_TSBLOCK)); + if (children_arrays[i] == nullptr) { + for (uint32_t j = 0; j < i; ++j) { + if (children_arrays[j] != nullptr && + children_arrays[j]->release != nullptr) { + children_arrays[j]->release(children_arrays[j]); + } + } + common::mem_free(children_arrays); + ReleaseArrowSchema(out_schema); + return common::E_OOM; + } + + common::Vector* vec = tsblock.get_vector(i); + int ret = BuildColumnArrowArray(vec, row_count, children_arrays[i]); + if (ret != common::E_OK) { + for (uint32_t j = 0; j <= i; ++j) { + if (children_arrays[j] != nullptr && + children_arrays[j]->release != nullptr) { + children_arrays[j]->release(children_arrays[j]); + } + } + common::mem_free(children_arrays); + ReleaseArrowSchema(out_schema); + return ret; + } + } + + StructArrayData* struct_data = new StructArrayData(); + struct_data->children = children_arrays; + struct_data->n_children = column_count; + + // Arrow C Data Interface: struct type requires n_buffers = 1 (validity + // bitmap) buffers[0] may be NULL if there are no nulls at the struct level + static const void* struct_buffers[1] = {nullptr}; + + out_array->length = row_count; + out_array->null_count = 0; // struct itself is never null + out_array->offset = 0; + out_array->n_buffers = 1; + out_array->n_children = column_count; + out_array->buffers = struct_buffers; + out_array->children = children_arrays; + out_array->dictionary = nullptr; + out_array->release = ReleaseStructArrowArray; + out_array->private_data = struct_data; + + return common::E_OK; +} + +} // namespace arrow diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index fbcf4e6f1..298f27f0a 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -24,13 +24,21 @@ #include #include +#include #include #include "common/tablet.h" #include "reader/result_set.h" +#include "reader/table_result_set.h" #include "reader/tsfile_reader.h" #include "writer/tsfile_writer.h" +// Forward declaration for arrow namespace function (defined in arrow_c.cc) +namespace arrow { +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema); +} + #ifdef __cplusplus extern "C" { #endif @@ -361,6 +369,21 @@ ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, return table_result_set; } +ResultSet tsfile_query_table_batch(TsFileReader reader, const char* table_name, + char** columns, uint32_t column_num, + Timestamp start_time, Timestamp end_time, + int batch_size, ERRNO* err_code) { + auto* r = static_cast(reader); + storage::ResultSet* table_result_set = nullptr; + std::vector column_names; + for (uint32_t i = 0; i < column_num; i++) { + column_names.emplace_back(columns[i]); + } + *err_code = r->query(table_name, column_names, start_time, end_time, + table_result_set, batch_size); + return table_result_set; +} + bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { auto* r = static_cast(result_set); bool has_next = true; @@ -373,6 +396,34 @@ bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { return has_next; } +ERRNO tsfile_result_set_get_next_tsblock_as_arrow(ResultSet result_set, + ArrowArray* out_array, + ArrowSchema* out_schema) { + if (result_set == nullptr || out_array == nullptr || + out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + auto* r = static_cast(result_set); + auto* table_result_set = dynamic_cast(r); + if (table_result_set == nullptr) { + return common::E_INVALID_ARG; + } + + common::TsBlock* tsblock = nullptr; + int ret = table_result_set->get_next_tsblock(tsblock); + if (ret != common::E_OK) { + return ret; + } + + if (tsblock == nullptr) { + return common::E_NO_MORE_DATA; + } + + ret = arrow::TsBlockToArrowStruct(*tsblock, out_array, out_schema); + return ret; +} + #define TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(type) \ type tsfile_result_set_get_value_by_name_##type(ResultSet result_set, \ const char* column_name) { \ diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 643b4e52b..b04e32c26 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -20,6 +20,7 @@ #ifndef SRC_CWRAPPER_TSFILE_CWRAPPER_H_ #define SRC_CWRAPPER_TSFILE_CWRAPPER_H_ #ifdef __cplusplus + extern "C" { #endif @@ -124,6 +125,39 @@ typedef void* TsRecord; typedef void* ResultSet; +typedef struct arrow_schema { + // Array type description + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct arrow_schema** children; + struct arrow_schema* dictionary; + + // Release callback + void (*release)(struct arrow_schema*); + // Opaque producer-specific data + void* private_data; +} ArrowSchema; + +typedef struct arrow_array { + // Array data description + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct arrow_array** children; + struct arrow_array* dictionary; + + // Release callback + void (*release)(struct arrow_array*); + // Opaque producer-specific data + void* private_data; +} ArrowArray; + typedef int32_t ERRNO; typedef int64_t Timestamp; @@ -444,11 +478,15 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name, ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, uint32_t column_num, Timestamp start_time, Timestamp end_time, ERRNO* err_code); +ResultSet tsfile_query_table_batch(TsFileReader reader, const char* table_name, + char** columns, uint32_t column_num, + Timestamp start_time, Timestamp end_time, + int batch_size, ERRNO* err_code); // ResultSet tsfile_reader_query_device(TsFileReader reader, // const char* device_name, -// char** sensor_name, uint32_t sensor_num, -// Timestamp start_time, Timestamp -// end_time); +// char** sensor_name, uint32_t +// sensor_num, Timestamp start_time, +// Timestamp end_time); /** * @brief Check and fetch the next row in the ResultSet. @@ -458,6 +496,27 @@ ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, */ bool tsfile_result_set_next(ResultSet result_set, ERRNO* error_code); +/** + * @brief Gets the next TsBlock from batch ResultSet and converts it to Arrow + * format. + * + * @param result_set [in] Valid ResultSet handle from batch query + * (tsfile_query_table_batch). + * @param out_array [out] Pointer to ArrowArray pointer. Will be set to the + * converted Arrow array. + * @param out_schema [out] Pointer to ArrowSchema pointer. Will be set to the + * converted Arrow schema. + * @return ERRNO - E_OK(0) on success, E_NO_MORE_DATA if no more blocks, or + * other error codes. + * @note Caller should release ArrowArray and ArrowSchema by calling their + * release callbacks when done. + * @note This function should only be called on ResultSet obtained from + * tsfile_query_table_batch with batch_size > 0. + */ +ERRNO tsfile_result_set_get_next_tsblock_as_arrow(ResultSet result_set, + ArrowArray* out_array, + ArrowSchema* out_schema); + /** * @brief Gets value from current row by column name (generic types). * diff --git a/cpp/src/reader/qds_with_timegenerator.h b/cpp/src/reader/qds_with_timegenerator.h index 52892df14..c0651f0b1 100644 --- a/cpp/src/reader/qds_with_timegenerator.h +++ b/cpp/src/reader/qds_with_timegenerator.h @@ -123,6 +123,7 @@ class QDSWithTimeGenerator : public ResultSet { bool is_null(uint32_t column_index); RowRecord* get_row_record(); std::shared_ptr get_metadata(); + int get_next_tsblock(common::TsBlock*& block) { return common::E_OK; }; private: int construct_node_tree(Expression* expr, Node*& node); diff --git a/cpp/src/reader/qds_without_timegenerator.h b/cpp/src/reader/qds_without_timegenerator.h index 0619fa673..f949e04b5 100644 --- a/cpp/src/reader/qds_without_timegenerator.h +++ b/cpp/src/reader/qds_without_timegenerator.h @@ -48,6 +48,7 @@ class QDSWithoutTimeGenerator : public ResultSet { bool is_null(uint32_t column_index); RowRecord* get_row_record(); std::shared_ptr get_metadata(); + int get_next_tsblock(common::TsBlock*& block) { return common::E_OK; }; private: int get_next_tsblock(uint32_t index, bool alloc_mem); diff --git a/cpp/src/reader/result_set.h b/cpp/src/reader/result_set.h index 87303cef4..26f3a3fa9 100644 --- a/cpp/src/reader/result_set.h +++ b/cpp/src/reader/result_set.h @@ -25,6 +25,7 @@ #include #include "common/row_record.h" +#include "common/tsblock/tsblock.h" namespace storage { /** @@ -155,6 +156,9 @@ class ResultSet : std::enable_shared_from_this { ASSERT(column_index >= 0 && column_index < row_record->get_col_num()); return row_record->get_field(column_index)->get_value(); } + + virtual int get_next_tsblock(common::TsBlock*& block) = 0; + /** * @brief Get the row record of the result set * diff --git a/cpp/src/reader/table_query_executor.h b/cpp/src/reader/table_query_executor.h index 974e6b45b..76427d7e8 100644 --- a/cpp/src/reader/table_query_executor.h +++ b/cpp/src/reader/table_query_executor.h @@ -44,13 +44,20 @@ class TableQueryExecutor { : meta_data_querier_(meta_data_querier), tsfile_io_reader_(tsfile_io_reader), table_query_ordering_(table_query_ordering), - block_size_(block_size) {} - TableQueryExecutor(ReadFile* read_file) { + block_size_(block_size), + batch_mode_(false) {} + TableQueryExecutor(ReadFile* read_file, const int batch_size = 0) { tsfile_io_reader_ = new TsFileIOReader(); tsfile_io_reader_->init(read_file); meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_); table_query_ordering_ = TableQueryOrdering::DEVICE; - block_size_ = 1024; + if (batch_size == 0) { + block_size_ = 1024; + batch_mode_ = false; + } else { + block_size_ = batch_size; + batch_mode_ = true; + } } ~TableQueryExecutor() { if (meta_data_querier_ != nullptr) { @@ -76,6 +83,7 @@ class TableQueryExecutor { TsFileIOReader* tsfile_io_reader_; TableQueryOrdering table_query_ordering_; int32_t block_size_; + bool batch_mode_; }; } // namespace storage diff --git a/cpp/src/reader/table_result_set.cc b/cpp/src/reader/table_result_set.cc index aeeefb463..9a4281f12 100644 --- a/cpp/src/reader/table_result_set.cc +++ b/cpp/src/reader/table_result_set.cc @@ -37,7 +37,12 @@ void TableResultSet::init() { TableResultSet::~TableResultSet() { close(); } int TableResultSet::next(bool& has_next) { + if (batch_mode_) { + return tsblock_reader_->has_next(has_next); + } + int ret = common::E_OK; + while (row_iterator_ == nullptr || !row_iterator_->has_next()) { if (RET_FAIL(tsblock_reader_->has_next(has_next))) { return ret; @@ -103,6 +108,35 @@ std::shared_ptr TableResultSet::get_metadata() { return result_set_metadata_; } +int TableResultSet::get_next_tsblock(common::TsBlock*& block) { + int ret = common::E_OK; + block = nullptr; + + if (!batch_mode_) { + return common::E_INVALID_ARG; + } + + bool has_next = false; + if (RET_FAIL(tsblock_reader_->has_next(has_next))) { + return common::E_NO_MORE_DATA; + } + + if (!has_next) { + return common::E_NO_MORE_DATA; + } + + if (RET_FAIL(tsblock_reader_->next(tsblock_))) { + return common::E_NO_MORE_DATA; + } + + if (tsblock_ == nullptr) { + return common::E_NO_MORE_DATA; + } + + block = tsblock_; + return common::E_OK; +} + void TableResultSet::close() { tsblock_reader_->close(); pa_.destroy(); diff --git a/cpp/src/reader/table_result_set.h b/cpp/src/reader/table_result_set.h index 4192f7c2f..251c8029c 100644 --- a/cpp/src/reader/table_result_set.h +++ b/cpp/src/reader/table_result_set.h @@ -28,19 +28,22 @@ class TableResultSet : public ResultSet { public: explicit TableResultSet(std::unique_ptr tsblock_reader, std::vector column_names, - std::vector data_types) + std::vector data_types, + bool batch_mode = false) : tsblock_reader_(std::move(tsblock_reader)), column_names_(column_names), - data_types_(data_types) { + data_types_(data_types), + batch_mode_(batch_mode) { init(); } - ~TableResultSet(); + ~TableResultSet() override; int next(bool& has_next) override; bool is_null(const std::string& column_name) override; bool is_null(uint32_t column_index) override; RowRecord* get_row_record() override; std::shared_ptr get_metadata() override; void close() override; + int get_next_tsblock(common::TsBlock*& block) override; private: void init(); @@ -51,6 +54,7 @@ class TableResultSet : public ResultSet { std::vector> tsblock_readers_; std::vector column_names_; std::vector data_types_; + const bool batch_mode_; }; } // namespace storage #endif // TABLE_RESULT_SET_H \ No newline at end of file diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index f97570885..436f262e8 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -42,7 +42,6 @@ int TsFileReader::open(const std::string& file_path) { } else if (RET_FAIL(tsfile_executor_->init(read_file_))) { std::cout << "filed to init " << ret << std::endl; } - table_query_executor_ = new storage::TableQueryExecutor(read_file_); return ret; } @@ -87,15 +86,16 @@ int TsFileReader::query(std::vector& path_list, int64_t start_time, int TsFileReader::query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, int64_t end_time, - ResultSet*& result_set) { + ResultSet*& result_set, int batch_size) { return this->query(table_name, columns_names, start_time, end_time, - result_set, nullptr); + result_set, nullptr, batch_size); } int TsFileReader::query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, int64_t end_time, - ResultSet*& result_set, Filter* tag_filter) { + ResultSet*& result_set, Filter* tag_filter, + int batch_size) { int ret = E_OK; TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta(); if (tsfile_meta == nullptr) { @@ -108,6 +108,9 @@ int TsFileReader::query(const std::string& table_name, } Filter* time_filter = new TimeBetween(start_time, end_time, false); + if (table_query_executor_ == nullptr) { + table_query_executor_ = new TableQueryExecutor(read_file_, batch_size); + } ret = table_query_executor_->query(to_lower(table_name), columns_names, time_filter, tag_filter, nullptr, result_set); @@ -185,6 +188,9 @@ int TsFileReader::query_table_on_tree( columns_names[i] = "col_" + std::to_string(i); } Filter* time_filter = new TimeBetween(star_time, end_time, false); + if (table_query_executor_ == nullptr) { + table_query_executor_ = new TableQueryExecutor(read_file_); + } ret = table_query_executor_->query_on_tree( satisfied_device_ids, columns_names, measurement_names_to_query, time_filter, result_set); diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index 8a6ba2264..526a96351 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -95,7 +95,7 @@ class TsFileReader { */ int query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, - int64_t end_time, ResultSet*& result_set); + int64_t end_time, ResultSet*& result_set, int batch_size = 0); /** * @brief query the tsfile by the table name, columns names, start time @@ -111,7 +111,8 @@ class TsFileReader { */ int query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, - int64_t end_time, ResultSet*& result_set, Filter* tag_filter); + int64_t end_time, ResultSet*& result_set, Filter* tag_filter, + int batch_size = 0); int query_table_on_tree(const std::vector& measurement_names, int64_t star_time, int64_t end_time, diff --git a/cpp/test/common/tsblock/arrow_tsblock_test.cc b/cpp/test/common/tsblock/arrow_tsblock_test.cc new file mode 100644 index 000000000..123efb59f --- /dev/null +++ b/cpp/test/common/tsblock/arrow_tsblock_test.cc @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include + +#include "common/tsblock/tsblock.h" +#include "cwrapper/tsfile_cwrapper.h" +#include "utils/db_utils.h" + +// Forward declarations for arrow namespace (functions are defined in +// arrow_c.cc) +namespace arrow { +// Type aliases for Arrow types (defined in tsfile_cwrapper.h) +using ArrowArray = ::ArrowArray; +using ArrowSchema = ::ArrowSchema; +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +// Function declaration (defined in arrow_c.cc) +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema); +} // namespace arrow + +static void VerifyArrowSchema( + const ::arrow::ArrowSchema* schema, + const std::vector& expected_names, + const std::vector& expected_formats) { + ASSERT_NE(schema, nullptr); + EXPECT_STREQ(schema->format, "+s"); + EXPECT_EQ(schema->n_children, expected_names.size()); + ASSERT_NE(schema->children, nullptr); + + for (size_t i = 0; i < expected_names.size(); ++i) { + const arrow::ArrowSchema* child = schema->children[i]; + ASSERT_NE(child, nullptr); + EXPECT_STREQ(child->name, expected_names[i].c_str()); + EXPECT_STREQ(child->format, expected_formats[i]); + EXPECT_EQ(child->flags, ARROW_FLAG_NULLABLE); + } +} + +static void VerifyArrowArrayData(const arrow::ArrowArray* array, + uint32_t expected_length) { + ASSERT_NE(array, nullptr); + EXPECT_EQ(array->length, expected_length); + EXPECT_EQ(array->n_children, 3); + ASSERT_NE(array->children, nullptr); +} + +TEST(ArrowTsBlockTest, NormalTsBlock_NoNulls) { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + common::ColumnSchema col3("string_col", common::STRING, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + tuple_desc.push_back(col3); + + common::TsBlock tsblock(&tuple_desc, 10); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + + for (int i = 0; i < 5; ++i) { + ASSERT_TRUE(row_appender.add_row()); + + int32_t int_val = 100 + i; + row_appender.append(0, reinterpret_cast(&int_val), + sizeof(int32_t)); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } + + EXPECT_EQ(tsblock.get_row_count(), 5); + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + std::vector expected_names = {"int_col", "double_col", + "string_col"}; + std::vector expected_formats = {"i", "g", "u"}; + VerifyArrowSchema(&schema, expected_names, expected_formats); + + VerifyArrowArrayData(&array, 5); + + ASSERT_NE(array.children, nullptr); + ASSERT_NE(array.children[0], nullptr); + ASSERT_NE(array.children[1], nullptr); + ASSERT_NE(array.children[2], nullptr); + + const ArrowArray* int_array = array.children[0]; + EXPECT_EQ(int_array->length, 5); + EXPECT_EQ(int_array->null_count, 0); + ASSERT_NE(int_array->buffers, nullptr); + const int32_t* int_data = reinterpret_cast( + int_array->buffers[int_array->n_buffers - 1]); + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(int_data[i], 100 + i); + } + + const arrow::ArrowArray* double_array = array.children[1]; + EXPECT_EQ(double_array->length, 5); + EXPECT_EQ(double_array->null_count, 0); + const double* double_data = reinterpret_cast( + double_array->buffers[double_array->n_buffers - 1]); + for (int i = 0; i < 5; ++i) { + EXPECT_DOUBLE_EQ(double_data[i], 3.14 + i); + } + const arrow::ArrowArray* string_array = array.children[2]; + EXPECT_EQ(string_array->length, 5); + EXPECT_EQ(string_array->null_count, 0); + ASSERT_NE(string_array->buffers, nullptr); + const int32_t* offsets = + reinterpret_cast(string_array->buffers[1]); + const char* string_data = + reinterpret_cast(string_array->buffers[2]); + + for (int i = 0; i < 5; ++i) { + int32_t start = offsets[i]; + int32_t end = offsets[i + 1]; + std::string expected_str = "test" + std::to_string(i); + std::string actual_str(string_data + start, end - start); + EXPECT_EQ(actual_str, expected_str); + } + + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } +} + +TEST(ArrowTsBlockTest, TsBlock_WithNulls) { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + common::ColumnSchema col3("string_col", common::STRING, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + tuple_desc.push_back(col3); + + common::TsBlock tsblock(&tuple_desc, 10); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < 5; ++i) { + ASSERT_TRUE(row_appender.add_row()); + + if (i == 1) { + row_appender.append_null(0); + row_appender.append_null(1); + row_appender.append_null(2); + } else if (i == 3) { + row_appender.append_null(0); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } else { + int32_t int_val = 100 + i; + row_appender.append(0, reinterpret_cast(&int_val), + sizeof(int32_t)); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } + } + + EXPECT_EQ(tsblock.get_row_count(), 5); + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + std::vector expected_names = {"int_col", "double_col", + "string_col"}; + std::vector expected_formats = {"i", "g", "u"}; + VerifyArrowSchema(&schema, expected_names, expected_formats); + + VerifyArrowArrayData(&array, 5); + + const arrow::ArrowArray* int_array = array.children[0]; + EXPECT_EQ(int_array->null_count, 2); + + const arrow::ArrowArray* double_array = array.children[1]; + EXPECT_EQ(double_array->null_count, 1); + + const arrow::ArrowArray* string_array = array.children[2]; + EXPECT_EQ(string_array->null_count, 1); + + ASSERT_NE(int_array->buffers[0], nullptr); + const uint8_t* null_bitmap = + reinterpret_cast(int_array->buffers[0]); + EXPECT_FALSE(null_bitmap[0] & (1 << 1)); + EXPECT_FALSE(null_bitmap[0] & (1 << 3)); + EXPECT_TRUE(null_bitmap[0] & (1 << 0)); + EXPECT_TRUE(null_bitmap[0] & (1 << 2)); + EXPECT_TRUE(null_bitmap[0] & (1 << 4)); + + const int32_t* int_data = reinterpret_cast( + int_array->buffers[int_array->n_buffers - 1]); + EXPECT_NE(int_data, nullptr); + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } +} + +TEST(ArrowTsBlockTest, TsBlock_EdgeCases) { + { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("single_col", common::INT64, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + + common::TsBlock tsblock(&tuple_desc, 5); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < 3; ++i) { + ASSERT_TRUE(row_appender.add_row()); + int64_t val = 1000 + i; + row_appender.append(0, reinterpret_cast(&val), + sizeof(int64_t)); + } + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + EXPECT_STREQ(schema.format, "+s"); + EXPECT_EQ(schema.n_children, 1); + EXPECT_STREQ(schema.children[0]->name, "single_col"); + EXPECT_STREQ(schema.children[0]->format, "l"); + + EXPECT_EQ(array.length, 3); + EXPECT_EQ(array.n_children, 1); + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } + } + + { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + + const int row_count = 1000; + common::TsBlock tsblock(&tuple_desc, row_count); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < row_count; ++i) { + ASSERT_TRUE(row_appender.add_row()); + int32_t int_val = i; + row_appender.append(0, reinterpret_cast(&int_val), + sizeof(int32_t)); + double double_val = i * 0.5; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + } + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + EXPECT_EQ(array.length, row_count); + EXPECT_EQ(array.n_children, 2); + + const arrow::ArrowArray* int_array = array.children[0]; + const int32_t* int_data = + reinterpret_cast(int_array->buffers[1]); + EXPECT_EQ(int_data[0], 0); + EXPECT_EQ(int_data[row_count - 1], row_count - 1); + + const arrow::ArrowArray* double_array = array.children[1]; + const double* double_data = + reinterpret_cast(double_array->buffers[1]); + EXPECT_DOUBLE_EQ(double_data[0], 0.0); + EXPECT_DOUBLE_EQ(double_data[row_count - 1], (row_count - 1) * 0.5); + + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } + } +} diff --git a/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc new file mode 100644 index 000000000..ece8ea7bf --- /dev/null +++ b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include +#include + +#include "common/record.h" +#include "common/schema.h" +#include "common/tablet.h" +#include "file/tsfile_io_writer.h" +#include "file/write_file.h" +#include "reader/table_result_set.h" +#include "reader/tsfile_reader.h" +#include "writer/chunk_writer.h" +#include "writer/tsfile_table_writer.h" + +using namespace storage; +using namespace common; + +class TsFileTableReaderBatchTest : public ::testing::Test { + protected: + void SetUp() override { + libtsfile_init(); + file_name_ = std::string("tsfile_reader_table_batch_test_") + + generate_random_string(10) + std::string(".tsfile"); + remove(file_name_.c_str()); + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + mode_t mode = 0666; + write_file_.create(file_name_, flags, mode); + } + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + std::string file_name_; + WriteFile write_file_; + + public: + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + + const std::string chars = + "0123456789" + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + std::string random_string; + + for (int i = 0; i < length; ++i) { + random_string += chars[dis(gen)]; + } + + return random_string; + } + + static TableSchema* gen_table_schema_no_tag() { + // Generate table schema with only FIELD columns (no TAG columns) + std::vector measurement_schemas; + std::vector column_categories; + int measurement_schema_num = 5; // 5 field columns + for (int i = 0; i < measurement_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "s" + std::to_string(i), TSDataType::INT64, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::FIELD); + } + return new TableSchema("testTableNoTag", measurement_schemas, + column_categories); + } + + static storage::Tablet gen_tablet_no_tag(TableSchema* table_schema, + int num_rows) { + // Generate tablet without tags (only field columns) + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), num_rows); + + for (int i = 0; i < num_rows; i++) { + tablet.add_timestamp(i, i); + auto column_schemas = table_schema->get_measurement_schemas(); + for (const auto& column_schema : column_schemas) { + if (column_schema->data_type_ == TSDataType::INT64) { + tablet.add_value(i, column_schema->measurement_name_, + static_cast(i)); + } + } + } + return tablet; + } + + static TableSchema* gen_table_schema() { + std::vector measurement_schemas; + std::vector column_categories; + int id_schema_num = 2; + int measurement_schema_num = 3; + for (int i = 0; i < id_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "id" + std::to_string(i), TSDataType::STRING, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::TAG); + } + for (int i = 0; i < measurement_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "s" + std::to_string(i), TSDataType::INT64, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::FIELD); + } + return new TableSchema("testTable", measurement_schemas, + column_categories); + } + + static storage::Tablet gen_tablet(TableSchema* table_schema, int offset, + int device_num, + int num_timestamp_per_device = 10) { + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), + device_num * num_timestamp_per_device); + + char* literal = new char[std::strlen("device_id") + 1]; + std::strcpy(literal, "device_id"); + String literal_str(literal, std::strlen("device_id")); + for (int i = 0; i < device_num; i++) { + for (int l = 0; l < num_timestamp_per_device; l++) { + int row_index = i * num_timestamp_per_device + l; + tablet.add_timestamp(row_index, row_index); + auto column_schemas = table_schema->get_measurement_schemas(); + for (const auto& column_schema : column_schemas) { + switch (column_schema->data_type_) { + case TSDataType::INT64: + tablet.add_value(row_index, + column_schema->measurement_name_, + static_cast(i)); + break; + case TSDataType::STRING: + tablet.add_value(row_index, + column_schema->measurement_name_, + literal_str); + break; + default: + break; + } + } + } + } + delete[] literal; + return tablet; + } +}; + +TEST_F(TsFileTableReaderBatchTest, BatchQueryWithSmallBatchSize) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + const int device_num = 2; + const int points_per_device = 50; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 20; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows = 0; + int block_count = 0; + common::TsBlock* block = nullptr; + + char* literal = new char[std::strlen("device_id") + 1]; + std::strcpy(literal, "device_id"); + String expected_string(literal, std::strlen("device_id")); + std::vector int64_sums(3, 0); + std::cout << "begin to start" << std::endl; + while ((ret = table_result_set->get_next_tsblock(block)) == common::E_OK) { + ASSERT_NE(block, nullptr); + block_count++; + uint32_t row_count = block->get_row_count(); + total_rows += row_count; + ASSERT_EQ(row_count, batch_size); + + common::RowIterator row_iterator(block); + while (row_iterator.has_next()) { + uint32_t len; + bool null; + + int int64_col_idx = 0; + for (uint32_t col_idx = 1; + col_idx < row_iterator.get_column_count(); ++col_idx) { + const char* value = row_iterator.read(col_idx, &len, &null); + ASSERT_FALSE(null); + TSDataType data_type = row_iterator.get_data_type(col_idx); + if (data_type == TSDataType::INT64) { + int64_t int_val = *reinterpret_cast(value); + int64_sums[int64_col_idx] += int_val; + int64_col_idx++; + std::cout << "to add" << int_val << std::endl; + } else if (data_type == TSDataType::STRING) { + String str_value(value, len); + ASSERT_EQ(str_value.compare(expected_string), 0); + } + } + row_iterator.next(); + } + } + std::cout << "finish with ret" << ret << std::endl; + std::cout << "check finished" << std::endl; + EXPECT_EQ(total_rows, device_num * points_per_device); + EXPECT_GT(block_count, 1); + for (size_t i = 0; i < int64_sums.size(); i++) { + EXPECT_EQ(int64_sums[i], 50); + } + + delete[] literal; + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, BatchQueryWithLargeBatchSize) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + const int device_num = 1; + const int points_per_device = 120; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 100; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows = 0; + int block_count = 0; + common::TsBlock* block = nullptr; + + while (table_result_set->get_next_tsblock(block) == common::E_OK) { + ASSERT_NE(block, nullptr); + block_count++; + uint32_t row_count = block->get_row_count(); + total_rows += row_count; + + ASSERT_EQ(row_count, block_count == 1 ? batch_size : 20); + } + + EXPECT_EQ(total_rows, device_num * points_per_device); + EXPECT_GE(block_count, 2); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, BatchQueryVerifyDataCorrectness) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + const int device_num = 1; + const int points_per_device = 30; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 10; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int expected_timestamp = 0; + common::TsBlock* block = nullptr; + + while (table_result_set->get_next_tsblock(block) == common::E_OK) { + ASSERT_NE(block, nullptr); + + common::RowIterator row_iterator(block); + while (row_iterator.has_next()) { + uint32_t len; + bool null; + int64_t timestamp = *reinterpret_cast( + row_iterator.read(0, &len, &null)); + ASSERT_FALSE(null); + EXPECT_EQ(timestamp, expected_timestamp); + + for (uint32_t col_idx = 2; + col_idx < row_iterator.get_column_count(); ++col_idx) { + const char* value = row_iterator.read(col_idx, &len, &null); + if (!null && row_iterator.get_data_type(col_idx) == INT64) { + int64_t int_val = *reinterpret_cast(value); + EXPECT_EQ(int_val, 0); + } + } + row_iterator.next(); + expected_timestamp++; + } + } + + EXPECT_EQ(expected_timestamp, device_num * points_per_device); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, PerformanceComparisonSinglePointVsBatch) { + // Create table schema without tags (only fields) + auto table_schema = gen_table_schema_no_tag(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + // Write a large amount of data + const int total_rows = 1000000; + auto tablet = gen_tablet_no_tag(table_schema, total_rows); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + // Test 1: Single point query (using next() method) + { + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + // Single point query: don't specify batch_size (or use 0) + auto start_time = std::chrono::high_resolution_clock::now(); + + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, + 1000000000000, tmp_result_set); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows_read = 0; + bool has_next = false; + + // Use next() method for single point query + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + total_rows_read++; + } + + auto end_time = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + + EXPECT_EQ(total_rows_read, total_rows); + std::cout << "\n=== Single Point Query (using next() method) ===" + << std::endl; + std::cout << "Total rows read: " << total_rows_read << std::endl; + std::cout << "Time taken: " << duration.count() << " ms" << std::endl; + std::cout << "Throughput: " + << (total_rows_read * 5 * 1000.0 / duration.count()) + << " rows/sec" << std::endl; + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + } + + // // Test 2: Batch query (batch_size = 1000) + // { + // storage::TsFileReader reader; + // int ret = reader.open(file_name_); + // ASSERT_EQ(ret, common::E_OK); + // + // ResultSet* tmp_result_set = nullptr; + // const int batch_size = 10000; // Batch query + // auto start_time = std::chrono::high_resolution_clock::now(); + // + // ret = reader.query(table_schema->get_table_name(), + // table_schema->get_measurement_names(), 0, + // 1000000000000, tmp_result_set, batch_size); + // ASSERT_EQ(ret, common::E_OK); + // ASSERT_NE(tmp_result_set, nullptr); + // + // auto* table_result_set = + // dynamic_cast(tmp_result_set); + // ASSERT_NE(table_result_set, nullptr); + // + // int total_rows_read = 0; + // common::TsBlock* block = nullptr; + // int block_count = 0; + // + // while ((ret = table_result_set->get_next_tsblock(block)) == + // common::E_OK) { + // ASSERT_NE(block, nullptr); + // block_count++; + // total_rows_read += block->get_row_count(); + // } + // + // auto end_time = std::chrono::high_resolution_clock::now(); + // auto duration = + // std::chrono::duration_cast( + // end_time - start_time); + // + // EXPECT_EQ(total_rows_read, total_rows); + // std::cout << "\n=== Batch Query (batch_size=10000) ===" << std::endl; + // std::cout << "Total rows read: " << total_rows_read << std::endl; + // std::cout << "Block count: " << block_count << std::endl; + // std::cout << "Time taken: " << duration.count() << " ms" << + // std::endl; std::cout << "Throughput: " + // << (total_rows_read * 5 * 1000.0 / duration.count()) + // << " rows/sec" << std::endl; + // + // reader.destroy_query_data_set(table_result_set); + // ASSERT_EQ(reader.close(), common::E_OK); + // } + + delete table_schema; +} diff --git a/cpp/third_party/zlib-1.3.1/treebuild.xml b/cpp/third_party/zlib-1.3.1/treebuild.xml index 930b00be4..8e030572a 100644 --- a/cpp/third_party/zlib-1.3.1/treebuild.xml +++ b/cpp/third_party/zlib-1.3.1/treebuild.xml @@ -1,103 +1,99 @@ - + - zip compression library - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + zip compression library + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + zip compression library + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/cpp/third_party/zlib-1.3.1/zlib-1.3.1/treebuild.xml b/cpp/third_party/zlib-1.3.1/zlib-1.3.1/treebuild.xml deleted file mode 100644 index 8e030572a..000000000 --- a/cpp/third_party/zlib-1.3.1/zlib-1.3.1/treebuild.xml +++ /dev/null @@ -1,112 +0,0 @@ - - - - zip compression library - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/python/setup.py b/python/setup.py index 40fe4e51a..5dd751f0d 100644 --- a/python/setup.py +++ b/python/setup.py @@ -89,8 +89,8 @@ def copy_tsfile_header(source): tsfile_include_dir = os.path.join(project_dir, "tsfile", "include") -extra_compile_args = ["-O0", "-g3", "-fno-omit-frame-pointer"] -extra_link_args = ["-g"] +extra_compile_args = ["-O2"] +extra_link_args = [] ext_modules_tsfile = [ # utils: from python to c or c to python. @@ -103,7 +103,7 @@ def copy_tsfile_header(source): runtime_library_dirs=[tsfile_shared_dir] if system != "Windows" else None, define_macros=[("CYTHON_TRACE_NOGIL", "1")], extra_compile_args=( - ["-std=c++11", "-O0", "-g3", "-fno-omit-frame-pointer"] if system != "Windows" else ["-std=c++11", + ["-std=c++11", "-O2"] if system != "Windows" else ["-std=c++11", "-DMS_WIN64"] ), language="c++", @@ -119,7 +119,7 @@ def copy_tsfile_header(source): runtime_library_dirs=[tsfile_shared_dir] if system != "Windows" else None, define_macros=[("CYTHON_TRACE_NOGIL", "1")], extra_compile_args=( - ["-std=c++11", "-O0", "-g3", "-fno-omit-frame-pointer"] if system != "Windows" else ["-std=c++11", + ["-std=c++11", "-O2"] if system != "Windows" else ["-std=c++11", "-DMS_WIN64"] ), language="c++", @@ -135,7 +135,7 @@ def copy_tsfile_header(source): define_macros=[("CYTHON_TRACE_NOGIL", "1")], runtime_library_dirs=[tsfile_shared_dir] if system != "Windows" else None, extra_compile_args=( - ["-std=c++11", "-O0", "-g3", "-fno-omit-frame-pointer"] if system != "Windows" else ["-std=c++11", "-DMS_WIN64"] + ["-std=c++11", "-O2"] if system != "Windows" else ["-std=c++11", "-DMS_WIN64"] ), language="c++", ) From 5fec95b5fbd8b4362a73e21ae13dfecf56aaa6c0 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Wed, 18 Mar 2026 23:45:17 +0800 Subject: [PATCH 6/8] fix some comment. --- cpp/src/common/container/bit_map.h | 26 +- cpp/src/common/tablet.cc | 5 +- cpp/src/cwrapper/arrow_c.cc | 350 +++++++-------------- cpp/src/cwrapper/tsfile_cwrapper.cc | 2 +- cpp/src/reader/qds_with_timegenerator.h | 1 + cpp/src/reader/qds_without_timegenerator.h | 1 + python/tsfile/tsfile_table_writer.py | 14 +- 7 files changed, 144 insertions(+), 255 deletions(-) diff --git a/cpp/src/common/container/bit_map.h b/cpp/src/common/container/bit_map.h index 30eda899b..4d29550ec 100644 --- a/cpp/src/common/container/bit_map.h +++ b/cpp/src/common/container/bit_map.h @@ -55,17 +55,7 @@ class BitMap { *start_addr = (*start_addr) & (~bit_mask); } - // Clear all bits in [0, count) to zero (mark all as non-null). - FORCE_INLINE void clear_all(uint32_t count) { - uint32_t full_bytes = count >> 3; - if (full_bytes > 0) { - memset(bitmap_, 0x00, full_bytes); - } - // Clear remaining bits in the last partial byte - for (uint32_t i = full_bytes << 3; i < count; i++) { - clear(i); - } - } + FORCE_INLINE void clear_all() { memset(bitmap_, 0x00, size_); } FORCE_INLINE bool test(uint32_t index) { uint32_t offset = index >> 3; @@ -76,9 +66,21 @@ class BitMap { return (*start_addr & bit_mask); } + // Count the number of bits set to 1 (i.e., number of null entries). + // __builtin_popcount is supported by GCC, Clang, and MinGW on Windows. + // TODO: add MSVC support if needed (e.g. __popcnt or manual bit count). + FORCE_INLINE uint32_t count_set_bits() const { + uint32_t count = 0; + const uint8_t* p = reinterpret_cast(bitmap_); + for (uint32_t i = 0; i < size_; i++) { + count += __builtin_popcount(p[i]); + } + return count; + } + FORCE_INLINE uint32_t get_size() { return size_; } - FORCE_INLINE char* get_bitmap() { return bitmap_; } // for debug + FORCE_INLINE char* get_bitmap() { return bitmap_; } private: FORCE_INLINE uint8_t get_bit_mask(uint32_t index) { diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 10a40ad58..148c5ee52 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -221,9 +221,10 @@ int Tablet::set_column_values(uint32_t schema_index, const void* data, if (null_bitmap == nullptr) { // All valid: bulk copy + mark all as non-null std::memcpy(dst, data, count * elem_size); - bitmaps_[schema_index].clear_all(count); + bitmaps_[schema_index].clear_all(); } else { - // Bulk copy all data (null positions will have garbage but won't be read). + // Bulk copy all data (null positions will have garbage but won't be + // read). std::memcpy(dst, data, count * elem_size); // Convert Arrow bitmap (1=valid, 0=null) to TsFile bitmap (1=null, diff --git a/cpp/src/cwrapper/arrow_c.cc b/cpp/src/cwrapper/arrow_c.cc index 2ff74185b..99a80b612 100644 --- a/cpp/src/cwrapper/arrow_c.cc +++ b/cpp/src/cwrapper/arrow_c.cc @@ -114,6 +114,32 @@ static const char* GetArrowFormatString(common::TSDataType datatype) { static size_t GetNullBitmapSize(int64_t length) { return (length + 7) / 8; } +// Build Arrow validity bitmap from TsFile Vector and store it in +// array_data->buffers[0]. Sets out_array->null_count. +// Returns E_OK on success, E_OOM on allocation failure. +static int BuildNullBitmap(common::Vector* vec, uint32_t row_count, + ArrowArrayData* array_data, ArrowArray* out_array) { + if (!vec->has_null()) { + array_data->buffers[0] = nullptr; + out_array->null_count = 0; + return common::E_OK; + } + size_t null_bitmap_size = GetNullBitmapSize(row_count); + uint8_t* null_bitmap = static_cast( + common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); + if (null_bitmap == nullptr) { + return common::E_OOM; + } + common::BitMap& vec_bitmap = vec->get_bitmap(); + char* vec_bitmap_data = vec_bitmap.get_bitmap(); + for (size_t i = 0; i < null_bitmap_size; ++i) { + null_bitmap[i] = ~static_cast(vec_bitmap_data[i]); + } + array_data->buffers[0] = null_bitmap; + out_array->null_count = vec_bitmap.count_set_bits(); + return common::E_OK; +} + // Reset all fields of an ArrowArray to zero/null after releasing. static void ResetArrowArray(ArrowArray* array) { array->length = 0; @@ -214,6 +240,36 @@ static void ReleaseArrowSchema(ArrowSchema* schema) { schema->private_data = nullptr; } +static ArrowArrayData* AllocArrowArrayData(int64_t n_buffers) { + ArrowArrayData* data = static_cast( + common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + if (data == nullptr) return nullptr; + data->n_buffers = n_buffers; + data->buffers = static_cast( + common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); + if (data->buffers == nullptr) { + common::mem_free(data); + return nullptr; + } + for (int64_t i = 0; i < n_buffers; ++i) { + data->buffers[i] = nullptr; + } + return data; +} + +static void FinalizeArrowArray(ArrowArray* out_array, + ArrowArrayData* array_data, uint32_t row_count) { + out_array->length = row_count; + out_array->offset = 0; + out_array->n_buffers = array_data->n_buffers; + out_array->n_children = 0; + out_array->buffers = const_cast(array_data->buffers); + out_array->children = nullptr; + out_array->dictionary = nullptr; + out_array->release = ReleaseArrowArray; + out_array->private_data = array_data; +} + template inline int BuildFixedLengthArrowArrayC(common::Vector* vec, uint32_t row_count, ArrowArray* out_array) { @@ -223,55 +279,14 @@ inline int BuildFixedLengthArrowArrayC(common::Vector* vec, uint32_t row_count, bool has_null = vec->has_null(); size_t type_size = sizeof(CType); - // Arrow C Data Interface: fixed-width types always have 2 buffers - // buffers[0] = validity bitmap (may be NULL if no nulls) - // buffers[1] = values - static constexpr int64_t n_buffers = 2; - ArrowArrayData* array_data = static_cast( - common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); - if (array_data == nullptr) { - return common::E_OOM; - } + ArrowArrayData* array_data = AllocArrowArrayData(2); + if (array_data == nullptr) return common::E_OOM; - array_data->n_buffers = n_buffers; - array_data->buffers = static_cast( - common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); - if (array_data->buffers == nullptr) { + int bm_ret = BuildNullBitmap(vec, row_count, array_data, out_array); + if (bm_ret != common::E_OK) { FreeArrowArrayData(array_data); - return common::E_OOM; - } - - for (int64_t i = 0; i < n_buffers; ++i) { - array_data->buffers[i] = nullptr; - } - - uint8_t* null_bitmap = nullptr; - if (has_null) { - size_t null_bitmap_size = GetNullBitmapSize(row_count); - null_bitmap = static_cast( - common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); - if (null_bitmap == nullptr) { - FreeArrowArrayData(array_data); - return common::E_OOM; - } - common::BitMap& vec_bitmap = vec->get_bitmap(); - char* vec_bitmap_data = vec_bitmap.get_bitmap(); - for (size_t i = 0; i < null_bitmap_size; ++i) { - null_bitmap[i] = ~static_cast(vec_bitmap_data[i]); - } - array_data->buffers[0] = null_bitmap; - - int64_t null_count = 0; - for (uint32_t i = 0; i < row_count; ++i) { - if (vec_bitmap.test(i)) { - null_count++; - } - } - out_array->null_count = null_count; - } else { - array_data->buffers[0] = nullptr; - out_array->null_count = 0; + return bm_ret; } char* vec_data = vec->get_value_data().get_data(); @@ -282,9 +297,6 @@ inline int BuildFixedLengthArrowArrayC(common::Vector* vec, uint32_t row_count, uint8_t* packed_buffer = static_cast( common::mem_alloc(packed_size, common::MOD_TSBLOCK)); if (packed_buffer == nullptr) { - if (null_bitmap != nullptr) { - common::mem_free(null_bitmap); - } FreeArrowArrayData(array_data); return common::E_OOM; } @@ -313,9 +325,6 @@ inline int BuildFixedLengthArrowArrayC(common::Vector* vec, uint32_t row_count, size_t data_size = type_size * row_count; data_buffer = common::mem_alloc(data_size, common::MOD_TSBLOCK); if (data_buffer == nullptr) { - if (null_bitmap != nullptr) { - common::mem_free(null_bitmap); - } FreeArrowArrayData(array_data); return common::E_OOM; } @@ -343,17 +352,7 @@ inline int BuildFixedLengthArrowArrayC(common::Vector* vec, uint32_t row_count, } array_data->buffers[1] = data_buffer; - - out_array->length = row_count; - out_array->offset = 0; - out_array->n_buffers = n_buffers; - out_array->n_children = 0; - out_array->buffers = const_cast(array_data->buffers); - out_array->children = nullptr; - out_array->dictionary = nullptr; - out_array->release = ReleaseArrowArray; - out_array->private_data = array_data; - + FinalizeArrowArray(out_array, array_data, row_count); return common::E_OK; } @@ -364,59 +363,20 @@ static int BuildStringArrowArrayC(common::Vector* vec, uint32_t row_count, } bool has_null = vec->has_null(); - int64_t n_buffers = 3; - ArrowArrayData* array_data = static_cast( - common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); - if (array_data == nullptr) { - return common::E_OOM; - } - array_data->n_buffers = n_buffers; - array_data->buffers = static_cast( - common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); - if (array_data->buffers == nullptr) { - FreeArrowArrayData(array_data); - return common::E_OOM; - } + ArrowArrayData* array_data = AllocArrowArrayData(3); + if (array_data == nullptr) return common::E_OOM; - for (int64_t i = 0; i < n_buffers; ++i) { - array_data->buffers[i] = nullptr; + int bm_ret = BuildNullBitmap(vec, row_count, array_data, out_array); + if (bm_ret != common::E_OK) { + FreeArrowArrayData(array_data); + return bm_ret; } - uint8_t* null_bitmap = nullptr; - if (has_null) { - size_t null_bitmap_size = GetNullBitmapSize(row_count); - null_bitmap = static_cast( - common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); - if (null_bitmap == nullptr) { - FreeArrowArrayData(array_data); - return common::E_OOM; - } - common::BitMap& vec_bitmap = vec->get_bitmap(); - char* vec_bitmap_data = vec_bitmap.get_bitmap(); - for (size_t i = 0; i < null_bitmap_size; ++i) { - null_bitmap[i] = ~static_cast(vec_bitmap_data[i]); - } - array_data->buffers[0] = null_bitmap; - - int64_t null_count = 0; - for (uint32_t i = 0; i < row_count; ++i) { - if (vec_bitmap.test(i)) { - null_count++; - } - } - out_array->null_count = null_count; - } else { - array_data->buffers[0] = nullptr; - out_array->null_count = 0; - } size_t offsets_size = sizeof(int32_t) * (row_count + 1); int32_t* offsets = static_cast( common::mem_alloc(offsets_size, common::MOD_TSBLOCK)); if (offsets == nullptr) { - if (null_bitmap != nullptr) { - common::mem_free(null_bitmap); - } FreeArrowArrayData(array_data); return common::E_OOM; } @@ -459,17 +419,7 @@ static int BuildStringArrowArrayC(common::Vector* vec, uint32_t row_count, } array_data->buffers[1] = offsets; array_data->buffers[2] = data_buffer; - - out_array->length = row_count; - out_array->offset = 0; - out_array->n_buffers = n_buffers; - out_array->n_children = 0; - out_array->buffers = const_cast(array_data->buffers); - out_array->children = nullptr; - out_array->dictionary = nullptr; - out_array->release = ReleaseArrowArray; - out_array->private_data = array_data; - + FinalizeArrowArray(out_array, array_data, row_count); return common::E_OK; } @@ -480,50 +430,20 @@ static int BuildDateArrowArrayC(common::Vector* vec, uint32_t row_count, } bool has_null = vec->has_null(); - static constexpr int64_t n_buffers = 2; - ArrowArrayData* array_data = static_cast( - common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + ArrowArrayData* array_data = AllocArrowArrayData(2); if (array_data == nullptr) return common::E_OOM; - array_data->n_buffers = n_buffers; - array_data->buffers = static_cast( - common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); - if (array_data->buffers == nullptr) { - FreeArrowArrayData(array_data); - return common::E_OOM; - } - for (int64_t i = 0; i < n_buffers; ++i) array_data->buffers[i] = nullptr; - common::BitMap& vec_bitmap = vec->get_bitmap(); - uint8_t* null_bitmap = nullptr; - if (has_null) { - size_t null_bitmap_size = GetNullBitmapSize(row_count); - null_bitmap = static_cast( - common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); - if (null_bitmap == nullptr) { - FreeArrowArrayData(array_data); - return common::E_OOM; - } - char* vec_bitmap_data = vec_bitmap.get_bitmap(); - for (size_t i = 0; i < null_bitmap_size; ++i) { - null_bitmap[i] = ~static_cast(vec_bitmap_data[i]); - } - int64_t null_count = 0; - for (uint32_t i = 0; i < row_count; ++i) { - if (vec_bitmap.test(i)) null_count++; - } - out_array->null_count = null_count; - array_data->buffers[0] = null_bitmap; - } else { - out_array->null_count = 0; - array_data->buffers[0] = nullptr; + int bm_ret = BuildNullBitmap(vec, row_count, array_data, out_array); + if (bm_ret != common::E_OK) { + FreeArrowArrayData(array_data); + return bm_ret; } int32_t* data_buffer = static_cast( common::mem_alloc(sizeof(int32_t) * row_count, common::MOD_TSBLOCK)); if (data_buffer == nullptr) { - if (null_bitmap) common::mem_free(null_bitmap); FreeArrowArrayData(array_data); return common::E_OOM; } @@ -542,15 +462,7 @@ static int BuildDateArrowArrayC(common::Vector* vec, uint32_t row_count, } array_data->buffers[1] = data_buffer; - out_array->length = row_count; - out_array->offset = 0; - out_array->n_buffers = n_buffers; - out_array->n_children = 0; - out_array->buffers = const_cast(array_data->buffers); - out_array->children = nullptr; - out_array->dictionary = nullptr; - out_array->release = ReleaseArrowArray; - out_array->private_data = array_data; + FinalizeArrowArray(out_array, array_data, row_count); return common::E_OK; } @@ -776,11 +688,13 @@ static common::TSDataType ArrowFormatToDataType(const char* format) { } // Convert Arrow C Data Interface struct array to storage::Tablet. -// The timestamp column (format "tsn:") is used as tablet timestamps; -// all other columns become tablet data columns. +// time_col_index specifies which column in the Arrow struct to use as the +// timestamp column. +// All other columns become data columns in the Tablet. // reg_schema: optional registered TableSchema; when provided its column types // are used in the Tablet (so they match the writer's registered schema -// exactly). Arrow format strings are still used to decode the actual buffers. +// exactly). +// Arrow format strings are still used to decode the actual buffers. int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, const ArrowSchema* in_schema, const storage::TableSchema* reg_schema, @@ -792,9 +706,9 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, int64_t n_cols = in_schema->n_children; if (n_rows <= 0 || n_cols == 0) return common::E_INVALID_ARG; - // time_col_index >= 0: use the specified column as time column - // time_col_index < 0: auto-detect by Arrow format "tsn:" (TIMESTAMP) - int time_col_idx = time_col_index; + if (time_col_index < 0 || time_col_index >= n_cols) + return common::E_INVALID_ARG; + std::vector col_names; std::vector col_types; std::vector read_modes; @@ -806,30 +720,25 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, } for (int64_t i = 0; i < n_cols; i++) { + if (static_cast(i) == time_col_index) continue; const ArrowSchema* child = in_schema->children[i]; common::TSDataType read_mode = ArrowFormatToDataType(child->format); if (read_mode == common::INVALID_DATATYPE) return common::E_TYPE_NOT_SUPPORTED; - // Skip the time column (either explicitly specified or auto-detected) - if (static_cast(i) == time_col_idx || - (time_col_idx < 0 && read_mode == common::TIMESTAMP)) { - time_col_idx = static_cast(i); - } else { - std::string col_name = child->name ? child->name : ""; - common::TSDataType col_type = read_mode; - if (reg_schema) { - int reg_idx = const_cast(reg_schema) - ->find_column_index(col_name); - if (reg_idx >= 0 && - reg_idx < static_cast(reg_data_types.size())) { - col_type = reg_data_types[reg_idx]; - } + std::string col_name = child->name ? child->name : ""; + common::TSDataType col_type = read_mode; + if (reg_schema) { + int reg_idx = const_cast(reg_schema) + ->find_column_index(col_name); + if (reg_idx >= 0 && + reg_idx < static_cast(reg_data_types.size())) { + col_type = reg_data_types[reg_idx]; } - col_names.emplace_back(std::move(col_name)); - col_types.push_back(col_type); - read_modes.push_back(read_mode); - data_col_indices.push_back(static_cast(i)); } + col_names.emplace_back(std::move(col_name)); + col_types.push_back(col_type); + read_modes.push_back(read_mode); + data_col_indices.push_back(static_cast(i)); } if (col_names.empty()) return common::E_INVALID_ARG; @@ -844,15 +753,11 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, } // Fill timestamps from the time column - if (time_col_idx >= 0) { - const ArrowArray* ts_arr = in_array->children[time_col_idx]; - const int64_t* ts_buf = static_cast(ts_arr->buffers[1]); - int64_t off = ts_arr->offset; - for (int64_t r = 0; r < n_rows; r++) { - if (ArrowIsValid(ts_arr, r)) - tablet->add_timestamp(static_cast(r), - ts_buf[off + r]); - } + { + const ArrowArray* ts_arr = in_array->children[time_col_index]; + const int64_t* ts_buf = + static_cast(ts_arr->buffers[1]) + ts_arr->offset; + tablet->set_timestamps(ts_buf, static_cast(n_rows)); } // Fill data columns from Arrow children (use read_modes to decode buffers) @@ -862,9 +767,13 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, uint32_t tcol = static_cast(ci); int64_t off = col_arr->offset; + const uint8_t* validity = + (col_arr->null_count > 0 && col_arr->buffers[0] != nullptr) + ? static_cast(col_arr->buffers[0]) + : nullptr; + switch (dtype) { case common::BOOLEAN: { - // Arrow boolean: bit-packed in buffers[1] const uint8_t* vals = static_cast(col_arr->buffers[1]); for (int64_t r = 0; r < n_rows; r++) { @@ -875,44 +784,12 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, } break; } - case common::INT32: { - const int32_t* vals = - static_cast(col_arr->buffers[1]); - for (int64_t r = 0; r < n_rows; r++) { - if (ArrowIsValid(col_arr, r)) - tablet->add_value(static_cast(r), - tcol, vals[off + r]); - } - break; - } - case common::INT64: { - const int64_t* vals = - static_cast(col_arr->buffers[1]); - for (int64_t r = 0; r < n_rows; r++) { - if (ArrowIsValid(col_arr, r)) - tablet->add_value(static_cast(r), - tcol, vals[off + r]); - } - break; - } - case common::FLOAT: { - const float* vals = - static_cast(col_arr->buffers[1]); - for (int64_t r = 0; r < n_rows; r++) { - if (ArrowIsValid(col_arr, r)) - tablet->add_value(static_cast(r), tcol, - vals[off + r]); - } - break; - } + case common::INT32: + case common::INT64: + case common::FLOAT: case common::DOUBLE: { - const double* vals = - static_cast(col_arr->buffers[1]); - for (int64_t r = 0; r < n_rows; r++) { - if (ArrowIsValid(col_arr, r)) - tablet->add_value(static_cast(r), - tcol, vals[off + r]); - } + tablet->set_column_values(tcol, col_arr->buffers[1], validity, + static_cast(n_rows)); break; } case common::DATE: { @@ -930,9 +807,8 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, break; } case common::TEXT: - case common::STRING: { - // Arrow UTF-8 string: buffers[1]=int32 offsets, buffers[2]=char - // data + case common::STRING: + case common::BLOB: { const int32_t* offsets = static_cast(col_arr->buffers[1]); const char* data = diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index f27db4e18..620b8392f 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -40,7 +40,7 @@ int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, const ArrowSchema* in_schema, const storage::TableSchema* reg_schema, - storage::Tablet** out_tablet, int time_col_index = -1); + storage::Tablet** out_tablet, int time_col_index); } // namespace arrow #ifdef __cplusplus diff --git a/cpp/src/reader/qds_with_timegenerator.h b/cpp/src/reader/qds_with_timegenerator.h index 596b11366..52892df14 100644 --- a/cpp/src/reader/qds_with_timegenerator.h +++ b/cpp/src/reader/qds_with_timegenerator.h @@ -123,6 +123,7 @@ class QDSWithTimeGenerator : public ResultSet { bool is_null(uint32_t column_index); RowRecord* get_row_record(); std::shared_ptr get_metadata(); + private: int construct_node_tree(Expression* expr, Node*& node); diff --git a/cpp/src/reader/qds_without_timegenerator.h b/cpp/src/reader/qds_without_timegenerator.h index 44479d50a..0619fa673 100644 --- a/cpp/src/reader/qds_without_timegenerator.h +++ b/cpp/src/reader/qds_without_timegenerator.h @@ -48,6 +48,7 @@ class QDSWithoutTimeGenerator : public ResultSet { bool is_null(uint32_t column_index); RowRecord* get_row_record(); std::shared_ptr get_metadata(); + private: int get_next_tsblock(uint32_t index, bool alloc_mem); diff --git a/python/tsfile/tsfile_table_writer.py b/python/tsfile/tsfile_table_writer.py index 1db3d94db..cfd817fec 100644 --- a/python/tsfile/tsfile_table_writer.py +++ b/python/tsfile/tsfile_table_writer.py @@ -187,11 +187,19 @@ def write_arrow_batch(self, data): Write a PyArrow RecordBatch or Table into tsfile using Arrow C Data Interface for efficient batch writing without Python-level row loops. :param data: pyarrow.RecordBatch or pyarrow.Table. Must include a - timestamp-typed column (pa.timestamp) which is used as the row - timestamps. All other columns must match the registered schema. + timestamp column. All other columns must match the registered schema. :return: no return value. """ - self.writer.write_arrow_batch(self.tableSchema.get_table_name(), data) + time_col = self.tableSchema.get_time_column() + if time_col is not None: + time_col_name = time_col.get_column_name() + else: + time_col_name = "time" + + time_col_index = data.schema.get_field_index(time_col_name) + if time_col_index < 0: + raise ValueError(f"Time column '{time_col_name}' not found in Arrow schema.") + self.writer.write_arrow_batch(self.tableSchema.get_table_name(), data, time_col_index) def close(self): """ From c8e4aa954b0f821c622dc2b4c34353240733ddcd Mon Sep 17 00:00:00 2001 From: ColinLee Date: Wed, 18 Mar 2026 23:57:32 +0800 Subject: [PATCH 7/8] fix comment. --- cpp/src/cwrapper/arrow_c.cc | 2 +- cpp/src/reader/table_query_executor.h | 2 +- cpp/src/reader/tsfile_reader.h | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cpp/src/cwrapper/arrow_c.cc b/cpp/src/cwrapper/arrow_c.cc index 99a80b612..fc3a81f79 100644 --- a/cpp/src/cwrapper/arrow_c.cc +++ b/cpp/src/cwrapper/arrow_c.cc @@ -743,7 +743,7 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, if (col_names.empty()) return common::E_INVALID_ARG; - std::string tname = table_name ? table_name : ""; + std::string tname = table_name ? table_name : "default_table"; auto* tablet = new storage::Tablet(tname, &col_names, &col_types, static_cast(n_rows)); if (tablet->err_code_ != common::E_OK) { diff --git a/cpp/src/reader/table_query_executor.h b/cpp/src/reader/table_query_executor.h index 0f2642f82..718947e5a 100644 --- a/cpp/src/reader/table_query_executor.h +++ b/cpp/src/reader/table_query_executor.h @@ -46,7 +46,7 @@ class TableQueryExecutor { table_query_ordering_(table_query_ordering), block_size_(block_size), return_mode_(RETURN_ROW) {} - TableQueryExecutor(ReadFile* read_file, const int batch_size = 0) { + TableQueryExecutor(ReadFile* read_file, const int batch_size = -1) { tsfile_io_reader_ = new TsFileIOReader(); tsfile_io_reader_->init(read_file); meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_); diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index 526a96351..60ebfc98b 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -92,10 +92,12 @@ class TsFileReader { * @param [in] start_time the start time * @param [in] end_time the end time * @param [out] result_set the result set + * @param [in] batch_size <= 0 means row-by-row return mode, + * > 0 means return TsBlock with the specified block size. */ int query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, - int64_t end_time, ResultSet*& result_set, int batch_size = 0); + int64_t end_time, ResultSet*& result_set, int batch_size = -1); /** * @brief query the tsfile by the table name, columns names, start time From 12bbcba3c8359ba538387e1d417cf676a0efd77f Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 19 Mar 2026 16:36:54 +0800 Subject: [PATCH 8/8] fix comment. --- cpp/src/common/container/bit_map.h | 25 ++++ cpp/src/common/tablet.cc | 21 +--- cpp/src/common/tablet.h | 19 ++- cpp/src/cwrapper/arrow_c.cc | 178 ++++++++++++++++++++--------- cpp/src/cwrapper/tsfile_cwrapper.h | 3 +- 5 files changed, 170 insertions(+), 76 deletions(-) diff --git a/cpp/src/common/container/bit_map.h b/cpp/src/common/container/bit_map.h index 4d29550ec..7c60a1ea3 100644 --- a/cpp/src/common/container/bit_map.h +++ b/cpp/src/common/container/bit_map.h @@ -78,6 +78,31 @@ class BitMap { return count; } + // Find the next set bit (null position) at or after @from, + // within [0, total_bits). Returns total_bits if none found. + // Skips zero bytes in bulk so cost is proportional to the number + // of null bytes, not total rows. + FORCE_INLINE uint32_t next_set_bit(uint32_t from, + uint32_t total_bits) const { + if (from >= total_bits) return total_bits; + const uint8_t* p = reinterpret_cast(bitmap_); + uint32_t byte_idx = from >> 3; + // Check remaining bits in the first (partial) byte + uint8_t byte_val = p[byte_idx] >> (from & 7); + if (byte_val) { + return from + __builtin_ctz(byte_val); + } + // Scan subsequent full bytes, skipping zeros + const uint32_t byte_end = (total_bits + 7) >> 3; + for (++byte_idx; byte_idx < byte_end; ++byte_idx) { + if (p[byte_idx]) { + uint32_t pos = (byte_idx << 3) + __builtin_ctz(p[byte_idx]); + return pos < total_bits ? pos : total_bits; + } + } + return total_bits; + } + FORCE_INLINE uint32_t get_size() { return size_; } FORCE_INLINE char* get_bitmap() { return bitmap_; } diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 148c5ee52..68bdb898d 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -177,7 +177,7 @@ int Tablet::set_timestamps(const int64_t* timestamps, uint32_t count) { } int Tablet::set_column_values(uint32_t schema_index, const void* data, - const uint8_t* null_bitmap, uint32_t count) { + const uint8_t* bitmap, uint32_t count) { if (err_code_ != E_OK) { return err_code_; } @@ -218,7 +218,7 @@ int Tablet::set_column_values(uint32_t schema_index, const void* data, return E_TYPE_NOT_SUPPORTED; } - if (null_bitmap == nullptr) { + if (bitmap == nullptr) { // All valid: bulk copy + mark all as non-null std::memcpy(dst, data, count * elem_size); bitmaps_[schema_index].clear_all(); @@ -227,20 +227,11 @@ int Tablet::set_column_values(uint32_t schema_index, const void* data, // read). std::memcpy(dst, data, count * elem_size); - // Convert Arrow bitmap (1=valid, 0=null) to TsFile bitmap (1=null, - // 0=valid) by inverting and writing directly. + // bitmap uses TsFile convention (1=null, 0=valid), same as + // internal BitMap, so copy directly. char* tsfile_bm = bitmaps_[schema_index].get_bitmap(); - uint32_t full_bytes = count / 8; - for (uint32_t i = 0; i < full_bytes; i++) { - tsfile_bm[i] = ~static_cast(null_bitmap[i]); - } - // Handle remaining bits in the last partial byte - for (uint32_t i = full_bytes * 8; i < count; i++) { - bool valid = (null_bitmap[i / 8] >> (i % 8)) & 1; - if (valid) { - bitmaps_[schema_index].clear(i); - } - } + uint32_t bm_bytes = (count + 7) / 8; + std::memcpy(tsfile_bm, bitmap, bm_bytes); } cur_row_size_ = std::max(count, cur_row_size_); return E_OK; diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index 9b818a271..02691087d 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -181,13 +181,24 @@ class Tablet { */ int add_timestamp(uint32_t row_index, int64_t timestamp); + /** + * @brief Bulk copy timestamps into the tablet. + * + * @param timestamps Pointer to an array of timestamp values. + * @param count Number of timestamps to copy. Must be <= max_row_num. + * If count > cur_row_size_, cur_row_size_ is updated to count, + * so that subsequent operations know how many rows are populated. + * @return Returns 0 on success, or a non-zero error code on failure + * (E_OUT_OF_RANGE if count > max_row_num). + */ int set_timestamps(const int64_t* timestamps, uint32_t count); - // Bulk copy fixed-length column data. If null_bitmap is nullptr, all rows - // are non-null. Otherwise bit=1 means valid, bit=0 means null (Arrow - // convention). + // Bulk copy fixed-length column data. If bitmap is nullptr, all rows are + // non-null. Otherwise bit=1 means null, bit=0 means valid (same as TsFile + // BitMap convention). Callers using other conventions (e.g. Arrow, where + // 1=valid) must invert before calling. int set_column_values(uint32_t schema_index, const void* data, - const uint8_t* null_bitmap, uint32_t count); + const uint8_t* bitmap, uint32_t count); void* get_value(int row_index, uint32_t schema_index, common::TSDataType& data_type) const; diff --git a/cpp/src/cwrapper/arrow_c.cc b/cpp/src/cwrapper/arrow_c.cc index fc3a81f79..6f56cfc6a 100644 --- a/cpp/src/cwrapper/arrow_c.cc +++ b/cpp/src/cwrapper/arrow_c.cc @@ -73,8 +73,8 @@ struct ArrowArrayData { // Owns format/name strings and children schemas. // Stored in ArrowSchema.private_data; freed by ReleaseArrowSchema. struct ArrowSchemaData { - std::vector* format_strings; - std::vector* name_strings; + std::string format_string; + std::string name_string; ArrowSchema** children; size_t n_children; }; @@ -218,8 +218,6 @@ static void FreeArrowSchemaData(ArrowSchemaData* data) { } common::mem_free(data->children); } - delete data->format_strings; - delete data->name_strings; delete data; } @@ -305,19 +303,32 @@ inline int BuildFixedLengthArrowArrayC(common::Vector* vec, uint32_t row_count, // Vector stores booleans as one byte each, densely packed // (null rows have no entry). Scatter into Arrow bit-packed format. - common::BitMap& bm = vec->get_bitmap(); - uint32_t src_idx = 0; + // Use next_set_bit to skip null rows without per-row bitmap testing. const uint8_t* src = reinterpret_cast(vec_data); - for (uint32_t i = 0; i < row_count; ++i) { - if (has_null && bm.test(i)) { - continue; // null row, no data in value buffer + uint32_t src_idx = 0; + if (has_null) { + common::BitMap& bm = vec->get_bitmap(); + uint32_t pos = 0; + while (pos < row_count) { + uint32_t null_pos = bm.next_set_bit(pos, row_count); + // Process non-null run [pos, null_pos) + for (uint32_t i = pos; i < null_pos; ++i) { + if (src[src_idx] != 0) { + packed_buffer[i / 8] |= (1 << (i & 7)); + } + src_idx++; + } + if (null_pos >= row_count) break; + // Skip null row (no source data, packed_buffer already zeroed) + pos = null_pos + 1; } - if (src[src_idx] != 0) { - uint32_t byte_idx = i / 8; - uint32_t bit_idx = i % 8; - packed_buffer[byte_idx] |= (1 << bit_idx); + } else { + for (uint32_t i = 0; i < row_count; ++i) { + if (src[src_idx] != 0) { + packed_buffer[i / 8] |= (1 << (i & 7)); + } + src_idx++; } - src_idx++; } data_buffer = packed_buffer; @@ -332,18 +343,26 @@ inline int BuildFixedLengthArrowArrayC(common::Vector* vec, uint32_t row_count, if (has_null) { // Value buffer is densely packed (no slots for null rows). // Scatter non-null values into their correct Arrow positions. + // Use next_set_bit to jump between null positions and bulk-copy + // contiguous non-null runs in between. common::BitMap& bm = vec->get_bitmap(); + char* dst = static_cast(data_buffer); uint32_t src_offset = 0; - for (uint32_t i = 0; i < row_count; ++i) { - if (bm.test(i)) { - // null row: write zero placeholder in Arrow buffer - std::memset(static_cast(data_buffer) + i * type_size, - 0, type_size); - } else { - std::memcpy(static_cast(data_buffer) + i * type_size, - vec_data + src_offset, type_size); - src_offset += type_size; + uint32_t pos = 0; + + while (pos < row_count) { + uint32_t null_pos = bm.next_set_bit(pos, row_count); + // Copy the non-null run [pos, null_pos) + if (null_pos > pos) { + uint32_t run = null_pos - pos; + std::memcpy(dst + pos * type_size, vec_data + src_offset, + run * type_size); + src_offset += run * type_size; } + if (null_pos >= row_count) break; + // Zero-fill the null slot + std::memset(dst + null_pos * type_size, 0, type_size); + pos = null_pos + 1; } } else { // No nulls: value buffer is dense and complete, direct copy @@ -400,22 +419,44 @@ static int BuildStringArrowArrayC(common::Vector* vec, uint32_t row_count, } // Single pass: build offsets and copy string data together. + // Use next_set_bit to skip null rows without per-row bitmap testing. offsets[0] = 0; uint32_t data_offset = 0; - for (uint32_t i = 0; i < row_count; ++i) { - if (has_null && vec_bitmap.test(i)) { - offsets[i + 1] = data_offset; - continue; + if (has_null) { + uint32_t pos = 0; + while (pos < row_count) { + uint32_t null_pos = vec_bitmap.next_set_bit(pos, row_count); + // Process non-null run [pos, null_pos) + for (uint32_t i = pos; i < null_pos; ++i) { + uint32_t len = 0; + std::memcpy(&len, vec_data + vec_offset, sizeof(uint32_t)); + vec_offset += sizeof(uint32_t); + if (len > 0) { + std::memcpy(data_buffer + data_offset, + vec_data + vec_offset, len); + } + vec_offset += len; + data_offset += len; + offsets[i + 1] = data_offset; + } + if (null_pos >= row_count) break; + // Null row: no source data, offset stays the same + offsets[null_pos + 1] = data_offset; + pos = null_pos + 1; } - uint32_t len = 0; - std::memcpy(&len, vec_data + vec_offset, sizeof(uint32_t)); - vec_offset += sizeof(uint32_t); - if (len > 0) { - std::memcpy(data_buffer + data_offset, vec_data + vec_offset, len); + } else { + for (uint32_t i = 0; i < row_count; ++i) { + uint32_t len = 0; + std::memcpy(&len, vec_data + vec_offset, sizeof(uint32_t)); + vec_offset += sizeof(uint32_t); + if (len > 0) { + std::memcpy(data_buffer + data_offset, vec_data + vec_offset, + len); + } + vec_offset += len; + data_offset += len; + offsets[i + 1] = data_offset; } - vec_offset += len; - data_offset += len; - offsets[i + 1] = data_offset; } array_data->buffers[1] = offsets; array_data->buffers[2] = data_buffer; @@ -448,12 +489,27 @@ static int BuildDateArrowArrayC(common::Vector* vec, uint32_t row_count, return common::E_OOM; } + // Use next_set_bit to skip null rows without per-row bitmap testing. char* vec_data = vec->get_value_data().get_data(); uint32_t src_offset = 0; - for (uint32_t i = 0; i < row_count; ++i) { - if (has_null && vec_bitmap.test(i)) { - data_buffer[i] = 0; - } else { + if (has_null) { + uint32_t pos = 0; + while (pos < row_count) { + uint32_t null_pos = vec_bitmap.next_set_bit(pos, row_count); + // Process non-null run [pos, null_pos) + for (uint32_t i = pos; i < null_pos; ++i) { + int32_t yyyymmdd = 0; + std::memcpy(&yyyymmdd, vec_data + src_offset, sizeof(int32_t)); + src_offset += sizeof(int32_t); + data_buffer[i] = common::YYYYMMDDToDaysSinceEpoch(yyyymmdd); + } + if (null_pos >= row_count) break; + // Null row: zero fill + data_buffer[null_pos] = 0; + pos = null_pos + 1; + } + } else { + for (uint32_t i = 0; i < row_count; ++i) { int32_t yyyymmdd = 0; std::memcpy(&yyyymmdd, vec_data + src_offset, sizeof(int32_t)); src_offset += sizeof(int32_t); @@ -527,16 +583,13 @@ static int BuildColumnArrowSchema(common::TSDataType data_type, } ArrowSchemaData* schema_data = new ArrowSchemaData(); - schema_data->format_strings = new std::vector(); - schema_data->name_strings = new std::vector(); + schema_data->format_string = format; + schema_data->name_string = column_name; schema_data->children = nullptr; schema_data->n_children = 0; - schema_data->format_strings->push_back(format); - schema_data->name_strings->push_back(column_name); - - out_schema->format = schema_data->format_strings->back().c_str(); - out_schema->name = schema_data->name_strings->back().c_str(); + out_schema->format = schema_data->format_string.c_str(); + out_schema->name = schema_data->name_string.c_str(); out_schema->metadata = nullptr; out_schema->flags = ARROW_FLAG_NULLABLE; out_schema->n_children = 0; @@ -564,8 +617,8 @@ int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, // Build ArrowSchema for struct type ArrowSchemaData* schema_data = new ArrowSchemaData(); - schema_data->format_strings = new std::vector(); - schema_data->name_strings = new std::vector(); + schema_data->format_string = "+s"; + schema_data->name_string = ""; schema_data->n_children = column_count; schema_data->children = static_cast(common::mem_alloc( column_count * sizeof(ArrowSchema*), common::MOD_TSBLOCK)); @@ -578,10 +631,6 @@ int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, schema_data->children[i] = nullptr; } - // Store format string for struct type - schema_data->format_strings->push_back("+s"); - schema_data->name_strings->push_back(""); - // Build schema for each column for (uint32_t i = 0; i < column_count; ++i) { schema_data->children[i] = static_cast( @@ -603,8 +652,8 @@ int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, } } - out_schema->format = schema_data->format_strings->at(0).c_str(); - out_schema->name = schema_data->name_strings->at(0).c_str(); + out_schema->format = schema_data->format_string.c_str(); + out_schema->name = schema_data->name_string.c_str(); out_schema->metadata = nullptr; out_schema->flags = 0; out_schema->n_children = column_count; @@ -788,8 +837,27 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, case common::INT64: case common::FLOAT: case common::DOUBLE: { - tablet->set_column_values(tcol, col_arr->buffers[1], validity, + // Invert Arrow bitmap (1=valid) to TsFile bitmap (1=null) + const uint8_t* null_bm = nullptr; + uint8_t* inverted_bm = nullptr; + if (validity != nullptr) { + uint32_t bm_bytes = (static_cast(n_rows) + 7) / 8; + inverted_bm = static_cast( + common::mem_alloc(bm_bytes, common::MOD_TSBLOCK)); + if (inverted_bm == nullptr) { + delete tablet; + return common::E_OOM; + } + for (uint32_t b = 0; b < bm_bytes; b++) { + inverted_bm[b] = ~validity[b]; + } + null_bm = inverted_bm; + } + tablet->set_column_values(tcol, col_arr->buffers[1], null_bm, static_cast(n_rows)); + if (inverted_bm != nullptr) { + common::mem_free(inverted_bm); + } break; } case common::DATE: { diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index b496cb12f..49de2b5b2 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -720,8 +720,7 @@ ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet); // Write Arrow C Data Interface batch into a table (Arrow -> Tablet -> write). // time_col_index: index of the time column in the Arrow struct. -// >= 0: use the specified column as the time column. -// < 0: auto-detect by Arrow format "tsn:" (TIMESTAMP type). +// Caller should determine the correct time_col_index before calling. ERRNO _tsfile_writer_write_arrow_table(TsFileWriter writer, const char* table_name, ArrowArray* array, ArrowSchema* schema,