Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
30151a8
Add standalone precompute engine to replace Arroyo streaming pipeline
zzylol Feb 21, 2026
19c0f84
Wire up DatasketchesKLL in accumulator factory and add E2E test
zzylol Feb 23, 2026
c822c8b
Add 1000-sample batch latency test to E2E precompute test
zzylol Feb 23, 2026
eedc6ce
Add 10M-sample throughput test to E2E precompute test
zzylol Feb 23, 2026
b749ac3
update
zzylol Feb 24, 2026
685655e
Fix sliding window aggregation: feed samples into all overlapping win…
zzylol Feb 24, 2026
8df4927
fix: remove duplicate [[bin]] entries in Cargo.toml from rebase merge
zzylol Mar 31, 2026
3ea0d58
style: apply cargo fmt
zzylol Mar 31, 2026
bf4c213
fix: use std::io::Error::other per clippy lint
zzylol Mar 31, 2026
b7e1b02
fix: remove unnecessary cast per clippy lint
zzylol Mar 31, 2026
7c76d04
Add standalone precompute engine to replace Arroyo streaming pipeline
zzylol Feb 21, 2026
e66e6e9
Wire up DatasketchesKLL in accumulator factory and add E2E test
zzylol Feb 23, 2026
ba79200
Add late data handling for closed windows and precompute engine desig…
zzylol Feb 26, 2026
fc34f14
Minor wording fix in precompute engine design doc
zzylol Feb 26, 2026
10e4161
Clarify single-machine multi-threaded architecture in design doc
zzylol Feb 26, 2026
24ec6ce
Add multi-connector ingest support (Prometheus + VictoriaMetrics)
zzylol Feb 26, 2026
e903783
Run cargo fmt
zzylol Feb 26, 2026
e5b6abc
Add pane-based incremental sliding window computation
zzylol Feb 26, 2026
da57c97
Merge branch 'main' into pr/C-multi-connector-pane-sliding-window
zzylol Apr 1, 2026
e83a8d8
style: apply cargo fmt
zzylol Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion asap-query-engine/src/bin/precompute_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use query_engine_rust::data_model::{
};
use query_engine_rust::drivers::query::adapters::AdapterConfig;
use query_engine_rust::engines::SimpleEngine;
use query_engine_rust::precompute_engine::config::PrecomputeEngineConfig;
use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig};
use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink};
use query_engine_rust::precompute_engine::PrecomputeEngine;
use query_engine_rust::stores::SimpleMapStore;
Expand Down Expand Up @@ -61,6 +61,10 @@ struct Args {
/// Aggregation ID to stamp on each raw-mode output
#[arg(long, default_value_t = 0)]
raw_mode_aggregation_id: u64,

/// Policy for handling late samples that arrive after their window has closed
#[arg(long, value_enum, default_value_t = LateDataPolicy::Drop)]
late_data_policy: LateDataPolicy,
}

#[tokio::main]
Expand Down Expand Up @@ -131,6 +135,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
channel_buffer_size: args.channel_buffer_size,
pass_raw_samples: args.pass_raw_samples,
raw_mode_aggregation_id: args.raw_mode_aggregation_id,
late_data_policy: args.late_data_policy,
};

// Create the output sink (writes directly to the store)
Expand Down
4 changes: 3 additions & 1 deletion asap-query-engine/src/bin/test_e2e_precompute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use query_engine_rust::drivers::ingest::prometheus_remote_write::{
};
use query_engine_rust::drivers::query::adapters::AdapterConfig;
use query_engine_rust::engines::SimpleEngine;
use query_engine_rust::precompute_engine::config::PrecomputeEngineConfig;
use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig};
use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink};
use query_engine_rust::precompute_engine::PrecomputeEngine;
use query_engine_rust::stores::SimpleMapStore;
Expand Down Expand Up @@ -144,6 +144,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
channel_buffer_size: 10000,
pass_raw_samples: false,
raw_mode_aggregation_id: 0,
late_data_policy: LateDataPolicy::Drop,
};
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
let engine = PrecomputeEngine::new(engine_config, streaming_config.clone(), output_sink);
Expand Down Expand Up @@ -283,6 +284,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
channel_buffer_size: 10000,
pass_raw_samples: true,
raw_mode_aggregation_id: raw_agg_id,
late_data_policy: LateDataPolicy::Drop,
};
let raw_sink = Arc::new(RawPassthroughSink::new(store.clone()));
let raw_engine = PrecomputeEngine::new(raw_engine_config, streaming_config.clone(), raw_sink);
Expand Down
293 changes: 93 additions & 200 deletions asap-query-engine/src/drivers/ingest/victoriametrics_remote_write.rs
Original file line number Diff line number Diff line change
@@ -1,200 +1,93 @@
// use axum::{body::Bytes, extract::State, http::StatusCode, routing::post, Router};
// use prost::Message;
// use std::sync::Arc;
// use tokio::net::TcpListener;
// use tracing::{debug, info, warn};

// use super::prometheus_remote_write::{labels_to_string, DecodedSample, WriteRequest};

