Skip to content

UPD: Migrate to parquet files #2

@vkhodygo

Description

@vkhodygo

I didn't want to invest too much of my time to get this working so I had to ask Claude to rework that code snippet a few times iteratively improving the prompt:

Dependencies

<dependencies>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-hadoop</artifactId>
        <version>1.13.1</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.30</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-yaml</artifactId>
        <version>2.15.3</version>
    </dependency>
</dependencies>

Schema

version: "1.0"
table: "population_data"
fields:
  - name: "year"
    type: "uint16"
    nullable: false
    description: "Year of the population record"
  
  - name: "country"
    type: "string"
    nullable: false
    description: "Country identifier"
  
  - name: "demographics"
    type: "struct"
    nullable: true
    fields:
      - name: "ageGroup"
        type: "uint8"
        nullable: false
      - name: "gender"
        type: "string"
        nullable: false
      - name: "count"
        type: "uint32"
        nullable: false

  - name: "metadata"
    type: "struct"
    nullable: true
    fields:
      - name: "source"
        type: "string"
        nullable: true
      - name: "lastUpdated"
        type: "timestamp"
        nullable: true

Schema configuration

// schema configuration classes
package com.example.schema;

import lombok.Getter;
import lombok.Setter;
import java.util.List;

@Getter
@Setter
public class SchemaConfig {
    private String version;
    private String table;
    private List<FieldConfig> fields;
}

@Getter
@Setter
public class FieldConfig {
    private String name;
    private String type;
    private boolean nullable;
    private String description;
    private List<FieldConfig> fields; // For nested structures
}
// data type mapping utility
package com.example.util;

import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

public class ParquetTypeMapper {
    public static Type mapField(FieldConfig field) {
        var builder = Types.required(mapDataType(field.getType()));
        
        if (field.isNullable()) {
            builder = Types.optional(mapDataType(field.getType()));
        }
        
        return builder.named(field.getName());
    }
    
    private static PrimitiveTypeName mapDataType(String type) {
        return switch (type.toLowerCase()) {
            case "uint8", "uint16" -> PrimitiveTypeName.INT32;
            case "uint32" -> PrimitiveTypeName.INT64;
            case "string" -> PrimitiveTypeName.BINARY;
            case "timestamp" -> PrimitiveTypeName.INT96;
            case "double" -> PrimitiveTypeName.DOUBLE;
            case "boolean" -> PrimitiveTypeName.BOOLEAN;
            default -> throw new IllegalArgumentException("Unsupported type: " + type);
        };
    }
}
// Main Parquet writer implementation
package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import java.io.File;
import java.io.IOException;
import java.util.List;

@Slf4j
public class PopulationDataParquetWriter {
    private final SchemaConfig schemaConfig;
    private final String outputPath;
    
    public PopulationDataParquetWriter(String schemaPath, String outputPath) throws IOException {
        var mapper = new ObjectMapper(new YAMLFactory());
        this.schemaConfig = mapper.readValue(new File(schemaPath), SchemaConfig.class);
        this.outputPath = outputPath;
    }
    
    public void writePopulationData(String country, String inputFileName, int minStartYear, int maxStartYear) throws IOException {
        var schema = buildParquetSchema();
        
        try (var writer = createParquetWriter(schema)) {
            // Read CSV data similar to your original DataParser
            try (var reader = new CSVReader(new FileReader(inputFileName))) {
                String[] line;
                while ((line = reader.readNext()) != null) {
                    var record = convertToParquetRecord(line, schema);
                    writer.write(record);
                }
            }
        }
    }
    
    private MessageType buildParquetSchema() {
        var schemaBuilder = new StringBuilder();
        schemaBuilder.append("message ").append(schemaConfig.getTable()).append(" {\n");
        
        for (var field : schemaConfig.getFields()) {
            appendField(schemaBuilder, field, 1);
        }
        
        schemaBuilder.append("}");
        return MessageTypeParser.parseMessageType(schemaBuilder.toString());
    }
    
    private void appendField(StringBuilder builder, FieldConfig field, int depth) {
        var indent = "  ".repeat(depth);
        
        if ("struct".equals(field.getType())) {
            builder.append(indent)
                   .append("optional group ")
                   .append(field.getName())
                   .append(" {\n");
            
            for (var nestedField : field.getFields()) {
                appendField(builder, nestedField, depth + 1);
            }
            
            builder.append(indent).append("}\n");
        } else {
            builder.append(indent)
                   .append(field.isNullable() ? "optional " : "required ")
                   .append(ParquetTypeMapper.mapField(field).toString())
                   .append("\n");
        }
    }
    
    private ParquetWriter<?> createParquetWriter(MessageType schema) throws IOException {
        return ParquetWriter.builder(new Path(outputPath))
            .withSchema(schema)
            .withCompressionCodec(CompressionCodecName.SNAPPY)
            .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
            .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
            .withDictionaryPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
            .withDictionaryEncoding(true)
            .withValidation(true)
            .withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION)
            .withConf(new Configuration())
            .build();
    }
}
// Updated main code replacing the original H2 database generation
public class DataGenerator {
    public static void generatePopulationData(Country country) {
        try {
            var schemaPath = "schema.yml";
            var outputPath = "./output/population_" + country.toString() + ".parquet";
            
            var writer = new PopulationDataParquetWriter(schemaPath, outputPath);
            writer.writePopulationData(
                country,
                Parameters.getPopulationInitialisationInputFileName(),
                Parameters.getMinStartYear(),
                Parameters.getMaxStartYear()
            );
            
        } catch (Exception e) {
            log.error("Failed to generate population data", e);
            throw new RuntimeException("Failed to generate population data", e);
        }
    }
}

There are other implementations, I can add them for comparison as well. I didn't validate any of that, but it seems to be legit. I

Data types can and will cause some problems. For example, to store UInt32 you can only employ Int64; this barely affects the actual file size as most of the data are redundant zeroes anyway, and they are compressed very well. However, reading into memory will blow up the dataset size.

One way to deal with this in this particular case is to use Int32, most likely all data values are well below the MAX_INT32.

One more issue here is that the number of data types in the Java implementation is very limited. For the initialisation stage this is less relevant, but this will affect the output size. I say, you pass the output directly to an R script which can further decrease the dataset size by employing more compact data types like Float16 and such.

Metadata

Metadata

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions