Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7eb6248
Proof of concept German-style strings support. Quite a few CometFuzzT…
mbutrovich Jun 12, 2025
6ed9013
Hashing support.
mbutrovich Jun 12, 2025
a90edc0
Checkpoint. TPC-H runs.
mbutrovich Jun 13, 2025
a265924
Checkpoint.
mbutrovich Jun 14, 2025
03f0b89
JVM shuffle support.
mbutrovich Aug 1, 2025
bc17c27
Better cast and literal null support.
mbutrovich Aug 1, 2025
c9f9d16
Merge branch 'main' into german_style_strings
mbutrovich Aug 20, 2025
a8cbfb7
Compact RecordBatches with *View types before sending to IPC for shuf…
mbutrovich Aug 21, 2025
9a479f1
Compact before FFI.
mbutrovich Aug 21, 2025
5b76d5e
scalar subquery view support. change bloom_filter_agg and bloom_filte…
mbutrovich Aug 21, 2025
757e019
Merge branch 'main' into german_style_strings
mbutrovich Sep 29, 2025
0d513ac
Merge branch 'main' into german_style_strings
mbutrovich Oct 23, 2025
8790b1f
Merge branch 'main' into german_style_strings
mbutrovich Nov 4, 2025
1121fe2
Merge branch 'main' into german_style_strings
mbutrovich Dec 18, 2025
cc6c7cc
Merge in main.
mbutrovich Dec 18, 2025
de64165
Merge branch 'main' into german_style_strings
mbutrovich Apr 27, 2026
0af932a
Delete .claude/settings.local.json
mbutrovich Apr 27, 2026
74ccec0
Add native columnar to row support.
mbutrovich Apr 27, 2026
31a22ef
Fix fuzz suites.
mbutrovich Apr 27, 2026
3d29871
fix casting int to utf8view
mbutrovich Apr 27, 2026
58c84b0
CometCastSuite passes.
mbutrovich Apr 27, 2026
95e04de
Remove printlns.
mbutrovich Apr 27, 2026
ca8cbe6
Merge remote-tracking branch 'origin/german_style_strings' into germa…
mbutrovich Apr 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 81 additions & 32 deletions common/src/main/java/org/apache/comet/vector/CometPlainVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.UUID;

import org.apache.arrow.c.CDataDictionaryProvider;
Expand All @@ -33,7 +34,7 @@
/** A column vector whose elements are plainly decoded. */
public class CometPlainVector extends CometDecodedVector {
private final long valueBufferAddress;
private final boolean isBaseFixedWidthVector;
private CometVectorType CometVectorType = null;

private byte booleanByteCache;
private int booleanByteCacheIndex = -1;
Expand All @@ -58,7 +59,17 @@ public CometPlainVector(
this.valueBufferAddress = vector.getDataBuffer().memoryAddress();
}

isBaseFixedWidthVector = valueVector instanceof BaseFixedWidthVector;
if (valueVector instanceof BaseFixedWidthVector) {
this.CometVectorType = org.apache.comet.vector.CometVectorType.BaseFixedWidthVector;
} else if (valueVector instanceof BaseVariableWidthVector) {
this.CometVectorType = org.apache.comet.vector.CometVectorType.BaseVariableWidthVector;
} else if (valueVector instanceof ViewVarBinaryVector) {
this.CometVectorType = org.apache.comet.vector.CometVectorType.ViewVarBinaryVector;
} else if (valueVector instanceof ViewVarCharVector) {
this.CometVectorType = org.apache.comet.vector.CometVectorType.ViewVarCharVector;
} else {
throw new RuntimeException("Unsupported binary vector type: " + valueVector.getName());
}
this.isReused = isReused;
}

