Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.BaseScanTaskGroup;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MergeableScanTask;
Expand Down Expand Up @@ -109,9 +110,49 @@ public static <T extends ScanTask> List<ScanTaskGroup<T>> planTaskGroups(
planTaskGroups(CloseableIterable.withNoopClose(tasks), splitSize, lookback, openFileCost));
}

/**
* Plans task groups using data-only file size (ContentScanTask::length) as the weight, ignoring
* delete file sizes. For non-ContentScanTask tasks, falls back to sizeBytes().
*/
public static <T extends ScanTask> List<ScanTaskGroup<T>> planTaskGroupsWithDataSize(
List<T> tasks, long splitSize, int lookback, long openFileCost) {
Function<T, Long> weightFunc =
task -> {
long dataSize =
task instanceof ContentScanTask
? ((ContentScanTask<?>) task).length()
: task.sizeBytes();
return Math.max(dataSize, task.filesCount() * openFileCost);
};
return planTaskGroups(tasks, splitSize, lookback, openFileCost, weightFunc);
}

public static <T extends ScanTask> List<ScanTaskGroup<T>> planTaskGroups(
List<T> tasks,
long splitSize,
int lookback,
long openFileCost,
Function<T, Long> weightFunc) {
return Lists.newArrayList(
planTaskGroups(
CloseableIterable.withNoopClose(tasks), splitSize, lookback, openFileCost, weightFunc));
}

@SuppressWarnings("unchecked")
public static <T extends ScanTask> CloseableIterable<ScanTaskGroup<T>> planTaskGroups(
CloseableIterable<T> tasks, long splitSize, int lookback, long openFileCost) {
Function<T, Long> defaultWeightFunc =
task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
return planTaskGroups(tasks, splitSize, lookback, openFileCost, defaultWeightFunc);
}

@SuppressWarnings("unchecked")
public static <T extends ScanTask> CloseableIterable<ScanTaskGroup<T>> planTaskGroups(
CloseableIterable<T> tasks,
long splitSize,
int lookback,
long openFileCost,
Function<T, Long> weightFunc) {

validatePlanningArguments(splitSize, lookback, openFileCost);

Expand All @@ -129,9 +170,6 @@ public static <T extends ScanTask> CloseableIterable<ScanTaskGroup<T>> planTaskG
}),
tasks);

Function<T, Long> weightFunc =
task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);

return CloseableIterable.transform(
CloseableIterable.combine(
new BinPacking.PackingIterable<>(splitTasks, splitSize, lookback, weightFunc, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId)
long pickedTasksSizeBytes = 0;
int pickedTasksCount = 0;
for (FileScanTask task : tasks) {
if (pickedTasksSizeBytes + task.sizeBytes() <= maxTotalFilesSizeBytes) {
if (pickedTasksSizeBytes + task.length() <= maxTotalFilesSizeBytes) {
pickedTasksCount++;
pickedTasksSizeBytes += task.sizeBytes();
pickedTasksSizeBytes += task.length();
} else {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class SparkStagedScan extends SparkScan {
private final long splitSize;
private final int splitLookback;
private final long openFileCost;

private List<ScanTaskGroup<ScanTask>> taskGroups = null; // lazy cache of tasks

SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema, SparkReadConf readConf) {
Expand All @@ -59,7 +58,8 @@ protected List<ScanTaskGroup<ScanTask>> taskGroups() {
table(),
taskSetId);

this.taskGroups = TableScanUtil.planTaskGroups(tasks, splitSize, splitLookback, openFileCost);
this.taskGroups =
TableScanUtil.planTaskGroupsWithDataSize(tasks, splitSize, splitLookback, openFileCost);
}
return taskGroups;
}
Expand All @@ -86,7 +86,7 @@ && readSchema().equals(that.readSchema())
@Override
public int hashCode() {
return Objects.hash(
table().name(), taskSetId, readSchema(), splitSize, splitSize, openFileCost);
table().name(), taskSetId, readSchema(), splitSize, splitLookback, openFileCost);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
Expand Down Expand Up @@ -122,4 +129,67 @@ public void testTaskSetPlanning() throws NoSuchTableException, IOException {
.isEqualTo(1);
}
}

@TestTemplate
public void testDataOnlyWeightIsDefault() throws NoSuchTableException, IOException {
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);

List<SimpleRecord> records =
ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
df.coalesce(1).writeTo(tableName).append();
df.coalesce(1).writeTo(tableName).append();

Table table = validationCatalog.loadTable(tableIdent);

// get data files to write position deletes against them
List<DataFile> dataFiles = Lists.newArrayList();
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
for (FileScanTask task : fileScanTasks) {
dataFiles.add(task.file());
}
}

assertThat(dataFiles).as("Should have 2 data files").hasSize(2);

// write position deletes for each data file to inflate sizeBytes()
for (DataFile dataFile : dataFiles) {
List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
deletes.add(Pair.of(dataFile.path(), 0L));
Pair<DeleteFile, ?> result =
FileHelpers.writeDeleteFile(
table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes);
table.newRowDelta().addDeletes(result.first()).commit();
}

table.refresh();

// scan with deletes to get inflated sizeBytes
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
String setID = UUID.randomUUID().toString();
List<FileScanTask> tasks = ImmutableList.copyOf(fileScanTasks);
taskSetManager.stageTasks(table, setID, tasks);

long dataOnlySize = tasks.stream().mapToLong(FileScanTask::length).sum();
long totalSizeBytes = tasks.stream().mapToLong(FileScanTask::sizeBytes).sum();
assertThat(totalSizeBytes)
.as("sizeBytes should be larger than data-only length due to delete files")
.isGreaterThan(dataOnlySize);

// data-only weight is used by default: both files should fit in one partition
// because their data-only sizes sum to exactly dataOnlySize
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
.option(SparkReadOptions.SCAN_TASK_SET_ID, setID)
.option(SparkReadOptions.SPLIT_SIZE, dataOnlySize)
.option(SparkReadOptions.FILE_OPEN_COST, "0")
.load(tableName);
assertThat(scanDF.javaRDD().getNumPartitions())
.as("Data-only weight should pack both files into 1 partition")
.isEqualTo(1);
}
}
}