Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@ class StreamingAggregationConfig:
aggregationType: str
aggregationSubType: str

# NEW fields for sliding window support (Issue #236)
windowSize: int # Window size in seconds (e.g., 900s for 15m)
slideInterval: int # Slide/hop interval in seconds (e.g., 30s)
windowType: str # "tumbling" or "sliding"

# DEPRECATED but kept for backward compatibility
tumblingWindowSize: int # For reading old configs

spatialFilter: str
metric: str # PromQL mode: metric name
parameters: dict
Expand Down Expand Up @@ -64,18 +60,10 @@ def from_dict(aggregation_config: dict) -> "StreamingAggregationConfig":
aggregation.aggregationType = aggregation_config["aggregationType"]
aggregation.aggregationSubType = aggregation_config["aggregationSubType"]

# NEW: Handle new window fields with backward compatibility
aggregation.windowType = aggregation_config.get("windowType", "tumbling")
aggregation.windowSize = aggregation_config.get(
"windowSize", aggregation_config.get("tumblingWindowSize")
)
aggregation.windowSize = aggregation_config["windowSize"]
aggregation.slideInterval = aggregation_config.get(
"slideInterval", aggregation_config.get("tumblingWindowSize")
)

# Keep deprecated field for backward compatibility
aggregation.tumblingWindowSize = aggregation_config.get(
"tumblingWindowSize", aggregation.windowSize
"slideInterval", aggregation.windowSize
)