// // ---------------------------------------------------------------------------
// // Config
// // ---------------------------------------------------------------------------

// /// Configuration for the VictoriaMetrics remote write ingest endpoint.
// ///
// /// This is a thin wrapper around the shared remote write decoder that always
// /// uses zstd compression (VictoriaMetrics remote write protocol).
// #[derive(Debug, Clone)]
// pub struct VictoriaMetricsRemoteWriteConfig {
// pub port: u16,
// }

// impl Default for VictoriaMetricsRemoteWriteConfig {
// fn default() -> Self {
// // VictoriaMetrics commonly uses 8428, but the caller can override this.
// Self { port: 8428 }
// }
// }

// // ---------------------------------------------------------------------------
// // Server
// // ---------------------------------------------------------------------------

// /// Shared state accessible by axum handlers.
// struct ServerState {
// /// Running counter of ingested samples (for logging).
// samples_ingested: std::sync::atomic::AtomicU64,
// }

// /// A standalone HTTP server that accepts VictoriaMetrics remote write requests.
// ///
// /// This server listens on `VictoriaMetricsRemoteWriteConfig::port` and exposes
// /// a `POST /api/v1/write` endpoint that expects zstd-compressed protobuf
// /// `WriteRequest` bodies, matching the VictoriaMetrics remote write protocol.
// ///
// /// The decoded samples are logged at debug level. To integrate with a
// /// downstream store or precompute engine, extend the handler or wrap this
// /// server with a callback.
// pub struct VictoriaMetricsRemoteWriteServer {
// config: VictoriaMetricsRemoteWriteConfig,
// }

// impl VictoriaMetricsRemoteWriteServer {
// pub fn new(config: VictoriaMetricsRemoteWriteConfig) -> Self {
// Self { config }
// }

// /// Start the server. Blocks until the listener is dropped or an error occurs.
// pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// let state = Arc::new(ServerState {
// samples_ingested: std::sync::atomic::AtomicU64::new(0),
// });

// let app = Router::new()
// .route("/api/v1/write", post(handle_victoriametrics_remote_write))
// .with_state(state);

// let addr = format!("0.0.0.0:{}", self.config.port);
// info!(
// "VictoriaMetrics remote write server listening on {} (zstd compression only)",
// addr
// );

// let listener = TcpListener::bind(&addr).await?;
// axum::serve(listener, app).await?;
// Ok(())
// }
// }

// async fn handle_victoriametrics_remote_write(
// State(state): State<Arc<ServerState>>,
// body: Bytes,
// ) -> StatusCode {
// let samples: Vec<DecodedSample> = match decode_victoriametrics_remote_write(&body) {
// Ok(s) => s,
// Err(VictoriaMetricsRemoteWriteError::ZstdDecompress(e)) => {
// warn!("Failed to zstd-decompress VictoriaMetrics remote write request: {e}");
// return StatusCode::BAD_REQUEST;
// }
// Err(VictoriaMetricsRemoteWriteError::ProtobufDecode(e)) => {
// warn!("Failed to decode VictoriaMetrics remote write protobuf: {e}");
// return StatusCode::BAD_REQUEST;
// }
// };

// let count = samples.len() as u64;
// let total = state
// .samples_ingested
// .fetch_add(count, std::sync::atomic::Ordering::Relaxed)
// + count;

// debug!(
// "Received {} VictoriaMetrics samples ({} total ingested)",
// count, total
// );
// for s in &samples {
// debug!(" {} t={} v={}", s.labels, s.timestamp_ms, s.value);
// }

// StatusCode::NO_CONTENT
// }

// // ---------------------------------------------------------------------------
// // Decode helpers
// // ---------------------------------------------------------------------------

// #[derive(Debug, thiserror::Error)]
// pub enum VictoriaMetricsRemoteWriteError {
// #[error("zstd decompression failed: {0}")]
// ZstdDecompress(String),
// #[error("protobuf decode failed: {0}")]
// ProtobufDecode(String),
// }

// /// Zstd-decompress and protobuf-decode a raw VictoriaMetrics remote write body
// /// into a flat list of [`DecodedSample`]s.
// pub fn decode_victoriametrics_remote_write(
// body: &[u8],
// ) -> Result<Vec<DecodedSample>, VictoriaMetricsRemoteWriteError> {
// let decompressed = zstd::decode_all(body)
// .map_err(|e| VictoriaMetricsRemoteWriteError::ZstdDecompress(e.to_string()))?;

// let write_req = WriteRequest::decode(decompressed.as_slice())
// .map_err(|e| VictoriaMetricsRemoteWriteError::ProtobufDecode(e.to_string()))?;

// let mut samples = Vec::new();
// for ts in &write_req.timeseries {
// let labels_str = labels_to_string(&ts.labels);
// for s in &ts.samples {
// samples.push(DecodedSample {
// labels: labels_str.clone(),
// timestamp_ms: s.timestamp,
// value: s.value,
// });
// }
// }
// Ok(samples)
// }

