diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 8b1dd9b044..3513b8b495 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -310,9 +310,10 @@ object CometConf extends ShimCometConf { val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled") .category(CATEGORY_ENABLE_EXEC) - .doc("Experimental support for Sort Merge Join with filter") + .doc("Support for Sort Merge Join with filter. " + + "Deprecated: this config will be removed in a future release.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COMET_TRACING_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.tracing.enabled") .category(CATEGORY_TUNING) diff --git a/native/Cargo.lock b/native/Cargo.lock index 6cb814c51c..cd52b5dbf8 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -23,7 +23,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" dependencies = [ - "crypto-common", + "crypto-common 0.1.7", "generic-array", ] @@ -184,7 +184,7 @@ checksum = "36fa98bc79671c7981272d91a8753a928ff6a1cd8e4f20a44c45bd5d313840bf" dependencies = [ "bigdecimal", "bon", - "digest", + "digest 0.10.7", "log", "miniz_oxide", "num-bigint", @@ -357,6 +357,7 @@ dependencies = [ "arrow-select", "flatbuffers", "lz4_flex", + "zstd", ] [[package]] @@ -492,9 +493,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.41" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0f9ee0f6e02ffd7ad5816e9464499fba7b3effd01123b515c41d1697c43dad1" +checksum = "e79b3f8a79cccc2898f31920fc69f304859b3bd567490f75ebf51ae1c792a9ac" dependencies = [ "compression-codecs", "compression-core", @@ -626,9 +627,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-config" -version = "1.8.15" +version = "1.8.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11493b0bad143270fb8ad284a096dd529ba91924c5409adeac856cc1bf047dbc" +checksum = "50f156acdd2cf55f5aa53ee416c4ac851cf1222694506c0b1f78c85695e9ca9d" dependencies = [ "aws-credential-types", "aws-runtime", @@ -646,7 +647,7 @@ dependencies = [ "fastrand", "hex", "http 1.4.0", - "sha1", + "sha1 0.10.6", "time", "tokio", "tracing", @@ -690,9 +691,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.7.2" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fc0651c57e384202e47153c1260b84a9936e19803d747615edf199dc3b98d17" +checksum = "5dcd93c82209ac7413532388067dce79be5a8780c1786e5fae3df22e4dee2864" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -715,9 +716,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.97.0" +version = "1.98.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aadc669e184501caaa6beafb28c6267fc1baef0810fb58f9b205485ca3f2567" +checksum = "d69c77aafa20460c68b6b3213c84f6423b6e76dbf89accd3e1789a686ffd9489" dependencies = [ "aws-credential-types", "aws-runtime", @@ -739,9 +740,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.99.0" +version = "1.100.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1342a7db8f358d3de0aed2007a0b54e875458e39848d54cc1d46700b2bfcb0a8" +checksum = "1c7e7b09346d5ca22a2a08267555843a6a0127fb20d8964cb6ecfb8fdb190225" dependencies = [ "aws-credential-types", "aws-runtime", @@ -763,9 +764,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.101.0" +version = "1.103.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab41ad64e4051ecabeea802d6a17845a91e83287e1dd249e6963ea1ba78c428a" +checksum = "c2249b81a2e73a8027c41c378463a81ec39b8510f184f2caab87de912af0f49b" dependencies = [ "aws-credential-types", "aws-runtime", @@ -788,9 +789,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.4.2" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0b660013a6683ab23797778e21f1f854744fdf05f68204b4cca4c8c04b5d1f4" +checksum = "68dc0b907359b120170613b5c09ccc61304eac3998ff6274b97d93ee6490115a" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -799,11 +800,11 @@ dependencies = [ "bytes", "form_urlencoded", "hex", - "hmac", + "hmac 0.13.0", "http 0.2.12", "http 1.4.0", "percent-encoding", - "sha2", + "sha2 0.11.0", "time", "tracing", ] @@ -980,9 +981,9 @@ dependencies = [ [[package]] name = "aws-types" -version = "1.3.14" +version = "1.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47c8323699dd9b3c8d5b3c13051ae9cdef58fd179957c882f8374dd8725962d9" +checksum = "2f4bbcaa9304ea40902d3d5f42a0428d1bd895a2b0f6999436fb279ffddc58ac" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -1120,14 +1121,14 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" dependencies = [ - "digest", + "digest 0.10.7", ] [[package]] name = "blake3" -version = "1.8.4" +version = "1.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d2d5991425dfd0785aed03aedcf0b321d61975c9b5b3689c774a2610ae0b51e" +checksum = "0aa83c34e62843d924f905e0f5c866eb1dd6545fc4d719e803d9ba6030371fce" dependencies = [ "arrayref", "arrayvec", @@ -1146,6 +1147,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-buffer" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdd35008169921d80bc60d3d0ab416eecb028c4cd653352907921d95084790be" +dependencies = [ + "hybrid-array", +] + [[package]] name = "block-padding" version = "0.3.3" @@ -1284,9 +1294,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.60" +version = "1.2.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43c5703da9466b66a946814e1adf53ea2c90f10063b86290cc9eb67ce3478a20" +checksum = "d16d90359e986641506914ba71350897565610e87ce0ad9e6f28569db3dd5c6d" dependencies = [ "find-msvc-tools", "jobserver", @@ -1389,7 +1399,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" dependencies = [ - "crypto-common", + "crypto-common 0.1.7", "inout", ] @@ -1453,6 +1463,12 @@ dependencies = [ "cc", ] +[[package]] +name = "cmov" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f88a43d011fc4a6876cb7344703e297c71dda42494fee094d5f7c76bf13f746" + [[package]] name = "colorchoice" version = "1.0.5" @@ -1481,9 +1497,9 @@ dependencies = [ [[package]] name = "compression-codecs" -version = "0.4.37" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb7b51a7d9c967fc26773061ba86150f19c50c0d65c887cb1fbe295fd16619b7" +checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf" dependencies = [ "bzip2", "compression-core", @@ -1496,9 +1512,9 @@ dependencies = [ [[package]] name = "compression-core" -version = "0.4.31" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" +checksum = "cc14f565cf027a105f7a44ccf9e5b424348421a1d8952a8fc9d499d313107789" [[package]] name = "concurrent-queue" @@ -1515,6 +1531,12 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const-oid" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" + [[package]] name = "const-random" version = "0.1.18" @@ -1559,9 +1581,9 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpp_demangle" -version = "0.5.1" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0667304c32ea56cb4cd6d2d7c0cfe9a2f8041229db8c033af7f8d69492429def" +checksum = "f2bb79cb74d735044c972aae58ed0aaa9a837e85b01106a54c39e42e97f62253" dependencies = [ "cfg-if", ] @@ -1688,6 +1710,15 @@ dependencies = [ "typenum", ] +[[package]] +name = "crypto-common" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77727bb15fa921304124b128af125e7e3b968275d1b108b379190264f4423710" +dependencies = [ + "hybrid-array", +] + [[package]] name = "csv" version = "1.4.0" @@ -1734,6 +1765,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "ctutils" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d5515a3834141de9eafb9717ad39eea8247b5674e6066c404e8c4b365d2a29e" +dependencies = [ + "cmov", +] + [[package]] name = "darling" version = "0.20.11" @@ -1820,13 +1860,11 @@ dependencies = [ [[package]] name = "datafusion" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93db0e623840612f7f2cd757f7e8a8922064192363732c88692e0870016e141b" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "arrow-schema", "async-trait", - "bytes", "chrono", "datafusion-catalog", "datafusion-catalog-listing", @@ -1859,8 +1897,6 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand 0.9.4", - "regex", "sqlparser", "tempfile", "tokio", @@ -1871,8 +1907,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37cefde60b26a7f4ff61e9d2ff2833322f91df2b568d7238afe67bde5bdffb66" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "async-trait", @@ -1896,8 +1931,7 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17e112307715d6a7a331111a4c2330ff54bc237183511c319e3708a4cff431fb" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "async-trait", @@ -2090,15 +2124,15 @@ dependencies = [ [[package]] name = "datafusion-common" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d72a11ca44a95e1081870d3abb80c717496e8a7acb467a1d3e932bb636af5cc2" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ - "ahash", "arrow", "arrow-ipc", + "arrow-schema", "chrono", + "foldhash 0.2.0", "half", - "hashbrown 0.16.1", + "hashbrown 0.17.0", "hex", "indexmap 2.14.0", "itertools 0.14.0", @@ -2106,17 +2140,16 @@ dependencies = [ "log", "object_store", "parquet", - "paste", "sqlparser", "tokio", + "uuid", "web-time", ] [[package]] name = "datafusion-common-runtime" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89f4afaed29670ec4fd6053643adc749fe3f4bc9d1ce1b8c5679b22c67d12def" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "futures", "log", @@ -2126,8 +2159,7 @@ dependencies = [ [[package]] name = "datafusion-datasource" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9fb386e1691355355a96419978a0022b7947b44d4a24a6ea99f00b6b485cbb6" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "async-compression", @@ -2151,6 +2183,7 @@ dependencies = [ "liblzma", "log", "object_store", + "parking_lot", "rand 0.9.4", "tokio", "tokio-util", @@ -2161,8 +2194,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-arrow" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffa6c52cfed0734c5f93754d1c0175f558175248bf686c944fb05c373e5fc096" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "arrow-ipc", @@ -2185,8 +2217,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-csv" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "503f29e0582c1fc189578d665ff57d9300da1f80c282777d7eb67bb79fb8cdca" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "async-trait", @@ -2208,8 +2239,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-json" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33804749abc8d0c8cb7473228483cb8070e524c6f6086ee1b85a64debe2b3d2" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "async-trait", @@ -2224,7 +2254,6 @@ dependencies = [ "datafusion-session", "futures", "object_store", - "serde_json", "tokio", "tokio-stream", ] @@ -2232,8 +2261,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a8e0365e0e08e8ff94d912f0ababcf9065a1a304018ba90b1fc83c855b4997" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "async-trait", @@ -2243,6 +2271,7 @@ dependencies = [ "datafusion-datasource", "datafusion-execution", "datafusion-expr", + "datafusion-functions", "datafusion-functions-aggregate-common", "datafusion-physical-expr", "datafusion-physical-expr-adapter", @@ -2262,19 +2291,16 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de6ac0df1662b9148ad3c987978b32cbec7c772f199b1d53520c8fa764a87ee" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" [[package]] name = "datafusion-execution" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c03c7fbdaefcca4ef6ffe425a5fc2325763bfb426599bb0bf4536466efabe709" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "arrow-buffer", "async-trait", - "chrono", "dashmap", "datafusion-common", "datafusion-expr", @@ -2292,10 +2318,10 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "574b9b6977fedbd2a611cbff12e5caf90f31640ad9dc5870f152836d94bad0dd" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", + "arrow-schema", "async-trait", "chrono", "datafusion-common", @@ -2306,7 +2332,6 @@ dependencies = [ "datafusion-physical-expr-common", "indexmap 2.14.0", "itertools 0.14.0", - "paste", "serde_json", "sqlparser", ] @@ -2314,21 +2339,18 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d7c3adf3db8bf61e92eb90cb659c8e8b734593a8f7c8e12a843c7ddba24b87e" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "datafusion-common", "indexmap 2.14.0", "itertools 0.14.0", - "paste", ] [[package]] name = "datafusion-functions" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28aa4e10384e782774b10e72aca4d93ef7b31aa653095d9d4536b0a3dbc51b6" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "arrow-buffer", @@ -2351,18 +2373,15 @@ dependencies = [ "num-traits", "rand 0.9.4", "regex", - "sha2", - "unicode-segmentation", + "sha2 0.10.9", "uuid", ] [[package]] name = "datafusion-functions-aggregate" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00aa6217e56098ba84e0a338176fe52f0a84cca398021512c6c8c5eff806d0ad" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ - "ahash", "arrow", "datafusion-common", "datafusion-doc", @@ -2372,19 +2391,17 @@ dependencies = [ "datafusion-macros", "datafusion-physical-expr", "datafusion-physical-expr-common", + "foldhash 0.2.0", "half", "log", "num-traits", - "paste", ] [[package]] name = "datafusion-functions-aggregate-common" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b511250349407db7c43832ab2de63f5557b19a20dfd236b39ca2c04468b50d47" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ - "ahash", "arrow", "datafusion-common", "datafusion-expr-common", @@ -2394,8 +2411,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef13a858e20d50f0a9bb5e96e7ac82b4e7597f247515bccca4fdd2992df0212a" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "arrow-ord", @@ -2409,18 +2425,17 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-macros", "datafusion-physical-expr-common", - "hashbrown 0.16.1", + "hashbrown 0.17.0", "itertools 0.14.0", "itoa", "log", - "paste", + "memchr", ] [[package]] name = "datafusion-functions-table" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b40d3f5bbb3905f9ccb1ce9485a9595c77b69758a7c24d3ba79e334ff51e7e" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "async-trait", @@ -2429,14 +2444,12 @@ dependencies = [ "datafusion-expr", "datafusion-physical-plan", "parking_lot", - "paste", ] [[package]] name = "datafusion-functions-window" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4e88ec9d57c9b685d02f58bfee7be62d72610430ddcedb82a08e5d9925dbfb6" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "datafusion-common", @@ -2447,14 +2460,12 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "log", - "paste", ] [[package]] name = "datafusion-functions-window-common" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8307bb93519b1a91913723a1130cfafeee3f72200d870d88e91a6fc5470ede5c" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -2463,8 +2474,7 @@ dependencies = [ [[package]] name = "datafusion-macros" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e367e6a71051d0ebdd29b2f85d12059b38b1d1f172c6906e80016da662226bd" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "datafusion-doc", "quote", @@ -2474,8 +2484,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e929015451a67f77d9d8b727b2bf3a40c4445fdef6cdc53281d7d97c76888ace" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "chrono", @@ -2493,10 +2502,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b1e68aba7a4b350401cfdf25a3d6f989ad898a7410164afe9ca52080244cb59" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ - "ahash", "arrow", "datafusion-common", "datafusion-expr", @@ -2504,11 +2511,10 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", - "hashbrown 0.16.1", + "hashbrown 0.17.0", "indexmap 2.14.0", "itertools 0.14.0", "parking_lot", - "paste", "petgraph", "tokio", ] @@ -2516,8 +2522,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-adapter" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea22315f33cf2e0adc104e8ec42e285f6ed93998d565c65e82fec6a9ee9f9db4" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "datafusion-common", @@ -2531,15 +2536,13 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b04b45ea8ad3ac2d78f2ea2a76053e06591c9629c7a603eda16c10649ecf4362" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ - "ahash", "arrow", "chrono", "datafusion-common", "datafusion-expr-common", - "hashbrown 0.16.1", + "hashbrown 0.17.0", "indexmap 2.14.0", "itertools 0.14.0", "parking_lot", @@ -2548,8 +2551,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cb13397809a425918f608dfe8653f332015a3e330004ab191b4404187238b95" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "datafusion-common", @@ -2566,11 +2568,10 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5edc023675791af9d5fb4cc4c24abf5f7bd3bd4dcf9e5bd90ea1eff6976dcc79" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ - "ahash", "arrow", + "arrow-data", "arrow-ord", "arrow-schema", "async-trait", @@ -2585,7 +2586,7 @@ dependencies = [ "datafusion-physical-expr-common", "futures", "half", - "hashbrown 0.16.1", + "hashbrown 0.17.0", "indexmap 2.14.0", "itertools 0.14.0", "log", @@ -2598,8 +2599,7 @@ dependencies = [ [[package]] name = "datafusion-pruning" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac8c76860e355616555081cab5968cec1af7a80701ff374510860bcd567e365a" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "datafusion-common", @@ -2608,15 +2608,13 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", - "itertools 0.14.0", "log", ] [[package]] name = "datafusion-session" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5412111aa48e2424ba926112e192f7a6b7e4ccb450145d25ce5ede9f19dc491e" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "async-trait", "datafusion-common", @@ -2629,8 +2627,7 @@ dependencies = [ [[package]] name = "datafusion-spark" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e059dcf8544da0d6598d0235be3cc29c209094a5976b2e4822e4a2cf91c2b5c5" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "bigdecimal", @@ -2645,19 +2642,19 @@ dependencies = [ "datafusion-functions-aggregate", "datafusion-functions-nested", "log", + "num-traits", "percent-encoding", "rand 0.9.4", "serde_json", - "sha1", - "sha2", + "sha1 0.11.0", + "sha2 0.10.9", "url", ] [[package]] name = "datafusion-sql" version = "53.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0d133ddf8b9b3b872acac900157f783e7b879fe9a6bccf389abebbfac45ec1" +source = "git+https://github.com/comphead/arrow-datafusion?branch=pr-21679#4f11db6661e436eb994aec482a8239e14e5734ff" dependencies = [ "arrow", "bigdecimal", @@ -2686,7 +2683,7 @@ version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" dependencies = [ - "const-oid", + "const-oid 0.9.6", "pem-rfc7468", "zeroize", ] @@ -2766,12 +2763,24 @@ version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer", - "const-oid", - "crypto-common", + "block-buffer 0.10.4", + "const-oid 0.9.6", + "crypto-common 0.1.7", "subtle", ] +[[package]] +name = "digest" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4850db49bf08e663084f7fb5c87d202ef91a3907271aff24a94eb97ff039153c" +dependencies = [ + "block-buffer 0.12.0", + "const-oid 0.10.2", + "crypto-common 0.2.1", + "ctutils", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -3268,17 +3277,17 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" -dependencies = [ - "allocator-api2", - "equivalent", - "foldhash 0.2.0", -] [[package]] name = "hashbrown" version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", +] [[package]] name = "hdfs-sys" @@ -3328,7 +3337,16 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest", + "digest 0.10.7", +] + +[[package]] +name = "hmac" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6303bc9732ae41b04cb554b844a762b4115a61bfaa81e3e83050991eeb56863f" +dependencies = [ + "digest 0.11.2", ] [[package]] @@ -3407,6 +3425,15 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" +[[package]] +name = "hybrid-array" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d46837a0ed51fe95bd3b05de33cd64a1ee88fc797477ca48446872504507c5" +dependencies = [ + "typenum", +] + [[package]] name = "hyper" version = "1.9.0" @@ -3673,9 +3700,9 @@ dependencies = [ [[package]] name = "idna_adapter" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +checksum = "cb68373c0d6620ef8105e855e7745e18b0d00d3bdb07fb532e434244cdb9a714" dependencies = [ "icu_normalizer", "icu_properties", @@ -3815,9 +3842,9 @@ dependencies = [ [[package]] name = "jiff" -version = "0.2.23" +version = "0.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a3546dc96b6d42c5f24902af9e2538e82e39ad350b0c766eb3fbf2d8f3d8359" +checksum = "f00b5dbd620d61dfdcb6007c9c1f6054ebd75319f163d886a9055cec1155073d" dependencies = [ "jiff-static", "jiff-tzdb-platform", @@ -3832,9 +3859,9 @@ dependencies = [ [[package]] name = "jiff-static" -version = "0.2.23" +version = "0.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a8c8b344124222efd714b73bb41f8b5120b27a7cc1c75593a6ff768d9d05aa4" +checksum = "e000de030ff8022ea1da3f466fbb0f3a809f5e51ed31f6dd931c35181ad8e6d7" dependencies = [ "proc-macro2", "quote", @@ -4064,9 +4091,9 @@ checksum = "b3a6a8c165077efc8f3a971534c50ea6a1a18b329ef4a66e897a7e3a1494565f" [[package]] name = "libc" -version = "0.2.185" +version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" [[package]] name = "libloading" @@ -4106,12 +4133,11 @@ checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" [[package]] name = "libmimalloc-sys" -version = "0.1.44" +version = "0.1.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "667f4fec20f29dfc6bc7357c582d91796c169ad7e2fce709468aefeb2c099870" +checksum = "2d1eacfa31c33ec25e873c136ba5669f00f9866d0688bea7be4d3f7e43067df6" dependencies = [ "cc", - "libc", ] [[package]] @@ -4208,7 +4234,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" dependencies = [ "cfg-if", - "digest", + "digest 0.10.7", ] [[package]] @@ -4237,9 +4263,9 @@ dependencies = [ [[package]] name = "mimalloc" -version = "0.1.48" +version = "0.1.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1ee66a4b64c74f4ef288bcbb9192ad9c3feaad75193129ac8509af543894fd8" +checksum = "b3627c4272df786b9260cabaa46aec1d59c93ede723d4c3ef646c503816b0640" dependencies = [ "libmimalloc-sys", ] @@ -4587,7 +4613,7 @@ dependencies = [ "percent-encoding", "quick-xml 0.38.4", "reqsign-core", - "reqwest 0.13.2", + "reqwest 0.13.3", "serde", "serde_json", "tokio", @@ -4815,8 +4841,8 @@ version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" dependencies = [ - "digest", - "hmac", + "digest 0.10.7", + "hmac 0.12.1", ] [[package]] @@ -4945,7 +4971,7 @@ dependencies = [ "der", "pbkdf2", "scrypt", - "sha2", + "sha2 0.10.9", "spki", ] @@ -5471,7 +5497,7 @@ dependencies = [ "form_urlencoded", "getrandom 0.2.17", "hex", - "hmac", + "hmac 0.12.1", "home", "http 1.4.0", "jsonwebtoken", @@ -5485,8 +5511,8 @@ dependencies = [ "rust-ini", "serde", "serde_json", - "sha1", - "sha2", + "sha1 0.10.6", + "sha2 0.10.9", "tokio", ] @@ -5502,13 +5528,13 @@ dependencies = [ "form_urlencoded", "futures", "hex", - "hmac", + "hmac 0.12.1", "http 1.4.0", "jiff", "log", "percent-encoding", - "sha1", - "sha2", + "sha1 0.10.6", + "sha2 0.10.9", "windows-sys 0.61.2", ] @@ -5557,9 +5583,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.13.2" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" +checksum = "62e0021ea2c22aed41653bc7e1419abb2c97e038ff2c33d0e1309e49a97deec0" dependencies = [ "base64", "bytes", @@ -5618,9 +5644,9 @@ dependencies = [ [[package]] name = "roaring" -version = "0.11.3" +version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ba9ce64a8f45d7fc86358410bb1a82e8c987504c0d4900e9141d69a9f26c885" +checksum = "1dedc5658c6ecb3bdb5ef5f3295bb9253f42dcf3fd1402c03f6b1f7659c3c4a9" dependencies = [ "bytemuck", "byteorder", @@ -5632,15 +5658,15 @@ version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8573f03f5883dcaebdfcf4725caa1ecb9c15b2ef50c43a07b816e06799bb12d" dependencies = [ - "const-oid", - "digest", + "const-oid 0.9.6", + "digest 0.10.7", "num-bigint-dig", "num-integer", "num-traits", "pkcs1", "pkcs8", "rand_core 0.6.4", - "sha2", + "sha2 0.10.9", "signature", "spki", "subtle", @@ -5712,9 +5738,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.38" +version = "0.23.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f9466fb2c14ea04357e91413efb882e2a6d4a406e625449bc0a5d360d53a21" +checksum = "7c2c118cb077cca2822033836dfb1b975355dfb784b5e8da48f7b6c5db74e60e" dependencies = [ "aws-lc-rs", "once_cell", @@ -5739,9 +5765,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.14.0" +version = "1.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" +checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9" dependencies = [ "web-time", "zeroize", @@ -5749,13 +5775,13 @@ dependencies = [ [[package]] name = "rustls-platform-verifier" -version = "0.6.2" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" +checksum = "26d1e2536ce4f35f4846aa13bff16bd0ff40157cdb14cc056c7b14ba41233ba0" dependencies = [ "core-foundation", "core-foundation-sys", - "jni 0.21.1", + "jni 0.22.4", "log", "once_cell", "rustls", @@ -5776,9 +5802,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" [[package]] name = "rustls-webpki" -version = "0.103.12" +version = "0.103.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" +checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" dependencies = [ "aws-lc-rs", "ring", @@ -5863,7 +5889,7 @@ checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" dependencies = [ "pbkdf2", "salsa20", - "sha2", + "sha2 0.10.9", ] [[package]] @@ -6049,7 +6075,18 @@ checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", "cpufeatures 0.2.17", - "digest", + "digest 0.10.7", +] + +[[package]] +name = "sha1" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aacc4cc499359472b4abe1bf11d0b12e688af9a805fa5e3016f9a386dc2d0214" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "digest 0.11.2", ] [[package]] @@ -6060,7 +6097,18 @@ checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", "cpufeatures 0.2.17", - "digest", + "digest 0.10.7", +] + +[[package]] +name = "sha2" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "446ba717509524cb3f22f17ecc096f10f4822d76ab5c0b9822c5f9c284e825f4" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "digest 0.11.2", ] [[package]] @@ -6085,7 +6133,7 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ - "digest", + "digest 0.10.7", "rand_core 0.6.4", ] @@ -6250,9 +6298,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "symbolic-common" -version = "12.18.1" +version = "12.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f3cdeaae6779ecba2567f20bf7716718b8c4ce6717c9def4ced18786bb11ea" +checksum = "332615d90111d8eeaf86a84dc9bbe9f65d0d8c5cf11b4caccedc37754eb0dcfd" dependencies = [ "debugid", "memmap2", @@ -6262,9 +6310,9 @@ dependencies = [ [[package]] name = "symbolic-demangle" -version = "12.18.1" +version = "12.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "672c6ad9cb8fce6a1283cc9df9070073cccad00ae241b80e3686328a64e3523b" +checksum = "912017718eb4d21930546245af9a3475c9dccf15675a5c215664e76621afc471" dependencies = [ "cpp_demangle", "rustc-demangle", @@ -6690,9 +6738,9 @@ dependencies = [ [[package]] name = "typenum" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" [[package]] name = "typetag" @@ -6748,7 +6796,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" dependencies = [ - "crypto-common", + "crypto-common 0.1.7", "subtle", ] @@ -6860,11 +6908,11 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasip2" -version = "1.0.2+wasi-0.2.9" +version = "1.0.3+wasi-0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" +checksum = "20064672db26d7cdc89c7798c48a0fdfac8213434a1186e5ef29fd560ae223d6" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.57.1", ] [[package]] @@ -6873,7 +6921,7 @@ version = "0.4.0+wasi-0.3.0-rc-2026-01-06" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.51.0", ] [[package]] @@ -7371,6 +7419,12 @@ dependencies = [ "wit-bindgen-rust-macro", ] +[[package]] +name = "wit-bindgen" +version = "0.57.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" + [[package]] name = "wit-bindgen-core" version = "0.51.0" diff --git a/native/Cargo.toml b/native/Cargo.toml index 0de5807a97..e4a9b7efd2 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -38,10 +38,10 @@ arrow = { version = "58.1.0", features = ["prettyprint", "ffi", "chrono-tz"] } async-trait = { version = "0.1" } bytes = { version = "1.11.1" } parquet = { version = "58.1.0", default-features = false, features = ["experimental"] } -datafusion = { version = "53.1.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] } -datafusion-datasource = { version = "53.1.0" } -datafusion-physical-expr-adapter = { version = "53.1.0" } -datafusion-spark = { version = "53.1.0", features = ["core"] } +datafusion = { git = "https://github.com/comphead/arrow-datafusion", branch = "pr-21679", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] } +datafusion-datasource = { git = "https://github.com/comphead/arrow-datafusion", branch = "pr-21679" } +datafusion-physical-expr-adapter = { git = "https://github.com/comphead/arrow-datafusion", branch = "pr-21679" } +datafusion-spark = { git = "https://github.com/comphead/arrow-datafusion", branch = "pr-21679", features = ["core"] } datafusion-comet-spark-expr = { path = "spark-expr" } datafusion-comet-common = { path = "common" } datafusion-comet-jni-bridge = { path = "jni-bridge" } diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index d54a03b7b6..cba8ce340e 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -91,7 +91,7 @@ jni = { version = "0.22.4", features = ["invocation"] } lazy_static = "1.4" assertables = "9" hex = "0.4.3" -datafusion-functions-nested = { version = "53.1.0" } +datafusion-functions-nested = { git = "https://github.com/comphead/arrow-datafusion", branch = "pr-21679" } [features] backtrace = ["datafusion/backtrace"] diff --git a/native/core/src/execution/expressions/arithmetic.rs b/native/core/src/execution/expressions/arithmetic.rs index 320532d773..8d4c59a010 100644 --- a/native/core/src/execution/expressions/arithmetic.rs +++ b/native/core/src/execution/expressions/arithmetic.rs @@ -77,10 +77,6 @@ impl Hash for CheckedBinaryExpr { } impl PhysicalExpr for CheckedBinaryExpr { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { self.child.fmt_sql(f) } diff --git a/native/core/src/execution/expressions/strings.rs b/native/core/src/execution/expressions/strings.rs index 7219395963..4a7c44cc3e 100644 --- a/native/core/src/execution/expressions/strings.rs +++ b/native/core/src/execution/expressions/strings.rs @@ -91,7 +91,7 @@ impl ExpressionBuilder for RlikeBuilder { let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - match right.as_any().downcast_ref::().unwrap().value() { + match right.downcast_ref::().unwrap().value() { ScalarValue::Utf8(Some(pattern)) => Ok(Arc::new(RLike::try_new(left, pattern)?)), _ => Err(ExecutionError::GeneralError( "RLike only supports scalar patterns".to_string(), diff --git a/native/core/src/execution/expressions/subquery.rs b/native/core/src/execution/expressions/subquery.rs index 9272ede60c..fc7b8104d2 100644 --- a/native/core/src/execution/expressions/subquery.rs +++ b/native/core/src/execution/expressions/subquery.rs @@ -29,7 +29,6 @@ use jni::{ sys::{jboolean, jbyte, jint, jlong, jshort}, }; use std::{ - any::Any, fmt::{Display, Formatter}, hash::Hash, sync::Arc, @@ -63,10 +62,6 @@ impl Display for Subquery { } impl PhysicalExpr for Subquery { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 395703c0f9..10634c3d78 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -35,6 +35,7 @@ use datafusion::common::{DataFusionError, Result as DataFusionResult, ScalarValu use datafusion::execution::disk_manager::DiskManagerMode; use datafusion::execution::memory_pool::MemoryPool; use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::ScalarUDF; use datafusion::{ execution::disk_manager::DiskManagerBuilder, @@ -53,6 +54,7 @@ use datafusion_spark::function::datetime::next_day::SparkNextDay; use datafusion_spark::function::hash::crc32::SparkCrc32; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; +use datafusion_spark::function::lambda::array_exists_higher_order_function; use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::math::hex::SparkHex; @@ -532,6 +534,7 @@ fn prepare_datafusion_session_context( datafusion::functions_nested::register_all(&mut session_ctx)?; register_datafusion_spark_function(&session_ctx); + session_ctx.register_higher_order_function(array_exists_higher_order_function())?; // Must be the last one to override existing functions with the same name datafusion_comet_spark_expr::register_all_comet_functions(&mut session_ctx)?; diff --git a/native/core/src/execution/memory_pools/fair_pool.rs b/native/core/src/execution/memory_pools/fair_pool.rs index e4a7ceab54..347c3d8ef6 100644 --- a/native/core/src/execution/memory_pools/fair_pool.rs +++ b/native/core/src/execution/memory_pools/fair_pool.rs @@ -16,7 +16,7 @@ // under the License. use std::{ - fmt::{Debug, Formatter, Result as FmtResult}, + fmt::{Debug, Display, Formatter, Result as FmtResult}, sync::Arc, }; @@ -83,10 +83,25 @@ impl CometFairMemoryPool { } } +impl Display for CometFairMemoryPool { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + let state = self.state.lock(); + write!( + f, + "CometFairMemoryPool(pool_size={}, used={}, num={})", + self.pool_size, state.used, state.num + ) + } +} + unsafe impl Send for CometFairMemoryPool {} unsafe impl Sync for CometFairMemoryPool {} impl MemoryPool for CometFairMemoryPool { + fn name(&self) -> &str { + "CometFairMemoryPool" + } + fn register(&self, _: &MemoryConsumer) { let mut state = self.state.lock(); state.num = state diff --git a/native/core/src/execution/memory_pools/logging_pool.rs b/native/core/src/execution/memory_pools/logging_pool.rs index c23672d01a..4e7b68a728 100644 --- a/native/core/src/execution/memory_pools/logging_pool.rs +++ b/native/core/src/execution/memory_pools/logging_pool.rs @@ -19,6 +19,7 @@ use datafusion::execution::memory_pool::{ MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, }; use log::{info, warn}; +use std::fmt; use std::sync::Arc; #[derive(Debug)] @@ -27,6 +28,12 @@ pub(crate) struct LoggingMemoryPool { pool: Arc, } +impl fmt::Display for LoggingMemoryPool { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "LoggingMemoryPool(task={})", self.task_attempt_id) + } +} + impl LoggingMemoryPool { pub fn new(task_attempt_id: u64, pool: Arc) -> Self { Self { @@ -37,6 +44,10 @@ impl LoggingMemoryPool { } impl MemoryPool for LoggingMemoryPool { + fn name(&self) -> &str { + "LoggingMemoryPool" + } + fn register(&self, consumer: &MemoryConsumer) { info!( "[Task {}] MemoryPool[{}].register()", diff --git a/native/core/src/execution/memory_pools/unified_pool.rs b/native/core/src/execution/memory_pools/unified_pool.rs index f34418ee94..e72c734d04 100644 --- a/native/core/src/execution/memory_pools/unified_pool.rs +++ b/native/core/src/execution/memory_pools/unified_pool.rs @@ -16,7 +16,7 @@ // under the License. use std::{ - fmt::{Debug, Formatter, Result as FmtResult}, + fmt::{Debug, Display, Formatter, Result as FmtResult}, sync::{ atomic::{AtomicUsize, Ordering::Relaxed}, Arc, @@ -90,10 +90,24 @@ impl Drop for CometUnifiedMemoryPool { } } +impl Display for CometUnifiedMemoryPool { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!( + f, + "CometUnifiedMemoryPool(used={})", + self.used.load(Relaxed) + ) + } +} + unsafe impl Send for CometUnifiedMemoryPool {} unsafe impl Sync for CometUnifiedMemoryPool {} impl MemoryPool for CometUnifiedMemoryPool { + fn name(&self) -> &str { + "CometUnifiedMemoryPool" + } + fn grow(&self, reservation: &MemoryReservation, additional: usize) { self.try_grow(reservation, additional).unwrap(); } diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index eb7e10bfc9..8547c0c3ca 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -68,9 +68,13 @@ pub(crate) fn to_native_metric_node( Some(metrics.aggregate_by_name()) }; - // add metrics + // Aggregate metrics by name using DataFusion's aggregate_by_name(), which + // correctly handles duplicate metric names (e.g. BaselineMetrics registered + // by both FileStream and ParquetMorselizer on the same ExecutionPlanMetricsSet). + // The additional_native_plans branch below already does this. node_metrics .unwrap_or_default() + .aggregate_by_name() .iter() .map(|m| m.value()) .map(|m| (m.name(), m.as_usize() as i64)) diff --git a/native/core/src/execution/operators/expand.rs b/native/core/src/execution/operators/expand.rs index e06fab23ec..e2677d226d 100644 --- a/native/core/src/execution/operators/expand.rs +++ b/native/core/src/execution/operators/expand.rs @@ -17,6 +17,7 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::SchemaRef; +use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::common::DataFusionError; use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -29,7 +30,6 @@ use datafusion::{ }; use futures::{Stream, StreamExt}; use std::{ - any::Any, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -91,8 +91,17 @@ impl DisplayAs for ExpandExec { } impl ExecutionPlan for ExpandExec { - fn as_any(&self) -> &dyn Any { - self + fn apply_expressions( + &self, + f: &mut dyn FnMut(&dyn PhysicalExpr) -> datafusion::common::Result, + ) -> datafusion::common::Result { + let mut tnr = TreeNodeRecursion::Continue; + for projection in &self.projections { + for expr in projection { + tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; + } + } + Ok(tnr) } fn schema(&self) -> SchemaRef { diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index d217ebc34b..a2dae49742 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -17,7 +17,6 @@ //! Native Iceberg table scan operator using iceberg-rust -use std::any::Any; use std::collections::HashMap; use std::fmt; use std::pin::Pin; @@ -26,6 +25,7 @@ use std::task::{Context, Poll}; use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions}; use arrow::datatypes::SchemaRef; +use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::common::{DataFusionError, Result as DFResult}; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::expressions::Column; @@ -108,8 +108,11 @@ impl ExecutionPlan for IcebergScanExec { "IcebergScanExec" } - fn as_any(&self) -> &dyn Any { - self + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> DFResult, + ) -> DFResult { + Ok(TreeNodeRecursion::Continue) } fn schema(&self) -> SchemaRef { diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 8ba79098d4..f66bc00186 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -18,7 +18,6 @@ //! Parquet writer operator for writing RecordBatches to Parquet files use std::{ - any::Any, collections::HashMap, fmt, fmt::{Debug, Formatter}, @@ -38,6 +37,7 @@ use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::{ error::{DataFusionError, Result}, execution::context::TaskContext, @@ -46,8 +46,8 @@ use datafusion::{ execution_plan::{Boundedness, EmissionType}, metrics::{ExecutionPlanMetricsSet, MetricsSet}, stream::RecordBatchStreamAdapter, - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, - SendableRecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, + PlanProperties, SendableRecordBatchStream, }, }; use futures::TryStreamExt; @@ -404,14 +404,17 @@ impl DisplayAs for ParquetWriterExec { #[async_trait] impl ExecutionPlan for ParquetWriterExec { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "ParquetWriterExec" } + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, + ) -> Result { + Ok(TreeNodeRecursion::Continue) + } + fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 90bb741b5e..9ac9e4586a 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -28,6 +28,7 @@ use arrow::compute::{cast_with_options, take, CastOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::ffi::FFI_ArrowArray; use arrow::ffi::FFI_ArrowSchema; +use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::metrics::{ @@ -43,7 +44,6 @@ use itertools::Itertools; use jni::objects::{Global, JObject, JValue}; use std::rc::Rc; use std::{ - any::Any, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll}, @@ -383,8 +383,11 @@ fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef { } impl ExecutionPlan for ScanExec { - fn as_any(&self) -> &dyn Any { - self + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> DataFusionResult, + ) -> DataFusionResult { + Ok(TreeNodeRecursion::Continue) } fn schema(&self) -> SchemaRef { diff --git a/native/core/src/execution/operators/shuffle_scan.rs b/native/core/src/execution/operators/shuffle_scan.rs index 92c4dc8780..18ae134484 100644 --- a/native/core/src/execution/operators/shuffle_scan.rs +++ b/native/core/src/execution/operators/shuffle_scan.rs @@ -24,6 +24,7 @@ use crate::{ }; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::common::{arrow_datafusion_err, Result as DataFusionResult}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::metrics::{ @@ -37,7 +38,6 @@ use datafusion::{ use futures::Stream; use jni::objects::{Global, JByteBuffer, JObject}; use std::{ - any::Any, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll}, @@ -221,8 +221,11 @@ fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef { } impl ExecutionPlan for ShuffleScanExec { - fn as_any(&self) -> &dyn Any { - self + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> DataFusionResult, + ) -> DataFusionResult { + Ok(TreeNodeRecursion::Continue) } fn schema(&self) -> SchemaRef { diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f3df4b522c..0fd209ce59 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -50,10 +50,10 @@ use datafusion::{ logical_expr::Operator as DataFusionOperator, physical_expr::{ expressions::{ - in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNullExpr, - Literal as DataFusionLiteral, + in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNullExpr, LambdaExpr, + LambdaVariable, Literal as DataFusionLiteral, }, - PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr, + HigherOrderFunctionExpr, PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr, }, physical_plan::{ aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy}, @@ -78,6 +78,7 @@ use crate::execution::operators::ExecutionError::GeneralError; use crate::execution::shuffle::{CometPartitioning, CompressionCodec}; use crate::execution::spark_plan::SparkPlan; use crate::parquet::parquet_support::prepare_object_store_with_configs; +use datafusion::common::datatype::FieldExt; use datafusion::common::scalar::ScalarStructBuilder; use datafusion::common::{ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter}, @@ -87,8 +88,9 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::logical_expr::type_coercion::functions::fields_with_udf; use datafusion::logical_expr::type_coercion::other::get_coerce_type_for_case_expression; use datafusion::logical_expr::{ - AggregateUDF, ReturnFieldArgs, ScalarUDF, TypeSignature, WindowFrame, WindowFrameBound, - WindowFrameUnits, WindowFunctionDefinition, + AggregateUDF, HigherOrderUDF, LambdaParametersProgress, ReturnFieldArgs, ScalarUDF, + TypeSignature, ValueOrLambda, WindowFrame, WindowFrameBound, WindowFrameUnits, + WindowFunctionDefinition, }; use datafusion::physical_expr::expressions::{Literal, StatsType}; use datafusion::physical_expr::window::WindowExpr; @@ -234,7 +236,6 @@ impl PhysicalPlanner { let literal = self.create_expr(partition_value, Arc::::clone(&empty_schema))?; literal - .as_any() .downcast_ref::() .ok_or_else(|| { GeneralError("Expected literal of partition value".to_string()) @@ -425,11 +426,7 @@ impl PhysicalPlanner { // WideDecimalBinaryExpr already handles overflow — skip redundant check // but only if its output type matches CheckOverflow's declared type - if child - .as_any() - .downcast_ref::() - .is_some() - { + if child.downcast_ref::().is_some() { let child_type = child.data_type(&input_schema)?; if child_type == data_type { return Ok(child); @@ -438,7 +435,7 @@ impl PhysicalPlanner { // Fuse Cast(Decimal128→Decimal128) + CheckOverflow into single rescale+check // Only fuse when the Cast target type matches the CheckOverflow output type - if let Some(cast) = child.as_any().downcast_ref::() { + if let Some(cast) = child.downcast_ref::() { if let ( DataType::Decimal128(p_out, s_out), Ok(DataType::Decimal128(_p_in, s_in)), @@ -701,10 +698,142 @@ impl PhysicalPlanner { expr.names.clone(), ))) } + ExprStruct::HigherOrderFunc(hof) => { + self.create_higher_order_func_expr(hof, input_schema) + } + ExprStruct::LambdaFunc(_) => Err(GeneralError( + "LambdaFunc must appear inside a HigherOrderFunc arg list".to_string(), + )), + ExprStruct::LambdaVar(var) => { + let idx = input_schema.index_of(&var.name).map_err(|_| { + GeneralError(format!( + "LambdaVar '{}' not found in enclosing lambda schema", + var.name + )) + })?; + let field = Arc::clone(&input_schema.fields()[idx]); + Ok(Arc::new(LambdaVariable::new(idx, field))) + } expr => Err(GeneralError(format!("Not implemented: {expr:?}"))), } } + /// Plan a higher-order function call. The enum `Slot` holds either an + /// already-planned value arg or the still-unplanned lambda proto so we can + /// delay lambda body planning until we know the parameter types from the + /// UDF. + fn create_higher_order_func_expr( + &self, + hof: &spark_expression::HigherOrderFunc, + input_schema: SchemaRef, + ) -> Result, ExecutionError> { + enum Slot<'a> { + Value(Arc), + Lambda(&'a spark_expression::LambdaFunc), + } + + let udf: Arc = self + .session_ctx + .state() + .higher_order_function(&hof.func) + .map_err(|e| GeneralError(format!("higher-order function '{}': {e}", hof.func)))?; + + let mut slots: Vec> = Vec::with_capacity(hof.args.len()); + let mut arg_fields: Vec>> = + Vec::with_capacity(hof.args.len()); + + for arg in &hof.args { + match arg.expr_struct.as_ref() { + Some(ExprStruct::LambdaFunc(lambda_proto)) => { + slots.push(Slot::Lambda(lambda_proto)); + arg_fields.push(ValueOrLambda::Lambda(None)); + } + _ => { + let phys = self.create_expr(arg, Arc::clone(&input_schema))?; + let field = phys + .return_field(input_schema.as_ref()) + .map_err(|e| GeneralError(e.to_string()))?; + arg_fields.push(ValueOrLambda::Value(field)); + slots.push(Slot::Value(phys)); + } + } + } + + let lambda_param_fields: Vec> = match udf + .lambda_parameters(0, &arg_fields) + .map_err(|e| GeneralError(e.to_string()))? + { + LambdaParametersProgress::Complete(items) => items, + LambdaParametersProgress::Partial(_) => { + return Err(GeneralError(format!( + "{}: lambda_parameters returned Partial progress; HOFs that depend on lambda return types are not supported yet", + hof.func, + ))); + } + }; + + let mut lambda_fields_iter = lambda_param_fields.into_iter(); + let physical_args: Vec> = slots + .into_iter() + .map(|slot| -> Result, ExecutionError> { + match slot { + Slot::Value(expr) => Ok(expr), + Slot::Lambda(lambda_proto) => { + let fields = lambda_fields_iter.next().ok_or_else(|| { + GeneralError(format!( + "{}: lambda_parameters did not provide fields for all lambdas", + hof.func, + )) + })?; + + if lambda_proto.param_names.len() > fields.len() { + return Err(GeneralError(format!( + "{}: lambda defines {} params but HOF supports only {}", + hof.func, + lambda_proto.param_names.len(), + fields.len(), + ))); + } + + let renamed: Vec = fields + .into_iter() + .zip(lambda_proto.param_names.iter()) + .map(|(f, name)| f.renamed(name.as_str())) + .collect(); + + let lambda_schema: SchemaRef = Arc::new(Schema::new( + renamed + .iter() + .map(|f| f.as_ref().clone()) + .collect::>(), + )); + + let body_proto = lambda_proto + .body + .as_ref() + .ok_or_else(|| GeneralError("LambdaFunc body missing".to_string()))?; + let body_expr = self.create_expr(body_proto, lambda_schema)?; + + let lambda = + LambdaExpr::try_new(lambda_proto.param_names.clone(), body_expr) + .map_err(|e| GeneralError(e.to_string()))?; + Ok(Arc::new(lambda)) + } + } + }) + .collect::>()?; + + let config_options = Arc::new(ConfigOptions::default()); + let hof_expr = HigherOrderFunctionExpr::try_new_with_schema( + udf, + physical_args, + input_schema.as_ref(), + config_options, + ) + .map_err(|e| GeneralError(e.to_string()))?; + Ok(Arc::new(hof_expr)) + } + /// Create a DataFusion physical sort expression from Spark physical expression fn create_sort_expr<'a>( &'a self, @@ -1185,12 +1314,10 @@ impl PhysicalPlanner { .iter() .map(|expr| { let literal = self.create_expr(expr, Arc::clone(&required_schema))?; - let df_literal = literal - .as_any() - .downcast_ref::() - .ok_or_else(|| { - GeneralError("Expected literal of default value.".to_string()) - })?; + let df_literal = + literal.downcast_ref::().ok_or_else(|| { + GeneralError("Expected literal of default value.".to_string()) + })?; Ok(df_literal.value().clone()) }) .collect(); @@ -1723,7 +1850,7 @@ impl PhysicalPlanner { hash_join.as_ref().swap_inputs(PartitionMode::Partitioned)?; let mut additional_native_plans = vec![]; - if swapped_hash_join.as_any().is::() { + if swapped_hash_join.is::() { // a projection was added to the hash join additional_native_plans.push(Arc::clone(swapped_hash_join.children()[0])); } @@ -2576,8 +2703,7 @@ impl PhysicalPlanner { &boundary_row.partition_bounds[col_idx], Arc::clone(&input_schema), )?; - let literal_expr = - expr.as_any().downcast_ref::().expect("Literal"); + let literal_expr = expr.downcast_ref::().expect("Literal"); col_values.push(literal_expr.value().clone()); } } @@ -2687,12 +2813,7 @@ impl PhysicalPlanner { // TODO this should try and find scalar let arguments = args .iter() - .map(|e| { - e.as_ref() - .as_any() - .downcast_ref::() - .map(|lit| lit.value()) - }) + .map(|e| e.as_ref().downcast_ref::().map(|lit| lit.value())) .collect::>(); let args = ReturnFieldArgs { @@ -2796,7 +2917,7 @@ fn expr_to_columns( expr.apply(&mut |expr: &Arc| { Ok({ - if let Some(column) = expr.as_any().downcast_ref::() { + if let Some(column) = expr.downcast_ref::() { if column.index() > left_field_len + right_field_len { return Err(DataFusionError::Internal(format!( "Column index {} out of range", @@ -2847,7 +2968,7 @@ impl TreeNodeRewriter for JoinFilterRewriter<'_> { type Node = Arc; fn f_down(&mut self, node: Self::Node) -> datafusion::common::Result> { - if let Some(column) = node.as_any().downcast_ref::() { + if let Some(column) = node.downcast_ref::() { if column.index() < self.left_field_len { // left side let new_index = self diff --git a/native/core/src/parquet/cast_column.rs b/native/core/src/parquet/cast_column.rs index 67558b5266..1cc928d1d5 100644 --- a/native/core/src/parquet/cast_column.rs +++ b/native/core/src/parquet/cast_column.rs @@ -31,7 +31,6 @@ use datafusion::common::ScalarValue; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::{ - any::Any, fmt::{self, Display}, hash::Hash, sync::Arc, @@ -250,10 +249,6 @@ impl Display for CometCastColumnExpr { } impl PhysicalExpr for CometCastColumnExpr { - fn as_any(&self) -> &dyn Any { - self - } - fn data_type(&self, _input_schema: &Schema) -> DataFusionResult { Ok(self.target_field.data_type().clone()) } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index af79d9082d..1703f05807 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -216,7 +216,7 @@ impl PhysicalExprAdapter for SparkPhysicalExprAdapter { // Walk the expression tree to find Column references let mut duplicate_err: Option = None; let _ = Arc::::clone(&expr).transform(|e| { - if let Some(col) = e.as_any().downcast_ref::() { + if let Some(col) = e.downcast_ref::() { if let Some((req, matched)) = check_column_duplicate(col.name(), orig_physical) { duplicate_err = Some(DataFusionError::External(Box::new( @@ -266,7 +266,7 @@ impl PhysicalExprAdapter for SparkPhysicalExprAdapter { // the actual parquet stream schema, which uses the original physical names. let expr = if let Some(name_map) = &self.logical_to_physical_names { expr.transform(|e| { - if let Some(col) = e.as_any().downcast_ref::() { + if let Some(col) = e.downcast_ref::() { if let Some(physical_name) = name_map.get(col.name()) { return Ok(Transformed::yes(Arc::new(Column::new( physical_name, @@ -295,7 +295,7 @@ impl SparkPhysicalExprAdapter { expr: Arc, ) -> DataFusionResult> { expr.transform(|e| { - if let Some(column) = e.as_any().downcast_ref::() { + if let Some(column) = e.downcast_ref::() { let col_name = column.name(); // Resolve fields by name because this is the fallback path @@ -370,21 +370,27 @@ impl SparkPhysicalExprAdapter { .data() } - /// Replace CastColumnExpr (DataFusion's cast) with Spark's Cast expression. + /// Replace CastExpr (DataFusion's cast) with Spark's Cast expression. fn replace_with_spark_cast( &self, expr: Arc, ) -> DataFusionResult>> { - // Check for CastColumnExpr and replace with spark_expr::Cast - // CastColumnExpr is in datafusion_physical_expr::expressions - if let Some(cast) = expr - .as_any() - .downcast_ref::() + // Check for CastExpr and replace with spark_expr::Cast + if let Some(cast) = expr.downcast_ref::() { let child = Arc::clone(cast.expr()); - let physical_type = cast.input_field().data_type(); let target_type = cast.target_field().data_type(); + // Derive input field from the child Column expression and the physical schema + let input_field = if let Some(col) = child.downcast_ref::() { + Arc::new(self.physical_file_schema.field(col.index()).clone()) + } else { + // Fallback: synthesize a field from the target field name and child data type + let child_type = cast.expr().data_type(&self.physical_file_schema)?; + Arc::new(Field::new(cast.target_field().name(), child_type, true)) + }; + let physical_type = input_field.data_type(); + // For complex nested types (Struct, List, Map), Timestamp timezone // mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr // with spark_parquet_convert which handles field-name-based selection, @@ -413,7 +419,7 @@ impl SparkPhysicalExprAdapter { let comet_cast: Arc = Arc::new( CometCastColumnExpr::new( child, - Arc::clone(cast.input_field()), + input_field, Arc::clone(cast.target_field()), None, ) diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index f1b598000d..1be50a67f4 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -90,6 +90,9 @@ message Expr { ToCsv to_csv = 67; HoursTransform hours_transform = 68; ArraysZip arrays_zip = 69; + HigherOrderFunc higher_order_func = 70; + LambdaFunc lambda_func = 71; + LambdaVar lambda_var = 72; } // Optional QueryContext for error reporting (contains SQL text and position) @@ -503,3 +506,29 @@ message ArraysZip { repeated Expr values = 1; repeated string names = 2; } + +// Invocation of a higher-order function registered on the DataFusion session +// (e.g. array_exists). `args` are interleaved value and lambda arguments in +// the order the UDF expects. `return_type` is optional; when absent, the +// native planner infers it from the UDF. +message HigherOrderFunc { + string func = 1; + repeated Expr args = 2; + DataType return_type = 3; +} + +// A lambda function literal. Only valid as an argument of a HigherOrderFunc. +// Parameter types come from HigherOrderUDF::lambda_parameters at planning +// time; only the names are carried here. +message LambdaFunc { + Expr body = 1; + repeated string param_names = 2; +} + +// A reference to a lambda parameter by name. Resolved against the synthetic +// lambda schema when the enclosing LambdaFunc body is planned. +message LambdaVar { + string name = 1; + DataType datatype = 2; + bool nullable = 3; +} diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index 8502c79624..a89abbb1e3 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -25,7 +25,8 @@ use crate::partitioners::{ use crate::{CometPartitioning, CompressionCodec}; use async_trait::async_trait; use datafusion::common::exec_datafusion_err; -use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion::common::tree_node::TreeNodeRecursion; +use datafusion::physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::EmptyRecordBatchStream; use datafusion::{ @@ -40,7 +41,6 @@ use datafusion::{ }; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use std::{ - any::Any, fmt, fmt::{Debug, Formatter}, sync::Arc, @@ -119,15 +119,17 @@ impl DisplayAs for ShuffleWriterExec { #[async_trait] impl ExecutionPlan for ShuffleWriterExec { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "ShuffleWriterExec" } + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, + ) -> Result { + Ok(TreeNodeRecursion::Continue) + } + fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } @@ -343,6 +345,7 @@ mod test { } #[tokio::test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` async fn shuffle_partitioner_memory() { let batch = create_batch(900); assert_eq!(8316, batch.get_array_memory_size()); // Not stable across Arrow versions diff --git a/native/spark-expr/src/agg_funcs/avg.rs b/native/spark-expr/src/agg_funcs/avg.rs index 3760b42504..24a9a30991 100644 --- a/native/spark-expr/src/agg_funcs/avg.rs +++ b/native/spark-expr/src/agg_funcs/avg.rs @@ -28,7 +28,7 @@ use datafusion::logical_expr::{ Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, }; use datafusion::physical_expr::expressions::format_state_name; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use arrow::array::ArrowNativeTypeOp; use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs}; @@ -67,11 +67,6 @@ impl Avg { } impl AggregateUDFImpl for Avg { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { // All numeric types use Float64 accumulation after casting match (&self.input_data_type, &self.result_data_type) { @@ -239,7 +234,7 @@ where impl GroupsAccumulator for AvgGroupsAccumulator where T: ArrowNumericType + Send, - F: Fn(T::Native, i64) -> Result + Send, + F: Fn(T::Native, i64) -> Result + Send + 'static, { fn update_batch( &mut self, diff --git a/native/spark-expr/src/agg_funcs/avg_decimal.rs b/native/spark-expr/src/agg_funcs/avg_decimal.rs index 9e8a31afa5..2722add556 100644 --- a/native/spark-expr/src/agg_funcs/avg_decimal.rs +++ b/native/spark-expr/src/agg_funcs/avg_decimal.rs @@ -28,7 +28,7 @@ use datafusion::logical_expr::{ Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, }; use datafusion::physical_expr::expressions::format_state_name; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use crate::utils::{build_bool_state, is_valid_decimal_precision, unlikely}; use crate::{decimal_sum_overflow_error, EvalMode, SparkErrorWithContext}; @@ -108,11 +108,6 @@ impl AvgDecimal { } impl AggregateUDFImpl for AvgDecimal { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { match (&self.sum_data_type, &self.result_data_type) { (Decimal128(sum_precision, sum_scale), Decimal128(target_precision, target_scale)) => { diff --git a/native/spark-expr/src/agg_funcs/correlation.rs b/native/spark-expr/src/agg_funcs/correlation.rs index 499ef2aa0c..465294f1ba 100644 --- a/native/spark-expr/src/agg_funcs/correlation.rs +++ b/native/spark-expr/src/agg_funcs/correlation.rs @@ -17,7 +17,7 @@ use arrow::compute::{and, filter, is_not_null}; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use crate::agg_funcs::covariance::CovarianceAccumulator; use crate::agg_funcs::stddev::StddevAccumulator; @@ -28,7 +28,6 @@ use arrow::{ }; use datafusion::common::{Result, ScalarValue}; use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs}; -use datafusion::logical_expr::type_coercion::aggregates::NUMERICS; use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; use datafusion::physical_expr::expressions::format_state_name; use datafusion::physical_expr::expressions::StatsType; @@ -51,18 +50,16 @@ impl Correlation { assert!(matches!(data_type, DataType::Float64)); Self { name: name.into(), - signature: Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable), + signature: Signature::exact( + vec![DataType::Float64, DataType::Float64], + Volatility::Immutable, + ), null_on_divide_by_zero, } } } impl AggregateUDFImpl for Correlation { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { &self.name } diff --git a/native/spark-expr/src/agg_funcs/covariance.rs b/native/spark-expr/src/agg_funcs/covariance.rs index 15759eb155..7fa81b9ffc 100644 --- a/native/spark-expr/src/agg_funcs/covariance.rs +++ b/native/spark-expr/src/agg_funcs/covariance.rs @@ -25,11 +25,9 @@ use arrow::{ }; use datafusion::common::{downcast_value, unwrap_or_internal_err, Result, ScalarValue}; use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs}; -use datafusion::logical_expr::type_coercion::aggregates::NUMERICS; use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; use datafusion::physical_expr::expressions::format_state_name; use datafusion::physical_expr::expressions::StatsType; -use std::any::Any; use std::sync::Arc; /// COVAR_SAMP and COVAR_POP aggregate expression @@ -65,7 +63,10 @@ impl Covariance { assert!(matches!(data_type, DataType::Float64)); Self { name: name.into(), - signature: Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable), + signature: Signature::exact( + vec![DataType::Float64, DataType::Float64], + Volatility::Immutable, + ), stats_type, null_on_divide_by_zero, } @@ -73,11 +74,6 @@ impl Covariance { } impl AggregateUDFImpl for Covariance { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { &self.name } diff --git a/native/spark-expr/src/agg_funcs/stddev.rs b/native/spark-expr/src/agg_funcs/stddev.rs index b231b8afa7..2b09339dc1 100644 --- a/native/spark-expr/src/agg_funcs/stddev.rs +++ b/native/spark-expr/src/agg_funcs/stddev.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use crate::agg_funcs::variance::VarianceAccumulator; use arrow::datatypes::FieldRef; @@ -78,11 +78,6 @@ impl Stddev { } impl AggregateUDFImpl for Stddev { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { &self.name } diff --git a/native/spark-expr/src/agg_funcs/sum_decimal.rs b/native/spark-expr/src/agg_funcs/sum_decimal.rs index 46db7f36b3..faa51252f8 100644 --- a/native/spark-expr/src/agg_funcs/sum_decimal.rs +++ b/native/spark-expr/src/agg_funcs/sum_decimal.rs @@ -27,7 +27,7 @@ use datafusion::logical_expr::Volatility::Immutable; use datafusion::logical_expr::{ Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, }; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; #[derive(Debug)] pub struct SumDecimal { @@ -99,10 +99,6 @@ impl SumDecimal { } impl AggregateUDFImpl for SumDecimal { - fn as_any(&self) -> &dyn Any { - self - } - fn accumulator(&self, _args: AccumulatorArgs) -> DFResult> { Ok(Box::new(SumDecimalAccumulator::new( self.precision, diff --git a/native/spark-expr/src/agg_funcs/sum_int.rs b/native/spark-expr/src/agg_funcs/sum_int.rs index 781528521b..7d1df1568b 100644 --- a/native/spark-expr/src/agg_funcs/sum_int.rs +++ b/native/spark-expr/src/agg_funcs/sum_int.rs @@ -29,7 +29,7 @@ use datafusion::logical_expr::Volatility::Immutable; use datafusion::logical_expr::{ Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, }; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Hash)] pub struct SumInteger { @@ -52,10 +52,6 @@ impl SumInteger { } impl AggregateUDFImpl for SumInteger { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "sum" } diff --git a/native/spark-expr/src/agg_funcs/variance.rs b/native/spark-expr/src/agg_funcs/variance.rs index c97e664dd6..5f2c249da7 100644 --- a/native/spark-expr/src/agg_funcs/variance.rs +++ b/native/spark-expr/src/agg_funcs/variance.rs @@ -26,7 +26,6 @@ use datafusion::logical_expr::Volatility::Immutable; use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature}; use datafusion::physical_expr::expressions::format_state_name; use datafusion::physical_expr::expressions::StatsType; -use std::any::Any; use std::sync::Arc; /// VAR_SAMP and VAR_POP aggregate expression @@ -71,11 +70,6 @@ impl Variance { } impl AggregateUDFImpl for Variance { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { &self.name } diff --git a/native/spark-expr/src/array_funcs/array_compact.rs b/native/spark-expr/src/array_funcs/array_compact.rs index 4653f966a5..d481242705 100644 --- a/native/spark-expr/src/array_funcs/array_compact.rs +++ b/native/spark-expr/src/array_funcs/array_compact.rs @@ -33,7 +33,6 @@ use datafusion::common::{exec_err, utils::take_function_args, Result}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; -use std::any::Any; use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Hash)] @@ -56,10 +55,6 @@ impl SparkArrayCompact { } impl ScalarUDFImpl for SparkArrayCompact { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "spark_array_compact" } diff --git a/native/spark-expr/src/array_funcs/array_insert.rs b/native/spark-expr/src/array_funcs/array_insert.rs index bce00483bc..e638c440fd 100644 --- a/native/spark-expr/src/array_funcs/array_insert.rs +++ b/native/spark-expr/src/array_funcs/array_insert.rs @@ -30,7 +30,6 @@ use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::hash::Hash; use std::{ - any::Any, fmt::{Debug, Display, Formatter}, sync::Arc, }; @@ -92,10 +91,6 @@ impl ArrayInsert { } impl PhysicalExpr for ArrayInsert { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/spark-expr/src/array_funcs/arrays_overlap.rs b/native/spark-expr/src/array_funcs/arrays_overlap.rs index 662186e614..889a0314df 100644 --- a/native/spark-expr/src/array_funcs/arrays_overlap.rs +++ b/native/spark-expr/src/array_funcs/arrays_overlap.rs @@ -39,7 +39,6 @@ use datafusion::common::{exec_err, utils::take_function_args, Result, ScalarValu use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; -use std::any::Any; use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Hash)] @@ -62,10 +61,6 @@ impl SparkArraysOverlap { } impl ScalarUDFImpl for SparkArraysOverlap { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "spark_arrays_overlap" } diff --git a/native/spark-expr/src/array_funcs/arrays_zip.rs b/native/spark-expr/src/array_funcs/arrays_zip.rs index 2126eb732c..6f0332d3d8 100644 --- a/native/spark-expr/src/array_funcs/arrays_zip.rs +++ b/native/spark-expr/src/array_funcs/arrays_zip.rs @@ -27,7 +27,6 @@ use datafusion::common::cast::{as_fixed_size_list_array, as_large_list_array, as use datafusion::common::{exec_err, Result, ScalarValue}; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; -use std::any::Any; use std::fmt::{Display, Formatter}; use std::sync::Arc; // TODO: Reuse functions from DF @@ -74,10 +73,6 @@ impl Display for SparkArraysZipFunc { } impl PhysicalExpr for SparkArraysZipFunc { - fn as_any(&self) -> &dyn Any { - self - } - fn data_type(&self, input_schema: &Schema) -> Result { let fields = self.fields(input_schema)?; Ok(List(Arc::new(Field::new_list_field( diff --git a/native/spark-expr/src/array_funcs/get_array_struct_fields.rs b/native/spark-expr/src/array_funcs/get_array_struct_fields.rs index dc05a3b7f0..a873392499 100644 --- a/native/spark-expr/src/array_funcs/get_array_struct_fields.rs +++ b/native/spark-expr/src/array_funcs/get_array_struct_fields.rs @@ -27,7 +27,6 @@ use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::hash::Hash; use std::{ - any::Any, fmt::{Debug, Display, Formatter}, sync::Arc, }; @@ -75,10 +74,6 @@ impl GetArrayStructFields { } impl PhysicalExpr for GetArrayStructFields { - fn as_any(&self) -> &dyn Any { - self - } - fn data_type(&self, input_schema: &Schema) -> DataFusionResult { let struct_field = self.child_field(input_schema)?; match self.child.data_type(input_schema)? { diff --git a/native/spark-expr/src/array_funcs/list_extract.rs b/native/spark-expr/src/array_funcs/list_extract.rs index d3661f496a..83fa23f757 100644 --- a/native/spark-expr/src/array_funcs/list_extract.rs +++ b/native/spark-expr/src/array_funcs/list_extract.rs @@ -26,7 +26,6 @@ use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::hash::Hash; use std::{ - any::Any, fmt::{Debug, Display, Formatter}, sync::Arc, }; @@ -113,10 +112,6 @@ impl ListExtract { } impl PhysicalExpr for ListExtract { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/spark-expr/src/array_funcs/size.rs b/native/spark-expr/src/array_funcs/size.rs index 9777553341..f206b299d6 100644 --- a/native/spark-expr/src/array_funcs/size.rs +++ b/native/spark-expr/src/array_funcs/size.rs @@ -21,7 +21,6 @@ use datafusion::common::{exec_err, DataFusionError, Result as DataFusionResult, use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; use std::sync::Arc; /// Spark size() function that returns the size of arrays or maps. @@ -73,10 +72,6 @@ impl SparkSizeFunc { } impl ScalarUDFImpl for SparkSizeFunc { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "size" } diff --git a/native/spark-expr/src/bloom_filter/bloom_filter_agg.rs b/native/spark-expr/src/bloom_filter/bloom_filter_agg.rs index 3436b29201..455e20a019 100644 --- a/native/spark-expr/src/bloom_filter/bloom_filter_agg.rs +++ b/native/spark-expr/src/bloom_filter/bloom_filter_agg.rs @@ -17,7 +17,7 @@ use arrow::datatypes::{Field, FieldRef}; use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use crate::bloom_filter::spark_bloom_filter; use crate::bloom_filter::spark_bloom_filter::SparkBloomFilter; @@ -41,7 +41,7 @@ pub struct BloomFilterAgg { #[inline] fn extract_i32_from_literal(expr: Arc) -> i32 { - match expr.as_any().downcast_ref::().unwrap().value() { + match (*expr).downcast_ref::().unwrap().value() { ScalarValue::Int64(scalar_value) => scalar_value.unwrap() as i32, _ => { unreachable!() @@ -75,10 +75,6 @@ impl BloomFilterAgg { } impl AggregateUDFImpl for BloomFilterAgg { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "bloom_filter_agg" } diff --git a/native/spark-expr/src/bloom_filter/bloom_filter_might_contain.rs b/native/spark-expr/src/bloom_filter/bloom_filter_might_contain.rs index ea246dfb25..66168444d9 100644 --- a/native/spark-expr/src/bloom_filter/bloom_filter_might_contain.rs +++ b/native/spark-expr/src/bloom_filter/bloom_filter_might_contain.rs @@ -22,7 +22,6 @@ use datafusion::error::DataFusionError; use datafusion::logical_expr::{ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::ColumnarValue; -use std::any::Any; use std::sync::Arc; use crate::bloom_filter::spark_bloom_filter::SparkBloomFilter; @@ -63,10 +62,6 @@ fn evaluate_bloom_filter( } impl ScalarUDFImpl for BloomFilterMightContain { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "might_contain" } diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 74e688cd1c..d78d3f5864 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -34,7 +34,6 @@ use datafusion::logical_expr::{ Volatility, }; use datafusion::physical_plan::ColumnarValue; -use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -275,10 +274,6 @@ impl CometScalarFunction { } impl ScalarUDFImpl for CometScalarFunction { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { self.name.as_str() } diff --git a/native/spark-expr/src/conditional_funcs/if_expr.rs b/native/spark-expr/src/conditional_funcs/if_expr.rs index 6b1291fbbe..5e57c54a22 100644 --- a/native/spark-expr/src/conditional_funcs/if_expr.rs +++ b/native/spark-expr/src/conditional_funcs/if_expr.rs @@ -24,7 +24,7 @@ use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::{expressions::CaseExpr, PhysicalExpr}; use std::fmt::{Display, Formatter}; use std::hash::Hash; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; /// IfExpr is a wrapper around CaseExpr, because `IF(a, b, c)` is semantically equivalent to /// `CASE WHEN a THEN b ELSE c END`. @@ -83,11 +83,6 @@ impl IfExpr { } impl PhysicalExpr for IfExpr { - /// Return a reference to Any that can be used for down-casting - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index 5f855a36b2..0e35779623 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -62,7 +62,6 @@ use datafusion::common::{internal_err, DataFusionError, Result as DataFusionResu use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::ColumnarValue; use std::{ - any::Any, fmt::{Debug, Display, Formatter}, hash::Hash, sync::Arc, @@ -736,10 +735,6 @@ impl Display for Cast { } impl PhysicalExpr for Cast { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/spark-expr/src/csv_funcs/to_csv.rs b/native/spark-expr/src/csv_funcs/to_csv.rs index f41cb7f918..01fdc901cb 100644 --- a/native/spark-expr/src/csv_funcs/to_csv.rs +++ b/native/spark-expr/src/csv_funcs/to_csv.rs @@ -23,7 +23,6 @@ use arrow::datatypes::{DataType, Schema}; use datafusion::common::Result; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; -use std::any::Any; use std::fmt::{Display, Formatter}; use std::hash::Hash; use std::sync::Arc; @@ -77,10 +76,6 @@ impl Display for ToCsv { } impl PhysicalExpr for ToCsv { - fn as_any(&self) -> &dyn Any { - self - } - fn data_type(&self, _: &Schema) -> Result { Ok(DataType::Utf8) } diff --git a/native/spark-expr/src/datetime_funcs/date_diff.rs b/native/spark-expr/src/datetime_funcs/date_diff.rs index ca148c103a..be3c1d3552 100644 --- a/native/spark-expr/src/datetime_funcs/date_diff.rs +++ b/native/spark-expr/src/datetime_funcs/date_diff.rs @@ -22,7 +22,6 @@ use datafusion::common::{utils::take_function_args, DataFusionError, Result}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; use std::sync::Arc; /// Spark-compatible date_diff function. @@ -52,10 +51,6 @@ impl Default for SparkDateDiff { } impl ScalarUDFImpl for SparkDateDiff { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "date_diff" } diff --git a/native/spark-expr/src/datetime_funcs/date_from_unix_date.rs b/native/spark-expr/src/datetime_funcs/date_from_unix_date.rs index 1c88fc47ab..0e624e6472 100644 --- a/native/spark-expr/src/datetime_funcs/date_from_unix_date.rs +++ b/native/spark-expr/src/datetime_funcs/date_from_unix_date.rs @@ -21,7 +21,6 @@ use datafusion::common::{utils::take_function_args, DataFusionError, Result, Sca use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; use std::sync::Arc; /// Spark-compatible date_from_unix_date function. @@ -48,10 +47,6 @@ impl Default for SparkDateFromUnixDate { } impl ScalarUDFImpl for SparkDateFromUnixDate { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "date_from_unix_date" } diff --git a/native/spark-expr/src/datetime_funcs/date_trunc.rs b/native/spark-expr/src/datetime_funcs/date_trunc.rs index aeae18e36f..7ceb5234e1 100644 --- a/native/spark-expr/src/datetime_funcs/date_trunc.rs +++ b/native/spark-expr/src/datetime_funcs/date_trunc.rs @@ -22,7 +22,6 @@ use datafusion::common::{ use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; use crate::kernels::temporal::{date_trunc_array_fmt_dyn, date_trunc_dyn}; @@ -51,10 +50,6 @@ impl Default for SparkDateTrunc { } impl ScalarUDFImpl for SparkDateTrunc { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "date_trunc" } diff --git a/native/spark-expr/src/datetime_funcs/extract_date_part.rs b/native/spark-expr/src/datetime_funcs/extract_date_part.rs index acb7d2266e..7344a3953a 100644 --- a/native/spark-expr/src/datetime_funcs/extract_date_part.rs +++ b/native/spark-expr/src/datetime_funcs/extract_date_part.rs @@ -22,7 +22,7 @@ use datafusion::common::{internal_datafusion_err, DataFusionError}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::{any::Any, fmt::Debug}; +use std::fmt::Debug; macro_rules! extract_date_part { ($struct_name:ident, $fn_name:expr, $date_part_variant:ident) => { @@ -44,10 +44,6 @@ macro_rules! extract_date_part { } impl ScalarUDFImpl for $struct_name { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { $fn_name } diff --git a/native/spark-expr/src/datetime_funcs/hours.rs b/native/spark-expr/src/datetime_funcs/hours.rs index ea3ef742a4..26ec35ef25 100644 --- a/native/spark-expr/src/datetime_funcs/hours.rs +++ b/native/spark-expr/src/datetime_funcs/hours.rs @@ -32,7 +32,7 @@ use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; use num::integer::div_floor; -use std::{any::Any, fmt::Debug, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; const MICROS_PER_HOUR: i64 = 3_600_000_000; @@ -56,10 +56,6 @@ impl Default for SparkHoursTransform { } impl ScalarUDFImpl for SparkHoursTransform { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "hours_transform" } diff --git a/native/spark-expr/src/datetime_funcs/make_date.rs b/native/spark-expr/src/datetime_funcs/make_date.rs index 58e4108580..ef29431703 100644 --- a/native/spark-expr/src/datetime_funcs/make_date.rs +++ b/native/spark-expr/src/datetime_funcs/make_date.rs @@ -23,7 +23,6 @@ use datafusion::common::{utils::take_function_args, DataFusionError, Result}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; use std::sync::Arc; /// Spark-compatible make_date function. @@ -75,10 +74,6 @@ fn make_date(year: i32, month: i32, day: i32) -> Option { } impl ScalarUDFImpl for SparkMakeDate { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "make_date" } diff --git a/native/spark-expr/src/datetime_funcs/timestamp_trunc.rs b/native/spark-expr/src/datetime_funcs/timestamp_trunc.rs index 2d7a571b76..1e95eb5141 100644 --- a/native/spark-expr/src/datetime_funcs/timestamp_trunc.rs +++ b/native/spark-expr/src/datetime_funcs/timestamp_trunc.rs @@ -23,7 +23,6 @@ use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::hash::Hash; use std::{ - any::Any, fmt::{Debug, Display, Formatter}, sync::Arc, }; @@ -85,10 +84,6 @@ impl Display for TimestampTruncExpr { } impl PhysicalExpr for TimestampTruncExpr { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs index c4f1576293..4b7df90559 100644 --- a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs @@ -24,7 +24,7 @@ use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; use num::integer::div_floor; -use std::{any::Any, fmt::Debug, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; const MICROS_PER_SECOND: i64 = 1_000_000; @@ -46,10 +46,6 @@ impl SparkUnixTimestamp { } impl ScalarUDFImpl for SparkUnixTimestamp { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "unix_timestamp" } diff --git a/native/spark-expr/src/json_funcs/from_json.rs b/native/spark-expr/src/json_funcs/from_json.rs index 685ea3c8ec..eaca6db016 100644 --- a/native/spark-expr/src/json_funcs/from_json.rs +++ b/native/spark-expr/src/json_funcs/from_json.rs @@ -86,10 +86,6 @@ impl PartialEq for FromJson { } impl PhysicalExpr for FromJson { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/spark-expr/src/json_funcs/to_json.rs b/native/spark-expr/src/json_funcs/to_json.rs index 3cc827f210..6dcd8bc045 100644 --- a/native/spark-expr/src/json_funcs/to_json.rs +++ b/native/spark-expr/src/json_funcs/to_json.rs @@ -79,10 +79,6 @@ impl PartialEq for ToJson { } impl PhysicalExpr for ToJson { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/spark-expr/src/math_funcs/internal/checkoverflow.rs b/native/spark-expr/src/math_funcs/internal/checkoverflow.rs index f1fb9c2f02..f867fe0c4d 100644 --- a/native/spark-expr/src/math_funcs/internal/checkoverflow.rs +++ b/native/spark-expr/src/math_funcs/internal/checkoverflow.rs @@ -28,7 +28,6 @@ use std::hash::Hash; use crate::SparkError; use std::{ - any::Any, fmt::{Display, Formatter}, sync::Arc, }; @@ -91,10 +90,6 @@ impl Display for CheckOverflow { } impl PhysicalExpr for CheckOverflow { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } @@ -274,9 +269,6 @@ mod tests { } impl PhysicalExpr for ScalarChild { - fn as_any(&self) -> &dyn Any { - self - } fn data_type(&self, _: &Schema) -> datafusion::common::Result { Ok(DataType::Decimal128(self.1, self.2)) } diff --git a/native/spark-expr/src/math_funcs/internal/decimal_rescale_check.rs b/native/spark-expr/src/math_funcs/internal/decimal_rescale_check.rs index 1322404951..667f76939a 100644 --- a/native/spark-expr/src/math_funcs/internal/decimal_rescale_check.rs +++ b/native/spark-expr/src/math_funcs/internal/decimal_rescale_check.rs @@ -29,7 +29,6 @@ use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::hash::Hash; use std::{ - any::Any, fmt::{Display, Formatter}, sync::Arc, }; @@ -154,10 +153,6 @@ fn rescale_and_check( } impl PhysicalExpr for DecimalRescaleCheckOverflow { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } @@ -392,9 +387,6 @@ mod tests { } impl PhysicalExpr for ScalarChild { - fn as_any(&self) -> &dyn Any { - self - } fn data_type(&self, _: &Schema) -> datafusion::common::Result { Ok(DataType::Decimal128(self.1, self.2)) } diff --git a/native/spark-expr/src/math_funcs/internal/normalize_nan.rs b/native/spark-expr/src/math_funcs/internal/normalize_nan.rs index b3838f64f4..165ae3acc9 100644 --- a/native/spark-expr/src/math_funcs/internal/normalize_nan.rs +++ b/native/spark-expr/src/math_funcs/internal/normalize_nan.rs @@ -26,7 +26,6 @@ use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::hash::Hash; use std::{ - any::Any, fmt::{Display, Formatter}, sync::Arc, }; @@ -57,10 +56,6 @@ impl NormalizeNaNAndZero { } impl PhysicalExpr for NormalizeNaNAndZero { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/spark-expr/src/math_funcs/negative.rs b/native/spark-expr/src/math_funcs/negative.rs index a268894086..650fa401ef 100644 --- a/native/spark-expr/src/math_funcs/negative.rs +++ b/native/spark-expr/src/math_funcs/negative.rs @@ -29,7 +29,7 @@ use datafusion::{ }; use std::fmt::{Display, Formatter}; use std::hash::Hash; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; pub fn create_negate_expr( expr: Arc, @@ -96,11 +96,6 @@ impl std::fmt::Display for NegativeExpr { } impl PhysicalExpr for NegativeExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn data_type(&self, input_schema: &Schema) -> Result { self.arg.data_type(input_schema) } diff --git a/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs b/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs index 644252b46b..d2479857af 100644 --- a/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs +++ b/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs @@ -31,7 +31,7 @@ use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::fmt::{Display, Formatter}; use std::hash::Hash; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; /// The arithmetic operation to perform. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -165,10 +165,6 @@ fn max_for_precision(precision: u8) -> i256 { } impl PhysicalExpr for WideDecimalBinaryExpr { - fn as_any(&self) -> &dyn Any { - self - } - fn data_type(&self, _input_schema: &Schema) -> Result { Ok(DataType::Decimal128( self.output_precision, diff --git a/native/spark-expr/src/nondetermenistic_funcs/monotonically_increasing_id.rs b/native/spark-expr/src/nondetermenistic_funcs/monotonically_increasing_id.rs index 49a5066a38..e7404e00c1 100644 --- a/native/spark-expr/src/nondetermenistic_funcs/monotonically_increasing_id.rs +++ b/native/spark-expr/src/nondetermenistic_funcs/monotonically_increasing_id.rs @@ -20,7 +20,6 @@ use arrow::datatypes::{DataType, Schema}; use datafusion::common::Result; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; -use std::any::Any; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::atomic::{AtomicI64, Ordering}; @@ -66,10 +65,6 @@ impl Hash for MonotonicallyIncreasingId { } impl PhysicalExpr for MonotonicallyIncreasingId { - fn as_any(&self) -> &dyn Any { - self - } - fn evaluate(&self, batch: &RecordBatch) -> Result { let start = self .current_offset diff --git a/native/spark-expr/src/nondetermenistic_funcs/rand.rs b/native/spark-expr/src/nondetermenistic_funcs/rand.rs index e23a83d84e..8f07f37344 100644 --- a/native/spark-expr/src/nondetermenistic_funcs/rand.rs +++ b/native/spark-expr/src/nondetermenistic_funcs/rand.rs @@ -23,7 +23,6 @@ use arrow::datatypes::{DataType, Schema}; use datafusion::common::Result; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; -use std::any::Any; use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::{Arc, Mutex}; @@ -120,10 +119,6 @@ impl Hash for RandExpr { } impl PhysicalExpr for RandExpr { - fn as_any(&self) -> &dyn Any { - self - } - fn data_type(&self, _input_schema: &Schema) -> Result { Ok(DataType::Float64) } diff --git a/native/spark-expr/src/nondetermenistic_funcs/randn.rs b/native/spark-expr/src/nondetermenistic_funcs/randn.rs index 40fafedc20..7d50d24811 100644 --- a/native/spark-expr/src/nondetermenistic_funcs/randn.rs +++ b/native/spark-expr/src/nondetermenistic_funcs/randn.rs @@ -22,7 +22,6 @@ use arrow::array::RecordBatch; use arrow::datatypes::{DataType, Schema}; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; -use std::any::Any; use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::{Arc, Mutex}; @@ -131,10 +130,6 @@ impl Hash for RandnExpr { } impl PhysicalExpr for RandnExpr { - fn as_any(&self) -> &dyn Any { - self - } - fn data_type(&self, _input_schema: &Schema) -> datafusion::common::Result { Ok(DataType::Float64) } diff --git a/native/spark-expr/src/predicate_funcs/rlike.rs b/native/spark-expr/src/predicate_funcs/rlike.rs index ed5970a6a2..ee005dd1ac 100644 --- a/native/spark-expr/src/predicate_funcs/rlike.rs +++ b/native/spark-expr/src/predicate_funcs/rlike.rs @@ -25,7 +25,6 @@ use datafusion::common::{internal_err, Result, ScalarValue}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::ColumnarValue; use regex::Regex; -use std::any::Any; use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -102,10 +101,6 @@ impl Display for RLike { } impl PhysicalExpr for RLike { - fn as_any(&self) -> &dyn Any { - self - } - fn data_type(&self, _input_schema: &Schema) -> Result { Ok(DataType::Boolean) } diff --git a/native/spark-expr/src/string_funcs/contains.rs b/native/spark-expr/src/string_funcs/contains.rs index bc34ce9cba..537227efdf 100644 --- a/native/spark-expr/src/string_funcs/contains.rs +++ b/native/spark-expr/src/string_funcs/contains.rs @@ -27,7 +27,6 @@ use datafusion::common::{exec_err, Result, ScalarValue}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; use std::sync::Arc; /// Spark-optimized contains function. @@ -53,10 +52,6 @@ impl SparkContains { } impl ScalarUDFImpl for SparkContains { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { "contains" } diff --git a/native/spark-expr/src/string_funcs/substring.rs b/native/spark-expr/src/string_funcs/substring.rs index e6f11fc39a..e8c3f12229 100644 --- a/native/spark-expr/src/string_funcs/substring.rs +++ b/native/spark-expr/src/string_funcs/substring.rs @@ -24,7 +24,6 @@ use datafusion::common::DataFusionError; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::{ - any::Any, fmt::{Display, Formatter}, hash::Hash, sync::Arc, @@ -68,10 +67,6 @@ impl Display for SubstringExpr { } impl PhysicalExpr for SubstringExpr { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/spark-expr/src/struct_funcs/create_named_struct.rs b/native/spark-expr/src/struct_funcs/create_named_struct.rs index 70e03ad0c0..1a63cb1cc9 100644 --- a/native/spark-expr/src/struct_funcs/create_named_struct.rs +++ b/native/spark-expr/src/struct_funcs/create_named_struct.rs @@ -22,7 +22,6 @@ use datafusion::common::Result as DataFusionResult; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::{ - any::Any, fmt::{Display, Formatter}, hash::Hash, sync::Arc, @@ -53,10 +52,6 @@ impl CreateNamedStruct { } impl PhysicalExpr for CreateNamedStruct { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/spark-expr/src/struct_funcs/get_struct_field.rs b/native/spark-expr/src/struct_funcs/get_struct_field.rs index 7929cea483..6684967c3a 100644 --- a/native/spark-expr/src/struct_funcs/get_struct_field.rs +++ b/native/spark-expr/src/struct_funcs/get_struct_field.rs @@ -22,7 +22,6 @@ use datafusion::common::{DataFusionError, Result as DataFusionResult, ScalarValu use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::{ - any::Any, fmt::{Display, Formatter}, hash::Hash, sync::Arc, @@ -62,10 +61,6 @@ impl GetStructField { } impl PhysicalExpr for GetStructField { - fn as_any(&self) -> &dyn Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/native/spark-expr/src/unbound.rs b/native/spark-expr/src/unbound.rs index cf0adafa91..69187dbc02 100644 --- a/native/spark-expr/src/unbound.rs +++ b/native/spark-expr/src/unbound.rs @@ -59,11 +59,6 @@ impl std::fmt::Display for UnboundColumn { } impl PhysicalExpr for UnboundColumn { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 768e4e0ed6..e43e876470 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -52,6 +52,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[ArrayContains] -> CometArrayContains, classOf[ArrayDistinct] -> CometScalarFunction("array_distinct"), classOf[ArrayExcept] -> CometArrayExcept, + classOf[ArrayExists] -> CometArrayExists, classOf[ArrayFilter] -> CometArrayFilter, classOf[ArrayInsert] -> CometArrayInsert, classOf[ArrayIntersect] -> CometArrayIntersect, @@ -70,6 +71,10 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Size] -> CometSize, classOf[ArraysZip] -> CometArraysZip) + private val lambdaExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + classOf[LambdaFunction] -> CometLambdaFunction, + classOf[NamedLambdaVariable] -> CometNamedLambdaVariable) + private val conditionalExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[CaseWhen] -> CometCaseWhen, classOf[If] -> CometIf) @@ -252,7 +257,7 @@ object QueryPlanSerde extends Logging with CometExprShim { mathExpressions ++ hashExpressions ++ stringExpressions ++ conditionalExpressions ++ mapExpressions ++ predicateExpressions ++ structExpressions ++ bitwiseExpressions ++ miscExpressions ++ arrayExpressions ++ - temporalExpressions ++ conversionExpressions + lambdaExpressions ++ temporalExpressions ++ conversionExpressions /** * Mapping of Spark aggregate expression class to Comet expression handler. diff --git a/spark/src/main/scala/org/apache/comet/serde/lambda.scala b/spark/src/main/scala/org/apache/comet/serde/lambda.scala new file mode 100644 index 0000000000..0f1d1a4812 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/lambda.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.catalyst.expressions.{ArrayExists, Attribute, LambdaFunction, NamedLambdaVariable} + +import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.QueryPlanSerde._ + +object CometArrayExists extends CometExpressionSerde[ArrayExists] { + + override def getSupportLevel(expr: ArrayExists): SupportLevel = { + if (!expr.followThreeValuedLogic) { + Unsupported( + Some( + "legacy ArrayExists two-valued logic is not supported; " + + "set spark.sql.legacy.followThreeValuedLogicInArrayExists=true")) + } else { + Compatible() + } + } + + override def convert( + expr: ArrayExists, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val argumentExpr = exprToProto(expr.argument, inputs, binding) + val functionExpr = exprToProtoInternal(expr.function, inputs, binding) + + if (argumentExpr.isEmpty || functionExpr.isEmpty) { + withInfo(expr, expr.argument, expr.function) + return None + } + + val builder = ExprOuterClass.HigherOrderFunc + .newBuilder() + .setFunc("array_exists") + .addArgs(argumentExpr.get) + .addArgs(functionExpr.get) + + Some(ExprOuterClass.Expr.newBuilder().setHigherOrderFunc(builder).build()) + } +} + +object CometLambdaFunction extends CometExpressionSerde[LambdaFunction] { + + override def convert( + expr: LambdaFunction, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val params = expr.arguments.collect { case v: NamedLambdaVariable => v.name } + if (params.length != expr.arguments.length) { + withInfo(expr, "lambda arguments must be NamedLambdaVariables") + return None + } + + val bodyExpr = exprToProtoInternal(expr.function, inputs, binding) + if (bodyExpr.isEmpty) { + withInfo(expr, expr.function) + return None + } + + val builder = ExprOuterClass.LambdaFunc + .newBuilder() + .setBody(bodyExpr.get) + .addAllParamNames(params.asJava) + + Some(ExprOuterClass.Expr.newBuilder().setLambdaFunc(builder).build()) + } +} + +object CometNamedLambdaVariable extends CometExpressionSerde[NamedLambdaVariable] { + + override def convert( + expr: NamedLambdaVariable, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + serializeDataType(expr.dataType).map { dt => + val builder = ExprOuterClass.LambdaVar + .newBuilder() + .setName(expr.name) + .setDatatype(dt) + .setNullable(expr.nullable) + ExprOuterClass.Expr.newBuilder().setLambdaVar(builder).build() + } + } +} diff --git a/spark/src/test/resources/sql-tests/expressions/array/array_exists.sql b/spark/src/test/resources/sql-tests/expressions/array/array_exists.sql new file mode 100644 index 0000000000..02c7907061 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/array/array_exists.sql @@ -0,0 +1,61 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true + +statement +CREATE TABLE test_array_exists(arr array, sarr array) USING parquet + +statement +INSERT INTO test_array_exists VALUES + (array(1, 2, 3, 4, 5), array('a', 'b', 'c')), + (array(-1, 0, 1), array('foo')), + (array(10), array('bar', 'baz')), + (array(1, NULL, 3), array('x', NULL, 'y')), + (array(), array()), + (NULL, NULL) + +-- predicate matches at least one element / none / empty / NULL row +query +SELECT exists(arr, x -> x > 2) FROM test_array_exists + +query +SELECT exists(arr, x -> x >= 0) FROM test_array_exists + +query +SELECT exists(arr, x -> x < -10) FROM test_array_exists + +-- three-valued logic: array with NULL element, predicate is NULL on NULL element +-- no element evaluates to true, at least one is NULL -> result NULL +query +SELECT exists(array(1, cast(NULL as int), 3), x -> x > 5) + +-- same shape but an element satisfies the predicate -> result true +query +SELECT exists(array(1, cast(NULL as int), 3), x -> x > 2) + +-- string elements +query +SELECT exists(sarr, s -> s = 'foo') FROM test_array_exists + +query +SELECT exists(sarr, s -> s IS NULL) FROM test_array_exists + +-- literal array, column predicate +query +SELECT exists(array(1, 2, 3), x -> x = cast(NULL as int)) + diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index 49fbe10c30..c96332b66b 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -443,4 +443,40 @@ class CometJoinSuite extends CometTestBase { } } } + + // Reproducer for SPARK-43113: full outer SMJ with a join filter that references + // a nullable column should not match when the filter evaluates to NULL. + test("SPARK-43113: Full outer SMJ with NULL in join filter") { + withTempView("l", "r") { + // testData2: (a, b) — all non-null + Seq((1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2)) + .toDF("a", "b") + .createOrReplaceTempView("l") + + // testData3: (a, b) — b is nullable + Seq((1, None), (2, Some(2))) + .toDF("a", "b") + .createOrReplaceTempView("r") + + val query = + """select /*+ MERGE(r) */ * + |from l + |full outer join r + |on l.a = r.a + |and l.b < (r.b + 1) + |and l.b < (r.a + 1)""".stripMargin + + val expected = Seq( + (Some(1), Some(1), None, None), + (Some(1), Some(2), None, None), + (None, None, Some(1), None), + (Some(2), Some(1), Some(2), Some(2)), + (Some(2), Some(2), Some(2), Some(2)), + (Some(3), Some(1), None, None), + (Some(3), Some(2), None, None)).toDF("a", "b", "a", "b") + + val df = sql(query) + checkAnswer(df, expected) + } + } }