aggregation.spatialFilter = aggregation_config["spatialFilter"]
Expand Down Expand Up @@ -150,10 +138,9 @@ def get_identifying_key(self) -> Tuple:
keys = [
self.aggregationType,
self.aggregationSubType,
self.windowType, # NEW: Include window type
self.windowSize, # NEW: Include window size
self.slideInterval, # NEW: Include slide interval
self.tumblingWindowSize, # Keep for backward compatibility
self.windowType,
self.windowSize,
self.slideInterval,
self.spatialFilter,
self.metric,
self.table_name, # SQL mode: table name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@ pub struct AggregationConfig {
pub rollup_labels: KeyByLabelNames,
pub original_yaml: String,

// NEW fields for sliding window support (Issue #236)
pub window_size: u64, // Window size in seconds (e.g., 900s for 15m)
pub slide_interval: u64, // Slide/hop interval in seconds (e.g., 30s)
pub window_type: String, // "tumbling" or "sliding"

// DEPRECATED but kept for backward compatibility
pub tumbling_window_size: u64,

pub spatial_filter: String,
pub spatial_filter_normalized: String,
pub metric: String, // PromQL mode: metric name; SQL mode: derived from table_name.value_column
Expand All @@ -51,27 +47,20 @@ impl AggregationConfig {
aggregated_labels: KeyByLabelNames,
rollup_labels: KeyByLabelNames,
original_yaml: String,
tumbling_window_size: u64,
window_size: u64,
slide_interval: u64,
window_type: String,
spatial_filter: String,
metric: String,
num_aggregates_to_retain: Option<u64>,
read_count_threshold: Option<u64>,
// NEW parameters for sliding window support
window_size: Option<u64>,
slide_interval: Option<u64>,
window_type: Option<String>,
// SQL-specific fields
table_name: Option<String>,
value_column: Option<String>,
) -> Self {
// Generate normalized spatial filter (placeholder implementation)
let spatial_filter_normalized = normalize_spatial_filter(&spatial_filter);

// Handle backward compatibility: if new fields not provided, use tumbling_window_size
let window_size = window_size.unwrap_or(tumbling_window_size);
let slide_interval = slide_interval.unwrap_or(tumbling_window_size);
let window_type = window_type.unwrap_or_else(|| "tumbling".to_string());

Self {
aggregation_id,
aggregation_type,
Expand All @@ -84,7 +73,6 @@ impl AggregationConfig {
window_size,
slide_interval,
window_type,
tumbling_window_size,
spatial_filter,
spatial_filter_normalized,
metric,
Expand Down Expand Up @@ -143,19 +131,18 @@ impl AggregationConfig {
let aggregated_labels = KeyByLabelNames::deserialize_from_json(&data["aggregatedLabels"])?;
let rollup_labels = KeyByLabelNames::deserialize_from_json(&data["rollupLabels"])?;

let tumbling_window_size = data["tumblingWindowSize"]
.as_u64()
.ok_or("Missing tumblingWindowSize")?;
let window_size = data["windowSize"].as_u64().ok_or("Missing windowSize")?;

// NEW: Handle new window fields with backward compatibility
let window_type = data
.get("windowType")
.and_then(|v| v.as_str())
.map(|s| s.to_string());

let window_size = data.get("windowSize").and_then(|v| v.as_u64());
.unwrap_or("tumbling")
.to_string();

let slide_interval = data.get("slideInterval").and_then(|v| v.as_u64());
let slide_interval = data
.get("slideInterval")
.and_then(|v| v.as_u64())
.unwrap_or(window_size);

let spatial_filter = data["spatialFilter"].as_str().unwrap_or("").to_string();

Expand Down Expand Up @@ -183,14 +170,13 @@ impl AggregationConfig {
aggregated_labels,
rollup_labels,
original_yaml,
tumbling_window_size,
window_size,
slide_interval,
window_type,
spatial_filter,
metric,
num_aggregates_to_retain,
read_count_threshold,
window_size,
slide_interval,
window_type,
table_name,
value_column,
))
Expand Down Expand Up @@ -265,21 +251,20 @@ impl AggregationConfig {
})
.collect();

let tumbling_window_size = aggregation_data["tumblingWindowSize"]
let window_size = aggregation_data["windowSize"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing tumblingWindowSize"))?;
.ok_or_else(|| anyhow::anyhow!("Missing windowSize"))?;

// NEW: Handle new window fields with backward compatibility
let window_type = aggregation_data
.get("windowType")
.and_then(|v| v.as_str())
.map(|s| s.to_string());

let window_size = aggregation_data.get("windowSize").and_then(|v| v.as_u64());
.unwrap_or("tumbling")
.to_string();

let slide_interval = aggregation_data
.get("slideInterval")
.and_then(|v| v.as_u64());
.and_then(|v| v.as_u64())
.unwrap_or(window_size);

let spatial_filter = aggregation_data["spatialFilter"]
.as_str()
Expand Down Expand Up @@ -329,14 +314,13 @@ impl AggregationConfig {
aggregated_labels,
rollup_labels,
String::new(), // original_yaml - empty as in Python
tumbling_window_size,
window_size,
slide_interval,
window_type,
spatial_filter,
metric,
num_aggregates_to_retain,
read_count_threshold,
window_size,
slide_interval,
window_type,
table_name,
value_column,
))
Expand All @@ -351,8 +335,6 @@ impl SerializableToSink for AggregationConfig {
"aggregationSubType": self.aggregation_sub_type,
"parameters": self.parameters,
"originalYaml": self.original_yaml,
"tumblingWindowSize": self.tumbling_window_size,
// NEW: Include new window fields
"windowSize": self.window_size,
"slideInterval": self.slide_interval,
"windowType": self.window_type,
Expand Down
2 changes: 1 addition & 1 deletion asap-planner-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl PlannerOutput {
if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") {
return aggs.iter().all(|agg| {
if let YamlValue::Mapping(m) = agg {
if let Some(val) = m.get("tumblingWindowSize") {
if let Some(val) = m.get("windowSize") {
let size = match val {
YamlValue::Number(n) => n.as_u64().unwrap_or(0),
_ => 0,
Expand Down
4 changes: 0 additions & 4 deletions asap-planner-rs/src/output/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,6 @@ fn build_streaming_yaml(
None => YamlValue::Null,
},
);
map.insert(
YamlValue::String("tumblingWindowSize".to_string()),
YamlValue::Number(cfg.tumbling_window_size.into()),
);
map.insert(
YamlValue::String("value_column".to_string()),
match &cfg.value_column {
Expand Down
3 changes: 0 additions & 3 deletions asap-planner-rs/src/planner/logics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,11 @@ fn set_tumbling_window_parameters(
config.window_size = effective_repeat;
config.slide_interval = effective_repeat;
config.window_type = "tumbling".to_string();
config.tumbling_window_size = effective_repeat;
}
QueryPatternType::OnlySpatial => {
config.window_size = prometheus_scrape_interval;
config.slide_interval = prometheus_scrape_interval;
config.window_type = "tumbling".to_string();
config.tumbling_window_size = prometheus_scrape_interval;
}
}
}
Expand All @@ -78,7 +76,6 @@ pub struct IntermediateWindowConfig {
pub window_size: u64,
pub slide_interval: u64,
pub window_type: String,
pub tumbling_window_size: u64,
}

pub fn get_precompute_operator_parameters(
Expand Down
7 changes: 1 addition & 6 deletions asap-planner-rs/src/planner/single_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub struct IntermediateAggConfig {
pub window_type: String,
pub window_size: u64,
pub slide_interval: u64,
pub tumbling_window_size: u64,
pub spatial_filter: String,
pub metric: String,
pub table_name: Option<String>,
Expand Down Expand Up @@ -66,13 +65,12 @@ impl IntermediateAggConfig {
}

format!(
"{}|{}|{}|{}|{}|{}|{}|{}|{:?}|{:?}|{:?}|{}",
"{}|{}|{}|{}|{}|{}|{}|{:?}|{:?}|{:?}|{}",
self.aggregation_type,
self.aggregation_sub_type,
self.window_type,
self.window_size,
self.slide_interval,
self.tumbling_window_size,
self.spatial_filter,
self.metric,
self.table_name,
Expand Down Expand Up @@ -281,7 +279,6 @@ impl SingleQueryProcessor {
window_type: window_cfg.window_type.clone(),
window_size: window_cfg.window_size,
slide_interval: window_cfg.slide_interval,
tumbling_window_size: window_cfg.tumbling_window_size,
spatial_filter: spatial_filter.clone(),
metric: metric.clone(),
table_name: None,
Expand All @@ -299,7 +296,6 @@ impl SingleQueryProcessor {
window_type: window_cfg.window_type.clone(),
window_size: window_cfg.window_size,
slide_interval: window_cfg.slide_interval,
tumbling_window_size: window_cfg.tumbling_window_size,
spatial_filter: spatial_filter.clone(),
metric: metric.clone(),
table_name: None,
Expand Down Expand Up @@ -427,7 +423,6 @@ mod tests {
window_type: "tumbling".to_string(),
window_size: 300,
slide_interval: 300,
tumbling_window_size: 300,
spatial_filter: String::new(),
metric: "http_requests_total".to_string(),
table_name: None,
Expand Down
4 changes: 2 additions & 2 deletions asap-planner-rs/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,12 @@ fn malformed_yaml_returns_parse_error() {

// --- Overlapping window tests ---
// Queries where range vector > t_repeat: e.g. [5m] repeated every 60s.
// Windows are always tumbling (sliding disabled); the planner emits tumblingWindowSize=t_repeat
// Windows are always tumbling (sliding disabled); the planner emits windowSize=t_repeat
// and the cleanup param tells the query engine how many windows to retain to cover the range.

#[test]
fn temporal_overlapping_window_size_equals_t_repeat() {
// [5m] range repeated every 60s → tumblingWindowSize = 60, not 300
// [5m] range repeated every 60s → windowSize = 60, not 300
let c = Controller::from_file(
Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"),
arroyo_opts(),
Expand Down
13 changes: 6 additions & 7 deletions asap-query-engine/benches/simple_store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,15 @@ fn make_agg_config(
KeyByLabelNames::empty(),
KeyByLabelNames::empty(),
"".to_string(),
60,
"".to_string(),
60, // window_size (seconds)
60, // slide_interval (seconds)
"tumbling".to_string(), // window_type
"".to_string(), // spatial_filter
metric.to_string(),
num_aggregates_to_retain,
read_count_threshold,
None,
None,
None,
None,
None,
None, // table_name
None, // value_column
)
}

Expand Down
1 change: 0 additions & 1 deletion asap-query-engine/examples/promql/streaming_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ aggregations:
metric: fake_metric
parameters:
K: 20
tumblingWindowSize: 1
windowSize: 1
windowType: tumbling
spatialFilter: ''
Expand Down
1 change: 0 additions & 1 deletion asap-query-engine/examples/sql/streaming_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ aggregations:
value_column: cpu_usage
parameters:
K: 20
tumblingWindowSize: 1
windowSize: 1
windowType: tumbling
spatialFilter: ''
2 changes: 1 addition & 1 deletion asap-query-engine/src/data_model/precomputed_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ impl SerializableToSink for PrecomputedOutput {
// assert_eq!(config.aggregation_id, 1);
// assert_eq!(config.metric, "cpu_usage");
// assert_eq!(config.aggregation_type, "sum");
// assert_eq!(config.tumbling_window_size, 10);
// assert_eq!(config.window_size, 10);
// }

// #[test]
Expand Down
12 changes: 6 additions & 6 deletions asap-query-engine/src/engines/simple_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,17 +552,17 @@ impl SimpleEngine {
}
"SetAggregator" => {
// Latest window only
let tumbling_window_size = self
let window_size = self
.streaming_config
.get_aggregation_config(agg_info.aggregation_id_for_key)
.map(|config| config.tumbling_window_size * 1000)
.map(|config| config.window_size * 1000)
.ok_or_else(|| {
format!(
"Failed to get tumbling window size for aggregation {}",
"Failed to get window size for aggregation {}",
agg_info.aggregation_id_for_key
)
})?;
(end_timestamp - tumbling_window_size, end_timestamp)
(end_timestamp - window_size, end_timestamp)
}
other => {
return Err(format!("Unsupported key aggregation type: {}", other));
Expand Down Expand Up @@ -2507,11 +2507,11 @@ impl SimpleEngine {
let end_ms = Self::convert_query_time_to_data_time(end);
let step_ms = (step * 1000.0) as u64;

// Get tumbling window size
// Get window size
let tumbling_window_ms = self
.streaming_config
.get_aggregation_config(base_context.agg_info.aggregation_id_for_value)
.map(|config| config.tumbling_window_size * 1000)?;
.map(|config| config.window_size * 1000)?;

// Validate parameters
self.validate_range_query_params(start_ms, end_ms, step_ms, tumbling_window_ms)
Expand Down
1 change: 0 additions & 1 deletion asap-query-engine/src/tests/sql_pattern_matching_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ mod tests {
window_size: window_secs,
slide_interval: window_secs,
window_type: "tumbling".to_string(),
tumbling_window_size: window_secs,
spatial_filter: String::new(),
spatial_filter_normalized: String::new(),
metric: "cpu_usage".to_string(),
Expand Down
Loading
Loading