diff --git a/.github/workflows/iceberg_spark_test.yml b/.github/workflows/iceberg_spark_test.yml index eae8281070..ec764279a6 100644 --- a/.github/workflows/iceberg_spark_test.yml +++ b/.github/workflows/iceberg_spark_test.yml @@ -48,6 +48,7 @@ on: env: RUST_VERSION: stable + RUST_BACKTRACE: 1 jobs: # Build native library once and share with all test jobs diff --git a/.github/workflows/pr_benchmark_check.yml b/.github/workflows/pr_benchmark_check.yml index f3af4d7817..6376a3548f 100644 --- a/.github/workflows/pr_benchmark_check.yml +++ b/.github/workflows/pr_benchmark_check.yml @@ -41,6 +41,7 @@ on: env: RUST_VERSION: stable + RUST_BACKTRACE: 1 jobs: benchmark-check: diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 0c7d3fc3e0..dfb63a8800 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -48,6 +48,7 @@ on: env: RUST_VERSION: stable + RUST_BACKTRACE: 1 jobs: diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index fb4dfa9754..7a57b6334c 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -48,6 +48,7 @@ on: env: RUST_VERSION: stable + RUST_BACKTRACE: 1 jobs: diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 91632d04d4..4d777cda87 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -54,6 +54,7 @@ on: env: RUST_VERSION: stable + RUST_BACKTRACE: 1 jobs: diff --git a/.github/workflows/spark_sql_test_native_datafusion.yml b/.github/workflows/spark_sql_test_native_datafusion.yml index d5165a3786..8df9d4801e 100644 --- a/.github/workflows/spark_sql_test_native_datafusion.yml +++ b/.github/workflows/spark_sql_test_native_datafusion.yml @@ -28,6 +28,7 @@ on: env: RUST_VERSION: stable + RUST_BACKTRACE: 1 jobs: spark-sql-catalyst-native-datafusion: diff --git a/.github/workflows/spark_sql_test_native_iceberg_compat.yml b/.github/workflows/spark_sql_test_native_iceberg_compat.yml index 100460385f..af84e8556d 100644 --- a/.github/workflows/spark_sql_test_native_iceberg_compat.yml +++ b/.github/workflows/spark_sql_test_native_iceberg_compat.yml @@ -28,6 +28,7 @@ on: env: RUST_VERSION: stable + RUST_BACKTRACE: 1 jobs: spark-sql-catalyst-native-iceberg-compat: diff --git a/native/Cargo.toml b/native/Cargo.toml index d5a6aeabc9..95504b97bf 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -70,4 +70,6 @@ strip = "debuginfo" inherits = "release" lto = false # Skip LTO for faster linking codegen-units = 16 # Parallel codegen (faster compile, slightly larger binary) +debug-assertions = true +panic = "unwind" # Allow panics to be caught and logged across FFI boundary # overflow-checks inherited as false from release diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs index ecac7af94e..a33f5d932c 100644 --- a/native/core/src/errors.rs +++ b/native/core/src/errors.rs @@ -145,7 +145,10 @@ pub enum CometError { } pub fn init() { - std::panic::set_hook(Box::new(|_panic_info| { + std::panic::set_hook(Box::new(|panic_info| { + // Log the panic message and location to stderr so it is visible in CI logs + // even if the exception message is lost crossing the FFI boundary + eprintln!("Comet native panic: {panic_info}"); // Capture the backtrace for a panic *PANIC_BACKTRACE.lock().unwrap() = Some(std::backtrace::Backtrace::force_capture().to_string()); diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 1030e30aaf..f5931b7dc7 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -29,7 +29,7 @@ use crate::{ use arrow::array::{Array, RecordBatch, UInt32Array}; use arrow::compute::{take, TakeOptions}; use arrow::datatypes::DataType as ArrowDataType; -use datafusion::common::{Result as DataFusionResult, ScalarValue}; +use datafusion::common::{DataFusionError, Result as DataFusionResult, ScalarValue}; use datafusion::execution::disk_manager::DiskManagerMode; use datafusion::execution::memory_pool::MemoryPool; use datafusion::execution::runtime_env::RuntimeEnvBuilder; @@ -59,6 +59,7 @@ use datafusion_spark::function::string::concat::SparkConcat; use datafusion_spark::function::string::space::SparkSpace; use futures::poll; use futures::stream::StreamExt; +use futures::FutureExt; use jni::objects::JByteBuffer; use jni::sys::{jlongArray, JNI_FALSE}; use jni::{ @@ -570,10 +571,29 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let (tx, rx) = mpsc::channel(2); let mut stream = stream; get_runtime().spawn(async move { - while let Some(batch) = stream.next().await { - if tx.send(batch).await.is_err() { - break; + let result = std::panic::AssertUnwindSafe(async { + while let Some(batch) = stream.next().await { + if tx.send(batch).await.is_err() { + break; + } } + }) + .catch_unwind() + .await; + + if let Err(panic) = result { + let msg = match panic.downcast_ref::<&str>() { + Some(s) => s.to_string(), + None => match panic.downcast_ref::() { + Some(s) => s.clone(), + None => "unknown panic".to_string(), + }, + }; + let _ = tx + .send(Err(DataFusionError::Execution(format!( + "native panic: {msg}" + )))) + .await; } }); exec_context.batch_receiver = Some(rx); diff --git a/native/core/src/execution/shuffle/spark_unsafe/list.rs b/native/core/src/execution/shuffle/spark_unsafe/list.rs index cc72813946..259ff29a79 100644 --- a/native/core/src/execution/shuffle/spark_unsafe/list.rs +++ b/native/core/src/execution/shuffle/spark_unsafe/list.rs @@ -90,7 +90,7 @@ impl SparkUnsafeArray { unsafe { let mask: i64 = 1i64 << (index & 0x3f); let word_offset = (self.row_addr + 8 + (((index >> 6) as i64) << 3)) as *const i64; - let word: i64 = *word_offset; + let word: i64 = word_offset.read_unaligned(); (word & mask) != 0 } } diff --git a/native/core/src/execution/shuffle/spark_unsafe/row.rs b/native/core/src/execution/shuffle/spark_unsafe/row.rs index 02daf6a34f..fe80d56d9a 100644 --- a/native/core/src/execution/shuffle/spark_unsafe/row.rs +++ b/native/core/src/execution/shuffle/spark_unsafe/row.rs @@ -290,7 +290,7 @@ impl SparkUnsafeRow { unsafe { let mask: i64 = 1i64 << (index & 0x3f); let word_offset = (self.row_addr + (((index >> 6) as i64) << 3)) as *const i64; - let word: i64 = *word_offset; + let word: i64 = word_offset.read_unaligned(); (word & mask) != 0 } } @@ -303,8 +303,8 @@ impl SparkUnsafeRow { unsafe { let mask: i64 = 1i64 << (index & 0x3f); let word_offset = (self.row_addr + (((index >> 6) as i64) << 3)) as *mut i64; - let word: i64 = *word_offset; - *word_offset = word & !mask; + let word: i64 = word_offset.read_unaligned(); + word_offset.write_unaligned(word & !mask); } } } @@ -968,7 +968,6 @@ mod test { } #[test] - #[cfg_attr(miri, ignore)] // Unaligned memory access in SparkUnsafeRow fn test_append_null_struct_field_to_struct_builder() { let data_type = DataType::Struct(Fields::from(vec![ Field::new("a", DataType::Boolean, true),