Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.api.Binary;
Expand All @@ -57,6 +58,7 @@
import org.apache.xtable.model.storage.InternalDataFile;

/** Responsible for Column stats extraction for Hudi. */
@Slf4j
@AllArgsConstructor
public class HudiFileStatsExtractor {
/*
Expand Down Expand Up @@ -91,15 +93,19 @@ public Stream<InternalDataFile> addStatsToFiles(
&& metaClient
.getTableConfig()
.isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS);
final Map<String, InternalField> nameFieldMap =
final Map<String, InternalField> parquetNameFieldMap =
schema.getAllFields().stream()
.collect(
Collectors.toMap(
field -> getFieldNameForStats(field, useMetadataTableColStats),
Function.identity()));
return useMetadataTableColStats
? computeColumnStatsFromMetadataTable(metadataTable, files, nameFieldMap)
: computeColumnStatsFromParquetFooters(files, nameFieldMap);
Collectors.toMap(field -> getFieldNameForStats(field, false), Function.identity()));
if (!useMetadataTableColStats) {
return computeColumnStatsFromParquetFooters(files, parquetNameFieldMap);
}
final Map<String, InternalField> metadataNameFieldMap =
schema.getAllFields().stream()
.collect(
Collectors.toMap(field -> getFieldNameForStats(field, true), Function.identity()));
return computeColumnStatsFromMetadataTable(
metadataTable, files, metadataNameFieldMap, parquetNameFieldMap);
}

private Stream<InternalDataFile> computeColumnStatsFromParquetFooters(
Expand All @@ -124,7 +130,8 @@ private Pair<String, String> getPartitionAndFileName(String path) {
private Stream<InternalDataFile> computeColumnStatsFromMetadataTable(
HoodieTableMetadata metadataTable,
Stream<InternalDataFile> files,
Map<String, InternalField> nameFieldMap) {
Map<String, InternalField> nameFieldMap,
Map<String, InternalField> parquetNameFieldMap) {
Map<Pair<String, String>, InternalDataFile> filePathsToDataFile =
files.collect(
Collectors.toMap(
Expand All @@ -151,20 +158,45 @@ private Stream<InternalDataFile> computeColumnStatsFromMetadataTable(
Map.Entry::getKey,
Collectors.mapping(
Map.Entry::getValue, CustomCollectors.toList(nameFieldMap.size()))));
return filePathsToDataFile.entrySet().stream()
.map(
pathToDataFile -> {
Pair<String, String> filePath = pathToDataFile.getKey();
InternalDataFile file = pathToDataFile.getValue();
List<Pair<InternalField, HoodieMetadataColumnStats>> fileStats =
stats.getOrDefault(filePath, Collections.emptyList());
List<ColumnStat> columnStats =
fileStats.stream()
.map(pair -> getColumnStatFromHudiStat(pair.getLeft(), pair.getRight()))
.collect(CustomCollectors.toList(fileStats.size()));
long recordCount = getMaxFromColumnStats(columnStats).orElse(0L);
return file.toBuilder().columnStats(columnStats).recordCount(recordCount).build();
});
List<InternalDataFile> withStats = new ArrayList<>();
List<InternalDataFile> filesWithoutStats = new ArrayList<>();
for (Map.Entry<Pair<String, String>, InternalDataFile> pathToDataFile :
filePathsToDataFile.entrySet()) {
Optional<InternalDataFile> enriched = tryEnrichWithMetadataStats(pathToDataFile, stats);
if (enriched.isPresent()) {
withStats.add(enriched.get());
} else {
filesWithoutStats.add(pathToDataFile.getValue());
}
}
if (!filesWithoutStats.isEmpty()) {
log.warn(
"{} file(s) had no column stats in the metadata table for table {}; falling back to parquet footers",
filesWithoutStats.size(),
metaClient.getBasePathV2());
withStats.addAll(
computeColumnStatsFromParquetFooters(filesWithoutStats.stream(), parquetNameFieldMap)
.collect(Collectors.toList()));
}
return withStats.stream();
}

private Optional<InternalDataFile> tryEnrichWithMetadataStats(
Map.Entry<Pair<String, String>, InternalDataFile> pathToDataFile,
Map<Pair<String, String>, List<Pair<InternalField, HoodieMetadataColumnStats>>> stats) {
Pair<String, String> filePath = pathToDataFile.getKey();
InternalDataFile file = pathToDataFile.getValue();
List<Pair<InternalField, HoodieMetadataColumnStats>> fileStats =
stats.getOrDefault(filePath, Collections.emptyList());
if (fileStats.isEmpty()) {
return Optional.empty();
}
List<ColumnStat> columnStats =
fileStats.stream()
.map(pair -> getColumnStatFromHudiStat(pair.getLeft(), pair.getRight()))
.collect(CustomCollectors.toList(fileStats.size()));
long recordCount = getMaxFromColumnStats(columnStats).orElse(0L);
return Optional.of(file.toBuilder().columnStats(columnStats).recordCount(recordCount).build());
}

private Optional<Long> getMaxFromColumnStats(List<ColumnStat> columnStats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
package org.apache.xtable.hudi;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
Expand All @@ -33,6 +38,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -53,16 +59,20 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.MetadataPartitionType;

import org.apache.xtable.GenericTable;
import org.apache.xtable.TestJavaHudiTable;
Expand Down Expand Up @@ -199,6 +209,131 @@ void columnStatsWithoutMetadataTable(@TempDir Path tempDir) throws IOException {
validateOutput(output);
}

@Test
void columnStatsWithMetadataTableMissingFallsBackToParquetFooters(@TempDir Path tempDir) {
List<InternalDataFile> inputFiles = generateInputFiles(tempDir, 1);

HoodieTableConfig mockTableConfig = mock(HoodieTableConfig.class);
when(mockTableConfig.isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS))
.thenReturn(true);

HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
when(mockMetaClient.getHadoopConf()).thenReturn(configuration);
when(mockMetaClient.getBasePathV2()).thenReturn(new org.apache.hadoop.fs.Path(tempDir.toUri()));
when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig);

HoodieTableMetadata mockMetadataTable = mock(HoodieTableMetadata.class);
when(mockMetadataTable.getColumnStats(any(), anyString())).thenReturn(Collections.emptyMap());

HudiFileStatsExtractor extractor = new HudiFileStatsExtractor(mockMetaClient);
List<InternalDataFile> output =
extractor
.addStatsToFiles(mockMetadataTable, inputFiles.stream(), schema)
.collect(Collectors.toList());

validateOutput(output);
}

@Test
void columnStatsWithMetadataTablePartialMissingFallsBackToParquetFooters(@TempDir Path tempDir) {
List<InternalDataFile> inputFiles = generateInputFiles(tempDir, 2);
InternalDataFile fileWithStats = inputFiles.get(0);
InternalDataFile fileWithoutStats = inputFiles.get(1);

Pair<String, String> fileWithStatsPair =
Pair.of("", new org.apache.hadoop.fs.Path(fileWithStats.getPhysicalPath()).getName());

HoodieTableConfig mockTableConfig = mock(HoodieTableConfig.class);
when(mockTableConfig.isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS))
.thenReturn(true);

HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
when(mockMetaClient.getHadoopConf()).thenReturn(configuration);
when(mockMetaClient.getBasePathV2()).thenReturn(new org.apache.hadoop.fs.Path(tempDir.toUri()));
when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig);

// Metadata table only returns stats for fileWithStats; fileWithoutStats is missing entirely
HoodieTableMetadata mockMetadataTable = mock(HoodieTableMetadata.class);
when(mockMetadataTable.getColumnStats(any(), anyString()))
.thenAnswer(
invocation -> {
String fieldName = invocation.getArgument(1);
if (fieldName.equals("long_field")) {
Map<Pair<String, String>, HoodieMetadataColumnStats> statsMap = new HashMap<>();
statsMap.put(
fileWithStatsPair,
HoodieMetadataColumnStats.newBuilder()
.setFileName(fileWithStatsPair.getRight())
.setColumnName(fieldName)
.setValueCount(2L)
.setNullCount(1L)
.setTotalSize(16L)
.setIsDeleted(false)
.build());
return statsMap;
}
return Collections.emptyMap();
});

HudiFileStatsExtractor extractor = new HudiFileStatsExtractor(mockMetaClient);
List<InternalDataFile> output =
extractor
.addStatsToFiles(mockMetadataTable, inputFiles.stream(), schema)
.collect(Collectors.toList());

assertEquals(2, output.size());
// fileWithoutStats must have fallen back to parquet footers and have full stats
InternalDataFile fromFooter =
output.stream()
.filter(f -> f.getPhysicalPath().equals(fileWithoutStats.getPhysicalPath()))
.findFirst()
.orElseThrow(() -> new RuntimeException("missing file in output"));
assertFalse(fromFooter.getColumnStats().isEmpty());
assertEquals(2, fromFooter.getRecordCount());
// fileWithStats came from metadata table (only has long_field stat)
InternalDataFile fromMeta =
output.stream()
.filter(f -> f.getPhysicalPath().equals(fileWithStats.getPhysicalPath()))
.findFirst()
.orElseThrow(() -> new RuntimeException("missing file in output"));
assertEquals(1, fromMeta.getColumnStats().size());
assertEquals(2L, fromMeta.getRecordCount());
// verify parquet fallback was invoked for exactly 1 file (fileWithoutStats), not fileWithStats
verify(mockMetaClient, times(1)).getHadoopConf();
}

private List<InternalDataFile> generateInputFiles(Path tempDir, int numFiles) {
GenericData genericData = GenericData.get();
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
List<InternalDataFile> files = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
Path file = tempDir.resolve(String.format("tmp-%d.parquet", i));
try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(
HadoopOutputFile.fromPath(
new org.apache.hadoop.fs.Path(file.toUri()), configuration))
.withSchema(AVRO_SCHEMA)
.withDataModel(genericData)
.build()) {
for (GenericRecord record : getRecords()) {
writer.write(record);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
files.add(
InternalDataFile.builder()
.physicalPath(file.toString())
.columnStats(Collections.emptyList())
.fileFormat(FileFormat.APACHE_PARQUET)
.lastModified(1234L)
.fileSizeBytes(4321L)
.recordCount(0)
.build());
}
return files;
}

private void validateOutput(List<InternalDataFile> output) {
assertEquals(1, output.size());
InternalDataFile fileWithStats = output.get(0);
Expand Down