Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/iceberg_spark_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ on:

env:
RUST_VERSION: stable
RUST_BACKTRACE: 1

jobs:
# Build native library once and share with all test jobs
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_benchmark_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ on:

env:
RUST_VERSION: stable
RUST_BACKTRACE: 1

jobs:
benchmark-check:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ on:

env:
RUST_VERSION: stable
RUST_BACKTRACE: 1

jobs:

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ on:

env:
RUST_VERSION: stable
RUST_BACKTRACE: 1

jobs:

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ on:

env:
RUST_VERSION: stable
RUST_BACKTRACE: 1

jobs:

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/spark_sql_test_native_datafusion.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ on:

env:
RUST_VERSION: stable
RUST_BACKTRACE: 1

jobs:
spark-sql-catalyst-native-datafusion:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/spark_sql_test_native_iceberg_compat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ on:

env:
RUST_VERSION: stable
RUST_BACKTRACE: 1

jobs:
spark-sql-catalyst-native-iceberg-compat:
Expand Down
2 changes: 2 additions & 0 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ strip = "debuginfo"
inherits = "release"
lto = false # Skip LTO for faster linking
codegen-units = 16 # Parallel codegen (faster compile, slightly larger binary)
debug-assertions = true
panic = "unwind" # Allow panics to be caught and logged across FFI boundary
# overflow-checks inherited as false from release
5 changes: 4 additions & 1 deletion native/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ pub enum CometError {
}

pub fn init() {
std::panic::set_hook(Box::new(|_panic_info| {
std::panic::set_hook(Box::new(|panic_info| {
// Log the panic message and location to stderr so it is visible in CI logs
// even if the exception message is lost crossing the FFI boundary
eprintln!("Comet native panic: {panic_info}");
Comment on lines +149 to +151
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change for improving reporting of panics

// Capture the backtrace for a panic
*PANIC_BACKTRACE.lock().unwrap() =
Some(std::backtrace::Backtrace::force_capture().to_string());
Expand Down
28 changes: 24 additions & 4 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
use arrow::array::{Array, RecordBatch, UInt32Array};
use arrow::compute::{take, TakeOptions};
use arrow::datatypes::DataType as ArrowDataType;
use datafusion::common::{Result as DataFusionResult, ScalarValue};
use datafusion::common::{DataFusionError, Result as DataFusionResult, ScalarValue};
use datafusion::execution::disk_manager::DiskManagerMode;
use datafusion::execution::memory_pool::MemoryPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
Expand Down Expand Up @@ -59,6 +59,7 @@ use datafusion_spark::function::string::concat::SparkConcat;
use datafusion_spark::function::string::space::SparkSpace;
use futures::poll;
use futures::stream::StreamExt;
use futures::FutureExt;
use jni::objects::JByteBuffer;
use jni::sys::{jlongArray, JNI_FALSE};
use jni::{
Expand Down Expand Up @@ -570,10 +571,29 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
let (tx, rx) = mpsc::channel(2);
let mut stream = stream;
get_runtime().spawn(async move {
while let Some(batch) = stream.next().await {
if tx.send(batch).await.is_err() {
break;
let result = std::panic::AssertUnwindSafe(async {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are for catching panics that happen in tokio threads

while let Some(batch) = stream.next().await {
if tx.send(batch).await.is_err() {
break;
}
}
})
.catch_unwind()
.await;

if let Err(panic) = result {
let msg = match panic.downcast_ref::<&str>() {
Some(s) => s.to_string(),
None => match panic.downcast_ref::<String>() {
Some(s) => s.clone(),
None => "unknown panic".to_string(),
},
};
let _ = tx
.send(Err(DataFusionError::Execution(format!(
"native panic: {msg}"
))))
.await;
}
});
exec_context.batch_receiver = Some(rx);
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/shuffle/spark_unsafe/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl SparkUnsafeArray {
unsafe {
let mask: i64 = 1i64 << (index & 0x3f);
let word_offset = (self.row_addr + 8 + (((index >> 6) as i64) << 3)) as *const i64;
let word: i64 = *word_offset;
let word: i64 = word_offset.read_unaligned();
(word & mask) != 0
}
}
Expand Down
7 changes: 3 additions & 4 deletions native/core/src/execution/shuffle/spark_unsafe/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl SparkUnsafeRow {
unsafe {
let mask: i64 = 1i64 << (index & 0x3f);
let word_offset = (self.row_addr + (((index >> 6) as i64) << 3)) as *const i64;
let word: i64 = *word_offset;
let word: i64 = word_offset.read_unaligned();
(word & mask) != 0
}
}
Expand All @@ -303,8 +303,8 @@ impl SparkUnsafeRow {
unsafe {
let mask: i64 = 1i64 << (index & 0x3f);
let word_offset = (self.row_addr + (((index >> 6) as i64) << 3)) as *mut i64;
let word: i64 = *word_offset;
*word_offset = word & !mask;
let word: i64 = word_offset.read_unaligned();
word_offset.write_unaligned(word & !mask);
}
}
}
Expand Down Expand Up @@ -968,7 +968,6 @@ mod test {
}

#[test]
#[cfg_attr(miri, ignore)] // Unaligned memory access in SparkUnsafeRow
fn test_append_null_struct_field_to_struct_builder() {
let data_type = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Boolean, true),
Expand Down
Loading