Skip to content

Commit ced9241

Browse files
author
zz_y
committed
1 parent ad8faa1 commit ced9241

File tree

7 files changed

+500
-2
lines changed

7 files changed

+500
-2
lines changed

asap-query-engine/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ path = "src/bin/test_e2e_precompute.rs"
7171
name = "bench_precompute_sketch"
7272
path = "src/bin/bench_precompute_sketch.rs"
7373

74+
[[bin]]
75+
name = "e2e_quickstart_resource_test"
76+
path = "src/bin/e2e_quickstart_resource_test.rs"
77+
7478
[dev-dependencies]
7579
ctor = "0.2"
7680
tempfile = "3.20.0"
Lines changed: 380 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,380 @@
1+
//! E2E resource usage test for the precompute engine with quickstart-like data patterns.
2+
//!
3+
//! Simulates 7 fake exporters × 27,000 series each = 189,000 series of `sensor_reading`,
4+
//! scraped at 1s intervals via Prometheus remote write, matching the quickstart setup.
5+
//! After 10 seconds of ingestion, reports CPU and memory usage.
6+
//!
7+
//! Usage:
8+
//! cargo run --release --bin e2e_quickstart_resource_test
9+
10+
use prost::Message;
11+
use query_engine_rust::data_model::{CleanupPolicy, LockStrategy, StreamingConfig};
12+
use query_engine_rust::drivers::ingest::prometheus_remote_write::{
13+
Label, Sample, TimeSeries, WriteRequest,
14+
};
15+
use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig};
16+
use query_engine_rust::precompute_engine::output_sink::StoreOutputSink;
17+
use query_engine_rust::precompute_engine::PrecomputeEngine;
18+
use query_engine_rust::stores::{SimpleMapStore, Store};
19+
use sketch_db_common::aggregation_config::AggregationConfig;
20+
use std::collections::HashMap;
21+
use std::sync::Arc;
22+
use std::time::{Duration, Instant};
23+
24+
const INGEST_PORT: u16 = 19400;
25+
const NUM_WORKERS: usize = 4;
26+
const DURATION_SECS: u64 = 10;
27+
28+
// Quickstart pattern: 7 exporters × 30×30×30 = 189,000 series
29+
const PATTERNS: &[&str] = &[
30+
"constant",
31+
"linear-up",
32+
"linear-down",
33+
"sine",
34+
"sine-noise",
35+
"step",
36+
"exp-up",
37+
];
38+
const NUM_REGIONS: usize = 30;
39+
const NUM_SERVICES: usize = 30;
40+
const NUM_HOSTS: usize = 30;
41+
42+
fn build_remote_write_body(timeseries: Vec<TimeSeries>) -> Vec<u8> {
43+
let write_req = WriteRequest { timeseries };
44+
let proto_bytes = write_req.encode_to_vec();
45+
snap::raw::Encoder::new()
46+
.compress_vec(&proto_bytes)
47+
.expect("snappy compress failed")
48+
}
49+
50+
fn make_sensor_reading(
51+
pattern: &str,
52+
region: &str,
53+
service: &str,
54+
host: &str,
55+
instance: &str,
56+
timestamp_ms: i64,
57+
value: f64,
58+
) -> TimeSeries {
59+
TimeSeries {
60+
labels: vec![
61+
Label {
62+
name: "__name__".into(),
63+
value: "sensor_reading".into(),
64+
},
65+
Label {
66+
name: "host".into(),
67+
value: host.into(),
68+
},
69+
Label {
70+
name: "instance".into(),
71+
value: instance.into(),
72+
},
73+
Label {
74+
name: "job".into(),
75+
value: "pattern-exporters".into(),
76+
},
77+
Label {
78+
name: "pattern".into(),
79+
value: pattern.into(),
80+
},
81+
Label {
82+
name: "region".into(),
83+
value: region.into(),
84+
},
85+
Label {
86+
name: "service".into(),
87+
value: service.into(),
88+
},
89+
],
90+
samples: vec![Sample {
91+
value,
92+
timestamp: timestamp_ms,
93+
}],
94+
}
95+
}
96+
97+
/// Generate a value based on pattern type and timestamp
98+
fn pattern_value(pattern: &str, t_secs: f64, base: f64) -> f64 {
99+
match pattern {
100+
"constant" => base * 1000.0,
101+
"linear-up" => base * 1000.0 + t_secs * 10.0,
102+
"linear-down" => base * 1000.0 - t_secs * 10.0,
103+
"sine" => base * 1000.0 + 500.0 * (t_secs * std::f64::consts::PI / 30.0).sin(),
104+
"sine-noise" => {
105+
base * 1000.0
106+
+ 500.0 * (t_secs * std::f64::consts::PI / 30.0).sin()
107+
+ 50.0 * ((t_secs * 7.3).sin())
108+
}
109+
"step" => {
110+
if (t_secs as i64 / 10) % 2 == 0 {
111+
base * 1000.0
112+
} else {
113+
base * 1000.0 + 500.0
114+
}
115+
}
116+
"exp-up" => base * 1000.0 * (1.0 + t_secs * 0.01).powf(2.0),
117+
_ => base * 1000.0,
118+
}
119+
}
120+
121+
fn make_kll_streaming_config() -> Arc<StreamingConfig> {
122+
// Match quickstart: DatasketchesKLL, K=200, quantile by (pattern), window=10s tumbling
123+
let mut params = HashMap::new();
124+
params.insert("K".to_string(), serde_json::Value::from(200u64));
125+
126+
// Grouping by pattern (spatial key), rolling up region/service/host/instance/job
127+
let grouping =
128+
promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![
129+
"pattern".to_string(),
130+
]);
131+
let rollup =
132+
promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![
133+
"instance".to_string(),
134+
"job".to_string(),
135+
"region".to_string(),
136+
"service".to_string(),
137+
"host".to_string(),
138+
]);
139+
let aggregated =
140+
promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]);
141+
142+
let agg_config = AggregationConfig::new(
143+
1,
144+
"DatasketchesKLL".to_string(),
145+
String::new(),
146+
params,
147+
grouping,
148+
rollup,
149+
aggregated,
150+
String::new(),
151+
10, // window size = 10s (matching quickstart range-duration/step)
152+
10, // tumbling
153+
"tumbling".to_string(),
154+
"sensor_reading".to_string(),
155+
"sensor_reading".to_string(),
156+
None,
157+
None,
158+
None,
159+
None,
160+
);
161+
162+
let mut agg_map = HashMap::new();
163+
agg_map.insert(1u64, agg_config);
164+
Arc::new(StreamingConfig::new(agg_map))
165+
}
166+
167+
fn read_proc_status() -> (u64, u64, u64) {
168+
// Returns (VmRSS in KB, VmPeak in KB, VmSize in KB)
169+
let status = std::fs::read_to_string("/proc/self/status").unwrap_or_default();
170+
let mut vm_rss = 0u64;
171+
let mut vm_peak = 0u64;
172+
let mut vm_size = 0u64;
173+
for line in status.lines() {
174+
if line.starts_with("VmRSS:") {
175+
vm_rss = line.split_whitespace().nth(1).and_then(|s| s.parse().ok()).unwrap_or(0);
176+
} else if line.starts_with("VmPeak:") {
177+
vm_peak = line.split_whitespace().nth(1).and_then(|s| s.parse().ok()).unwrap_or(0);
178+
} else if line.starts_with("VmSize:") {
179+
vm_size = line.split_whitespace().nth(1).and_then(|s| s.parse().ok()).unwrap_or(0);
180+
}
181+
}
182+
(vm_rss, vm_peak, vm_size)
183+
}
184+
185+
fn read_proc_cpu_time() -> (f64, f64) {
186+
// Returns (user_time_secs, system_time_secs) from /proc/self/stat
187+
let stat = std::fs::read_to_string("/proc/self/stat").unwrap_or_default();
188+
let parts: Vec<&str> = stat.split_whitespace().collect();
189+
if parts.len() > 14 {
190+
let ticks_per_sec = 100.0; // typical Linux CLK_TCK
191+
let utime = parts[13].parse::<f64>().unwrap_or(0.0) / ticks_per_sec;
192+
let stime = parts[14].parse::<f64>().unwrap_or(0.0) / ticks_per_sec;
193+
(utime, stime)
194+
} else {
195+
(0.0, 0.0)
196+
}
197+
}
198+
199+
#[tokio::main]
200+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
201+
tracing_subscriber::fmt()
202+
.with_env_filter(
203+
tracing_subscriber::EnvFilter::try_from_default_env()
204+
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")),
205+
)
206+
.init();
207+
208+
let streaming_config = make_kll_streaming_config();
209+
let store: Arc<dyn Store> = Arc::new(SimpleMapStore::new_with_strategy(
210+
streaming_config.clone(),
211+
CleanupPolicy::CircularBuffer,
212+
LockStrategy::PerKey,
213+
));
214+
215+
let engine_config = PrecomputeEngineConfig {
216+
num_workers: NUM_WORKERS,
217+
ingest_port: INGEST_PORT,
218+
allowed_lateness_ms: 5_000,
219+
max_buffer_per_series: 10_000,
220+
flush_interval_ms: 1_000,
221+
channel_buffer_size: 50_000,
222+
pass_raw_samples: false,
223+
raw_mode_aggregation_id: 0,
224+
late_data_policy: LateDataPolicy::Drop,
225+
};
226+
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
227+
let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink);
228+
tokio::spawn(async move {
229+
if let Err(e) = engine.run().await {
230+
eprintln!("Precompute engine error: {e}");
231+
}
232+
});
233+
234+
// Wait for server to bind
235+
tokio::time::sleep(Duration::from_secs(1)).await;
236+
237+
let series_per_pattern = NUM_REGIONS * NUM_SERVICES * NUM_HOSTS; // 27,000
238+
let total_series = PATTERNS.len() * series_per_pattern; // 189,000
239+
240+
println!("=== Precompute Engine E2E Resource Test ===");
241+
println!(" Patterns: {} ({:?})", PATTERNS.len(), PATTERNS);
242+
println!(" Series per pattern: {} ({}×{}×{})", series_per_pattern, NUM_REGIONS, NUM_SERVICES, NUM_HOSTS);
243+
println!(" Total series: {}", total_series);
244+
println!(" Workers: {}", NUM_WORKERS);
245+
println!(" Duration: {}s", DURATION_SECS);
246+
println!(" Aggregation: DatasketchesKLL K=200, tumbling 10s, group by pattern");
247+
println!();
248+
249+
let (rss_before, _, _) = read_proc_status();
250+
let (cpu_user_before, cpu_sys_before) = read_proc_cpu_time();
251+
println!("Before ingestion: VmRSS = {} KB ({:.1} MB)", rss_before, rss_before as f64 / 1024.0);
252+
253+
let client = reqwest::Client::builder()
254+
.pool_max_idle_per_host(8)
255+
.build()?;
256+
257+
let start = Instant::now();
258+
let mut total_samples_sent = 0u64;
259+
let mut tick = 0u64;
260+
261+
println!("\n--- Sending data (simulating Prometheus scrape at 1s intervals) ---");
262+
263+
while start.elapsed() < Duration::from_secs(DURATION_SECS) {
264+
let tick_start = Instant::now();
265+
let timestamp_ms = (tick * 1000 + 500) as i64; // mid-second
266+
let t_secs = tick as f64;
267+
268+
// Build all timeseries for this tick.
269+
// In the quickstart, Prometheus batches all scraped series into remote write.
270+
// We send in chunks to avoid building a single massive request.
271+
let chunk_size = 10_000; // series per HTTP request
272+
let mut all_timeseries = Vec::with_capacity(total_series);
273+
274+
for (p_idx, pattern) in PATTERNS.iter().enumerate() {
275+
let instance = format!("fake-exporter-{}:5000{}", pattern, p_idx);
276+
for r in 0..NUM_REGIONS {
277+
let region = format!("region{}", r);
278+
for s in 0..NUM_SERVICES {
279+
let service = format!("svc{}", s);
280+
for h in 0..NUM_HOSTS {
281+
let host = format!("host{}", h);
282+
let base = (r * NUM_SERVICES * NUM_HOSTS + s * NUM_HOSTS + h) as f64
283+
/ (series_per_pattern as f64);
284+
let value = pattern_value(pattern, t_secs, base);
285+
all_timeseries.push(make_sensor_reading(
286+
pattern, &region, &service, &host, &instance, timestamp_ms, value,
287+
));
288+
}
289+
}
290+
}
291+
}
292+
293+
// Send in parallel chunks
294+
let mut handles = Vec::new();
295+
for chunk in all_timeseries.chunks(chunk_size) {
296+
let body = build_remote_write_body(chunk.to_vec());
297+
let client = client.clone();
298+
handles.push(tokio::spawn(async move {
299+
let resp = client
300+
.post(format!("http://localhost:{INGEST_PORT}/api/v1/write"))
301+
.header("Content-Type", "application/x-protobuf")
302+
.header("Content-Encoding", "snappy")
303+
.body(body)
304+
.send()
305+
.await;
306+
matches!(resp, Ok(r) if r.status().is_success() || r.status() == reqwest::StatusCode::NO_CONTENT)
307+
}));
308+
}
309+
310+
let mut all_ok = true;
311+
for handle in handles {
312+
if !handle.await.unwrap_or(false) {
313+
all_ok = false;
314+
}
315+
}
316+
317+
total_samples_sent += total_series as u64;
318+
let send_time = tick_start.elapsed();
319+
320+
if tick % 2 == 0 || !all_ok {
321+
println!(
322+
" tick={} t={}ms samples={} send_time={:.0}ms ok={}",
323+
tick, timestamp_ms, total_series, send_time.as_secs_f64() * 1000.0, all_ok
324+
);
325+
}
326+
327+
tick += 1;
328+
329+
// Sleep until next 1-second tick
330+
let elapsed_in_tick = tick_start.elapsed();
331+
if elapsed_in_tick < Duration::from_secs(1) {
332+
tokio::time::sleep(Duration::from_secs(1) - elapsed_in_tick).await;
333+
}
334+
}
335+
336+
let wall_time = start.elapsed();
337+
338+
// Wait a bit for processing to finish
339+
tokio::time::sleep(Duration::from_secs(2)).await;
340+
341+
let (rss_after, vm_peak, vm_size) = read_proc_status();
342+
let (cpu_user_after, cpu_sys_after) = read_proc_cpu_time();
343+
344+
let cpu_user = cpu_user_after - cpu_user_before;
345+
let cpu_sys = cpu_sys_after - cpu_sys_before;
346+
let cpu_total = cpu_user + cpu_sys;
347+
348+
println!("\n=== Resource Usage Report (after {}s) ===", DURATION_SECS);
349+
println!(" Wall time: {:.1}s", wall_time.as_secs_f64());
350+
println!(" Ticks completed: {}", tick);
351+
println!(" Total samples sent: {}", total_samples_sent);
352+
println!(
353+
" Avg throughput: {:.0} samples/sec",
354+
total_samples_sent as f64 / wall_time.as_secs_f64()
355+
);
356+
println!();
357+
println!(" --- Memory ---");
358+
println!(" VmRSS (current): {} KB ({:.1} MB)", rss_after, rss_after as f64 / 1024.0);
359+
println!(" VmPeak: {} KB ({:.1} MB)", vm_peak, vm_peak as f64 / 1024.0);
360+
println!(" VmSize: {} KB ({:.1} MB)", vm_size, vm_size as f64 / 1024.0);
361+
println!(
362+
" RSS delta: {} KB ({:.1} MB)",
363+
rss_after.saturating_sub(rss_before),
364+
rss_after.saturating_sub(rss_before) as f64 / 1024.0
365+
);
366+
println!();
367+
println!(" --- CPU ---");
368+
println!(" User time: {:.2}s", cpu_user);
369+
println!(" System time: {:.2}s", cpu_sys);
370+
println!(" Total CPU time: {:.2}s", cpu_total);
371+
println!(
372+
" CPU utilization: {:.1}% (of {:.1}s wall time)",
373+
cpu_total / wall_time.as_secs_f64() * 100.0,
374+
wall_time.as_secs_f64()
375+
);
376+
377+
println!("\n=== Test complete ===");
378+
379+
Ok(())
380+
}

asap-query-engine/src/precompute_engine/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ pub mod series_router;
88
pub mod window_manager;
99
pub mod worker;
1010

11-
pub use engine::PrecomputeEngine;
11+
pub use engine::{PrecomputeEngine, PrecomputeWorkerDiagnostics};

0 commit comments

Comments
 (0)