-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Optimize the evaluation of date_part(<col>) == <constant> when pushed down #19733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d94889a
4aa7f4e
2329c12
7ac8325
7a3e8b3
fbd5dcc
d920735
d3318ff
9fb245b
5ffb704
372f704
2fdc14c
c2b0cd3
a0b6564
0a24d60
86b7627
0158662
b491d4f
59235de
9ae434e
9f845e7
08ef1f1
57f6c4c
e4dc727
f36257e
6bceb43
13f1164
d902f65
6992c8f
b456a22
3669fb9
bb6625f
aeafe1a
62b0841
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ use std::any::Any; | |
| use std::str::FromStr; | ||
| use std::sync::Arc; | ||
|
|
||
| use arrow::array::timezone::Tz; | ||
| use arrow::array::{Array, ArrayRef, Float64Array, Int32Array}; | ||
| use arrow::compute::kernels::cast_utils::IntervalUnit; | ||
| use arrow::compute::{DatePart, binary, date_part}; | ||
|
|
@@ -27,8 +28,10 @@ use arrow::datatypes::DataType::{ | |
| }; | ||
| use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; | ||
| use arrow::datatypes::{ | ||
| DataType, Field, FieldRef, IntervalUnit as ArrowIntervalUnit, TimeUnit, | ||
| DataType, Date32Type, Date64Type, Field, FieldRef, IntervalUnit as ArrowIntervalUnit, | ||
| TimeUnit, | ||
| }; | ||
| use chrono::{Datelike, NaiveDate, TimeZone, Utc}; | ||
| use datafusion_common::types::{NativeType, logical_date}; | ||
|
|
||
| use datafusion_common::{ | ||
|
|
@@ -44,9 +47,11 @@ use datafusion_common::{ | |
| types::logical_string, | ||
| utils::take_function_args, | ||
| }; | ||
| use datafusion_expr::preimage::PreimageResult; | ||
| use datafusion_expr::simplify::SimplifyContext; | ||
| use datafusion_expr::{ | ||
| ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature, | ||
| TypeSignature, Volatility, | ||
| ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDFImpl, Signature, | ||
| TypeSignature, Volatility, interval_arithmetic, | ||
| }; | ||
| use datafusion_expr_common::signature::{Coercion, TypeSignatureClass}; | ||
| use datafusion_macros::user_doc; | ||
|
|
@@ -237,6 +242,71 @@ impl ScalarUDFImpl for DatePartFunc { | |
| }) | ||
| } | ||
|
|
||
| // Only casting the year is supported since pruning other IntervalUnit is not possible | ||
| // date_part(col, YEAR) = 2024 => col >= '2024-01-01' and col < '2025-01-01' | ||
| // But for anything less than YEAR simplifying is not possible without specifying the bigger interval | ||
| // date_part(col, MONTH) = 1 => col = '2023-01-01' or col = '2024-01-01' or ... or col = '3000-01-01' | ||
| fn preimage( | ||
| &self, | ||
| args: &[Expr], | ||
| lit_expr: &Expr, | ||
| info: &SimplifyContext, | ||
| ) -> Result<PreimageResult> { | ||
| let [part, col_expr] = take_function_args(self.name(), args)?; | ||
|
|
||
| // Get the interval unit from the part argument | ||
| let interval_unit = part | ||
| .as_literal() | ||
| .and_then(|sv| sv.try_as_str().flatten()) | ||
| .map(part_normalization) | ||
| .and_then(|s| IntervalUnit::from_str(s).ok()); | ||
|
|
||
| // only support extracting year | ||
| match interval_unit { | ||
| Some(IntervalUnit::Year) => (), | ||
| _ => return Ok(PreimageResult::None), | ||
| } | ||
|
|
||
| // Check if the argument is a literal (e.g. date_part(YEAR, col) = 2024) | ||
| let Some(argument_literal) = lit_expr.as_literal() else { | ||
| return Ok(PreimageResult::None); | ||
| }; | ||
|
|
||
| // Extract i32 year from Scalar value | ||
| let year = match argument_literal { | ||
| ScalarValue::Int32(Some(y)) => *y, | ||
| _ => return Ok(PreimageResult::None), | ||
| }; | ||
|
|
||
| // Can only extract year from Date32/64 and Timestamp column | ||
| let target_type = match info.get_data_type(col_expr)? { | ||
| Date32 | Date64 | Timestamp(_, _) => &info.get_data_type(col_expr)?, | ||
| _ => return Ok(PreimageResult::None), | ||
| }; | ||
|
|
||
| // Compute the Interval bounds | ||
| let Some(start_time) = NaiveDate::from_ymd_opt(year, 1, 1) else { | ||
| return Ok(PreimageResult::None); | ||
| }; | ||
| let Some(end_time) = start_time.with_year(year + 1) else { | ||
| return Ok(PreimageResult::None); | ||
| }; | ||
|
|
||
| // Convert to ScalarValues | ||
| let (Some(lower), Some(upper)) = ( | ||
| date_to_scalar(start_time, target_type), | ||
| date_to_scalar(end_time, target_type), | ||
| ) else { | ||
| return Ok(PreimageResult::None); | ||
| }; | ||
| let interval = Box::new(interval_arithmetic::Interval::try_new(lower, upper)?); | ||
|
|
||
| Ok(PreimageResult::Range { | ||
| expr: col_expr.clone(), | ||
| interval, | ||
| }) | ||
| } | ||
|
|
||
| fn aliases(&self) -> &[String] { | ||
| &self.aliases | ||
| } | ||
|
|
@@ -251,6 +321,52 @@ fn is_epoch(part: &str) -> bool { | |
| matches!(part.to_lowercase().as_str(), "epoch") | ||
| } | ||
|
|
||
| fn date_to_scalar(date: NaiveDate, target_type: &DataType) -> Option<ScalarValue> { | ||
| Some(match target_type { | ||
| Date32 => ScalarValue::Date32(Some(Date32Type::from_naive_date(date))), | ||
| Date64 => ScalarValue::Date64(Some(Date64Type::from_naive_date(date))), | ||
|
|
||
| Timestamp(unit, tz_opt) => { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels to me like this code should be able to re-use the conversion functions in arrow rather than re-implementing them here For example Date32 => ScalarValue::Date32(Some(Date32Type::from_naive_date(date))),
Date64 => ScalarValue::Date64(Some(Date64Type::from_naive_date(date))),
...I didn't have a chance to figure out how to do it for Timestamp, but it seems like there should be a function like this for the timestamps too -- for example https://docs.rs/arrow/latest/arrow/array/types/struct.TimestampSecondType.html and https://docs.rs/arrow/latest/arrow/datatypes/trait.ArrowTimestampType.html Maybe something like TimestampSecondType::make_value(date)(We will have to figure out timestamps)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed date32/64 62b0841 As for timestamp: dyn TimestampType::make_value() is using a |
||
| let naive_midnight = date.and_hms_opt(0, 0, 0)?; | ||
|
|
||
| let utc_dt = if let Some(tz_str) = tz_opt { | ||
| let tz: Tz = tz_str.parse().ok()?; | ||
|
|
||
| let local = tz.from_local_datetime(&naive_midnight); | ||
|
|
||
| let local_dt = match local { | ||
| chrono::offset::LocalResult::Single(dt) => dt, | ||
| chrono::offset::LocalResult::Ambiguous(dt1, _dt2) => dt1, | ||
| chrono::offset::LocalResult::None => local.earliest()?, | ||
| }; | ||
|
|
||
| local_dt.with_timezone(&Utc) | ||
| } else { | ||
| Utc.from_utc_datetime(&naive_midnight) | ||
| }; | ||
|
|
||
| match unit { | ||
| Second => { | ||
| ScalarValue::TimestampSecond(Some(utc_dt.timestamp()), tz_opt.clone()) | ||
| } | ||
| Millisecond => ScalarValue::TimestampMillisecond( | ||
| Some(utc_dt.timestamp_millis()), | ||
| tz_opt.clone(), | ||
| ), | ||
| Microsecond => ScalarValue::TimestampMicrosecond( | ||
| Some(utc_dt.timestamp_micros()), | ||
| tz_opt.clone(), | ||
| ), | ||
| Nanosecond => ScalarValue::TimestampNanosecond( | ||
| Some(utc_dt.timestamp_nanos_opt()?), | ||
| tz_opt.clone(), | ||
| ), | ||
| } | ||
| } | ||
| _ => return None, | ||
| }) | ||
| } | ||
|
|
||
| // Try to remove quote if exist, if the quote is invalid, return original string and let the downstream function handle the error | ||
| fn part_normalization(part: &str) -> &str { | ||
| part.strip_prefix(|c| c == '\'' || c == '\"') | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a section to
docs/source/library-user-guide/functions/adding-udfs.mdexplaining:date_part)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a PR for
preimagedoc improvement here #20008.I am however, not sure that this doc needs to explain
preimage. I think the doc's goal is to be a very minimal guide on adding and registering a function. There is also no mention ofsimplifytoo.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the API docs is probably adequate. We could potentially add a note to
adding-udfs.mdthat says something generic like "The ScalarUDFImpl has additional methods that support specialized optimizations such aspreimage-- see the API documentation for additional details"There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A separate PR should do.