diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index e2dbcb61e9..a6c7aa3bbe 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -26,6 +26,7 @@ import org.apache.iceberg.BaseScanTaskGroup; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MergeableScanTask; @@ -109,9 +110,49 @@ public static List> planTaskGroups( planTaskGroups(CloseableIterable.withNoopClose(tasks), splitSize, lookback, openFileCost)); } + /** + * Plans task groups using data-only file size (ContentScanTask::length) as the weight, ignoring + * delete file sizes. For non-ContentScanTask tasks, falls back to sizeBytes(). + */ + public static List> planTaskGroupsWithDataSize( + List tasks, long splitSize, int lookback, long openFileCost) { + Function weightFunc = + task -> { + long dataSize = + task instanceof ContentScanTask + ? ((ContentScanTask) task).length() + : task.sizeBytes(); + return Math.max(dataSize, task.filesCount() * openFileCost); + }; + return planTaskGroups(tasks, splitSize, lookback, openFileCost, weightFunc); + } + + public static List> planTaskGroups( + List tasks, + long splitSize, + int lookback, + long openFileCost, + Function weightFunc) { + return Lists.newArrayList( + planTaskGroups( + CloseableIterable.withNoopClose(tasks), splitSize, lookback, openFileCost, weightFunc)); + } + @SuppressWarnings("unchecked") public static CloseableIterable> planTaskGroups( CloseableIterable tasks, long splitSize, int lookback, long openFileCost) { + Function defaultWeightFunc = + task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); + return planTaskGroups(tasks, splitSize, lookback, openFileCost, defaultWeightFunc); + } + + @SuppressWarnings("unchecked") + public static CloseableIterable> planTaskGroups( + CloseableIterable tasks, + long splitSize, + int lookback, + long openFileCost, + Function weightFunc) { validatePlanningArguments(splitSize, lookback, openFileCost); @@ -129,9 +170,6 @@ public static CloseableIterable> planTaskG }), tasks); - Function weightFunc = - task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); - return CloseableIterable.transform( CloseableIterable.combine( new BinPacking.PackingIterable<>(splitTasks, splitSize, lookback, weightFunc, true), diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index dc26660f98..9fd6c8dec6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -200,9 +200,9 @@ StructLikeMap>> planFileGroups(long startingSnapshotId) long pickedTasksSizeBytes = 0; int pickedTasksCount = 0; for (FileScanTask task : tasks) { - if (pickedTasksSizeBytes + task.sizeBytes() <= maxTotalFilesSizeBytes) { + if (pickedTasksSizeBytes + task.length() <= maxTotalFilesSizeBytes) { pickedTasksCount++; - pickedTasksSizeBytes += task.sizeBytes(); + pickedTasksSizeBytes += task.length(); } else { break; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index fd299ade7f..260cd98cae 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -37,7 +37,6 @@ class SparkStagedScan extends SparkScan { private final long splitSize; private final int splitLookback; private final long openFileCost; - private List> taskGroups = null; // lazy cache of tasks SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema, SparkReadConf readConf) { @@ -59,7 +58,8 @@ protected List> taskGroups() { table(), taskSetId); - this.taskGroups = TableScanUtil.planTaskGroups(tasks, splitSize, splitLookback, openFileCost); + this.taskGroups = + TableScanUtil.planTaskGroupsWithDataSize(tasks, splitSize, splitLookback, openFileCost); } return taskGroups; } @@ -86,7 +86,7 @@ && readSchema().equals(that.readSchema()) @Override public int hashCode() { return Objects.hash( - table().name(), taskSetId, readSchema(), splitSize, splitSize, openFileCost); + table().name(), taskSetId, readSchema(), splitSize, splitLookback, openFileCost); } @Override diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java index 6ce2ce6238..e6b0b455bc 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java @@ -20,16 +20,23 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.UUID; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Files; import org.apache.iceberg.Table; +import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.util.Pair; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -122,4 +129,67 @@ public void testTaskSetPlanning() throws NoSuchTableException, IOException { .isEqualTo(1); } } + + @TestTemplate + public void testDataOnlyWeightIsDefault() throws NoSuchTableException, IOException { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.coalesce(1).writeTo(tableName).append(); + df.coalesce(1).writeTo(tableName).append(); + + Table table = validationCatalog.loadTable(tableIdent); + + // get data files to write position deletes against them + List dataFiles = Lists.newArrayList(); + try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { + for (FileScanTask task : fileScanTasks) { + dataFiles.add(task.file()); + } + } + + assertThat(dataFiles).as("Should have 2 data files").hasSize(2); + + // write position deletes for each data file to inflate sizeBytes() + for (DataFile dataFile : dataFiles) { + List> deletes = Lists.newArrayList(); + deletes.add(Pair.of(dataFile.path(), 0L)); + Pair result = + FileHelpers.writeDeleteFile( + table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes); + table.newRowDelta().addDeletes(result.first()).commit(); + } + + table.refresh(); + + // scan with deletes to get inflated sizeBytes + try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { + ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); + String setID = UUID.randomUUID().toString(); + List tasks = ImmutableList.copyOf(fileScanTasks); + taskSetManager.stageTasks(table, setID, tasks); + + long dataOnlySize = tasks.stream().mapToLong(FileScanTask::length).sum(); + long totalSizeBytes = tasks.stream().mapToLong(FileScanTask::sizeBytes).sum(); + assertThat(totalSizeBytes) + .as("sizeBytes should be larger than data-only length due to delete files") + .isGreaterThan(dataOnlySize); + + // data-only weight is used by default: both files should fit in one partition + // because their data-only sizes sum to exactly dataOnlySize + Dataset scanDF = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.SCAN_TASK_SET_ID, setID) + .option(SparkReadOptions.SPLIT_SIZE, dataOnlySize) + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .load(tableName); + assertThat(scanDF.javaRDD().getNumPartitions()) + .as("Data-only weight should pack both files into 1 partition") + .isEqualTo(1); + } + } }