Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
-- Bounded memory-shape reproducer for list columns -> unnest expansion ->
-- GROUP BY row id -> ordered array_agg.
--
-- The reported issue used about 20,000 rows and 2,000 list elements
-- per row (~40M expanded rows). Defaults below are intentionally small
-- (10K expanded rows) so the benchmark is safe to smoke-test locally;
-- set env vars to scale up:
-- UNNEST_ARRAY_AGG_ROWS=20000 UNNEST_ARRAY_AGG_LIST_LEN=2000

name Q01

group unnest_array_agg
subgroup ordered_array_agg

assert B
WITH base AS (
SELECT
value AS row_id,
range(0, ${UNNEST_ARRAY_AGG_LIST_LEN:-100}) AS vals,
range(0, ${UNNEST_ARRAY_AGG_LIST_LEN:-100}) AS val_idx
FROM range(0, ${UNNEST_ARRAY_AGG_ROWS:-100})
), expanded AS (
SELECT row_id, unnest(vals) AS val, unnest(val_idx) AS idx
FROM base
)
SELECT COUNT(*) = ${UNNEST_ARRAY_AGG_ROWS:-100} * ${UNNEST_ARRAY_AGG_LIST_LEN:-100}
FROM expanded;
----
true

run
WITH base AS (
SELECT
value AS row_id,
range(0, ${UNNEST_ARRAY_AGG_LIST_LEN:-100}) AS vals,
range(0, ${UNNEST_ARRAY_AGG_LIST_LEN:-100}) AS val_idx
FROM range(0, ${UNNEST_ARRAY_AGG_ROWS:-100})
), expanded AS (
SELECT row_id, unnest(vals) AS val, unnest(val_idx) AS idx
FROM base
)
SELECT row_id, array_agg(val ORDER BY idx) AS vals
FROM expanded
GROUP BY row_id;
96 changes: 96 additions & 0 deletions datafusion/sqllogictest/test_files/unnest_array_agg_repro.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Bounded reproducer for high-memory query shape:
# list columns -> unnest row expansion -> GROUP BY row id -> ordered array_agg.

statement ok
CREATE TABLE unnest_array_agg_repro AS
SELECT
value AS row_id,
range(0, 4) AS vals,
range(0, 4) AS val_idx
FROM range(0, 3);

# Three input rows with four elements each expand to twelve rows.
query I
SELECT COUNT(*)
FROM (
SELECT row_id, unnest(vals) AS val, unnest(val_idx) AS idx
FROM unnest_array_agg_repro
) expanded;
----
12

# Regroup each expanded row id and force ordered aggregate state.
query I?
SELECT row_id, array_agg(val ORDER BY idx) AS vals
FROM (
SELECT row_id, unnest(vals) AS val, unnest(val_idx) AS idx
FROM unnest_array_agg_repro
) expanded
GROUP BY row_id
ORDER BY row_id;
----
0 [0, 1, 2, 3]
1 [0, 1, 2, 3]
2 [0, 1, 2, 3]

# Capture the plan shape for the reproducer: UnnestExec feeds AggregateExec with ordered array_agg.
query TT
EXPLAIN VERBOSE
SELECT row_id, array_agg(val ORDER BY idx) AS vals
FROM (
SELECT row_id, unnest(vals) AS val, unnest(val_idx) AS idx
FROM unnest_array_agg_repro
) expanded
GROUP BY row_id
ORDER BY row_id;
----
initial_logical_plan
01)Sort: expanded.row_id ASC NULLS LAST
02)--Projection: expanded.row_id, array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST] AS vals
03)----Aggregate: groupBy=[[expanded.row_id]], aggr=[[array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]]]
04)------SubqueryAlias: expanded
05)--------Projection: unnest_array_agg_repro.row_id, __unnest_placeholder(unnest_array_agg_repro.vals,depth=1) AS UNNEST(unnest_array_agg_repro.vals) AS val, __unnest_placeholder(unnest_array_agg_repro.val_idx,depth=1) AS UNNEST(unnest_array_agg_repro.val_idx) AS idx
06)----------Unnest: lists[__unnest_placeholder(unnest_array_agg_repro.vals)|depth=1, __unnest_placeholder(unnest_array_agg_repro.val_idx)|depth=1] structs[]
<slt:ignore>
initial_physical_plan
01)SortExec: expr=[row_id@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--ProjectionExec: expr=[row_id@0 as row_id, array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]@1 as vals]
03)----AggregateExec: mode=FinalPartitioned, gby=[row_id@0 as row_id], aggr=[array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]]
04)------AggregateExec: mode=Partial, gby=[row_id@0 as row_id], aggr=[array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]]
05)--------ProjectionExec: expr=[row_id@0 as row_id, __unnest_placeholder(unnest_array_agg_repro.vals,depth=1)@1 as val, __unnest_placeholder(unnest_array_agg_repro.val_idx,depth=1)@2 as idx]
06)----------UnnestExec
<slt:ignore>
physical_plan
01)SortPreservingMergeExec: [row_id@0 ASC NULLS LAST]
02)--SortExec: expr=[row_id@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----ProjectionExec: expr=[row_id@0 as row_id, array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]@1 as vals]
04)------AggregateExec: mode=FinalPartitioned, gby=[row_id@0 as row_id], aggr=[array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]]
05)--------RepartitionExec: partitioning=Hash([row_id@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[row_id@0 as row_id], aggr=[array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]]
07)------------SortExec: expr=[idx@2 ASC NULLS LAST], preserve_partitioning=[true]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------ProjectionExec: expr=[row_id@0 as row_id, __unnest_placeholder(unnest_array_agg_repro.vals,depth=1)@1 as val, __unnest_placeholder(unnest_array_agg_repro.val_idx,depth=1)@2 as idx]
10)------------------UnnestExec
<slt:ignore>
physical_plan_with_schema
01)SortPreservingMergeExec: [row_id@0 ASC NULLS LAST], schema=[row_id:Int64, vals:List(Int64);N]
<slt:ignore>
05)--------RepartitionExec: partitioning=Hash([row_id@0], 4), input_partitions=4, schema=[row_id:Int64, array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST][array_agg]:List(Int64);N, array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST][array_agg_orderings]:List(Struct("idx@2": Int64))]
<slt:ignore>
Loading