// // ---------------------------------------------------------------------------
// // Tests
// // ---------------------------------------------------------------------------

// #[cfg(test)]
// mod tests {
// use super::*;
// use crate::drivers::ingest::prometheus_remote_write::{
// Label, Sample, TimeSeries, WriteRequest,
// };

// #[test]
// fn test_zstd_decode_single_sample() {
// let write_req = WriteRequest {
// timeseries: vec![TimeSeries {
// labels: vec![
// Label {
// name: "__name__".into(),
// value: "vm_metric".into(),
// },
// Label {
// name: "region".into(),
// value: "us-east-1".into(),
// },
// ],
// samples: vec![Sample {
// value: 99.9,
// timestamp: 1700000000000,
// }],
// }],
// };

// let proto_bytes = write_req.encode_to_vec();
// let compressed = zstd::encode_all(proto_bytes.as_slice(), 0).unwrap();

// let samples = decode_victoriametrics_remote_write(&compressed).unwrap();
// assert_eq!(samples.len(), 1);
// assert_eq!(samples[0].labels, "vm_metric{region=\"us-east-1\"}");
// assert_eq!(samples[0].timestamp_ms, 1700000000000);
// assert!((samples[0].value - 99.9).abs() < f64::EPSILON);
// }

// #[test]
// fn test_decode_invalid_zstd() {
// let result = decode_victoriametrics_remote_write(b"not-zstd-data");
// assert!(result.is_err());
// assert!(matches!(
// result.unwrap_err(),
// VictoriaMetricsRemoteWriteError::ZstdDecompress(_)
// ));
// }
// }
use prost::Message;

use super::prometheus_remote_write::{labels_to_string, DecodedSample, WriteRequest};

// ---------------------------------------------------------------------------
// Decode helpers
// ---------------------------------------------------------------------------

#[derive(Debug, thiserror::Error)]
pub enum VictoriaMetricsRemoteWriteError {
#[error("zstd decompression failed: {0}")]
ZstdDecompress(String),
#[error("protobuf decode failed: {0}")]
ProtobufDecode(String),
}

/// Zstd-decompress and protobuf-decode a raw VictoriaMetrics remote write body
/// into a flat list of [`DecodedSample`]s.
pub fn decode_victoriametrics_remote_write(
body: &[u8],
) -> Result<Vec<DecodedSample>, VictoriaMetricsRemoteWriteError> {
let decompressed = zstd::decode_all(body)
.map_err(|e| VictoriaMetricsRemoteWriteError::ZstdDecompress(e.to_string()))?;

let write_req = WriteRequest::decode(decompressed.as_slice())
.map_err(|e| VictoriaMetricsRemoteWriteError::ProtobufDecode(e.to_string()))?;

let mut samples = Vec::new();
for ts in &write_req.timeseries {
let labels_str = labels_to_string(&ts.labels);
for s in &ts.samples {
samples.push(DecodedSample {
labels: labels_str.clone(),
timestamp_ms: s.timestamp,
value: s.value,
});
}
}
Ok(samples)
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
use super::*;
use crate::drivers::ingest::prometheus_remote_write::{
Label, Sample, TimeSeries, WriteRequest,
};

#[test]
fn test_zstd_decode_single_sample() {
let write_req = WriteRequest {
timeseries: vec![TimeSeries {
labels: vec![
Label {
name: "__name__".into(),
value: "vm_metric".into(),
},
Label {
name: "region".into(),
value: "us-east-1".into(),
},
],
samples: vec![Sample {
value: 99.9,
timestamp: 1700000000000,
}],
}],
};

let proto_bytes = write_req.encode_to_vec();
let compressed = zstd::encode_all(proto_bytes.as_slice(), 0).unwrap();

let samples = decode_victoriametrics_remote_write(&compressed).unwrap();
assert_eq!(samples.len(), 1);
assert_eq!(samples[0].labels, "vm_metric{region=\"us-east-1\"}");
assert_eq!(samples[0].timestamp_ms, 1700000000000);
assert!((samples[0].value - 99.9).abs() < f64::EPSILON);
}

#[test]
fn test_decode_invalid_zstd() {
let result = decode_victoriametrics_remote_write(b"not-zstd-data");
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
VictoriaMetricsRemoteWriteError::ZstdDecompress(_)
));
}
}
2 changes: 1 addition & 1 deletion asap-query-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub use drivers::{
OtlpReceiverConfig,
};

pub use precompute_engine::config::PrecomputeEngineConfig;
pub use precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig};
pub use precompute_engine::output_sink::StoreOutputSink;
pub use precompute_engine::PrecomputeEngine;

Expand Down
1 change: 1 addition & 0 deletions asap-query-engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ async fn main() -> Result<()> {
channel_buffer_size: args.precompute_channel_buffer_size,
pass_raw_samples: false,
raw_mode_aggregation_id: 0,
late_data_policy: query_engine_rust::precompute_engine::config::LateDataPolicy::Drop,
};
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
let engine =
Expand Down
Loading
Loading