From a22ea25124a91b47b4f1e868845afaab2c7cfc71 Mon Sep 17 00:00:00 2001 From: andresvalle Date: Wed, 17 Sep 2025 01:49:52 -0600 Subject: [PATCH 1/3] fix: PyArrow expects columnar data --- lago_data_model/reader.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lago_data_model/reader.py b/lago_data_model/reader.py index 6a9e97a..c637c62 100644 --- a/lago_data_model/reader.py +++ b/lago_data_model/reader.py @@ -37,25 +37,28 @@ def save_as_parquet_streaming(self, output_folder=".", prefix="instrument_readin ]) writer = None - chunk_data = [] + tbp_values = [] + readings_values = [] try: for record in self._parse_stream(): - chunk_data.append(record) + tbp_values.append(record['TBP']) + readings_values.append(record['readings']) - if len(chunk_data) >= self.chunk_size: - table = pa.table(chunk_data, schema=schema) + if len(tbp_values) >= self.chunk_size: + table = pa.table([tbp_values, readings_values], schema=schema) if writer is None: # TODO: Do we want to use snappy here? gzip seems like a better fit writer = pq.ParquetWriter(output_path, schema, compression='snappy') writer.write_table(table) - chunk_data = [] + tbp_values = [] + readings_values = [] # Write remaining data - if chunk_data: - table = pa.table(chunk_data, schema=schema) + if tbp_values: + table = pa.table([tbp_values, readings_values], schema=schema) if writer is None: writer = pq.ParquetWriter(output_path, schema, compression='snappy') writer.write_table(table) From 84845cd2c6faf3f78931913f3f1ab11ebd4d9dde Mon Sep 17 00:00:00 2001 From: andresvalle Date: Wed, 17 Sep 2025 01:50:43 -0600 Subject: [PATCH 2/3] feat: check for output_dir --- lago_data_model/reader.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lago_data_model/reader.py b/lago_data_model/reader.py index c637c62..639b6b5 100644 --- a/lago_data_model/reader.py +++ b/lago_data_model/reader.py @@ -1,3 +1,4 @@ +import os import pyarrow as pa import pyarrow.parquet as pq from tqdm import tqdm @@ -28,6 +29,8 @@ def _parse_stream(self): yield {'TBP': current_tbp, 'readings': current_readings} def save_as_parquet_streaming(self, output_folder=".", prefix="instrument_readings"): + # Create the output dir if it doesn't exist + os.makedirs(output_folder, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d%H%M") output_path = f"{output_folder}/{prefix}__{timestamp}.parquet" From 0927500322fb4de07f5917d165fc158c96019cbd Mon Sep 17 00:00:00 2001 From: andresvalle Date: Wed, 17 Sep 2025 01:51:18 -0600 Subject: [PATCH 3/3] refactor: increased default chunk size --- lago_data_model/reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lago_data_model/reader.py b/lago_data_model/reader.py index 639b6b5..870eb0c 100644 --- a/lago_data_model/reader.py +++ b/lago_data_model/reader.py @@ -5,7 +5,7 @@ from datetime import datetime class LagFileReader: - def __init__(self, file_path, chunk_size=1000): + def __init__(self, file_path, chunk_size=5000): self.file_path = file_path self.chunk_size = chunk_size