Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions asap-query-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ path = "src/bin/precompute_engine.rs"
name = "test_e2e_precompute"
path = "src/bin/test_e2e_precompute.rs"


[dev-dependencies]
ctor = "0.2"
tempfile = "3.20.0"
Expand Down
69 changes: 43 additions & 26 deletions asap-query-engine/src/bin/test_e2e_precompute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,17 @@ struct BenchResult {
batch_latency_ms: f64,
}

struct BenchRunConfig {
label: String,
port: u16,
streaming_config: Arc<StreamingConfig>,
num_workers: usize,
num_concurrent_senders: usize,
num_requests: u64,
samples_per_request: u64,
num_series: u64,
}

/// Build an AggregationConfig for Sum with specified window parameters.
fn make_sum_agg_config(
agg_id: u64,
Expand Down Expand Up @@ -599,18 +610,20 @@ fn make_sum_agg_config(
}

/// Run a single windowed benchmark and return the results.
#[allow(clippy::too_many_arguments)]
async fn run_single_bench(
client: &reqwest::Client,
label: &str,
port: u16,
streaming_config: Arc<StreamingConfig>,
num_workers: usize,
num_concurrent_senders: usize,
num_requests: u64,
samples_per_request: u64,
num_series: u64,
config: BenchRunConfig,
) -> Result<BenchResult, Box<dyn std::error::Error + Send + Sync>> {
let BenchRunConfig {
label,
port,
streaming_config,
num_workers,
num_concurrent_senders,
num_requests,
samples_per_request,
num_series,
} = config;
let total_samples = num_requests * samples_per_request;

let noop_sink = Arc::new(NoopOutputSink::new());
Expand Down Expand Up @@ -728,7 +741,7 @@ async fn run_single_bench(
println!(" Batch latency: {batch_latency_ms:.1}ms");

Ok(BenchResult {
label: label.to_string(),
label,
send_throughput,
e2e_throughput,
batch_latency_ms,
Expand Down Expand Up @@ -760,14 +773,16 @@ async fn run_windowed_benchmarks(

let r = run_single_bench(
client,
label,
port,
sc,
4,
4, // concurrent senders to saturate workers
num_requests,
samples_per_request,
num_series,
BenchRunConfig {
label: label.to_string(),
port,
streaming_config: sc,
num_workers: 4,
num_concurrent_senders: 4, // concurrent senders to saturate workers
num_requests,
samples_per_request,
num_series,
},
)
.await?;
results.push(r);
Expand Down Expand Up @@ -802,14 +817,16 @@ async fn run_scalability_benchmark(

let r = run_single_bench(
client,
&label,
port,
sc,
num_workers,
num_workers, // concurrent senders match worker count
num_requests,
samples_per_request,
num_series,
BenchRunConfig {
label,
port,
streaming_config: sc,
num_workers,
num_concurrent_senders: num_workers, // concurrent senders match worker count
num_requests,
samples_per_request,
num_series,
},
)
.await?;
results.push(r);
Expand Down
2 changes: 1 addition & 1 deletion asap-query-engine/src/data_model/precomputed_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ impl PrecomputedOutput {
Ok(Box::new(accumulator))
}
"MultipleSum" => {
let accumulator = MultipleSumAccumulator::deserialize_from_bytes(buffer)
let accumulator = MultipleSumAccumulator::deserialize_from_bytes_arroyo(buffer)
.map_err(|e| format!("Failed to deserialize MultipleSumAccumulator: {e}"))?;
Ok(Box::new(accumulator))
}
Expand Down
3 changes: 2 additions & 1 deletion asap-query-engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use sketch_core::config::{self, ImplMode};

use query_engine_rust::data_model::enums::{InputFormat, LockStrategy, StreamingEngine};
use query_engine_rust::drivers::AdapterConfig;
use query_engine_rust::precompute_engine::config::LateDataPolicy;
use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config};
use query_engine_rust::{
HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpReceiver,
Expand Down Expand Up @@ -301,7 +302,7 @@ async fn main() -> Result<()> {
channel_buffer_size: args.precompute_channel_buffer_size,
pass_raw_samples: false,
raw_mode_aggregation_id: 0,
late_data_policy: query_engine_rust::precompute_engine::config::LateDataPolicy::Drop,
late_data_policy: LateDataPolicy::Drop,
};
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
let engine =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,15 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box<dyn Accumul
.unwrap_or(200) as u16;
Box::new(KllAccumulatorUpdater::new(k))
}
"MultipleSum" | "multiple_sum" => Box::new(MultipleSumUpdater::new()),
"MultipleIncrease" | "multiple_increase" => Box::new(MultipleIncreaseUpdater::new()),
"MultipleMinMax" | "multiple_min_max" => Box::new(MultipleMinMaxUpdater::new(
if sub_type.eq_ignore_ascii_case("max") {
"max".to_string()
} else {
"min".to_string()
},
)),
"Sum" | "sum" => Box::new(SumAccumulatorUpdater::new()),
"Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new("min".to_string())),
"Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new("max".to_string())),
Expand Down
43 changes: 42 additions & 1 deletion asap-query-engine/src/precompute_engine/output_sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::data_model::{AggregateCore, PrecomputedOutput};
use crate::stores::Store;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tracing::debug_span;

/// Trait for emitting completed window outputs.
Expand Down Expand Up @@ -61,6 +61,47 @@ impl OutputSink for RawPassthroughSink {
}
}

/// A capturing sink for testing that stores all emitted outputs.
pub struct CapturingOutputSink {
pub captured: Mutex<Vec<(PrecomputedOutput, Box<dyn AggregateCore>)>>,
}

impl CapturingOutputSink {
pub fn new() -> Self {
Self {
captured: Mutex::new(Vec::new()),
}
}

pub fn drain(&self) -> Vec<(PrecomputedOutput, Box<dyn AggregateCore>)> {
self.captured.lock().unwrap().drain(..).collect()
}

pub fn len(&self) -> usize {
self.captured.lock().unwrap().len()
}

pub fn is_empty(&self) -> bool {
self.captured.lock().unwrap().is_empty()
}
}

impl Default for CapturingOutputSink {
fn default() -> Self {
Self::new()
}
}

impl OutputSink for CapturingOutputSink {
fn emit_batch(
&self,
outputs: Vec<(PrecomputedOutput, Box<dyn AggregateCore>)>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.captured.lock().unwrap().extend(outputs);
Ok(())
}
}

/// A no-op sink for testing that just counts emitted batches.
pub struct NoopOutputSink {
pub emit_count: std::sync::atomic::AtomicU64,
Expand Down
Loading
Loading