Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -195,6 +196,92 @@ public void testCompactionCanRemoveEqualityDeleteFiles() throws NoSuchTableExcep
assertThat(stats.getNumCurrentSnapshotEqualityDeleteFiles()).isEqualTo(0L);
}

@Test
public void testBudgetedRewriteUsesDataLengthForTaskGrouping() throws NoSuchTableException {
createAndInitTable("id int, data string");

// Create 4 separate data files by appending individually
for (int i = 0; i < 4; i++) {
List<SimpleRecord> records =
Arrays.asList(
new SimpleRecord(i * 2, "data_" + i), new SimpleRecord(i * 2 + 1, "data_" + i));
ops.spark()
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(tableName)
.append();
}

// Delete one row from each data file to produce partition-scoped position delete files.
// In an unpartitioned table, all position deletes are in the same partition and thus
// associated with ALL data files, inflating each task's sizeBytes relative to its length.
for (int i = 0; i < 4; i++) {
sql("DELETE FROM %s WHERE id = %d", tableName, i * 2);
}

Table table = ops.getTable(tableName);

// Verify we have 4 data files and position delete files
List<Object[]> dataFileCountResult = sql("SELECT count(*) FROM %s.data_files", tableName);
assertThat((long) dataFileCountResult.get(0)[0]).isEqualTo(4L);

List<Object[]> deleteFileCountResult = sql("SELECT count(*) FROM %s.delete_files", tableName);
assertThat((long) deleteFileCountResult.get(0)[0]).isGreaterThanOrEqualTo(4L);

// Compute budget as half of total data file size (by file_size_in_bytes from metadata,
// excluding delete file sizes). If the old sizeBytes-based grouping was used, each task
// would appear much larger (data + all partition-scoped delete files), and the budget
// would cover fewer files.
List<Object[]> totalSizeResult =
sql("SELECT sum(file_size_in_bytes) FROM %s.data_files", tableName);
long totalDataSize = (long) totalSizeResult.get(0)[0];
// add margin to total data size, file sizes are roughly the same but can vary by a few bytes
long margin = totalDataSize / 10;
long halfBudget = totalDataSize / 2 + margin;

// Set target-file-size-bytes to the total size of 2 data files. With the length-based
// grouping fix (linkedin/iceberg#233), the 2 rewritten data files are grouped into a
// single task and merged into 1 output file. If sizeBytes (data + all partition-scoped
// delete files) was used instead, each task would appear much larger than the target,
// preventing them from being grouped together and producing 2 separate output files.
long targetSize = halfBudget;

log.info(
"Budgeted rewrite test: totalDataSize={}, halfBudget={}, targetSize={}",
totalDataSize,
halfBudget,
targetSize);

// Use SparkActions directly instead of ops.rewriteDataFiles() because this test requires
// fine-grained control over budget options (MAX_TOTAL_FILES_SIZE_BYTES, target-file-size-bytes)
// that are not exposed through the Operations API.
RewriteDataFiles.Result result =
SparkActions.get(ops.spark())
.rewriteDataFiles(table)
.binPack()
.option(RewriteDataFiles.MAX_TOTAL_FILES_SIZE_BYTES, Long.toString(halfBudget))
.option("target-file-size-bytes", Long.toString(targetSize))
.option("min-file-size-bytes", "1")
.option("max-file-size-bytes", Long.toString(targetSize * 2))
.option("min-input-files", "1")
.option("delete-file-threshold", "0")
.execute();

// Budget covers exactly half the data files by length.
Assertions.assertEquals(2, result.rewrittenDataFilesCount());
// With length-based grouping, the 2 data files (total size = targetSize) fit in one group
// and merge into 1 output file. With sizeBytes-based grouping, each file's perceived size
// would be data_length + totalDeleteSize, far exceeding the target, so they would be
// placed in separate groups producing 2 output files instead.
Assertions.assertEquals(1, result.addedDataFilesCount());

// Verify data correctness: only odd-numbered IDs remain (even IDs were deleted)
List<Object[]> expected =
Arrays.asList(row(1, "data_0"), row(3, "data_1"), row(5, "data_2"), row(7, "data_3"));
List<Object[]> actual = sql("SELECT * FROM %s ORDER BY id ASC", tableName);
assertThat(actual).containsExactlyElementsOf(expected);
}

private void writeEqDeleteRecord(Table table, String delCol, Object delVal) {
List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().findField(delCol).fieldId());
Schema eqDeleteRowSchema = table.schema().select(delCol);
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ext {
ok_http3_version = "4.11.0"
junit_version = "5.11.0"
iceberg_1_2_version = "1.2.0.11"
iceberg_1_5_version = "1.5.2.7"
iceberg_1_5_version = "1.5.2.8"
otel_agent_version = "2.12.0" // Bundles OTel SDK 1.47.0
otel_annotations_version = "2.12.0" // Match agent version
}
Expand Down