Add lambda substrait support#21193
Conversation
|
👋 Hello from @substrait-io. Great to see the core lambda PR has gotten through! Once this PR is in a ready to review state and is rebased off of main, I will be more than happy to help review it 🙂 |
|
Thanks @benbellick, I will open this tonight. Besides rebasing, I believe it misses some tests (I tested with sqllogictests only) |
|
@benbellick this is ready for review. Failing CI correctly detects breaking changes but apparently fails to create a comment here with the changes summary |
|
Great! I will try and find some time to take a look tomorrow :) Thanks for working on this! |
|
I updated your branch with main since the fix for detect breaking changes was resolved now, sorry for the trouble |
benbellick
left a comment
There was a problem hiding this comment.
It overall looks good to me! I left a few comments on some stylistic things but the only thing that I would particularly like to see is just the tests for consumer / producer independently. Thanks!
| /// Default implementation of lambda related methods of the [SubstraitConsumer] trait | ||
| /// | ||
| /// Can be embedded into a custom [SubstraitConsumer] to implement them | ||
| pub struct DefaultSubstraitLambdaConsumer { |
There was a problem hiding this comment.
Is there a reason this is public? This feels like an implementation detail of the default lambda-handling logic. What about:
| pub struct DefaultSubstraitLambdaConsumer { | |
| struct LambdaConsumerState { |
There was a problem hiding this comment.
The existing required methods for trait SubstraitConsumer are trivial to implement, but that's not the case for the newly added lambda methods. This is a just a convenience to custom implementations which don't want to customize the default lambda handling, should I remove it?
struct CustomSubstraitConsumer {
extensions: Arc<Extensions>,
state: Arc<SessionState>,
// You can reuse existing consumer code related to lambdas
lambda_consumer: DefaultSubstraitLambdaConsumer,
}
#[async_trait]
impl SubstraitConsumer for CustomSubstraitConsumer {
async fn resolve_table_ref(
&self,
table_ref: &TableReference,
) -> Result<Option<Arc<dyn TableProvider>>> {
let table = table_ref.table().to_string();
let schema = self.state.schema_for_ref(table_ref.clone())?;
let table_provider = schema.table(&table).await?;
Ok(table_provider)
}
fn get_extensions(&self) -> &Extensions {
self.extensions.as_ref()
}
fn get_function_registry(&self) -> &impl FunctionRegistry {
self.state.as_ref()
}
fn with_lambda_parameters(
&self,
lambda_parameters: &[Type],
input_schema: &DFSchema,
) -> datafusion::common::Result<(Vec<String>, Self)> {
let (names, lambda_consumer) = self.lambda_consumer.with_lambda_parameters(
self,
lambda_parameters,
input_schema,
)?;
Ok((
names,
Self {
extensions: self.extensions.clone(),
state: self.state.clone(),
lambda_consumer,
},
))
}
fn lambda_variable(
&self,
steps_out: usize,
field_idx: usize,
) -> datafusion::common::Result<Expr> {
self.lambda_consumer.lambda_variable(steps_out, field_idx)
}
}There was a problem hiding this comment.
Ah I see now, I appreciate the explanation. That makes a lot of sense then!
AFAICT the strategy for outer schemas is to provide default impls so that consumer implementors who don't care about the behavior can ignore their existence, but then will encounter a runtime error if they are used:
I'm wondering if we should do the same thing for lambdas? Rather than enforce that implementors must implement the lambda-handling fns, they could instead optionally ignore them, resulting in an error if lambda expressions are encountered.
What do you think? I am not so particular here TBH. Ultimately my goal is to validate that the translation itself is correct, and I am happy to leave API concerns to the project maintainers :)
There was a problem hiding this comment.
Ah, I agree, they definetively should have a default impl returning an error to avoid a breaking change, thank you 2c23439
But even then, I still believe a public DefaultSubstraitLambdaConsumer is convenient, but yes, let's leave that for that maintainers, thanks
| /// Default implementation of lambda related methods of the [SubstraitProducer] trait | ||
| /// | ||
| /// Can be embedded into a custom [SubstraitProducer] to implement them | ||
| pub struct DefaultSubstraitLambdaProducer { |
There was a problem hiding this comment.
Same comment as on the consumer side. I wonder if we can just keep this private, since its usage in implementing this producer is an implementation detail.
There was a problem hiding this comment.
Same reason for the consumer. What get's decided there (#21193 (comment)) I'll also apply here
struct CustomSubstraitProducer {
extensions: Extensions,
state: Arc<SessionState>,
// You can reuse existing producer code related to lambdas
lambda_producer: DefaultSubstraitLambdaProducer,
}
impl SubstraitProducer for CustomSubstraitProducer {
fn register_function(&mut self, signature: String) -> u32 {
self.extensions.register_function(&signature)
}
fn register_type(&mut self, type_name: String) -> u32 {
self.extensions.register_type(&type_name)
}
fn get_extensions(self) -> Extensions {
self.extensions
}
fn push_lambda_parameters(
&mut self,
lambda_parameters: Vec<FieldRef>,
) -> datafusion::common::Result<()> {
let lambda_parameters_map = lambda_parameters_map(self, lambda_parameters)?;
self.lambda_producer
.push_lambda_parameters(lambda_parameters_map);
Ok(())
}
fn pop_lambda_parameters(&mut self) -> datafusion::common::Result<()> {
self.lambda_producer.pop_lambda_parameters()
}
fn lambda_variable(&self, name: &str) -> datafusion::common::Result<(u32, i32)> {
self.lambda_producer.lambda_variable(name)
}
fn lambda_parameter_type(
&self,
name: &str,
) -> datafusion::common::Result<substrait::proto::Type> {
self.lambda_producer.lambda_parameter_type(name)
}
}There was a problem hiding this comment.
These tests are great!
One additional thing that might be useful is a small number of tests that exercise the producer and consumer independently. The roundtrip tests verify that the producer and consumer are internally consistent with each other, but they don’t make it as obvious what Substrait representation we expect to support.
There is some precedent for both styles:
- Consumer-side tests that load Substrait JSON and convert it to a DataFusion plan:
- Producer-side tests that call
to_substrait_planand inspect the generated proto:datafusion/datafusion/substrait/tests/cases/serialize.rs
Lines 114 to 126 in fa9ada3
It might be nice to add one or two similar tests for lambdas, so the expected Substrait shape for Lambda / LambdaParameterReference is documented by the tests.
There was a problem hiding this comment.
@benbellick From the substrait translation point of view, do you think these tests are enough, so we can ping those who also reviewed other lambda PR's?
There was a problem hiding this comment.
Let me take one more pass at this today. Thanks!
| /// Returns a new instance of this consumer which includes the given `lambda_parameters` and the names they got assigned | ||
| /// | ||
| /// Note for custom implementations it's possible to embed a [DefaultSubstraitLambdaConsumer] and forward this method to it | ||
| fn with_lambda_parameters( | ||
| &self, | ||
| lambda_parameters: &[Type], | ||
| input_schema: &DFSchema, | ||
| ) -> datafusion::common::Result<(Vec<String>, Self)>; |
There was a problem hiding this comment.
@benbellick I usually follows the pattern of existing methods, like push/pop_outer_schema from #20439. This is my first time dealing with substrait, so I may be wrong, but I didn't followed this pattern (using push/pop_lambda_parameter) because it modifies &self via a RwLock and I'm note sure this couldn't lead to conflicts if the same consumer is used to consume two different plans at the same time in different threads. If that's not the case I can change this to use push/pop_lambda_parameter as well.
There was a problem hiding this comment.
Ah, that is an interesting point... Is it expected/supported for the same SubstraitConsumer instance to be used concurrently?
If not, then I think it would be simpler and consistent to model lambda scope the same way as outer schemas.
If yes, then the scoped-consumer approach here makes sense, but it seems like the existing push_outer_schema / pop_outer_schema stack may have the same interleaving issue and should probably be addressed separately.
There was a problem hiding this comment.
I'll confirm with the maintainer who ends up reviewing this, but SubstraitConsumer is both Send + Sync and all it's methods take &self. Since the default consumer is cheap to create, and I expect most/all custom ones to be cheap as well, I guess it's mostly due to async and to easily embed it into other structures which should also implement Send + Sync than to allow efficient concurrent usage.
I won't expect any consumer to be used concurrently for performance, but, since it can be used, I think it's possible that it's/will be used concurrently incidentally as the easier/natural way within a given codebase
benbellick
left a comment
There was a problem hiding this comment.
Few more comments but on the whole this looks good to me! Thanks
| { | ||
| "version": { | ||
| "minorNumber": 85, | ||
| "producer": "datafusion" | ||
| }, | ||
| "extensions": [ | ||
| { | ||
| "extensionFunction": { | ||
| "extensionUrnReference": 4294967295, | ||
| "functionAnchor": 2, | ||
| "name": "array_transform2" | ||
| } | ||
| }, | ||
| { | ||
| "extensionFunction": { | ||
| "extensionUrnReference": 4294967295, | ||
| "name": "make_array" | ||
| } | ||
| }, | ||
| { | ||
| "extensionFunction": { | ||
| "extensionUrnReference": 4294967295, | ||
| "functionAnchor": 3, | ||
| "name": "array_concat" | ||
| } | ||
| }, | ||
| { | ||
| "extensionFunction": { | ||
| "extensionUrnReference": 4294967295, | ||
| "functionAnchor": 1, | ||
| "name": "multiply" | ||
| } | ||
| } | ||
| ], | ||
| "relations": [ | ||
| { | ||
| "root": { | ||
| "input": { | ||
| "project": { | ||
| "common": { | ||
| "emit": { | ||
| "outputMapping": [ | ||
| 1 | ||
| ] | ||
| } | ||
| }, | ||
| "input": { | ||
| "read": { | ||
| "baseSchema": { | ||
| "names": [ | ||
| "p1" | ||
| ], | ||
| "struct": { | ||
| "types": [ | ||
| { | ||
| "i64": { | ||
| "nullability": "NULLABILITY_NULLABLE" | ||
| } | ||
| } | ||
| ], | ||
| "nullability": "NULLABILITY_REQUIRED" | ||
| } | ||
| }, | ||
| "projection": { | ||
| "select": { | ||
| "structItems": [ | ||
| {} | ||
| ] | ||
| } | ||
| }, | ||
| "namedTable": { | ||
| "names": [ | ||
| "data3" | ||
| ] | ||
| } | ||
| } | ||
| }, | ||
| "expressions": [ | ||
| { | ||
| "scalarFunction": { | ||
| "functionReference": 2, | ||
| "arguments": [ | ||
| { | ||
| "value": { | ||
| "scalarFunction": { | ||
| "arguments": [ | ||
| { | ||
| "value": { | ||
| "scalarFunction": { | ||
| "arguments": [ | ||
| { | ||
| "value": { | ||
| "selection": { | ||
| "directReference": { | ||
| "structField": {} | ||
| }, | ||
| "rootReference": {} | ||
| } | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "value": { | ||
| "lambda": { | ||
| "parameters": { | ||
| "types": [ | ||
| { | ||
| "list": { | ||
| "type": { | ||
| "i64": { | ||
| "nullability": "NULLABILITY_NULLABLE" | ||
| } | ||
| }, | ||
| "nullability": "NULLABILITY_NULLABLE" | ||
| } | ||
| }, | ||
| { | ||
| "i64": { | ||
| "nullability": "NULLABILITY_NULLABLE" | ||
| } | ||
| } | ||
| ], | ||
| "nullability": "NULLABILITY_REQUIRED" | ||
| }, | ||
| "body": { | ||
| "scalarFunction": { | ||
| "functionReference": 3, | ||
| "arguments": [ | ||
| { | ||
| "value": { | ||
| "scalarFunction": { | ||
| "functionReference": 2, | ||
| "arguments": [ | ||
| { | ||
| "value": { | ||
| "selection": { | ||
| "directReference": { | ||
| "structField": {} | ||
| }, | ||
| "lambdaParameterReference": {} | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "value": { | ||
| "lambda": { | ||
| "parameters": { | ||
| "types": [ | ||
| { | ||
| "i64": { | ||
| "nullability": "NULLABILITY_NULLABLE" | ||
| } | ||
| }, | ||
| { | ||
| "i64": { | ||
| "nullability": "NULLABILITY_NULLABLE" | ||
| } | ||
| } | ||
| ], | ||
| "nullability": "NULLABILITY_REQUIRED" | ||
| }, | ||
| "body": { | ||
| "scalarFunction": { | ||
| "functionReference": 1, | ||
| "arguments": [ | ||
| { | ||
| "value": { | ||
| "scalarFunction": { | ||
| "functionReference": 1, | ||
| "arguments": [ | ||
| { | ||
| "value": { | ||
| "selection": { | ||
| "directReference": { | ||
| "structField": {} | ||
| }, | ||
| "lambdaParameterReference": {} | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "value": { | ||
| "selection": { | ||
| "directReference": { | ||
| "structField": { | ||
| "field": 1 | ||
| } | ||
| }, | ||
| "lambdaParameterReference": { | ||
| "stepsOut": 1 | ||
| } | ||
| } | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "value": { | ||
| "selection": { | ||
| "directReference": { | ||
| "structField": { | ||
| "field": 1 | ||
| } | ||
| }, | ||
| "lambdaParameterReference": {} | ||
| } | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "value": { | ||
| "scalarFunction": { | ||
| "functionReference": 2, | ||
| "arguments": [ | ||
| { | ||
| "value": { | ||
| "selection": { | ||
| "directReference": { | ||
| "structField": {} | ||
| }, | ||
| "lambdaParameterReference": {} | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "value": { | ||
| "lambda": { | ||
| "parameters": { | ||
| "types": [ | ||
| { | ||
| "i64": { | ||
| "nullability": "NULLABILITY_NULLABLE" | ||
| } | ||
| }, | ||
| { | ||
| "i64": { | ||
| "nullability": "NULLABILITY_NULLABLE" | ||
| } | ||
| } | ||
| ], | ||
| "nullability": "NULLABILITY_REQUIRED" | ||
| }, | ||
| "body": { | ||
| "scalarFunction": { | ||
| "functionReference": 1, | ||
| "arguments": [ | ||
| { | ||
| "value": { | ||
| "scalarFunction": { | ||
| "functionReference": 1, | ||
| "arguments": [ | ||
| { | ||
| "value": { | ||
| "selection": { | ||
| "directReference": { | ||
| "structField": {} | ||
| }, | ||
| "lambdaParameterReference": {} | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "value": { | ||
| "selection": { | ||
| "directReference": { | ||
| "structField": { | ||
| "field": 1 | ||
| } | ||
| }, | ||
| "lambdaParameterReference": { | ||
| "stepsOut": 1 | ||
| } | ||
| } | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "value": { | ||
| "selection": { | ||
| "directReference": { | ||
| "structField": { | ||
| "field": 1 | ||
| } | ||
| }, | ||
| "lambdaParameterReference": {} | ||
| } | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| }, | ||
| "names": [ | ||
| "array_transform2(make_array(make_array(data3.p1)),(v, i) -> array_concat(array_transform2(v,(v, j) -> v * i * j),array_transform2(v,(v, j) -> v * i * j)))" | ||
| ] | ||
| } | ||
| } | ||
| ] | ||
| } No newline at end of file |
There was a problem hiding this comment.
There are a few things that were wrong with this plan:
- missing URN declarations
- missing
outputTypein scalar function invocations
The Substrait validation in DataFusion does not catch these issues, but in the interest of keeping checked-in Substrait fixtures structurally valid, I think it is better to use this updated plan.
There are a few other validity issues I noticed, but I don't think this PR needs to solve the broader DataFusion Substrait extension story:
- Function names in Substrait extension declarations should be signatures, e.g.
add:i8_i8rather than justadd. DataFusion has permissive handling for this on the consumer side, but I think that should be fixed separately. - Some referenced functions here, such as
array_transform2,make_array, andarray_concat, do not correspond to extension YAML declarations. That also seems like a broader DataFusion Substrait issue and can be resolved later.
| { | |
| "version": { | |
| "minorNumber": 85, | |
| "producer": "datafusion" | |
| }, | |
| "extensions": [ | |
| { | |
| "extensionFunction": { | |
| "extensionUrnReference": 4294967295, | |
| "functionAnchor": 2, | |
| "name": "array_transform2" | |
| } | |
| }, | |
| { | |
| "extensionFunction": { | |
| "extensionUrnReference": 4294967295, | |
| "name": "make_array" | |
| } | |
| }, | |
| { | |
| "extensionFunction": { | |
| "extensionUrnReference": 4294967295, | |
| "functionAnchor": 3, | |
| "name": "array_concat" | |
| } | |
| }, | |
| { | |
| "extensionFunction": { | |
| "extensionUrnReference": 4294967295, | |
| "functionAnchor": 1, | |
| "name": "multiply" | |
| } | |
| } | |
| ], | |
| "relations": [ | |
| { | |
| "root": { | |
| "input": { | |
| "project": { | |
| "common": { | |
| "emit": { | |
| "outputMapping": [ | |
| 1 | |
| ] | |
| } | |
| }, | |
| "input": { | |
| "read": { | |
| "baseSchema": { | |
| "names": [ | |
| "p1" | |
| ], | |
| "struct": { | |
| "types": [ | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| ], | |
| "nullability": "NULLABILITY_REQUIRED" | |
| } | |
| }, | |
| "projection": { | |
| "select": { | |
| "structItems": [ | |
| {} | |
| ] | |
| } | |
| }, | |
| "namedTable": { | |
| "names": [ | |
| "data3" | |
| ] | |
| } | |
| } | |
| }, | |
| "expressions": [ | |
| { | |
| "scalarFunction": { | |
| "functionReference": 2, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "arguments": [ | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "arguments": [ | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": {} | |
| }, | |
| "rootReference": {} | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "lambda": { | |
| "parameters": { | |
| "types": [ | |
| { | |
| "list": { | |
| "type": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| ], | |
| "nullability": "NULLABILITY_REQUIRED" | |
| }, | |
| "body": { | |
| "scalarFunction": { | |
| "functionReference": 3, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "functionReference": 2, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": {} | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "lambda": { | |
| "parameters": { | |
| "types": [ | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| ], | |
| "nullability": "NULLABILITY_REQUIRED" | |
| }, | |
| "body": { | |
| "scalarFunction": { | |
| "functionReference": 1, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "functionReference": 1, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": {} | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": { | |
| "field": 1 | |
| } | |
| }, | |
| "lambdaParameterReference": { | |
| "stepsOut": 1 | |
| } | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": { | |
| "field": 1 | |
| } | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "functionReference": 2, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": {} | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "lambda": { | |
| "parameters": { | |
| "types": [ | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| ], | |
| "nullability": "NULLABILITY_REQUIRED" | |
| }, | |
| "body": { | |
| "scalarFunction": { | |
| "functionReference": 1, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "functionReference": 1, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": {} | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": { | |
| "field": 1 | |
| } | |
| }, | |
| "lambdaParameterReference": { | |
| "stepsOut": 1 | |
| } | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": { | |
| "field": 1 | |
| } | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| ] | |
| } | |
| }, | |
| "names": [ | |
| "array_transform2(make_array(make_array(data3.p1)),(v, i) -> array_concat(array_transform2(v,(v, j) -> v * i * j),array_transform2(v,(v, j) -> v * i * j)))" | |
| ] | |
| } | |
| } | |
| ] | |
| } | |
| { | |
| "version": { | |
| "minorNumber": 85, | |
| "producer": "datafusion" | |
| }, | |
| "extensions": [ | |
| { | |
| "extensionFunction": { | |
| "extensionUrnReference": 2, | |
| "functionAnchor": 2, | |
| "name": "array_transform2" | |
| } | |
| }, | |
| { | |
| "extensionFunction": { | |
| "extensionUrnReference": 2, | |
| "name": "make_array" | |
| } | |
| }, | |
| { | |
| "extensionFunction": { | |
| "extensionUrnReference": 2, | |
| "functionAnchor": 3, | |
| "name": "array_concat" | |
| } | |
| }, | |
| { | |
| "extensionFunction": { | |
| "extensionUrnReference": 1, | |
| "functionAnchor": 1, | |
| "name": "multiply" | |
| } | |
| } | |
| ], | |
| "relations": [ | |
| { | |
| "root": { | |
| "input": { | |
| "project": { | |
| "common": { | |
| "emit": { | |
| "outputMapping": [ | |
| 1 | |
| ] | |
| } | |
| }, | |
| "input": { | |
| "read": { | |
| "baseSchema": { | |
| "names": [ | |
| "p1" | |
| ], | |
| "struct": { | |
| "types": [ | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| ], | |
| "nullability": "NULLABILITY_REQUIRED" | |
| } | |
| }, | |
| "projection": { | |
| "select": { | |
| "structItems": [ | |
| {} | |
| ] | |
| } | |
| }, | |
| "namedTable": { | |
| "names": [ | |
| "data3" | |
| ] | |
| } | |
| } | |
| }, | |
| "expressions": [ | |
| { | |
| "scalarFunction": { | |
| "functionReference": 2, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "arguments": [ | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "arguments": [ | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": {} | |
| }, | |
| "rootReference": {} | |
| } | |
| } | |
| } | |
| ], | |
| "outputType": { | |
| "list": { | |
| "type": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| } | |
| } | |
| } | |
| ], | |
| "outputType": { | |
| "list": { | |
| "type": { | |
| "list": { | |
| "type": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "lambda": { | |
| "parameters": { | |
| "types": [ | |
| { | |
| "list": { | |
| "type": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| ], | |
| "nullability": "NULLABILITY_REQUIRED" | |
| }, | |
| "body": { | |
| "scalarFunction": { | |
| "functionReference": 3, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "functionReference": 2, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": {} | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "lambda": { | |
| "parameters": { | |
| "types": [ | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| ], | |
| "nullability": "NULLABILITY_REQUIRED" | |
| }, | |
| "body": { | |
| "scalarFunction": { | |
| "functionReference": 1, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "functionReference": 1, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": {} | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": { | |
| "field": 1 | |
| } | |
| }, | |
| "lambdaParameterReference": { | |
| "stepsOut": 1 | |
| } | |
| } | |
| } | |
| } | |
| ], | |
| "outputType": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": { | |
| "field": 1 | |
| } | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| } | |
| ], | |
| "outputType": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| ], | |
| "outputType": { | |
| "list": { | |
| "type": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "functionReference": 2, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": {} | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "lambda": { | |
| "parameters": { | |
| "types": [ | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| ], | |
| "nullability": "NULLABILITY_REQUIRED" | |
| }, | |
| "body": { | |
| "scalarFunction": { | |
| "functionReference": 1, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "scalarFunction": { | |
| "functionReference": 1, | |
| "arguments": [ | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": {} | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": { | |
| "field": 1 | |
| } | |
| }, | |
| "lambdaParameterReference": { | |
| "stepsOut": 1 | |
| } | |
| } | |
| } | |
| } | |
| ], | |
| "outputType": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| } | |
| } | |
| }, | |
| { | |
| "value": { | |
| "selection": { | |
| "directReference": { | |
| "structField": { | |
| "field": 1 | |
| } | |
| }, | |
| "lambdaParameterReference": {} | |
| } | |
| } | |
| } | |
| ], | |
| "outputType": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| ], | |
| "outputType": { | |
| "list": { | |
| "type": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| } | |
| } | |
| } | |
| ], | |
| "outputType": { | |
| "list": { | |
| "type": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| ], | |
| "outputType": { | |
| "list": { | |
| "type": { | |
| "list": { | |
| "type": { | |
| "i64": { | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| }, | |
| "nullability": "NULLABILITY_NULLABLE" | |
| } | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| }, | |
| "names": [ | |
| "array_transform2(make_array(make_array(data3.p1)),(v, i) -> array_concat(array_transform2(v,(v, j) -> v * i * j),array_transform2(v,(v, j) -> v * i * j)))" | |
| ] | |
| } | |
| } | |
| ], | |
| "extensionUrns": [ | |
| { | |
| "extensionUrnAnchor": 1, | |
| "urn": "extension:io.substrait:functions_arithmetic" | |
| }, | |
| { | |
| "extensionUrnAnchor": 2, | |
| "urn": "extension:io.substrait:functions_list" | |
| } | |
| ] | |
| } | |
There was a problem hiding this comment.
Thank you for the patch. I realized now that I could have checked with the validator/others consumers instead of trusting the datafusion producer, my bad.
There was a problem hiding this comment.
Ah no worries, there isn't really a canonical validator in substrait so it isn't obvious how to do that IMO.
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn higher_order_function42() -> Result<()> { |
There was a problem hiding this comment.
I don't dispute that the implementation is correct, but this test is tough to read.
My understanding is that roundtrip tests generally catch most consistency issues, and so the goal of tests here is to specifically validate the things which could be incorrect but difficult to validate via roundtrip tests.
What do you think about focusing exclusively on the lambda parameter refs? Something like:
#[tokio::test]
async fn serialize_nested_lambda_references() -> Result<()> {
let ctx = higher_order_function_ctx().await?;
let df = ctx
.sql(
"SELECT array_transform2(
[[data3.p1]],
(v, i) -> array_transform2(v, (v, j) -> v * i * j)
) FROM data3",
)
.await?;
let datafusion_plan = df.into_optimized_plan()?;
let plan = to_substrait_plan(&datafusion_plan, &ctx.state())?
.as_ref()
.clone();
assert_eq!(
lambda_param_refs_in_expression(project_expression(&plan, 0)),
vec![
// inner array_transform2 argument: outer v
(0, 0),
// inner lambda body: v * i * j
(0, 0),
(1, 1),
(0, 1),
]
);
Ok(())
}There was a problem hiding this comment.
I think it's great, thank you. I just kept the second inner lambda, and checked the lambda parameters as well 83e261f
| })?; | ||
|
|
||
| let named_lambda_parameters = | ||
| std::iter::zip(&l.params, lambda_parameters) |
There was a problem hiding this comment.
What happens if the length of l.params and lambda_parameters differ? Can this happen?
There was a problem hiding this comment.
Is a error if the expression define more parameters (l.params) than the function reports as supported (lambda_parameters), thanks 347bf12
The inverse just means that not all supported parameters were used
There was a problem hiding this comment.
Should we include tests here to check that the following two things result in an error?
- missing parameters or body
- invalid steps_out or field index
Or somewhere else if you think it would be more appropriate.
Co-authored-by: Ben Bellick <36523439+benbellick@users.noreply.github.com>
benbellick
left a comment
There was a problem hiding this comment.
LGTM, great work! Excited to get this in 🚀
Feel free to summon the maintainers now.
| let Some(var) = lambda_parameters.get(field_idx) else { | ||
| return substrait_err!( | ||
| "No lambda field at index {field_idx}, got only {}", | ||
| "At lambda {steps_out} steps out, no field at index {field_idx}, got only {}", |
There was a problem hiding this comment.
nice improved error msg!
| Ok(ctx) | ||
| } | ||
|
|
||
| fn collect_lambda_ref( |
There was a problem hiding this comment.
Might be nice to put a comment here just to make it obvious to future readers what exactly this is doing.
Thanks for the review @benbellick |
Which issue does this PR close?
Part of #21172
Rationale for this change
Substrait support wasn't implemented in the core lambda support to reduce PR size
What changes are included in this PR?
Substrait consuming and producing of higher-order functions, lambdas and lambda variables
Are these changes tested?
Unit tests added to
datafusion/substrait/tests/cases/roundtrip_logical_plan.rsAre there any user-facing changes?
Yes, there are breaking changes, new methods without default implementation have been added to
SubstraitConsumerandSubstraitProducer