diff --git a/lago_data_model/reader.py b/lago_data_model/reader.py index 6a9e97a..870eb0c 100644 --- a/lago_data_model/reader.py +++ b/lago_data_model/reader.py @@ -1,10 +1,11 @@ +import os import pyarrow as pa import pyarrow.parquet as pq from tqdm import tqdm from datetime import datetime class LagFileReader: - def __init__(self, file_path, chunk_size=1000): + def __init__(self, file_path, chunk_size=5000): self.file_path = file_path self.chunk_size = chunk_size @@ -28,6 +29,8 @@ def _parse_stream(self): yield {'TBP': current_tbp, 'readings': current_readings} def save_as_parquet_streaming(self, output_folder=".", prefix="instrument_readings"): + # Create the output dir if it doesn't exist + os.makedirs(output_folder, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d%H%M") output_path = f"{output_folder}/{prefix}__{timestamp}.parquet" @@ -37,25 +40,28 @@ def save_as_parquet_streaming(self, output_folder=".", prefix="instrument_readin ]) writer = None - chunk_data = [] + tbp_values = [] + readings_values = [] try: for record in self._parse_stream(): - chunk_data.append(record) + tbp_values.append(record['TBP']) + readings_values.append(record['readings']) - if len(chunk_data) >= self.chunk_size: - table = pa.table(chunk_data, schema=schema) + if len(tbp_values) >= self.chunk_size: + table = pa.table([tbp_values, readings_values], schema=schema) if writer is None: # TODO: Do we want to use snappy here? gzip seems like a better fit writer = pq.ParquetWriter(output_path, schema, compression='snappy') writer.write_table(table) - chunk_data = [] + tbp_values = [] + readings_values = [] # Write remaining data - if chunk_data: - table = pa.table(chunk_data, schema=schema) + if tbp_values: + table = pa.table([tbp_values, readings_values], schema=schema) if writer is None: writer = pq.ParquetWriter(output_path, schema, compression='snappy') writer.write_table(table)