From 48e4cc4ee18ef2c0ce809f4f435e36105d478897 Mon Sep 17 00:00:00 2001 From: Stas Pak Date: Wed, 4 Mar 2026 21:33:49 -0800 Subject: [PATCH 1/5] Fix bin-pack compaction producing undersized output files SparkStagedScan uses task.sizeBytes() (data + delete files) as the weight function for bin-packing, but the split size passed from SparkBinPackDataRewriter is computed using ContentScanTask::length (data-only). This mismatch causes each Spark partition to hold less actual data than intended, producing output files smaller than target-file-size-bytes. Add a weight function parameter to TableScanUtil.planTaskGroups() and a USE_DATA_ONLY_WEIGHT read option that SparkBinPackDataRewriter sets to signal SparkStagedScan to use ContentScanTask::length as the bin-packing weight, consistent with how splitSize was computed. Also fix a pre-existing bug in SparkStagedScan.hashCode() where splitSize was used twice instead of including splitLookback. Co-Authored-By: Claude Opus 4.6 --- .../apache/iceberg/util/TableScanUtil.java | 26 +++++- .../apache/iceberg/spark/SparkReadConf.java | 8 ++ .../iceberg/spark/SparkReadOptions.java | 5 ++ .../actions/SparkBinPackDataRewriter.java | 1 + .../iceberg/spark/source/SparkStagedScan.java | 32 ++++++- .../spark/source/TestSparkStagedScan.java | 85 +++++++++++++++++++ 6 files changed, 151 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index e2dbcb61e9..219d09cc8f 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -109,9 +109,32 @@ public static List> planTaskGroups( planTaskGroups(CloseableIterable.withNoopClose(tasks), splitSize, lookback, openFileCost)); } + public static List> planTaskGroups( + List tasks, + long splitSize, + int lookback, + long openFileCost, + Function weightFunc) { + return Lists.newArrayList( + planTaskGroups( + CloseableIterable.withNoopClose(tasks), splitSize, lookback, openFileCost, weightFunc)); + } + @SuppressWarnings("unchecked") public static CloseableIterable> planTaskGroups( CloseableIterable tasks, long splitSize, int lookback, long openFileCost) { + Function defaultWeightFunc = + task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); + return planTaskGroups(tasks, splitSize, lookback, openFileCost, defaultWeightFunc); + } + + @SuppressWarnings("unchecked") + public static CloseableIterable> planTaskGroups( + CloseableIterable tasks, + long splitSize, + int lookback, + long openFileCost, + Function weightFunc) { validatePlanningArguments(splitSize, lookback, openFileCost); @@ -129,9 +152,6 @@ public static CloseableIterable> planTaskG }), tasks); - Function weightFunc = - task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); - return CloseableIterable.transform( CloseableIterable.combine( new BinPacking.PackingIterable<>(splitTasks, splitSize, lookback, weightFunc, true), diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index ae4795908e..500450a3ec 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -288,6 +288,14 @@ public boolean adaptiveSplitSizeEnabled() { .parse(); } + public boolean useDataOnlyWeight() { + return confParser + .booleanConf() + .option(SparkReadOptions.USE_DATA_ONLY_WEIGHT) + .defaultValue(false) + .parse(); + } + public int parallelism() { int defaultParallelism = spark.sparkContext().defaultParallelism(); int numShufflePartitions = spark.sessionState().conf().numShufflePartitions(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 17f2bfee69..b8d2065fdc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -95,4 +95,9 @@ private SparkReadOptions() {} public static final String TIMESTAMP_AS_OF = "timestampAsOf"; public static final String AGGREGATE_PUSH_DOWN_ENABLED = "aggregate-push-down-enabled"; + + // Use data-only size (ContentScanTask::length) as weight for task group planning + // instead of the default sizeBytes() which includes delete file sizes. + // Used by bin-pack compaction to produce correctly-sized output files. + public static final String USE_DATA_ONLY_WEIGHT = "use-data-only-weight"; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java index 9a96f44ebd..0b22c9620c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java @@ -49,6 +49,7 @@ protected void doRewrite(String groupId, List group) { .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId) .option(SparkReadOptions.SPLIT_SIZE, splitSize(inputSize(group))) .option(SparkReadOptions.FILE_OPEN_COST, "0") + .option(SparkReadOptions.USE_DATA_ONLY_WEIGHT, "true") .load(groupId); // write the packed data into new files where each split becomes a new file diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index fd299ade7f..300560a36d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.Objects; +import java.util.function.Function; +import org.apache.iceberg.ContentScanTask; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; @@ -37,6 +39,7 @@ class SparkStagedScan extends SparkScan { private final long splitSize; private final int splitLookback; private final long openFileCost; + private final boolean useDataOnlyWeight; private List> taskGroups = null; // lazy cache of tasks @@ -46,6 +49,7 @@ class SparkStagedScan extends SparkScan { this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); this.openFileCost = readConf.splitOpenFileCost(); + this.useDataOnlyWeight = readConf.useDataOnlyWeight(); } @Override @@ -59,7 +63,22 @@ protected List> taskGroups() { table(), taskSetId); - this.taskGroups = TableScanUtil.planTaskGroups(tasks, splitSize, splitLookback, openFileCost); + if (useDataOnlyWeight) { + Function weightFunc = + task -> { + long dataSize = + task instanceof ContentScanTask + ? ((ContentScanTask) task).length() + : task.sizeBytes(); + return Math.max(dataSize, task.filesCount() * openFileCost); + }; + this.taskGroups = + TableScanUtil.planTaskGroups( + tasks, splitSize, splitLookback, openFileCost, weightFunc); + } else { + this.taskGroups = + TableScanUtil.planTaskGroups(tasks, splitSize, splitLookback, openFileCost); + } } return taskGroups; } @@ -80,13 +99,20 @@ public boolean equals(Object other) { && readSchema().equals(that.readSchema()) && Objects.equals(splitSize, that.splitSize) && Objects.equals(splitLookback, that.splitLookback) - && Objects.equals(openFileCost, that.openFileCost); + && Objects.equals(openFileCost, that.openFileCost) + && useDataOnlyWeight == that.useDataOnlyWeight; } @Override public int hashCode() { return Objects.hash( - table().name(), taskSetId, readSchema(), splitSize, splitSize, openFileCost); + table().name(), + taskSetId, + readSchema(), + splitSize, + splitLookback, + openFileCost, + useDataOnlyWeight); } @Override diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java index 6ce2ce6238..a7c4c6ea9e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java @@ -20,16 +20,23 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.UUID; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Files; import org.apache.iceberg.Table; +import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.util.Pair; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -122,4 +129,82 @@ public void testTaskSetPlanning() throws NoSuchTableException, IOException { .isEqualTo(1); } } + + @TestTemplate + public void testDataOnlyWeightTaskGroupPlanning() throws NoSuchTableException, IOException { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.coalesce(1).writeTo(tableName).append(); + df.coalesce(1).writeTo(tableName).append(); + + Table table = validationCatalog.loadTable(tableIdent); + + // get data files to write position deletes against them + List dataFiles = Lists.newArrayList(); + try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { + for (FileScanTask task : fileScanTasks) { + dataFiles.add(task.file()); + } + } + + assertThat(dataFiles).as("Should have 2 data files").hasSize(2); + + // write position deletes for each data file to inflate sizeBytes() + for (DataFile dataFile : dataFiles) { + List> deletes = Lists.newArrayList(); + deletes.add(Pair.of(dataFile.path(), 0L)); + Pair result = + FileHelpers.writeDeleteFile( + table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes); + table.newRowDelta().addDeletes(result.first()).commit(); + } + + table.refresh(); + + // scan with deletes to get inflated sizeBytes + try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { + ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); + String setID = UUID.randomUUID().toString(); + List tasks = ImmutableList.copyOf(fileScanTasks); + taskSetManager.stageTasks(table, setID, tasks); + + long dataOnlySize = tasks.stream().mapToLong(FileScanTask::length).sum(); + long totalSizeBytes = tasks.stream().mapToLong(FileScanTask::sizeBytes).sum(); + assertThat(totalSizeBytes) + .as("sizeBytes should be larger than data-only length due to delete files") + .isGreaterThan(dataOnlySize); + + // with data-only weight: both files should fit in one partition + // because their data-only sizes sum to exactly dataOnlySize + Dataset dataOnlyDF = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.SCAN_TASK_SET_ID, setID) + .option(SparkReadOptions.SPLIT_SIZE, dataOnlySize) + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .option(SparkReadOptions.USE_DATA_ONLY_WEIGHT, "true") + .load(tableName); + assertThat(dataOnlyDF.javaRDD().getNumPartitions()) + .as("Data-only weight should pack both files into 1 partition") + .isEqualTo(1); + + // without data-only weight: inflated sizeBytes should cause 2 partitions + // because the sizeBytes of both files together exceeds dataOnlySize + Dataset defaultDF = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.SCAN_TASK_SET_ID, setID) + .option(SparkReadOptions.SPLIT_SIZE, dataOnlySize) + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .load(tableName); + assertThat(defaultDF.javaRDD().getNumPartitions()) + .as("Default weight (with deletes) should produce more partitions") + .isGreaterThan(1); + } + } } From 2ac47ac200211d803b8d5606703ab056981450ea Mon Sep 17 00:00:00 2001 From: Stas Pak Date: Wed, 4 Mar 2026 21:46:39 -0800 Subject: [PATCH 2/5] Style fix --- .../java/org/apache/iceberg/spark/source/SparkStagedScan.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index 300560a36d..4a6944461b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -73,8 +73,7 @@ protected List> taskGroups() { return Math.max(dataSize, task.filesCount() * openFileCost); }; this.taskGroups = - TableScanUtil.planTaskGroups( - tasks, splitSize, splitLookback, openFileCost, weightFunc); + TableScanUtil.planTaskGroups(tasks, splitSize, splitLookback, openFileCost, weightFunc); } else { this.taskGroups = TableScanUtil.planTaskGroups(tasks, splitSize, splitLookback, openFileCost); From 151dc53d83126a01c741a6f03ec760cccfda122e Mon Sep 17 00:00:00 2001 From: Stas Pak Date: Wed, 4 Mar 2026 21:49:27 -0800 Subject: [PATCH 3/5] Use length instead of sizeBytes when truncating task to fit into budget --- .../iceberg/spark/actions/RewriteDataFilesSparkAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index dc26660f98..9fd6c8dec6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -200,9 +200,9 @@ StructLikeMap>> planFileGroups(long startingSnapshotId) long pickedTasksSizeBytes = 0; int pickedTasksCount = 0; for (FileScanTask task : tasks) { - if (pickedTasksSizeBytes + task.sizeBytes() <= maxTotalFilesSizeBytes) { + if (pickedTasksSizeBytes + task.length() <= maxTotalFilesSizeBytes) { pickedTasksCount++; - pickedTasksSizeBytes += task.sizeBytes(); + pickedTasksSizeBytes += task.length(); } else { break; } From 048fcd0068576af96bc61b11d1adf3a64b876f32 Mon Sep 17 00:00:00 2001 From: Stas Pak Date: Thu, 5 Mar 2026 12:33:41 -0800 Subject: [PATCH 4/5] Always use data-only weight in SparkStagedScan SparkStagedScan is only reachable via rewrite procedures, so the opt-in USE_DATA_ONLY_WEIGHT flag and branching are unnecessary. Use ContentScanTask::length as the weight function unconditionally and remove the read option, config method, and caller flag. Co-Authored-By: Claude Opus 4.6 --- .../apache/iceberg/spark/SparkReadConf.java | 8 ---- .../iceberg/spark/SparkReadOptions.java | 5 --- .../actions/SparkBinPackDataRewriter.java | 1 - .../iceberg/spark/source/SparkStagedScan.java | 39 ++++++------------- .../spark/source/TestSparkStagedScan.java | 23 ++--------- 5 files changed, 16 insertions(+), 60 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 500450a3ec..ae4795908e 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -288,14 +288,6 @@ public boolean adaptiveSplitSizeEnabled() { .parse(); } - public boolean useDataOnlyWeight() { - return confParser - .booleanConf() - .option(SparkReadOptions.USE_DATA_ONLY_WEIGHT) - .defaultValue(false) - .parse(); - } - public int parallelism() { int defaultParallelism = spark.sparkContext().defaultParallelism(); int numShufflePartitions = spark.sessionState().conf().numShufflePartitions(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index b8d2065fdc..17f2bfee69 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -95,9 +95,4 @@ private SparkReadOptions() {} public static final String TIMESTAMP_AS_OF = "timestampAsOf"; public static final String AGGREGATE_PUSH_DOWN_ENABLED = "aggregate-push-down-enabled"; - - // Use data-only size (ContentScanTask::length) as weight for task group planning - // instead of the default sizeBytes() which includes delete file sizes. - // Used by bin-pack compaction to produce correctly-sized output files. - public static final String USE_DATA_ONLY_WEIGHT = "use-data-only-weight"; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java index 0b22c9620c..9a96f44ebd 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java @@ -49,7 +49,6 @@ protected void doRewrite(String groupId, List group) { .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId) .option(SparkReadOptions.SPLIT_SIZE, splitSize(inputSize(group))) .option(SparkReadOptions.FILE_OPEN_COST, "0") - .option(SparkReadOptions.USE_DATA_ONLY_WEIGHT, "true") .load(groupId); // write the packed data into new files where each split becomes a new file diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index 4a6944461b..b11380bd0d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -39,8 +39,6 @@ class SparkStagedScan extends SparkScan { private final long splitSize; private final int splitLookback; private final long openFileCost; - private final boolean useDataOnlyWeight; - private List> taskGroups = null; // lazy cache of tasks SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema, SparkReadConf readConf) { @@ -49,7 +47,6 @@ class SparkStagedScan extends SparkScan { this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); this.openFileCost = readConf.splitOpenFileCost(); - this.useDataOnlyWeight = readConf.useDataOnlyWeight(); } @Override @@ -63,21 +60,16 @@ protected List> taskGroups() { table(), taskSetId); - if (useDataOnlyWeight) { - Function weightFunc = - task -> { - long dataSize = - task instanceof ContentScanTask - ? ((ContentScanTask) task).length() - : task.sizeBytes(); - return Math.max(dataSize, task.filesCount() * openFileCost); - }; - this.taskGroups = - TableScanUtil.planTaskGroups(tasks, splitSize, splitLookback, openFileCost, weightFunc); - } else { - this.taskGroups = - TableScanUtil.planTaskGroups(tasks, splitSize, splitLookback, openFileCost); - } + Function weightFunc = + task -> { + long dataSize = + task instanceof ContentScanTask + ? ((ContentScanTask) task).length() + : task.sizeBytes(); + return Math.max(dataSize, task.filesCount() * openFileCost); + }; + this.taskGroups = + TableScanUtil.planTaskGroups(tasks, splitSize, splitLookback, openFileCost, weightFunc); } return taskGroups; } @@ -98,20 +90,13 @@ public boolean equals(Object other) { && readSchema().equals(that.readSchema()) && Objects.equals(splitSize, that.splitSize) && Objects.equals(splitLookback, that.splitLookback) - && Objects.equals(openFileCost, that.openFileCost) - && useDataOnlyWeight == that.useDataOnlyWeight; + && Objects.equals(openFileCost, that.openFileCost); } @Override public int hashCode() { return Objects.hash( - table().name(), - taskSetId, - readSchema(), - splitSize, - splitLookback, - openFileCost, - useDataOnlyWeight); + table().name(), taskSetId, readSchema(), splitSize, splitLookback, openFileCost); } @Override diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java index a7c4c6ea9e..e6b0b455bc 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java @@ -131,7 +131,7 @@ public void testTaskSetPlanning() throws NoSuchTableException, IOException { } @TestTemplate - public void testDataOnlyWeightTaskGroupPlanning() throws NoSuchTableException, IOException { + public void testDataOnlyWeightIsDefault() throws NoSuchTableException, IOException { sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); List records = @@ -177,34 +177,19 @@ public void testDataOnlyWeightTaskGroupPlanning() throws NoSuchTableException, I .as("sizeBytes should be larger than data-only length due to delete files") .isGreaterThan(dataOnlySize); - // with data-only weight: both files should fit in one partition + // data-only weight is used by default: both files should fit in one partition // because their data-only sizes sum to exactly dataOnlySize - Dataset dataOnlyDF = + Dataset scanDF = spark .read() .format("iceberg") .option(SparkReadOptions.SCAN_TASK_SET_ID, setID) .option(SparkReadOptions.SPLIT_SIZE, dataOnlySize) .option(SparkReadOptions.FILE_OPEN_COST, "0") - .option(SparkReadOptions.USE_DATA_ONLY_WEIGHT, "true") .load(tableName); - assertThat(dataOnlyDF.javaRDD().getNumPartitions()) + assertThat(scanDF.javaRDD().getNumPartitions()) .as("Data-only weight should pack both files into 1 partition") .isEqualTo(1); - - // without data-only weight: inflated sizeBytes should cause 2 partitions - // because the sizeBytes of both files together exceeds dataOnlySize - Dataset defaultDF = - spark - .read() - .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, setID) - .option(SparkReadOptions.SPLIT_SIZE, dataOnlySize) - .option(SparkReadOptions.FILE_OPEN_COST, "0") - .load(tableName); - assertThat(defaultDF.javaRDD().getNumPartitions()) - .as("Default weight (with deletes) should produce more partitions") - .isGreaterThan(1); } } } From 038ba429be93c1d03979b2f1cc5661bc31257390 Mon Sep 17 00:00:00 2001 From: Stas Pak Date: Thu, 5 Mar 2026 12:55:00 -0800 Subject: [PATCH 5/5] Move data-only weight function into TableScanUtil Add planTaskGroupsWithDataSize() to TableScanUtil that uses ContentScanTask::length instead of sizeBytes() for bin-packing weight. This avoids exposing weight calculation internals in SparkStagedScan. Co-Authored-By: Claude Opus 4.6 --- .../org/apache/iceberg/util/TableScanUtil.java | 18 ++++++++++++++++++ .../iceberg/spark/source/SparkStagedScan.java | 12 +----------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index 219d09cc8f..a6c7aa3bbe 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -26,6 +26,7 @@ import org.apache.iceberg.BaseScanTaskGroup; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MergeableScanTask; @@ -109,6 +110,23 @@ public static List> planTaskGroups( planTaskGroups(CloseableIterable.withNoopClose(tasks), splitSize, lookback, openFileCost)); } + /** + * Plans task groups using data-only file size (ContentScanTask::length) as the weight, ignoring + * delete file sizes. For non-ContentScanTask tasks, falls back to sizeBytes(). + */ + public static List> planTaskGroupsWithDataSize( + List tasks, long splitSize, int lookback, long openFileCost) { + Function weightFunc = + task -> { + long dataSize = + task instanceof ContentScanTask + ? ((ContentScanTask) task).length() + : task.sizeBytes(); + return Math.max(dataSize, task.filesCount() * openFileCost); + }; + return planTaskGroups(tasks, splitSize, lookback, openFileCost, weightFunc); + } + public static List> planTaskGroups( List tasks, long splitSize, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index b11380bd0d..260cd98cae 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -20,8 +20,6 @@ import java.util.List; import java.util.Objects; -import java.util.function.Function; -import org.apache.iceberg.ContentScanTask; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; @@ -60,16 +58,8 @@ protected List> taskGroups() { table(), taskSetId); - Function weightFunc = - task -> { - long dataSize = - task instanceof ContentScanTask - ? ((ContentScanTask) task).length() - : task.sizeBytes(); - return Math.max(dataSize, task.filesCount() * openFileCost); - }; this.taskGroups = - TableScanUtil.planTaskGroups(tasks, splitSize, splitLookback, openFileCost, weightFunc); + TableScanUtil.planTaskGroupsWithDataSize(tasks, splitSize, splitLookback, openFileCost); } return taskGroups; }