From 21c1e7bb6689d22866fabe9859e52ef88aeac1cd Mon Sep 17 00:00:00 2001 From: Stas Pak Date: Fri, 6 Mar 2026 15:40:05 -0800 Subject: [PATCH 1/2] Upgrade iceberg-1.5.2.8 to include compaction improvement (linkedin/iceberg#233), add integration test Upgrades iceberg from 1.5.2.7 to 1.5.2.8 which includes a fix for budgeted rewrite task grouping to use data file length instead of sizeBytes (linkedin/iceberg#233). Adds an integration test that validates the corrected behavior. --- .../catalog/e2e/SparkMoRFunctionalTest.java | 85 +++++++++++++++++++ build.gradle | 2 +- 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java index 2f309a952..8e32f2ab1 100644 --- a/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java +++ b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java @@ -26,6 +26,7 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.util.ArrayUtil; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -195,6 +196,90 @@ public void testCompactionCanRemoveEqualityDeleteFiles() throws NoSuchTableExcep assertThat(stats.getNumCurrentSnapshotEqualityDeleteFiles()).isEqualTo(0L); } + @Test + public void testBudgetedRewriteUsesDataLengthForTaskGrouping() throws NoSuchTableException { + createAndInitTable("id int, data string"); + + // Create 4 separate data files by appending individually + for (int i = 0; i < 4; i++) { + List records = + Arrays.asList( + new SimpleRecord(i * 2, "data_" + i), new SimpleRecord(i * 2 + 1, "data_" + i)); + ops.spark() + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + } + + // Delete one row from each data file to produce partition-scoped position delete files. + // In an unpartitioned table, all position deletes are in the same partition and thus + // associated with ALL data files, inflating each task's sizeBytes relative to its length. + for (int i = 0; i < 4; i++) { + sql("DELETE FROM %s WHERE id = %d", tableName, i * 2); + } + + Table table = ops.getTable(tableName); + + // Verify we have 4 data files and position delete files + List dataFileCountResult = sql("SELECT count(*) FROM %s.data_files", tableName); + assertThat((long) dataFileCountResult.get(0)[0]).isEqualTo(4L); + + List deleteFileCountResult = sql("SELECT count(*) FROM %s.delete_files", tableName); + assertThat((long) deleteFileCountResult.get(0)[0]).isGreaterThanOrEqualTo(4L); + + // Compute budget as half of total data file size (by file_size_in_bytes from metadata, + // excluding delete file sizes). If the old sizeBytes-based grouping was used, each task + // would appear much larger (data + all partition-scoped delete files), and the budget + // would cover fewer files. + List totalSizeResult = + sql("SELECT sum(file_size_in_bytes) FROM %s.data_files", tableName); + long totalDataSize = (long) totalSizeResult.get(0)[0]; + long halfBudget = totalDataSize / 2; + + // Set target-file-size-bytes to the total size of 2 data files. With the length-based + // grouping fix (linkedin/iceberg#233), the 2 rewritten data files are grouped into a + // single task and merged into 1 output file. If sizeBytes (data + all partition-scoped + // delete files) was used instead, each task would appear much larger than the target, + // preventing them from being grouped together and producing 2 separate output files. + long targetSize = halfBudget; + + log.info( + "Budgeted rewrite test: totalDataSize={}, halfBudget={}, targetSize={}", + totalDataSize, + halfBudget, + targetSize); + + // Use SparkActions directly instead of ops.rewriteDataFiles() because this test requires + // fine-grained control over budget options (MAX_TOTAL_FILES_SIZE_BYTES, target-file-size-bytes) + // that are not exposed through the Operations API. + RewriteDataFiles.Result result = + SparkActions.get(ops.spark()) + .rewriteDataFiles(table) + .binPack() + .option(RewriteDataFiles.MAX_TOTAL_FILES_SIZE_BYTES, Long.toString(halfBudget)) + .option("target-file-size-bytes", Long.toString(targetSize)) + .option("min-file-size-bytes", "1") + .option("max-file-size-bytes", Long.toString(targetSize * 2)) + .option("min-input-files", "1") + .option("delete-file-threshold", "0") + .execute(); + + // Budget covers exactly half the data files by length. + Assertions.assertEquals(2, result.rewrittenDataFilesCount()); + // With length-based grouping, the 2 data files (total size = targetSize) fit in one group + // and merge into 1 output file. With sizeBytes-based grouping, each file's perceived size + // would be data_length + totalDeleteSize, far exceeding the target, so they would be + // placed in separate groups producing 2 output files instead. + Assertions.assertEquals(1, result.addedDataFilesCount()); + + // Verify data correctness: only odd-numbered IDs remain (even IDs were deleted) + List expected = + Arrays.asList(row(1, "data_0"), row(3, "data_1"), row(5, "data_2"), row(7, "data_3")); + List actual = sql("SELECT * FROM %s ORDER BY id ASC", tableName); + assertThat(actual).containsExactlyElementsOf(expected); + } + private void writeEqDeleteRecord(Table table, String delCol, Object delVal) { List equalityFieldIds = Lists.newArrayList(table.schema().findField(delCol).fieldId()); Schema eqDeleteRowSchema = table.schema().select(delCol); diff --git a/build.gradle b/build.gradle index 3fc6e6937..f0d31da00 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ ext { ok_http3_version = "4.11.0" junit_version = "5.11.0" iceberg_1_2_version = "1.2.0.11" - iceberg_1_5_version = "1.5.2.7" + iceberg_1_5_version = "1.5.2.8" otel_agent_version = "2.12.0" // Bundles OTel SDK 1.47.0 otel_annotations_version = "2.12.0" // Match agent version } From 0ae052feb8bb73cce6e35e5a913cbc8f14986e88 Mon Sep 17 00:00:00 2001 From: Stas Pak Date: Mon, 9 Mar 2026 09:59:38 -0700 Subject: [PATCH 2/2] Address comments --- .../openhouse/catalog/e2e/SparkMoRFunctionalTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java index 8e32f2ab1..9d010d246 100644 --- a/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java +++ b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java @@ -235,7 +235,9 @@ public void testBudgetedRewriteUsesDataLengthForTaskGrouping() throws NoSuchTabl List totalSizeResult = sql("SELECT sum(file_size_in_bytes) FROM %s.data_files", tableName); long totalDataSize = (long) totalSizeResult.get(0)[0]; - long halfBudget = totalDataSize / 2; + // add margin to total data size, file sizes are roughly the same but can vary by a few bytes + long margin = totalDataSize / 10; + long halfBudget = totalDataSize / 2 + margin; // Set target-file-size-bytes to the total size of 2 data files. With the length-based // grouping fix (linkedin/iceberg#233), the 2 rewritten data files are grouped into a