From 254b9b072ed1426aeb45cf0fb34847b437e59af2 Mon Sep 17 00:00:00 2001 From: Alexander Rhee Date: Wed, 16 Jul 2025 16:52:49 -0700 Subject: [PATCH 1/3] handle empty EarliestCommitToRetain --- .../xtable/hudi/HudiConversionSource.java | 22 ++ .../xtable/hudi/TestHudiConversionSource.java | 253 ++++++++++++++++++ 2 files changed, 275 insertions(+) create mode 100644 xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java index cb65c3411..5d6334695 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java @@ -39,6 +39,7 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; +import com.google.common.base.Strings; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -184,11 +185,32 @@ private boolean isAffectedByCleanupProcess(Instant instant) { TimelineMetadataUtils.deserializeHoodieCleanMetadata( metaClient.getActiveTimeline().getInstantDetails(lastCleanInstant.get()).get()); String earliestCommitToRetain = cleanMetadata.getEarliestCommitToRetain(); + if (Strings.isNullOrEmpty(earliestCommitToRetain)) { + return cleanInstantsOccurredSinceLastSyncedInstant(instant); + } Instant earliestCommitToRetainInstant = HudiInstantUtils.parseFromInstantTime(earliestCommitToRetain); return earliestCommitToRetainInstant.isAfter(instant); } + // When clean instants have empty earliestCommitToRetain, trigger full snapshot sync if any + // clean instants occurred after the last synced instant to err on the side of caution + private boolean cleanInstantsOccurredSinceLastSyncedInstant(Instant instant) { + String lastSyncedCommitTime = HudiInstantUtils.convertInstantToCommit(instant); + List cleanInstantsAfterLastSync = + metaClient + .getActiveTimeline() + .getCleanerTimeline() + .filterCompletedInstants() + .filter( + cleanInstant -> + HoodieTimeline.compareTimestamps( + cleanInstant.getTimestamp(), GREATER_THAN, lastSyncedCommitTime)) + .getInstants(); + + return !cleanInstantsAfterLastSync.isEmpty(); + } + private CommitsPair getCompletedAndPendingCommitsForInstants(List lastPendingInstants) { List lastPendingHoodieInstants = getCommitsForInstants(lastPendingInstants); List lastPendingHoodieInstantsCompleted = diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java new file mode 100644 index 000000000..09c579ab4 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.hudi; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; + +class TestHudiConversionSource { + + @Test + void testIsIncrementalSyncSafeFromWithNullEarliestCommitToRetainNoCleanInstants() + throws Exception { + // Mock the dependencies + HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + HoodieActiveTimeline mockActiveTimeline = mock(HoodieActiveTimeline.class); + HoodieTimeline mockCleanerTimeline = mock(HoodieTimeline.class); + HoodieTimeline mockFilteredCleanerTimeline = mock(HoodieTimeline.class); + HoodieTimeline mockCompletedCommitsTimeline = mock(HoodieTimeline.class); + HoodieInstant mockCleanInstant = mock(HoodieInstant.class); + HoodieInstant mockCommitInstant = mock(HoodieInstant.class); + HoodieCleanMetadata mockCleanMetadata = mock(HoodieCleanMetadata.class); + + // Set up the mock chain for cleaner timeline + when(mockMetaClient.getActiveTimeline()).thenReturn(mockActiveTimeline); + + // Mock the Hadoop configuration to prevent NPE in HudiDataFileExtractor + Configuration hadoopConf = new Configuration(); + hadoopConf.addResource("core-default.xml"); + when(mockMetaClient.getHadoopConf()).thenReturn(hadoopConf); + + // Mock table config to prevent NPE in HudiDataFileExtractor + org.apache.hudi.common.table.HoodieTableConfig mockTableConfig = + mock(org.apache.hudi.common.table.HoodieTableConfig.class); + when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig); + when(mockTableConfig.isMetadataTableAvailable()).thenReturn(false); + when(mockActiveTimeline.getCleanerTimeline()).thenReturn(mockCleanerTimeline); + when(mockCleanerTimeline.filterCompletedInstants()).thenReturn(mockCleanerTimeline); + when(mockCleanerTimeline.lastInstant()).thenReturn(Option.of(mockCleanInstant)); + when(mockActiveTimeline.deserializeInstantContent(mockCleanInstant, HoodieCleanMetadata.class)) + .thenReturn(mockCleanMetadata); + + // Set up the key behavior: earliestCommitToRetain is null + when(mockCleanMetadata.getEarliestCommitToRetain()).thenReturn(null); + + // Set up mocks for handleEmptyEarliestCommitToRetain - no clean instants after last sync + when(mockCleanerTimeline.filter(any())).thenReturn(mockFilteredCleanerTimeline); + when(mockFilteredCleanerTimeline.getInstants()).thenReturn(Collections.emptyList()); + + // Set up the mock chain for commit timeline (for doesCommitExistsAsOfInstant) + when(mockActiveTimeline.filterCompletedInstants()).thenReturn(mockCompletedCommitsTimeline); + when(mockCompletedCommitsTimeline.findInstantsBeforeOrEquals(any(String.class))) + .thenReturn(mockCompletedCommitsTimeline); + when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant)); + + // Create the HudiConversionSource with proper Configuration + Configuration conf = new Configuration(); + conf.addResource("core-default.xml"); + conf.addResource("core-site.xml"); + conf.addResource("hdfs-default.xml"); + conf.addResource("hdfs-site.xml"); + HudiConversionSource hudiConversionSource = + new HudiConversionSource( + mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class), conf, new Properties()); + + // Test that isIncrementalSyncSafeFrom returns true when earliestCommitToRetain is null + // and no clean instants after last sync + Instant testInstant = Instant.now().minusSeconds(3600); // 1 hour ago + boolean result = hudiConversionSource.isIncrementalSyncSafeFrom(testInstant); + + // This should return true because when earliestCommitToRetain is null and no clean instants + // after last sync, + // handleEmptyEarliestCommitToRetain returns false, making isAffectedByCleanupProcess return + // false + assertTrue( + result, + "isIncrementalSyncSafeFrom should return true when earliestCommitToRetain is null and no clean instants after last sync"); + } + + @Test + void testIsIncrementalSyncSafeFromWithNullEarliestCommitToRetainWithCleanInstants() + throws Exception { + // Mock the dependencies + HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + HoodieActiveTimeline mockActiveTimeline = mock(HoodieActiveTimeline.class); + HoodieTimeline mockCleanerTimeline = mock(HoodieTimeline.class); + HoodieTimeline mockFilteredCleanerTimeline = mock(HoodieTimeline.class); + HoodieTimeline mockCompletedCommitsTimeline = mock(HoodieTimeline.class); + HoodieInstant mockCleanInstant = mock(HoodieInstant.class); + HoodieInstant mockCleanInstantAfterSync = mock(HoodieInstant.class); + HoodieInstant mockCommitInstant = mock(HoodieInstant.class); + HoodieCleanMetadata mockCleanMetadata = mock(HoodieCleanMetadata.class); + + // Set up the mock chain for cleaner timeline + when(mockMetaClient.getActiveTimeline()).thenReturn(mockActiveTimeline); + + // Mock the Hadoop configuration to prevent NPE in HudiDataFileExtractor + Configuration hadoopConf = new Configuration(); + hadoopConf.addResource("core-default.xml"); + when(mockMetaClient.getHadoopConf()).thenReturn(hadoopConf); + + // Mock table config to prevent NPE in HudiDataFileExtractor + org.apache.hudi.common.table.HoodieTableConfig mockTableConfig = + mock(org.apache.hudi.common.table.HoodieTableConfig.class); + when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig); + when(mockTableConfig.isMetadataTableAvailable()).thenReturn(false); + when(mockActiveTimeline.getCleanerTimeline()).thenReturn(mockCleanerTimeline); + when(mockCleanerTimeline.filterCompletedInstants()).thenReturn(mockCleanerTimeline); + when(mockCleanerTimeline.lastInstant()).thenReturn(Option.of(mockCleanInstant)); + when(mockActiveTimeline.deserializeInstantContent(mockCleanInstant, HoodieCleanMetadata.class)) + .thenReturn(mockCleanMetadata); + + // Set up the key behavior: earliestCommitToRetain is null + when(mockCleanMetadata.getEarliestCommitToRetain()).thenReturn(null); + + // Set up mocks for handleEmptyEarliestCommitToRetain - clean instants exist after last sync + when(mockCleanerTimeline.filter(any())).thenReturn(mockFilteredCleanerTimeline); + when(mockFilteredCleanerTimeline.getInstants()) + .thenReturn(Arrays.asList(mockCleanInstantAfterSync)); + + // Set up the mock chain for commit timeline (for doesCommitExistsAsOfInstant) + when(mockActiveTimeline.filterCompletedInstants()).thenReturn(mockCompletedCommitsTimeline); + when(mockCompletedCommitsTimeline.findInstantsBeforeOrEquals(any(String.class))) + .thenReturn(mockCompletedCommitsTimeline); + when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant)); + + // Create the HudiConversionSource with proper Configuration + Configuration conf = new Configuration(); + conf.addResource("core-default.xml"); + conf.addResource("core-site.xml"); + conf.addResource("hdfs-default.xml"); + conf.addResource("hdfs-site.xml"); + HudiConversionSource hudiConversionSource = + new HudiConversionSource( + mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class), conf, new Properties()); + + // Test that isIncrementalSyncSafeFrom returns false when earliestCommitToRetain is null + // but clean instants exist after last sync + Instant testInstant = Instant.now().minusSeconds(3600); // 1 hour ago + boolean result = hudiConversionSource.isIncrementalSyncSafeFrom(testInstant); + + // This should return false because when earliestCommitToRetain is null and clean instants exist + // after last sync, + // handleEmptyEarliestCommitToRetain returns true, making isAffectedByCleanupProcess return true + assertFalse( + result, + "isIncrementalSyncSafeFrom should return false when earliestCommitToRetain is null but clean instants exist after last sync"); + } + + @Test + void testIsIncrementalSyncSafeFromWithEmptyEarliestCommitToRetainWithCleanInstants() + throws Exception { + // Mock the dependencies + HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + HoodieActiveTimeline mockActiveTimeline = mock(HoodieActiveTimeline.class); + HoodieTimeline mockCleanerTimeline = mock(HoodieTimeline.class); + HoodieTimeline mockFilteredCleanerTimeline = mock(HoodieTimeline.class); + HoodieTimeline mockCompletedCommitsTimeline = mock(HoodieTimeline.class); + HoodieInstant mockCleanInstant = mock(HoodieInstant.class); + HoodieInstant mockCleanInstantAfterSync = mock(HoodieInstant.class); + HoodieInstant mockCommitInstant = mock(HoodieInstant.class); + HoodieCleanMetadata mockCleanMetadata = mock(HoodieCleanMetadata.class); + + // Set up the mock chain for cleaner timeline + when(mockMetaClient.getActiveTimeline()).thenReturn(mockActiveTimeline); + + // Mock the Hadoop configuration to prevent NPE in HudiDataFileExtractor + Configuration hadoopConf = new Configuration(); + hadoopConf.addResource("core-default.xml"); + when(mockMetaClient.getHadoopConf()).thenReturn(hadoopConf); + + // Mock table config to prevent NPE in HudiDataFileExtractor + org.apache.hudi.common.table.HoodieTableConfig mockTableConfig = + mock(org.apache.hudi.common.table.HoodieTableConfig.class); + when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig); + when(mockTableConfig.isMetadataTableAvailable()).thenReturn(false); + when(mockActiveTimeline.getCleanerTimeline()).thenReturn(mockCleanerTimeline); + when(mockCleanerTimeline.filterCompletedInstants()).thenReturn(mockCleanerTimeline); + when(mockCleanerTimeline.lastInstant()).thenReturn(Option.of(mockCleanInstant)); + when(mockActiveTimeline.deserializeInstantContent(mockCleanInstant, HoodieCleanMetadata.class)) + .thenReturn(mockCleanMetadata); + + // Set up the key behavior: earliestCommitToRetain is empty string + when(mockCleanMetadata.getEarliestCommitToRetain()).thenReturn(""); + + // Set up mocks for handleEmptyEarliestCommitToRetain - clean instants exist after last sync + when(mockCleanerTimeline.filter(any())).thenReturn(mockFilteredCleanerTimeline); + when(mockFilteredCleanerTimeline.getInstants()) + .thenReturn(Arrays.asList(mockCleanInstantAfterSync)); + + // Set up the mock chain for commit timeline (for doesCommitExistsAsOfInstant) + when(mockActiveTimeline.filterCompletedInstants()).thenReturn(mockCompletedCommitsTimeline); + when(mockCompletedCommitsTimeline.findInstantsBeforeOrEquals(any(String.class))) + .thenReturn(mockCompletedCommitsTimeline); + when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant)); + + // Create the HudiConversionSource with proper Configuration + Configuration conf = new Configuration(); + conf.addResource("core-default.xml"); + conf.addResource("core-site.xml"); + conf.addResource("hdfs-default.xml"); + conf.addResource("hdfs-site.xml"); + HudiConversionSource hudiConversionSource = + new HudiConversionSource( + mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class), conf, new Properties()); + + // Test that isIncrementalSyncSafeFrom returns false when earliestCommitToRetain is + // empty/whitespace + // and clean instants exist after last sync + Instant testInstant = Instant.now().minusSeconds(3600); // 1 hour ago + boolean result = hudiConversionSource.isIncrementalSyncSafeFrom(testInstant); + + // This should return false because when earliestCommitToRetain is empty/whitespace and clean + // instants exist after last sync, + // handleEmptyEarliestCommitToRetain returns true, making isAffectedByCleanupProcess return true + assertFalse( + result, + "isIncrementalSyncSafeFrom should return false when earliestCommitToRetain is empty/whitespace and clean instants exist after last sync"); + } +} From 1d8f91e03711eec2457df8267bc6bc1f23f3c928 Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Tue, 7 Apr 2026 17:49:21 -0700 Subject: [PATCH 2/3] Fix build failures: qualify GREATER_THAN and fix test mocks --- .../xtable/hudi/HudiConversionSource.java | 4 +- .../xtable/hudi/TestHudiConversionSource.java | 163 ++++++------------ 2 files changed, 52 insertions(+), 115 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java index 5d6334695..3d9c18997 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java @@ -205,7 +205,9 @@ private boolean cleanInstantsOccurredSinceLastSyncedInstant(Instant instant) { .filter( cleanInstant -> HoodieTimeline.compareTimestamps( - cleanInstant.getTimestamp(), GREATER_THAN, lastSyncedCommitTime)) + cleanInstant.getTimestamp(), + HoodieTimeline.GREATER_THAN, + lastSyncedCommitTime)) .getInstants(); return !cleanInstantsAfterLastSync.isEmpty(); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java index 09c579ab4..9b9236b8c 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java @@ -27,24 +27,39 @@ import java.time.Instant; import java.util.Arrays; import java.util.Collections; -import java.util.Properties; +import java.util.HashMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; class TestHudiConversionSource { + private static byte[] serializeCleanMetadata(String earliestCommitToRetain) throws Exception { + HoodieCleanMetadata metadata = + HoodieCleanMetadata.newBuilder() + .setStartCleanTime("000") + .setTimeTakenInMillis(0L) + .setTotalFilesDeleted(0) + .setEarliestCommitToRetain(earliestCommitToRetain) + .setBootstrapPartitionMetadata(new HashMap<>()) + .setPartitionMetadata(new HashMap<>()) + .build(); + return TimelineMetadataUtils.serializeCleanMetadata(metadata).get(); + } + @Test void testIsIncrementalSyncSafeFromWithNullEarliestCommitToRetainNoCleanInstants() throws Exception { - // Mock the dependencies HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); HoodieActiveTimeline mockActiveTimeline = mock(HoodieActiveTimeline.class); HoodieTimeline mockCleanerTimeline = mock(HoodieTimeline.class); @@ -52,68 +67,41 @@ void testIsIncrementalSyncSafeFromWithNullEarliestCommitToRetainNoCleanInstants( HoodieTimeline mockCompletedCommitsTimeline = mock(HoodieTimeline.class); HoodieInstant mockCleanInstant = mock(HoodieInstant.class); HoodieInstant mockCommitInstant = mock(HoodieInstant.class); - HoodieCleanMetadata mockCleanMetadata = mock(HoodieCleanMetadata.class); - // Set up the mock chain for cleaner timeline + HoodieTableConfig mockTableConfig = mock(HoodieTableConfig.class); when(mockMetaClient.getActiveTimeline()).thenReturn(mockActiveTimeline); - - // Mock the Hadoop configuration to prevent NPE in HudiDataFileExtractor - Configuration hadoopConf = new Configuration(); - hadoopConf.addResource("core-default.xml"); - when(mockMetaClient.getHadoopConf()).thenReturn(hadoopConf); - - // Mock table config to prevent NPE in HudiDataFileExtractor - org.apache.hudi.common.table.HoodieTableConfig mockTableConfig = - mock(org.apache.hudi.common.table.HoodieTableConfig.class); + when(mockMetaClient.getHadoopConf()).thenReturn(new Configuration()); when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig); when(mockTableConfig.isMetadataTableAvailable()).thenReturn(false); + when(mockMetaClient.getBasePathV2()).thenReturn(new Path("/tmp/test-table")); when(mockActiveTimeline.getCleanerTimeline()).thenReturn(mockCleanerTimeline); when(mockCleanerTimeline.filterCompletedInstants()).thenReturn(mockCleanerTimeline); when(mockCleanerTimeline.lastInstant()).thenReturn(Option.of(mockCleanInstant)); - when(mockActiveTimeline.deserializeInstantContent(mockCleanInstant, HoodieCleanMetadata.class)) - .thenReturn(mockCleanMetadata); + // Use empty string — Strings.isNullOrEmpty("") is true, same behavior as null + when(mockActiveTimeline.getInstantDetails(mockCleanInstant)) + .thenReturn(Option.of(serializeCleanMetadata(""))); - // Set up the key behavior: earliestCommitToRetain is null - when(mockCleanMetadata.getEarliestCommitToRetain()).thenReturn(null); - - // Set up mocks for handleEmptyEarliestCommitToRetain - no clean instants after last sync when(mockCleanerTimeline.filter(any())).thenReturn(mockFilteredCleanerTimeline); when(mockFilteredCleanerTimeline.getInstants()).thenReturn(Collections.emptyList()); - // Set up the mock chain for commit timeline (for doesCommitExistsAsOfInstant) when(mockActiveTimeline.filterCompletedInstants()).thenReturn(mockCompletedCommitsTimeline); when(mockCompletedCommitsTimeline.findInstantsBeforeOrEquals(any(String.class))) .thenReturn(mockCompletedCommitsTimeline); + when(mockCommitInstant.getTimestamp()).thenReturn("20200101120000000"); when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant)); - // Create the HudiConversionSource with proper Configuration - Configuration conf = new Configuration(); - conf.addResource("core-default.xml"); - conf.addResource("core-site.xml"); - conf.addResource("hdfs-default.xml"); - conf.addResource("hdfs-site.xml"); HudiConversionSource hudiConversionSource = - new HudiConversionSource( - mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class), conf, new Properties()); - - // Test that isIncrementalSyncSafeFrom returns true when earliestCommitToRetain is null - // and no clean instants after last sync - Instant testInstant = Instant.now().minusSeconds(3600); // 1 hour ago - boolean result = hudiConversionSource.isIncrementalSyncSafeFrom(testInstant); - - // This should return true because when earliestCommitToRetain is null and no clean instants - // after last sync, - // handleEmptyEarliestCommitToRetain returns false, making isAffectedByCleanupProcess return - // false + new HudiConversionSource(mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class)); + + Instant testInstant = Instant.now().minusSeconds(3600); assertTrue( - result, + hudiConversionSource.isIncrementalSyncSafeFrom(testInstant), "isIncrementalSyncSafeFrom should return true when earliestCommitToRetain is null and no clean instants after last sync"); } @Test void testIsIncrementalSyncSafeFromWithNullEarliestCommitToRetainWithCleanInstants() throws Exception { - // Mock the dependencies HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); HoodieActiveTimeline mockActiveTimeline = mock(HoodieActiveTimeline.class); HoodieTimeline mockCleanerTimeline = mock(HoodieTimeline.class); @@ -122,68 +110,42 @@ void testIsIncrementalSyncSafeFromWithNullEarliestCommitToRetainWithCleanInstant HoodieInstant mockCleanInstant = mock(HoodieInstant.class); HoodieInstant mockCleanInstantAfterSync = mock(HoodieInstant.class); HoodieInstant mockCommitInstant = mock(HoodieInstant.class); - HoodieCleanMetadata mockCleanMetadata = mock(HoodieCleanMetadata.class); - // Set up the mock chain for cleaner timeline + HoodieTableConfig mockTableConfig = mock(HoodieTableConfig.class); when(mockMetaClient.getActiveTimeline()).thenReturn(mockActiveTimeline); - - // Mock the Hadoop configuration to prevent NPE in HudiDataFileExtractor - Configuration hadoopConf = new Configuration(); - hadoopConf.addResource("core-default.xml"); - when(mockMetaClient.getHadoopConf()).thenReturn(hadoopConf); - - // Mock table config to prevent NPE in HudiDataFileExtractor - org.apache.hudi.common.table.HoodieTableConfig mockTableConfig = - mock(org.apache.hudi.common.table.HoodieTableConfig.class); + when(mockMetaClient.getHadoopConf()).thenReturn(new Configuration()); when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig); when(mockTableConfig.isMetadataTableAvailable()).thenReturn(false); + when(mockMetaClient.getBasePathV2()).thenReturn(new Path("/tmp/test-table")); when(mockActiveTimeline.getCleanerTimeline()).thenReturn(mockCleanerTimeline); when(mockCleanerTimeline.filterCompletedInstants()).thenReturn(mockCleanerTimeline); when(mockCleanerTimeline.lastInstant()).thenReturn(Option.of(mockCleanInstant)); - when(mockActiveTimeline.deserializeInstantContent(mockCleanInstant, HoodieCleanMetadata.class)) - .thenReturn(mockCleanMetadata); - - // Set up the key behavior: earliestCommitToRetain is null - when(mockCleanMetadata.getEarliestCommitToRetain()).thenReturn(null); + // Use empty string — Strings.isNullOrEmpty("") is true, same behavior as null + when(mockActiveTimeline.getInstantDetails(mockCleanInstant)) + .thenReturn(Option.of(serializeCleanMetadata(""))); - // Set up mocks for handleEmptyEarliestCommitToRetain - clean instants exist after last sync when(mockCleanerTimeline.filter(any())).thenReturn(mockFilteredCleanerTimeline); when(mockFilteredCleanerTimeline.getInstants()) .thenReturn(Arrays.asList(mockCleanInstantAfterSync)); - // Set up the mock chain for commit timeline (for doesCommitExistsAsOfInstant) when(mockActiveTimeline.filterCompletedInstants()).thenReturn(mockCompletedCommitsTimeline); when(mockCompletedCommitsTimeline.findInstantsBeforeOrEquals(any(String.class))) .thenReturn(mockCompletedCommitsTimeline); + when(mockCommitInstant.getTimestamp()).thenReturn("20200101120000000"); when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant)); - // Create the HudiConversionSource with proper Configuration - Configuration conf = new Configuration(); - conf.addResource("core-default.xml"); - conf.addResource("core-site.xml"); - conf.addResource("hdfs-default.xml"); - conf.addResource("hdfs-site.xml"); HudiConversionSource hudiConversionSource = - new HudiConversionSource( - mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class), conf, new Properties()); + new HudiConversionSource(mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class)); - // Test that isIncrementalSyncSafeFrom returns false when earliestCommitToRetain is null - // but clean instants exist after last sync - Instant testInstant = Instant.now().minusSeconds(3600); // 1 hour ago - boolean result = hudiConversionSource.isIncrementalSyncSafeFrom(testInstant); - - // This should return false because when earliestCommitToRetain is null and clean instants exist - // after last sync, - // handleEmptyEarliestCommitToRetain returns true, making isAffectedByCleanupProcess return true + Instant testInstant = Instant.now().minusSeconds(3600); assertFalse( - result, + hudiConversionSource.isIncrementalSyncSafeFrom(testInstant), "isIncrementalSyncSafeFrom should return false when earliestCommitToRetain is null but clean instants exist after last sync"); } @Test void testIsIncrementalSyncSafeFromWithEmptyEarliestCommitToRetainWithCleanInstants() throws Exception { - // Mock the dependencies HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); HoodieActiveTimeline mockActiveTimeline = mock(HoodieActiveTimeline.class); HoodieTimeline mockCleanerTimeline = mock(HoodieTimeline.class); @@ -192,62 +154,35 @@ void testIsIncrementalSyncSafeFromWithEmptyEarliestCommitToRetainWithCleanInstan HoodieInstant mockCleanInstant = mock(HoodieInstant.class); HoodieInstant mockCleanInstantAfterSync = mock(HoodieInstant.class); HoodieInstant mockCommitInstant = mock(HoodieInstant.class); - HoodieCleanMetadata mockCleanMetadata = mock(HoodieCleanMetadata.class); - // Set up the mock chain for cleaner timeline + HoodieTableConfig mockTableConfig = mock(HoodieTableConfig.class); when(mockMetaClient.getActiveTimeline()).thenReturn(mockActiveTimeline); - - // Mock the Hadoop configuration to prevent NPE in HudiDataFileExtractor - Configuration hadoopConf = new Configuration(); - hadoopConf.addResource("core-default.xml"); - when(mockMetaClient.getHadoopConf()).thenReturn(hadoopConf); - - // Mock table config to prevent NPE in HudiDataFileExtractor - org.apache.hudi.common.table.HoodieTableConfig mockTableConfig = - mock(org.apache.hudi.common.table.HoodieTableConfig.class); + when(mockMetaClient.getHadoopConf()).thenReturn(new Configuration()); when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig); when(mockTableConfig.isMetadataTableAvailable()).thenReturn(false); + when(mockMetaClient.getBasePathV2()).thenReturn(new Path("/tmp/test-table")); when(mockActiveTimeline.getCleanerTimeline()).thenReturn(mockCleanerTimeline); when(mockCleanerTimeline.filterCompletedInstants()).thenReturn(mockCleanerTimeline); when(mockCleanerTimeline.lastInstant()).thenReturn(Option.of(mockCleanInstant)); - when(mockActiveTimeline.deserializeInstantContent(mockCleanInstant, HoodieCleanMetadata.class)) - .thenReturn(mockCleanMetadata); - - // Set up the key behavior: earliestCommitToRetain is empty string - when(mockCleanMetadata.getEarliestCommitToRetain()).thenReturn(""); + when(mockActiveTimeline.getInstantDetails(mockCleanInstant)) + .thenReturn(Option.of(serializeCleanMetadata(""))); - // Set up mocks for handleEmptyEarliestCommitToRetain - clean instants exist after last sync when(mockCleanerTimeline.filter(any())).thenReturn(mockFilteredCleanerTimeline); when(mockFilteredCleanerTimeline.getInstants()) .thenReturn(Arrays.asList(mockCleanInstantAfterSync)); - // Set up the mock chain for commit timeline (for doesCommitExistsAsOfInstant) when(mockActiveTimeline.filterCompletedInstants()).thenReturn(mockCompletedCommitsTimeline); when(mockCompletedCommitsTimeline.findInstantsBeforeOrEquals(any(String.class))) .thenReturn(mockCompletedCommitsTimeline); + when(mockCommitInstant.getTimestamp()).thenReturn("20200101120000000"); when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant)); - // Create the HudiConversionSource with proper Configuration - Configuration conf = new Configuration(); - conf.addResource("core-default.xml"); - conf.addResource("core-site.xml"); - conf.addResource("hdfs-default.xml"); - conf.addResource("hdfs-site.xml"); HudiConversionSource hudiConversionSource = - new HudiConversionSource( - mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class), conf, new Properties()); - - // Test that isIncrementalSyncSafeFrom returns false when earliestCommitToRetain is - // empty/whitespace - // and clean instants exist after last sync - Instant testInstant = Instant.now().minusSeconds(3600); // 1 hour ago - boolean result = hudiConversionSource.isIncrementalSyncSafeFrom(testInstant); - - // This should return false because when earliestCommitToRetain is empty/whitespace and clean - // instants exist after last sync, - // handleEmptyEarliestCommitToRetain returns true, making isAffectedByCleanupProcess return true + new HudiConversionSource(mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class)); + + Instant testInstant = Instant.now().minusSeconds(3600); assertFalse( - result, - "isIncrementalSyncSafeFrom should return false when earliestCommitToRetain is empty/whitespace and clean instants exist after last sync"); + hudiConversionSource.isIncrementalSyncSafeFrom(testInstant), + "isIncrementalSyncSafeFrom should return false when earliestCommitToRetain is empty and clean instants exist after last sync"); } } From 3d59b9ff9e1fb4da0c002df8a7d02935b6081217 Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Tue, 7 Apr 2026 21:45:04 -0700 Subject: [PATCH 3/3] Use PathBasedPartitionSpecExtractor to align with main branch rename --- .../java/org/apache/xtable/hudi/HudiConversionSource.java | 4 ++-- .../org/apache/xtable/hudi/TestHudiConversionSource.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java index 3d9c18997..4edcbed5d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java @@ -60,14 +60,14 @@ public class HudiConversionSource implements ConversionSource { public HudiConversionSource( HoodieTableMetaClient metaClient, - HudiSourcePartitionSpecExtractor sourcePartitionSpecExtractor) { + PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor) { this.metaClient = metaClient; this.tableExtractor = new HudiTableExtractor(new HudiSchemaExtractor(), sourcePartitionSpecExtractor); this.dataFileExtractor = new HudiDataFileExtractor( metaClient, - new HudiPartitionValuesExtractor( + new PathBasedPartitionValuesExtractor( sourcePartitionSpecExtractor.getPathToPartitionFieldFormat()), new HudiFileStatsExtractor(metaClient)); } diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java index 9b9236b8c..956fe29bb 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSource.java @@ -91,7 +91,7 @@ void testIsIncrementalSyncSafeFromWithNullEarliestCommitToRetainNoCleanInstants( when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant)); HudiConversionSource hudiConversionSource = - new HudiConversionSource(mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class)); + new HudiConversionSource(mockMetaClient, mock(PathBasedPartitionSpecExtractor.class)); Instant testInstant = Instant.now().minusSeconds(3600); assertTrue( @@ -135,7 +135,7 @@ void testIsIncrementalSyncSafeFromWithNullEarliestCommitToRetainWithCleanInstant when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant)); HudiConversionSource hudiConversionSource = - new HudiConversionSource(mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class)); + new HudiConversionSource(mockMetaClient, mock(PathBasedPartitionSpecExtractor.class)); Instant testInstant = Instant.now().minusSeconds(3600); assertFalse( @@ -178,7 +178,7 @@ void testIsIncrementalSyncSafeFromWithEmptyEarliestCommitToRetainWithCleanInstan when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant)); HudiConversionSource hudiConversionSource = - new HudiConversionSource(mockMetaClient, mock(HudiSourcePartitionSpecExtractor.class)); + new HudiConversionSource(mockMetaClient, mock(PathBasedPartitionSpecExtractor.class)); Instant testInstant = Instant.now().minusSeconds(3600); assertFalse(