Skip to content

Nesting async UDF calls causes an internal error #20031

@andreashgk

Description

@andreashgk

Describe the bug

Directly calling an async UDF on the output of another async UDF async_example(async_example(1)) produces an internal error.:

Internal error: async functions should not be called directly

They do not have to be the same async UDF in order to get this error.

To Reproduce

The following code is a miminal example somewhat copied from the async UDF example in this repo:

use std::sync::Arc;

use async_trait::async_trait;
use datafusion::{
    arrow::{
        array::{ArrayRef, Int64Array},
        datatypes::DataType,
    },
    common::{cast::as_int64_array, not_impl_err, utils::take_function_args},
    error::Result,
    logical_expr::{
        ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
        async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl},
    },
    prelude::SessionContext,
};

#[derive(Debug, PartialEq, Eq, Hash)]
pub struct AsyncExample {
    signature: Signature,
}

impl ScalarUDFImpl for AsyncExample {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn name(&self) -> &str {
        "async_example"
    }

    fn signature(&self) -> &Signature {
        &self.signature
    }

    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
        Ok(DataType::Int64)
    }

    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
        not_impl_err!("async_udf was called without async")
    }
}

#[async_trait]
impl AsyncScalarUDFImpl for AsyncExample {
    async fn invoke_async_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
        let args = ColumnarValue::values_to_arrays(&args.args)?;
        let [input_column] = take_function_args(self.name(), args)?;

        let input_column = as_int64_array(&input_column)?;

        let results: Int64Array = input_column.iter().map(|i| i.map(|i| i + 1)).collect();
        Ok(ColumnarValue::Array(Arc::new(results) as ArrayRef))
    }
}

#[tokio::main]
async fn main() {
    let ctx = SessionContext::new();
    ctx.register_udf(
        AsyncScalarUDF::new(Arc::new(AsyncExample {
            signature: Signature::exact(
                vec![DataType::Int64],
                datafusion::logical_expr::Volatility::Immutable,
            ),
        }))
        .into_scalar_udf(),
    );

    // Works
    let df = ctx.sql("SELECT async_example(1)").await.unwrap();
    df.collect().await.unwrap();
    // Does not work
    let df = ctx
        .sql("SELECT async_example(async_example(1))")
        .await
        .unwrap();
    if let Err(err) = df.collect().await {
        eprintln!("{err}");
    }
}

A backtrace for this example can be found below.

Expected behavior

Both UDFs should simply be evaluated in succession. if it this not yet supported a more specific error message may be useful.

Additional context

The above code produces the following output: (with the backtrace feature in this crate enabled)

backtrace.txt

I'm using datafusion 52.1.0

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions