-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
Directly calling an async UDF on the output of another async UDF async_example(async_example(1)) produces an internal error.:
Internal error: async functions should not be called directly
They do not have to be the same async UDF in order to get this error.
To Reproduce
The following code is a miminal example somewhat copied from the async UDF example in this repo:
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::{
arrow::{
array::{ArrayRef, Int64Array},
datatypes::DataType,
},
common::{cast::as_int64_array, not_impl_err, utils::take_function_args},
error::Result,
logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl},
},
prelude::SessionContext,
};
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct AsyncExample {
signature: Signature,
}
impl ScalarUDFImpl for AsyncExample {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
"async_example"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
not_impl_err!("async_udf was called without async")
}
}
#[async_trait]
impl AsyncScalarUDFImpl for AsyncExample {
async fn invoke_async_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(&args.args)?;
let [input_column] = take_function_args(self.name(), args)?;
let input_column = as_int64_array(&input_column)?;
let results: Int64Array = input_column.iter().map(|i| i.map(|i| i + 1)).collect();
Ok(ColumnarValue::Array(Arc::new(results) as ArrayRef))
}
}
#[tokio::main]
async fn main() {
let ctx = SessionContext::new();
ctx.register_udf(
AsyncScalarUDF::new(Arc::new(AsyncExample {
signature: Signature::exact(
vec![DataType::Int64],
datafusion::logical_expr::Volatility::Immutable,
),
}))
.into_scalar_udf(),
);
// Works
let df = ctx.sql("SELECT async_example(1)").await.unwrap();
df.collect().await.unwrap();
// Does not work
let df = ctx
.sql("SELECT async_example(async_example(1))")
.await
.unwrap();
if let Err(err) = df.collect().await {
eprintln!("{err}");
}
}A backtrace for this example can be found below.
Expected behavior
Both UDFs should simply be evaluated in succession. if it this not yet supported a more specific error message may be useful.
Additional context
The above code produces the following output: (with the backtrace feature in this crate enabled)
I'm using datafusion 52.1.0
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working