Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ object CometConf extends ShimCometConf {
"Whether to enable native columnar to row conversion. When enabled, Comet will use " +
"native Rust code to convert Arrow columnar data to Spark UnsafeRow format instead " +
"of the JVM implementation. This can improve performance for queries that need to " +
"convert between columnar and row formats. This is an experimental feature.")
"convert between columnar and row formats.")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
Expand Down
10 changes: 10 additions & 0 deletions native/core/src/execution/columnar_to_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,16 @@ impl<'a> TypedElements<'a> {
let values_slice = $arr.values();
let byte_len = num_elements * $elem_size;
let src_start = start_idx * $elem_size;
debug_assert!(
src_start + byte_len <= values_slice.len() * $elem_size,
"bulk_copy_range: source slice out of bounds: src_start={}, byte_len={}, values_len={}, elem_size={}",
src_start, byte_len, values_slice.len() * $elem_size, $elem_size
);
debug_assert!(
elements_start + byte_len <= buffer.len(),
"bulk_copy_range: destination slice out of bounds: elements_start={}, byte_len={}, buffer_len={}",
elements_start, byte_len, buffer.len()
);
let src_bytes = unsafe {
std::slice::from_raw_parts(
(values_slice.as_ptr() as *const u8).add(src_start),
Expand Down
18 changes: 18 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_columnarToRowConvert(
) -> jni::sys::jobject {
try_unwrap_or_throw(&e, |mut env| {
// Get the context
debug_assert!(c2r_handle != 0, "columnarToRowConvert: c2r_handle is null");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can c2r_handle be negative?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jlong is a signed 64-bit integer, so yes, c2r_handle can technically be negative since memory addresses above 2^63 appear as negative values when stored in a signed long. However, the only invalid value we need to check for is 0 (null pointer). Negative values are valid memory addresses on 64-bit systems.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the AI response. Claude didn't ask me first 😞

let ctx = (c2r_handle as *mut ColumnarToRowContext)
.as_mut()
.ok_or_else(|| CometError::Internal("Null columnar to row context".to_string()))?;
Expand All @@ -989,6 +990,17 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_columnarToRowConvert(
let array_ptr = array_addrs_elements[i] as *mut FFI_ArrowArray;
let schema_ptr = schema_addrs_elements[i] as *mut FFI_ArrowSchema;

debug_assert!(
!array_ptr.is_null(),
"columnarToRowConvert: null array pointer at index {}",
i
);
debug_assert!(
!schema_ptr.is_null(),
"columnarToRowConvert: null schema pointer at index {}",
i
);

// Take ownership of the FFI structures
let ffi_array = std::ptr::read(array_ptr);
let ffi_schema = std::ptr::read(schema_ptr);
Expand All @@ -1001,6 +1013,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_columnarToRowConvert(
}

// Convert columnar to row
debug_assert!(
num_rows >= 0,
"columnarToRowConvert: num_rows is negative: {}",
num_rows
);
let (buffer_ptr, offsets, lengths) = ctx.convert(&arrays, num_rows as usize)?;

// Create Java int arrays for offsets and lengths
Expand Down Expand Up @@ -1037,6 +1054,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_columnarToRowClose(
c2r_handle: jlong,
) {
try_unwrap_or_throw(&e, |_env| {
debug_assert!(c2r_handle != 0, "columnarToRowClose: c2r_handle is null");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

if c2r_handle != 0 {
let _ctx: Box<ColumnarToRowContext> =
Box::from_raw(c2r_handle as *mut ColumnarToRowContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import org.apache.comet.{CometConf, NativeColumnarToRowConverter}
* Native implementation of ColumnarToRowExec that converts Arrow columnar data to Spark UnsafeRow
* format using Rust.
*
* This is an experimental feature that can be enabled by setting
* `spark.comet.columnarToRow.native.enabled=true`.
* This feature is enabled by default and can be disabled by setting
* `spark.comet.exec.columnarToRow.native.enabled=false`.
*
* Benefits over the JVM implementation:
* - Zero-copy for variable-length types (strings, binary)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ TakeOrderedAndProject
: : +- BroadcastHashJoin
: : :- Filter
: : : +- HashAggregate
: : : +- CometColumnarToRow
: : : +- CometNativeColumnarToRow
: : : +- CometColumnarExchange
: : : +- HashAggregate
: : : +- Project
Expand All @@ -17,23 +17,23 @@ TakeOrderedAndProject
: : : : +- Scan parquet spark_catalog.default.store_returns [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : : +- SubqueryBroadcast
: : : : +- BroadcastExchange
: : : : +- CometColumnarToRow
: : : : +- CometNativeColumnarToRow
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- CometColumnarToRow
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- BroadcastExchange
: : +- Filter
: : +- HashAggregate
: : +- CometColumnarToRow
: : +- CometNativeColumnarToRow
: : +- CometColumnarExchange
: : +- HashAggregate
: : +- HashAggregate
: : +- CometColumnarToRow
: : +- CometNativeColumnarToRow
: : +- CometColumnarExchange
: : +- HashAggregate
: : +- Project
Expand All @@ -43,17 +43,17 @@ TakeOrderedAndProject
: : : +- Scan parquet spark_catalog.default.store_returns [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : +- ReusedSubquery
: : +- BroadcastExchange
: : +- CometColumnarToRow
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometColumnarToRow
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.store
+- BroadcastExchange
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.customer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CometColumnarToRow
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_returns
: : : : +- SubqueryBroadcast
: : : : +- BroadcastExchange
: : : : +- CometColumnarToRow
: : : : +- CometNativeColumnarToRow
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject
+- HashAggregate
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometColumnarExchange
+- HashAggregate
+- Project
Expand All @@ -12,7 +12,7 @@ TakeOrderedAndProject
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin
: : : :- BroadcastHashJoin
: : : : :- CometColumnarToRow
: : : : :- CometNativeColumnarToRow
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.customer
: : : : +- BroadcastExchange
Expand All @@ -22,12 +22,12 @@ TakeOrderedAndProject
: : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : : : +- SubqueryBroadcast
: : : : : +- BroadcastExchange
: : : : : +- CometColumnarToRow
: : : : : +- CometNativeColumnarToRow
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : : +- BroadcastExchange
: : : : +- CometColumnarToRow
: : : : +- CometNativeColumnarToRow
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
Expand All @@ -38,7 +38,7 @@ TakeOrderedAndProject
: : : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : : +- ReusedSubquery
: : : +- BroadcastExchange
: : : +- CometColumnarToRow
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
Expand All @@ -49,17 +49,17 @@ TakeOrderedAndProject
: : : +- Scan parquet spark_catalog.default.catalog_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
: : : +- ReusedSubquery
: : +- BroadcastExchange
: : +- CometColumnarToRow
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometColumnarToRow
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.customer_address
+- BroadcastExchange
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.customer_demographics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ TakeOrderedAndProject
: : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales
: : : : : +- SubqueryBroadcast
: : : : : +- BroadcastExchange
: : : : : +- CometColumnarToRow
: : : : : +- CometNativeColumnarToRow
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
Expand Down Expand Up @@ -50,12 +50,12 @@ TakeOrderedAndProject
: : +- CometFilter
: : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometColumnarToRow
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address
+- BroadcastExchange
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometProject
+- CometFilter
+- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics
Expand Down
Loading
Loading