diff --git a/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java index 2f309a952..9d010d246 100644 --- a/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java +++ b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkMoRFunctionalTest.java @@ -26,6 +26,7 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.util.ArrayUtil; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -195,6 +196,92 @@ public void testCompactionCanRemoveEqualityDeleteFiles() throws NoSuchTableExcep assertThat(stats.getNumCurrentSnapshotEqualityDeleteFiles()).isEqualTo(0L); } + @Test + public void testBudgetedRewriteUsesDataLengthForTaskGrouping() throws NoSuchTableException { + createAndInitTable("id int, data string"); + + // Create 4 separate data files by appending individually + for (int i = 0; i < 4; i++) { + List records = + Arrays.asList( + new SimpleRecord(i * 2, "data_" + i), new SimpleRecord(i * 2 + 1, "data_" + i)); + ops.spark() + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + } + + // Delete one row from each data file to produce partition-scoped position delete files. + // In an unpartitioned table, all position deletes are in the same partition and thus + // associated with ALL data files, inflating each task's sizeBytes relative to its length. + for (int i = 0; i < 4; i++) { + sql("DELETE FROM %s WHERE id = %d", tableName, i * 2); + } + + Table table = ops.getTable(tableName); + + // Verify we have 4 data files and position delete files + List dataFileCountResult = sql("SELECT count(*) FROM %s.data_files", tableName); + assertThat((long) dataFileCountResult.get(0)[0]).isEqualTo(4L); + + List deleteFileCountResult = sql("SELECT count(*) FROM %s.delete_files", tableName); + assertThat((long) deleteFileCountResult.get(0)[0]).isGreaterThanOrEqualTo(4L); + + // Compute budget as half of total data file size (by file_size_in_bytes from metadata, + // excluding delete file sizes). If the old sizeBytes-based grouping was used, each task + // would appear much larger (data + all partition-scoped delete files), and the budget + // would cover fewer files. + List totalSizeResult = + sql("SELECT sum(file_size_in_bytes) FROM %s.data_files", tableName); + long totalDataSize = (long) totalSizeResult.get(0)[0]; + // add margin to total data size, file sizes are roughly the same but can vary by a few bytes + long margin = totalDataSize / 10; + long halfBudget = totalDataSize / 2 + margin; + + // Set target-file-size-bytes to the total size of 2 data files. With the length-based + // grouping fix (linkedin/iceberg#233), the 2 rewritten data files are grouped into a + // single task and merged into 1 output file. If sizeBytes (data + all partition-scoped + // delete files) was used instead, each task would appear much larger than the target, + // preventing them from being grouped together and producing 2 separate output files. + long targetSize = halfBudget; + + log.info( + "Budgeted rewrite test: totalDataSize={}, halfBudget={}, targetSize={}", + totalDataSize, + halfBudget, + targetSize); + + // Use SparkActions directly instead of ops.rewriteDataFiles() because this test requires + // fine-grained control over budget options (MAX_TOTAL_FILES_SIZE_BYTES, target-file-size-bytes) + // that are not exposed through the Operations API. + RewriteDataFiles.Result result = + SparkActions.get(ops.spark()) + .rewriteDataFiles(table) + .binPack() + .option(RewriteDataFiles.MAX_TOTAL_FILES_SIZE_BYTES, Long.toString(halfBudget)) + .option("target-file-size-bytes", Long.toString(targetSize)) + .option("min-file-size-bytes", "1") + .option("max-file-size-bytes", Long.toString(targetSize * 2)) + .option("min-input-files", "1") + .option("delete-file-threshold", "0") + .execute(); + + // Budget covers exactly half the data files by length. + Assertions.assertEquals(2, result.rewrittenDataFilesCount()); + // With length-based grouping, the 2 data files (total size = targetSize) fit in one group + // and merge into 1 output file. With sizeBytes-based grouping, each file's perceived size + // would be data_length + totalDeleteSize, far exceeding the target, so they would be + // placed in separate groups producing 2 output files instead. + Assertions.assertEquals(1, result.addedDataFilesCount()); + + // Verify data correctness: only odd-numbered IDs remain (even IDs were deleted) + List expected = + Arrays.asList(row(1, "data_0"), row(3, "data_1"), row(5, "data_2"), row(7, "data_3")); + List actual = sql("SELECT * FROM %s ORDER BY id ASC", tableName); + assertThat(actual).containsExactlyElementsOf(expected); + } + private void writeEqDeleteRecord(Table table, String delCol, Object delVal) { List equalityFieldIds = Lists.newArrayList(table.schema().findField(delCol).fieldId()); Schema eqDeleteRowSchema = table.schema().select(delCol); diff --git a/build.gradle b/build.gradle index 3fc6e6937..f0d31da00 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ ext { ok_http3_version = "4.11.0" junit_version = "5.11.0" iceberg_1_2_version = "1.2.0.11" - iceberg_1_5_version = "1.5.2.7" + iceberg_1_5_version = "1.5.2.8" otel_agent_version = "2.12.0" // Bundles OTel SDK 1.47.0 otel_annotations_version = "2.12.0" // Match agent version }