Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 80 additions & 2 deletions crates/arroyo-connectors/src/single_file_custom/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use arroyo_operator::SourceFinishType;
use arroyo_rpc::formats::{BadData, Format, Framing};
use arroyo_rpc::grpc::rpc::{StopMode, TableConfig};
use arroyo_rpc::ControlMessage;
use arroyo_types::UserError;
use arroyo_types::{SignalMessage, UserError, Watermark};
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -175,8 +175,11 @@ impl SingleFileCustomSourceFunc {
}
}
None => {
collector.flush_buffer().await?;
info!("Finished reading {} records from {}", self.records_read, self.path);
// Emit a far-future watermark to flush all pending windows
let flush_ts = SystemTime::now() + Duration::from_secs(60 * 60 * 24 * 365);
collector.broadcast(SignalMessage::Watermark(Watermark::EventTime(flush_ts))).await;
collector.flush_buffer().await?;
return Ok(SourceFinishType::Final);
}
}
Expand All @@ -190,6 +193,69 @@ impl SingleFileCustomSourceFunc {
}
}

fn get_min_max_timestamp(
&self,
batch: &RecordBatch,
ts_idx: usize,
) -> (Option<SystemTime>, Option<SystemTime>) {
use arrow::array::AsArray;
use arrow::datatypes::{DataType, TimeUnit};

let ts_array = batch.column(ts_idx);
let data_type = ts_array.data_type().clone();

let (min_nanos, max_nanos): (Option<i64>, Option<i64>) = match &data_type {
DataType::Timestamp(TimeUnit::Millisecond, _) => {
let arr = ts_array.as_primitive::<arrow::datatypes::TimestampMillisecondType>();
let vals: Vec<i64> = (0..batch.num_rows())
.map(|i| arr.value(i) * 1_000_000)
.collect();
(
vals.iter().copied().reduce(i64::min),
vals.iter().copied().reduce(i64::max),
)
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
let arr = ts_array.as_primitive::<arrow::datatypes::TimestampNanosecondType>();
let vals: Vec<i64> = (0..batch.num_rows()).map(|i| arr.value(i)).collect();
(
vals.iter().copied().reduce(i64::min),
vals.iter().copied().reduce(i64::max),
)
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
let arr = ts_array.as_primitive::<arrow::datatypes::TimestampMicrosecondType>();
let vals: Vec<i64> = (0..batch.num_rows())
.map(|i| arr.value(i) * 1_000)
.collect();
(
vals.iter().copied().reduce(i64::min),
vals.iter().copied().reduce(i64::max),
)
}
DataType::Int64 => {
let arr = ts_array.as_primitive::<arrow::datatypes::Int64Type>();
let vals: Vec<i64> = match self.ts_format {
TsFormat::UnixMillis => (0..batch.num_rows())
.map(|i| arr.value(i) * 1_000_000)
.collect(),
TsFormat::UnixSeconds => (0..batch.num_rows())
.map(|i| arr.value(i) * 1_000_000_000)
.collect(),
_ => vec![],
};
(
vals.iter().copied().reduce(i64::min),
vals.iter().copied().reduce(i64::max),
)
}
_ => (None, None),
};

let to_systime = |nanos: i64| UNIX_EPOCH + Duration::from_nanos(nanos as u64);
(min_nanos.map(to_systime), max_nanos.map(to_systime))
}

async fn run_parquet(
&mut self,
ctx: &mut SourceContext,
Expand Down Expand Up @@ -256,9 +322,21 @@ impl SingleFileCustomSourceFunc {
batch = stream.next() => {
match batch {
Some(Ok(batch)) => {
// Broadcast min timestamp BEFORE collecting so windows aren't dropped as late
let (min_ts, max_ts) = self.get_min_max_timestamp(&batch, ts_idx);
if let Some(ts) = min_ts {
collector.broadcast(SignalMessage::Watermark(Watermark::EventTime(ts))).await;
}

// Extract timestamps from the batch and add _timestamp column
let out_batch = self.add_timestamp_to_batch(&batch, ts_idx)?;
collector.collect(out_batch).await;

// Advance watermark to max timestamp AFTER collecting
if let Some(ts) = max_ts {
collector.broadcast(SignalMessage::Watermark(Watermark::EventTime(ts))).await;
}

self.records_read += batch.num_rows() as u64;

if self.records_read % LOG_INTERVAL == 0 {
Expand Down
Loading