Expand Down Expand Up @@ -124,25 +135,46 @@ public double getDouble(int rowId) {
@Override
public UTF8String getUTF8String(int rowId) {
if (isNullAt(rowId)) return null;
if (!isBaseFixedWidthVector) {
BaseVariableWidthVector varWidthVector = (BaseVariableWidthVector) valueVector;
long offsetBufferAddress = varWidthVector.getOffsetBuffer().memoryAddress();
int offset = Platform.getInt(null, offsetBufferAddress + rowId * 4L);
int length = Platform.getInt(null, offsetBufferAddress + (rowId + 1L) * 4L) - offset;
return UTF8String.fromAddress(null, valueBufferAddress + offset, length);
} else {
BaseFixedWidthVector fixedWidthVector = (BaseFixedWidthVector) valueVector;
int length = fixedWidthVector.getTypeWidth();
int offset = rowId * length;
byte[] result = new byte[length];
Platform.copyMemory(
null, valueBufferAddress + offset, result, Platform.BYTE_ARRAY_OFFSET, length);

if (!isUuid) {
return UTF8String.fromBytes(result);
} else {
return UTF8String.fromString(convertToUuid(result).toString());
}
switch (CometVectorType) {
case BaseVariableWidthVector:
{
BaseVariableWidthVector varWidthVector = (BaseVariableWidthVector) valueVector;
long offsetBufferAddress = varWidthVector.getOffsetBuffer().memoryAddress();
int offset = Platform.getInt(null, offsetBufferAddress + rowId * 4L);
int length = Platform.getInt(null, offsetBufferAddress + (rowId + 1L) * 4L) - offset;
return UTF8String.fromAddress(null, valueBufferAddress + offset, length);
}
case BaseFixedWidthVector:
{
BaseFixedWidthVector fixedWidthVector = (BaseFixedWidthVector) valueVector;
int length = fixedWidthVector.getTypeWidth();
int offset = rowId * length;
byte[] result = new byte[length];
Platform.copyMemory(
null, valueBufferAddress + offset, result, Platform.BYTE_ARRAY_OFFSET, length);

if (!isUuid) {
return UTF8String.fromBytes(result);
} else {
return UTF8String.fromString(convertToUuid(result).toString());
}
}
case ViewVarCharVector:
{
ViewVarCharVector viewVarCharVector = (ViewVarCharVector) valueVector;
int length = viewVarCharVector.getValueLength(rowId);
byte[] objectBytes = viewVarCharVector.get(rowId);
if (!isUuid) {
return UTF8String.fromBytes(Arrays.copyOf(objectBytes, length));
} else {
return UTF8String.fromString(
convertToUuid(Arrays.copyOf(objectBytes, length)).toString());
}
}
default:
{
throw new RuntimeException("Unsupported binary vector type: " + valueVector.getName());
}
}
}

Expand All @@ -151,17 +183,34 @@ public byte[] getBinary(int rowId) {
if (isNullAt(rowId)) return null;
int offset;
int length;
if (valueVector instanceof BaseVariableWidthVector) {
BaseVariableWidthVector varWidthVector = (BaseVariableWidthVector) valueVector;
long offsetBufferAddress = varWidthVector.getOffsetBuffer().memoryAddress();
offset = Platform.getInt(null, offsetBufferAddress + rowId * 4L);
length = Platform.getInt(null, offsetBufferAddress + (rowId + 1L) * 4L) - offset;
} else if (valueVector instanceof BaseFixedWidthVector) {
BaseFixedWidthVector fixedWidthVector = (BaseFixedWidthVector) valueVector;
length = fixedWidthVector.getTypeWidth();
offset = rowId * length;
} else {
throw new RuntimeException("Unsupported binary vector type: " + valueVector.getName());

switch (CometVectorType) {
case BaseFixedWidthVector:
{
BaseFixedWidthVector fixedWidthVector = (BaseFixedWidthVector) valueVector;
length = fixedWidthVector.getTypeWidth();
offset = rowId * length;
break;
}
case BaseVariableWidthVector:
{
BaseVariableWidthVector varWidthVector = (BaseVariableWidthVector) valueVector;
long offsetBufferAddress = varWidthVector.getOffsetBuffer().memoryAddress();
offset = Platform.getInt(null, offsetBufferAddress + rowId * 4L);
length = Platform.getInt(null, offsetBufferAddress + (rowId + 1L) * 4L) - offset;
break;
}
case ViewVarBinaryVector:
{
ViewVarBinaryVector viewVarBinaryVector = (ViewVarBinaryVector) valueVector;
length = viewVarBinaryVector.getValueLength(rowId);
byte[] objectBytes = viewVarBinaryVector.get(rowId);
return Arrays.copyOf(objectBytes, length);
}
default:
{
throw new RuntimeException("Unsupported binary vector type: " + valueVector.getName());
}
}
byte[] result = new byte[length];
Platform.copyMemory(
Expand Down
27 changes: 27 additions & 0 deletions common/src/main/java/org/apache/comet/vector/CometVectorType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.vector;

public enum CometVectorType {
BaseFixedWidthVector,
BaseVariableWidthVector,
ViewVarBinaryVector,
ViewVarCharVector,
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ object Utils extends CometTypeShim with Logging {
case float: ArrowType.FloatingPoint if float.getPrecision == FloatingPointPrecision.DOUBLE =>
DoubleType
case ArrowType.Utf8.INSTANCE => StringType
case ArrowType.Utf8View.INSTANCE => StringType
case ArrowType.BinaryView.INSTANCE => BinaryType
case ArrowType.Binary.INSTANCE => BinaryType
case _: ArrowType.FixedSizeBinary => BinaryType
case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
Expand Down Expand Up @@ -393,7 +395,7 @@ object Utils extends CometTypeShim with Logging {
_: BigIntVector | _: Float4Vector | _: Float8Vector | _: VarCharVector |
_: DecimalVector | _: DateDayVector | _: TimeStampMicroTZVector | _: VarBinaryVector |
_: FixedSizeBinaryVector | _: TimeStampMicroVector | _: StructVector | _: ListVector |
_: MapVector | _: NullVector) =>
_: MapVector | _: ViewVarCharVector | _: ViewVarBinaryVector | _: NullVector) =>
v.asInstanceOf[FieldVector]
case _ =>
throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}")
Expand Down
59 changes: 58 additions & 1 deletion native/core/src/execution/columnar_to_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,10 @@ enum TypedArray<'a> {
Decimal128(&'a Decimal128Array, u8), // array + precision
String(&'a StringArray),
LargeString(&'a LargeStringArray),
StringView(&'a StringViewArray),
Binary(&'a BinaryArray),
LargeBinary(&'a LargeBinaryArray),
BinaryView(&'a BinaryViewArray),
FixedSizeBinary(&'a FixedSizeBinaryArray),
Struct(
&'a StructArray,
Expand Down Expand Up @@ -214,6 +216,14 @@ impl<'a> TypedArray<'a> {
array,
LargeBinaryArray
)?)),
DataType::Utf8View => Ok(TypedArray::StringView(downcast_array!(
array,
StringViewArray
)?)),
DataType::BinaryView => Ok(TypedArray::BinaryView(downcast_array!(
array,
BinaryViewArray
)?)),
DataType::FixedSizeBinary(_) => Ok(TypedArray::FixedSizeBinary(downcast_array!(
array,
FixedSizeBinaryArray
Expand Down Expand Up @@ -270,8 +280,10 @@ impl<'a> TypedArray<'a> {
Decimal128,
String,
LargeString,
StringView,
Binary,
LargeBinary,
BinaryView,
FixedSizeBinary,
Struct,
List,
Expand Down Expand Up @@ -332,8 +344,12 @@ impl<'a> TypedArray<'a> {
TypedArray::LargeString(arr) => {
Ok(write_bytes_padded(buffer, arr.value(row_idx).as_bytes()))
}
TypedArray::StringView(arr) => {
Ok(write_bytes_padded(buffer, arr.value(row_idx).as_bytes()))
}
TypedArray::Binary(arr) => Ok(write_bytes_padded(buffer, arr.value(row_idx))),
TypedArray::LargeBinary(arr) => Ok(write_bytes_padded(buffer, arr.value(row_idx))),
TypedArray::BinaryView(arr) => Ok(write_bytes_padded(buffer, arr.value(row_idx))),
TypedArray::FixedSizeBinary(arr) => Ok(write_bytes_padded(buffer, arr.value(row_idx))),
TypedArray::Decimal128(arr, precision) if *precision > MAX_LONG_DIGITS => {
let bytes = i128_to_spark_decimal_bytes(arr.value(row_idx));
Expand Down Expand Up @@ -383,8 +399,10 @@ enum TypedElements<'a> {
Decimal128(&'a Decimal128Array, u8),
String(&'a StringArray),
LargeString(&'a LargeStringArray),
StringView(&'a StringViewArray),
Binary(&'a BinaryArray),
LargeBinary(&'a LargeBinaryArray),
BinaryView(&'a BinaryViewArray),
FixedSizeBinary(&'a FixedSizeBinaryArray),
// For nested types, fall back to ArrayRef
Other(&'a ArrayRef, DataType),
Expand All @@ -409,6 +427,8 @@ impl<'a> TypedElements<'a> {
(DataType::LargeUtf8, LargeString, LargeStringArray),
(DataType::Binary, Binary, BinaryArray),
(DataType::LargeBinary, LargeBinary, LargeBinaryArray),
(DataType::Utf8View, StringView, StringViewArray),
(DataType::BinaryView, BinaryView, BinaryViewArray),
);

// Handle special cases that need extra processing
Expand Down Expand Up @@ -482,8 +502,10 @@ impl<'a> TypedElements<'a> {
Decimal128,
String,
LargeString,
StringView,
Binary,
LargeBinary,
BinaryView,
FixedSizeBinary,
Other
]
Expand Down Expand Up @@ -533,8 +555,12 @@ impl<'a> TypedElements<'a> {
TypedElements::LargeString(arr) => {
Ok(write_bytes_padded(buffer, arr.value(idx).as_bytes()))
}
TypedElements::StringView(arr) => {
Ok(write_bytes_padded(buffer, arr.value(idx).as_bytes()))
}
TypedElements::Binary(arr) => Ok(write_bytes_padded(buffer, arr.value(idx))),
TypedElements::LargeBinary(arr) => Ok(write_bytes_padded(buffer, arr.value(idx))),
TypedElements::BinaryView(arr) => Ok(write_bytes_padded(buffer, arr.value(idx))),
TypedElements::FixedSizeBinary(arr) => Ok(write_bytes_padded(buffer, arr.value(idx))),
TypedElements::Decimal128(arr, precision) if *precision > MAX_LONG_DIGITS => {
let bytes = i128_to_spark_decimal_bytes(arr.value(idx));
Expand Down Expand Up @@ -771,6 +797,36 @@ impl<'a> TypedElements<'a> {
}
}
}
TypedElements::StringView(arr) => {
for i in 0..num_elements {
let src_idx = start_idx + i;
if arr.is_null(src_idx) {
set_null_bit(buffer, null_bitset_start, i);
} else {
let len = write_bytes_padded(buffer, arr.value(src_idx).as_bytes());
let data_offset = buffer.len() - round_up_to_8(len) - array_start;
let offset_and_len = ((data_offset as i64) << 32) | (len as i64);
let slot_offset = elements_start + i * 8;
buffer[slot_offset..slot_offset + 8]
.copy_from_slice(&offset_and_len.to_le_bytes());
}
}
}
TypedElements::BinaryView(arr) => {
for i in 0..num_elements {
let src_idx = start_idx + i;
if arr.is_null(src_idx) {
set_null_bit(buffer, null_bitset_start, i);
} else {
let len = write_bytes_padded(buffer, arr.value(src_idx));
let data_offset = buffer.len() - round_up_to_8(len) - array_start;
let offset_and_len = ((data_offset as i64) << 32) | (len as i64);
let slot_offset = elements_start + i * 8;
buffer[slot_offset..slot_offset + 8]
.copy_from_slice(&offset_and_len.to_le_bytes());
}
}
}
TypedElements::Other(arr, element_type) => {
// Fall back to old method for nested types
for i in 0..num_elements {
Expand Down Expand Up @@ -984,7 +1040,8 @@ impl ColumnarToRowContext {
self.write_row_typed(&typed_arrays, &var_len_indices, row_idx)?;

let row_end = self.buffer.len();
self.lengths.push((row_end - row_start) as i32);
let row_len = row_end - row_start;
self.lengths.push(row_len as i32);
}

Ok((self.buffer.as_ptr(), &self.offsets, &self.lengths))
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/expressions/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl ExpressionBuilder for RlikeBuilder {
let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?;

match right.as_any().downcast_ref::<Literal>().unwrap().value() {
ScalarValue::Utf8(Some(pattern)) => Ok(Arc::new(RLike::try_new(left, pattern)?)),
ScalarValue::Utf8View(Some(pattern)) => Ok(Arc::new(RLike::try_new(left, pattern)?)),
_ => Err(ExecutionError::GeneralError(
"RLike only supports scalar patterns".to_string(),
)),
Expand Down
19 changes: 19 additions & 0 deletions native/core/src/execution/expressions/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ impl PhysicalExpr for Subquery {
.unwrap();
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(string))))
}
DataType::Utf8View => {
let string = jni_static_call!(env,
comet_exec.get_string(self.exec_context_id, self.id) -> StringWrapper
)?;

let string = JString::from_raw(env, string.get().as_raw())
.try_to_string(env)
.unwrap();
Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(string))))
}
DataType::Binary => {
let bytes = jni_static_call!(env,
comet_exec.get_binary(self.exec_context_id, self.id) -> BinaryWrapper
Expand All @@ -184,6 +194,15 @@ impl PhysicalExpr for Subquery {

Ok(ColumnarValue::Scalar(ScalarValue::Binary(Some(slice))))
}
DataType::BinaryView => {
let bytes = jni_static_call!(env,
comet_exec.get_binary(self.exec_context_id, self.id) -> BinaryWrapper
)?;
let bytes = JByteArray::from_raw(env, bytes.get().as_raw());
let slice = env.convert_byte_array(bytes).unwrap();

Ok(ColumnarValue::Scalar(ScalarValue::BinaryView(Some(slice))))
}
_ => internal_err!("Unsupported scalar subquery data type {:?}", self.data_type),
}
})
Expand Down
Loading
Loading