From c7107e21c8a1e6ae76e9dc2a095d8042e0cd1637 Mon Sep 17 00:00:00 2001 From: zz_y Date: Wed, 11 Mar 2026 16:17:21 -0500 Subject: [PATCH 1/2] Remove unused Flink code pieces from query engine - Delete commenting_out_flink_diff temp file - Remove StreamingEngine enum (Flink/Arroyo variants no longer needed) - Remove --streaming-engine CLI arg from main.rs - Remove streaming_engine field from KafkaConsumerConfig - Remove StreamingEngine::Flink match arm and all commented-out Flink message handling from kafka.rs - Remove deserialize_from_json_flink commented-out method from precomputed_output.rs - Simplify create_precompute_from_bytes: remove streaming_engine param and replace all Flink/Arroyo conditionals with direct arroyo calls - Clean up residual Flink references in accumulator files Co-Authored-By: Claude Sonnet 4.6 --- .../src/commenting_out_flink_diff | 1032 ----------------- asap-query-engine/src/data_model/enums.rs | 6 - .../src/data_model/precomputed_output.rs | 120 +- asap-query-engine/src/drivers/ingest/kafka.rs | 191 +-- asap-query-engine/src/main.rs | 7 +- .../datasketches_kll_accumulator.rs | 2 - .../hydra_kll_accumulator.rs | 3 +- .../set_aggregator_accumulator.rs | 2 +- 8 files changed, 71 insertions(+), 1292 deletions(-) delete mode 100644 asap-query-engine/src/commenting_out_flink_diff diff --git a/asap-query-engine/src/commenting_out_flink_diff b/asap-query-engine/src/commenting_out_flink_diff deleted file mode 100644 index 50c1763..0000000 --- a/asap-query-engine/src/commenting_out_flink_diff +++ /dev/null @@ -1,1032 +0,0 @@ -diff --git a/src/data_model/enums.rs b/src/data_model/enums.rs -index b04e3bf..c04e9c7 100644 ---- a/src/data_model/enums.rs -+++ b/src/data_model/enums.rs -@@ -6,6 +6,6 @@ pub enum InputFormat { - - #[derive(clap::ValueEnum, Clone, Debug)] - pub enum StreamingEngine { -- Flink, -+ // Flink, - Arroyo, - } -diff --git a/src/data_model/precomputed_output.rs b/src/data_model/precomputed_output.rs -index f4ca3dd..b0f749f 100644 ---- a/src/data_model/precomputed_output.rs -+++ b/src/data_model/precomputed_output.rs -@@ -254,61 +254,61 @@ impl PrecomputedOutput { - streaming_engine: &str, - ) -> Result> { - match streaming_engine { -- "flink" => Self::deserialize_from_json_flink(data, streaming_config), -+ // "flink" => Self::deserialize_from_json_flink(data, streaming_config), - "arroyo" => Self::deserialize_from_json_arroyo(data, streaming_config), - _ => Err(format!("Unknown streaming engine: {streaming_engine}").into()), - } - } - -- /// Deserialization for Flink streaming engine -- pub fn deserialize_from_json_flink( -- data: &serde_json::Value, -- streaming_config: &HashMap, -- ) -> Result> { -- let aggregation_id = data -- .get("aggregation_id") -- .and_then(|v| v.as_u64()) -- .ok_or("Missing or invalid 'aggregation_id' field")?; -- -- let start_timestamp = data -- .get("start_timestamp") -- .and_then(|v| v.as_u64()) -- .ok_or("Missing or invalid 'start_timestamp' field")?; -- -- let end_timestamp = data -- .get("end_timestamp") -- .and_then(|v| v.as_u64()) -- .ok_or("Missing or invalid 'end_timestamp' field")?; -- -- let key = if let Some(key_data) = data.get("key") { -- if key_data.is_null() { -- None -- } else { -- Some(KeyByLabelValues::deserialize_from_json(key_data).map_err( -- |e| -> Box { -- format!("Failed to deserialize key: {e}").into() -- }, -- )?) -- } -- } else { -- None -- }; -- -- // Get aggregation type from streaming config lookup -- let config = streaming_config -- .get(&aggregation_id) -- .ok_or_else(|| { -- format!("Aggregation ID {aggregation_id} not found in streaming config") -- })? -- .clone(); -- -- Ok(Self { -- start_timestamp, -- end_timestamp, -- key, -- config, -- }) -- } -+ // /// Deserialization for Flink streaming engine -+ // pub fn deserialize_from_json_flink( -+ // data: &serde_json::Value, -+ // streaming_config: &HashMap, -+ // ) -> Result> { -+ // let aggregation_id = data -+ // .get("aggregation_id") -+ // .and_then(|v| v.as_u64()) -+ // .ok_or("Missing or invalid 'aggregation_id' field")?; -+ -+ // let start_timestamp = data -+ // .get("start_timestamp") -+ // .and_then(|v| v.as_u64()) -+ // .ok_or("Missing or invalid 'start_timestamp' field")?; -+ -+ // let end_timestamp = data -+ // .get("end_timestamp") -+ // .and_then(|v| v.as_u64()) -+ // .ok_or("Missing or invalid 'end_timestamp' field")?; -+ -+ // let key = if let Some(key_data) = data.get("key") { -+ // if key_data.is_null() { -+ // None -+ // } else { -+ // Some(KeyByLabelValues::deserialize_from_json(key_data).map_err( -+ // |e| -> Box { -+ // format!("Failed to deserialize key: {e}").into() -+ // }, -+ // )?) -+ // } -+ // } else { -+ // None -+ // }; -+ -+ // // Get aggregation type from streaming config lookup -+ // let config = streaming_config -+ // .get(&aggregation_id) -+ // .ok_or_else(|| { -+ // format!("Aggregation ID {aggregation_id} not found in streaming config") -+ // })? -+ // .clone(); -+ -+ // Ok(Self { -+ // start_timestamp, -+ // end_timestamp, -+ // key, -+ // config, -+ // }) -+ // } - - /// Deserialization for Arroyo streaming engine - pub fn deserialize_from_json_arroyo( -diff --git a/src/drivers/kafka_consumer.rs b/src/drivers/kafka_consumer.rs -index c2d090f..06f1651 100644 ---- a/src/drivers/kafka_consumer.rs -+++ b/src/drivers/kafka_consumer.rs -@@ -224,105 +224,105 @@ impl KafkaConsumer { - InputFormat::Json => { - // Handle streaming engine specific logic - match self.config.streaming_engine { -- StreamingEngine::Flink => { -- debug!("Received message of length: {}", payload.len()); -+ // StreamingEngine::Flink => { -+ // debug!("Received message of length: {}", payload.len()); - -- let json_data = if self.config.decompress_json { -- // Decompress using gzip -- let mut decoder = GzDecoder::new(payload); -- let mut decompressed = Vec::new(); -- match decoder.read_to_end(&mut decompressed) { -- Ok(_) => { -- debug!( -- "Decompressed JSON message of length: {}", -- decompressed.len() -- ); -- decompressed -- } -- Err(e) => { -- error!("Error decompressing gzip data: {}", e); -- return Err(format!("Gzip decompression error: {e}").into()); -- } -- } -- } else { -- payload.to_vec() -- }; -+ // let json_data = if self.config.decompress_json { -+ // // Decompress using gzip -+ // let mut decoder = GzDecoder::new(payload); -+ // let mut decompressed = Vec::new(); -+ // match decoder.read_to_end(&mut decompressed) { -+ // Ok(_) => { -+ // debug!( -+ // "Decompressed JSON message of length: {}", -+ // decompressed.len() -+ // ); -+ // decompressed -+ // } -+ // Err(e) => { -+ // error!("Error decompressing gzip data: {}", e); -+ // return Err(format!("Gzip decompression error: {e}").into()); -+ // } -+ // } -+ // } else { -+ // payload.to_vec() -+ // }; - -- let json_str = match String::from_utf8(json_data) { -- Ok(s) => s, -- Err(e) => { -- error!("Error converting bytes to UTF-8: {}", e); -- return Err(format!("UTF-8 conversion error: {e}").into()); -- } -- }; -+ // let json_str = match String::from_utf8(json_data) { -+ // Ok(s) => s, -+ // Err(e) => { -+ // error!("Error converting bytes to UTF-8: {}", e); -+ // return Err(format!("UTF-8 conversion error: {e}").into()); -+ // } -+ // }; - -- let json_parse_start_time = Instant::now(); -+ // let json_parse_start_time = Instant::now(); - -- let json_dict: serde_json::Value = match serde_json::from_str(&json_str) { -- Ok(dict) => { -- let json_parse_duration = json_parse_start_time.elapsed(); -- debug!( -- "JSON parsing took: {:.2}ms", -- json_parse_duration.as_secs_f64() * 1000.0 -- ); -- dict -- } -- Err(e) => { -- error!("Error parsing JSON: {}", e); -- debug!("JSON content: {}", json_str); -- return Err(format!("JSON parsing error: {e}").into()); -- } -- }; -+ // let json_dict: serde_json::Value = match serde_json::from_str(&json_str) { -+ // Ok(dict) => { -+ // let json_parse_duration = json_parse_start_time.elapsed(); -+ // debug!( -+ // "JSON parsing took: {:.2}ms", -+ // json_parse_duration.as_secs_f64() * 1000.0 -+ // ); -+ // dict -+ // } -+ // Err(e) => { -+ // error!("Error parsing JSON: {}", e); -+ // debug!("JSON content: {}", json_str); -+ // return Err(format!("JSON parsing error: {e}").into()); -+ // } -+ // }; - -- debug!( -- "Deserializing JSON message: {}, {}, {}", -- json_dict -- .get("aggregation_id") -- .and_then(|v| v.as_u64()) -- .unwrap_or(0), -- json_dict -- .get("start_timestamp") -- .and_then(|v| v.as_u64()) -- .unwrap_or(0), -- json_dict -- .get("end_timestamp") -- .and_then(|v| v.as_u64()) -- .unwrap_or(0) -- ); -+ // debug!( -+ // "Deserializing JSON message: {}, {}, {}", -+ // json_dict -+ // .get("aggregation_id") -+ // .and_then(|v| v.as_u64()) -+ // .unwrap_or(0), -+ // json_dict -+ // .get("start_timestamp") -+ // .and_then(|v| v.as_u64()) -+ // .unwrap_or(0), -+ // json_dict -+ // .get("end_timestamp") -+ // .and_then(|v| v.as_u64()) -+ // .unwrap_or(0) -+ // ); - -- let deserialize_start_time = Instant::now(); -+ // let deserialize_start_time = Instant::now(); - -- match PrecomputedOutput::deserialize_from_json_with_precompute(&json_dict) { -- Ok((output, precompute)) => { -- let deserialize_duration = deserialize_start_time.elapsed(); -- debug!( -- "Deserialization took: {:.2}ms", -- deserialize_duration.as_secs_f64() * 1000.0 -- ); -- debug!( -- "Deserialized item: {}, {}, {}", -- output.config.aggregation_id, -- output.start_timestamp, -- output.end_timestamp -- ); -- debug!("Successfully deserialized Flink JSON message with precompute data"); -- let total_message_duration = message_start_time.elapsed(); -- debug!( -- "Total message processing took: {:.2}ms", -- total_message_duration.as_secs_f64() * 1000.0 -- ); -- Ok(Some((output, precompute))) -- } -- Err(e) => { -- error!( -- "Error deserializing Flink PrecomputedOutput from JSON with precompute: {}", -- e -- ); -- debug!("JSON content: {}", json_str); -- Err(e) -- } -- } -- } -+ // match PrecomputedOutput::deserialize_from_json_with_precompute(&json_dict) { -+ // Ok((output, precompute)) => { -+ // let deserialize_duration = deserialize_start_time.elapsed(); -+ // debug!( -+ // "Deserialization took: {:.2}ms", -+ // deserialize_duration.as_secs_f64() * 1000.0 -+ // ); -+ // debug!( -+ // "Deserialized item: {}, {}, {}", -+ // output.config.aggregation_id, -+ // output.start_timestamp, -+ // output.end_timestamp -+ // ); -+ // debug!("Successfully deserialized Flink JSON message with precompute data"); -+ // let total_message_duration = message_start_time.elapsed(); -+ // debug!( -+ // "Total message processing took: {:.2}ms", -+ // total_message_duration.as_secs_f64() * 1000.0 -+ // ); -+ // Ok(Some((output, precompute))) -+ // } -+ // Err(e) => { -+ // error!( -+ // "Error deserializing Flink PrecomputedOutput from JSON with precompute: {}", -+ // e -+ // ); -+ // debug!("JSON content: {}", json_str); -+ // Err(e) -+ // } -+ // } -+ // } - StreamingEngine::Arroyo => { - // Arroyo messages - gzip decompression is applied at precompute level, not message level - let json_str = match String::from_utf8(payload.to_vec()) { -diff --git a/src/precompute_operators/count_min_sketch_accumulator.rs b/src/precompute_operators/count_min_sketch_accumulator.rs -index bcd98e4..658999e 100644 ---- a/src/precompute_operators/count_min_sketch_accumulator.rs -+++ b/src/precompute_operators/count_min_sketch_accumulator.rs -@@ -14,12 +14,12 @@ use promql_utilities::query_logics::enums::Statistic; - pub struct CountMinSketchAccumulator { - pub row_num: usize, - pub col_num: usize, -- pub sketch: Vec>, -+ pub sketch: Vec>, - } - - impl CountMinSketchAccumulator { - pub fn new(row_num: usize, col_num: usize) -> Self { -- let sketch = vec![vec![0; col_num]; row_num]; -+ let sketch = vec![vec![0.0; col_num]; row_num]; - Self { - row_num, - col_num, -@@ -44,7 +44,7 @@ impl CountMinSketchAccumulator { - for i in 0..self.row_num { - let hash_value = xxh32(key_bytes, i as u32); - let col_index = (hash_value as usize) % self.col_num; -- self.sketch[i][col_index] += value as i32; -+ self.sketch[i][col_index] += value; - } - } - -@@ -61,7 +61,7 @@ impl CountMinSketchAccumulator { - let key_str = key_values.join(";"); - let key_bytes = key_str.as_bytes(); - -- let mut min_value = i32::MAX; -+ let mut min_value = f64::MAX; - - // Query each row and take the minimum - for i in 0..self.row_num { -@@ -70,44 +70,44 @@ impl CountMinSketchAccumulator { - min_value = min_value.min(self.sketch[i][col_index]); - } - -- min_value as f64 -+ min_value - } - -- pub fn deserialize_from_json(data: &Value) -> Result> { -- let row_num = data["row_num"] -- .as_u64() -- .ok_or("Missing or invalid 'row_num' field")? as usize; -- let col_num = data["col_num"] -- .as_u64() -- .ok_or("Missing or invalid 'col_num' field")? as usize; -- -- let sketch_data = data["sketch"] -- .as_array() -- .ok_or("Missing or invalid 'sketch' field")?; -- -- let mut sketch = Vec::new(); -- for row in sketch_data { -- let row_array = row.as_array().ok_or("Invalid row in sketch data")?; -- let mut sketch_row = Vec::new(); -- for cell in row_array { -- let value = cell.as_i64().ok_or("Invalid cell value in sketch data")? as i32; -- sketch_row.push(value); -- } -- sketch.push(sketch_row); -- } -- -- Ok(Self { -- row_num, -- col_num, -- sketch, -- }) -- } -+ // pub fn deserialize_from_json(data: &Value) -> Result> { -+ // let row_num = data["row_num"] -+ // .as_f64() -+ // .ok_or("Missing or invalid 'row_num' field")? as usize; -+ // let col_num = data["col_num"] -+ // .as_f64() -+ // .ok_or("Missing or invalid 'col_num' field")? as usize; -+ -+ // let sketch_data = data["sketch"] -+ // .as_array() -+ // .ok_or("Missing or invalid 'sketch' field")?; -+ -+ // let mut sketch = Vec::new(); -+ // for row in sketch_data { -+ // let row_array = row.as_array().ok_or("Invalid row in sketch data")?; -+ // let mut sketch_row = Vec::new(); -+ // for cell in row_array { -+ // let value = cell.as_f64().ok_or("Invalid cell value in sketch data")?; -+ // sketch_row.push(value); -+ // } -+ // sketch.push(sketch_row); -+ // } -+ -+ // Ok(Self { -+ // row_num, -+ // col_num, -+ // sketch, -+ // }) -+ // } - - pub fn deserialize_from_bytes_arroyo( - buffer: &[u8], - ) -> Result> { - // Arroyo uses MessagePack format: [sketch_counters, col_num, row_num] -- let precompute: (Vec>, usize, usize) = rmp_serde::from_slice(buffer) -+ let precompute: (Vec>, usize, usize) = rmp_serde::from_slice(buffer) - .map_err(|e| format!("Failed to deserialize CountMinSketch from MessagePack: {e}"))?; - - let (sketch_counters, col_num, row_num) = precompute; -@@ -118,43 +118,46 @@ impl CountMinSketchAccumulator { - }) - } - -- pub fn deserialize_from_bytes(buffer: &[u8]) -> Result> { -- if buffer.len() < 8 { -- return Err("Buffer too short for row_num and col_num".into()); -- } -- -- let row_num = u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]) as usize; -- let col_num = u32::from_le_bytes([buffer[4], buffer[5], buffer[6], buffer[7]]) as usize; -- -- let expected_size = 8 + (row_num * col_num * 4); -- if buffer.len() < expected_size { -- return Err("Buffer too short for sketch data".into()); -- } -- -- let mut sketch = Vec::new(); -- let mut offset = 8; -- -- for _ in 0..row_num { -- let mut row = Vec::new(); -- for _ in 0..col_num { -- let value = i32::from_le_bytes([ -- buffer[offset], -- buffer[offset + 1], -- buffer[offset + 2], -- buffer[offset + 3], -- ]); -- row.push(value); -- offset += 4; -- } -- sketch.push(row); -- } -- -- Ok(Self { -- row_num, -- col_num, -- sketch, -- }) -- } -+ // pub fn deserialize_from_bytes(buffer: &[u8]) -> Result> { -+ // if buffer.len() < 8 { -+ // return Err("Buffer too short for row_num and col_num".into()); -+ // } -+ -+ // TODO: this logic will need to be checked for i32 -> f64 -+ // Github Issue #11 -+ -+ // let row_num = u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]) as usize; -+ // let col_num = u32::from_le_bytes([buffer[4], buffer[5], buffer[6], buffer[7]]) as usize; -+ -+ // let expected_size = 8 + (row_num * col_num * 4); -+ // if buffer.len() < expected_size { -+ // return Err("Buffer too short for sketch data".into()); -+ // } -+ -+ // let mut sketch = Vec::new(); -+ // let mut offset = 8; -+ -+ // for _ in 0..row_num { -+ // let mut row = Vec::new(); -+ // for _ in 0..col_num { -+ // let value = i32::from_le_bytes([ -+ // buffer[offset], -+ // buffer[offset + 1], -+ // buffer[offset + 2], -+ // buffer[offset + 3], -+ // ]); -+ // row.push(value); -+ // offset += 4; -+ // } -+ // sketch.push(row); -+ // } -+ -+ // Ok(Self { -+ // row_num, -+ // col_num, -+ // sketch, -+ // }) -+ // } - } - - impl SerializableToSink for CountMinSketchAccumulator { -@@ -319,7 +322,7 @@ mod tests { - // Check all values are initialized to 0 - for row in &cms.sketch { - for &value in row { -- assert_eq!(value, 0); -+ assert_eq!(value, 0.0); - } - } - } -@@ -356,17 +359,17 @@ mod tests { - let mut cms2 = CountMinSketchAccumulator::new(2, 3); - - // Set some values -- cms1.sketch[0][0] = 5; -- cms1.sketch[1][2] = 10; -+ cms1.sketch[0][0] = 5.0; -+ cms1.sketch[1][2] = 10.0; - -- cms2.sketch[0][0] = 3; -- cms2.sketch[0][1] = 7; -+ cms2.sketch[0][0] = 3.0; -+ cms2.sketch[0][1] = 7.0; - - let merged = CountMinSketchAccumulator::merge_accumulators(vec![cms1, cms2]).unwrap(); - -- assert_eq!(merged.sketch[0][0], 8); // 5 + 3 -- assert_eq!(merged.sketch[0][1], 7); // 0 + 7 -- assert_eq!(merged.sketch[1][2], 10); // 10 + 0 -+ assert_eq!(merged.sketch[0][0], 8.0); // 5 + 3 -+ assert_eq!(merged.sketch[0][1], 7.0); // 0 + 7 -+ assert_eq!(merged.sketch[1][2], 10.0); // 10 + 0 - } - - #[test] -@@ -378,30 +381,30 @@ mod tests { - assert!(result.is_err()); - } - -- #[test] -- fn test_count_min_sketch_serialization() { -- let mut cms = CountMinSketchAccumulator::new(2, 3); -- cms.sketch[0][1] = 42; -- cms.sketch[1][2] = 100; -- -- // Test JSON serialization -- let json_value = cms.serialize_to_json(); -- let deserialized = CountMinSketchAccumulator::deserialize_from_json(&json_value).unwrap(); -- -- assert_eq!(deserialized.row_num, 2); -- assert_eq!(deserialized.col_num, 3); -- assert_eq!(deserialized.sketch[0][1], 42); -- assert_eq!(deserialized.sketch[1][2], 100); -- -- // Test binary serialization -- let bytes = cms.serialize_to_bytes(); -- let deserialized_bytes = CountMinSketchAccumulator::deserialize_from_bytes(&bytes).unwrap(); -- -- assert_eq!(deserialized_bytes.row_num, 2); -- assert_eq!(deserialized_bytes.col_num, 3); -- assert_eq!(deserialized_bytes.sketch[0][1], 42); -- assert_eq!(deserialized_bytes.sketch[1][2], 100); -- } -+ // #[test] -+ // fn test_count_min_sketch_serialization() { -+ // let mut cms = CountMinSketchAccumulator::new(2, 3); -+ // cms.sketch[0][1] = 42.0; -+ // cms.sketch[1][2] = 100.0; -+ -+ // // Test JSON serialization -+ // let json_value = cms.serialize_to_json(); -+ // let deserialized = CountMinSketchAccumulator::deserialize_from_json(&json_value).unwrap(); -+ -+ // assert_eq!(deserialized.row_num, 2); -+ // assert_eq!(deserialized.col_num, 3); -+ // assert_eq!(deserialized.sketch[0][1], 42.0); -+ // assert_eq!(deserialized.sketch[1][2], 100.0); -+ -+ // // Test binary serialization -+ // let bytes = cms.serialize_to_bytes(); -+ // let deserialized_bytes = CountMinSketchAccumulator::deserialize_from_bytes(&bytes).unwrap(); -+ -+ // assert_eq!(deserialized_bytes.row_num, 2); -+ // assert_eq!(deserialized_bytes.col_num, 3); -+ // assert_eq!(deserialized_bytes.sketch[0][1], 42.0); -+ // assert_eq!(deserialized_bytes.sketch[1][2], 100.0); -+ // } - - #[test] - fn test_count_min_sketch_as_aggregate_core() { -diff --git a/src/precompute_operators/datasketches_kll_accumulator.rs b/src/precompute_operators/datasketches_kll_accumulator.rs -index c72b700..3635f8e 100644 ---- a/src/precompute_operators/datasketches_kll_accumulator.rs -+++ b/src/precompute_operators/datasketches_kll_accumulator.rs -@@ -68,62 +68,62 @@ impl DatasketchesKLLAccumulator { - sorted_values[index] - } - -- pub fn deserialize_from_json(data: &Value) -> Result> { -- let max_capacity = data["max_capacity"].as_u64().unwrap_or(1000) as usize; -- -- let values = if let Some(values_array) = data["values"].as_array() { -- values_array -- .iter() -- .map(|v| v.as_f64().unwrap_or(0.0)) -- .collect() -- } else { -- Vec::new() -- }; -- -- Ok(Self { -- values, -- max_capacity, -- }) -- } -- -- pub fn deserialize_from_bytes(buffer: &[u8]) -> Result> { -- if buffer.len() < 8 { -- return Err("Buffer too short for max_capacity and values_count".into()); -- } -- -- let max_capacity = -- u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]) as usize; -- let values_count = -- u32::from_le_bytes([buffer[4], buffer[5], buffer[6], buffer[7]]) as usize; -- -- let expected_size = 8 + (values_count * 8); -- if buffer.len() < expected_size { -- return Err("Buffer too short for values data".into()); -- } -- -- let mut values = Vec::new(); -- let mut offset = 8; -- -- for _ in 0..values_count { -- let value = f64::from_le_bytes([ -- buffer[offset], -- buffer[offset + 1], -- buffer[offset + 2], -- buffer[offset + 3], -- buffer[offset + 4], -- buffer[offset + 5], -- buffer[offset + 6], -- buffer[offset + 7], -- ]); -- values.push(value); -- offset += 8; -- } -- -- Ok(Self { -- values, -- max_capacity, -- }) -- } -+ // pub fn deserialize_from_json(data: &Value) -> Result> { -+ // let max_capacity = data["max_capacity"].as_u64().unwrap_or(1000) as usize; -+ -+ // let values = if let Some(values_array) = data["values"].as_array() { -+ // values_array -+ // .iter() -+ // .map(|v| v.as_f64().unwrap_or(0.0)) -+ // .collect() -+ // } else { -+ // Vec::new() -+ // }; -+ -+ // Ok(Self { -+ // values, -+ // max_capacity, -+ // }) -+ // } -+ -+ // pub fn deserialize_from_bytes(buffer: &[u8]) -> Result> { -+ // if buffer.len() < 8 { -+ // return Err("Buffer too short for max_capacity and values_count".into()); -+ // } -+ -+ // let max_capacity = -+ // u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]) as usize; -+ // let values_count = -+ // u32::from_le_bytes([buffer[4], buffer[5], buffer[6], buffer[7]]) as usize; -+ -+ // let expected_size = 8 + (values_count * 8); -+ // if buffer.len() < expected_size { -+ // return Err("Buffer too short for values data".into()); -+ // } -+ -+ // let mut values = Vec::new(); -+ // let mut offset = 8; -+ -+ // for _ in 0..values_count { -+ // let value = f64::from_le_bytes([ -+ // buffer[offset], -+ // buffer[offset + 1], -+ // buffer[offset + 2], -+ // buffer[offset + 3], -+ // buffer[offset + 4], -+ // buffer[offset + 5], -+ // buffer[offset + 6], -+ // buffer[offset + 7], -+ // ]); -+ // values.push(value); -+ // offset += 8; -+ // } -+ -+ // Ok(Self { -+ // values, -+ // max_capacity, -+ // }) -+ // } - - pub fn deserialize_from_bytes_arroyo( - buffer: &[u8], -diff --git a/src/precompute_operators/delta_set_aggregator_accumulator.rs b/src/precompute_operators/delta_set_aggregator_accumulator.rs -index 2248748..46a853b 100644 ---- a/src/precompute_operators/delta_set_aggregator_accumulator.rs -+++ b/src/precompute_operators/delta_set_aggregator_accumulator.rs -@@ -43,114 +43,114 @@ impl DeltaSetAggregatorAccumulator { - self.removed.insert(key); - } - -- pub fn deserialize_from_json(data: &Value) -> Result> { -- let mut added = HashSet::new(); -- let mut removed = HashSet::new(); -- -- if let Some(added_array) = data["added"].as_array() { -- for item in added_array { -- // Handle nested structure with "values" key -- let key_data = if let Some(values) = item.get("values") { -- values -- } else { -- item -- }; -- let key = KeyByLabelValues::deserialize_from_json(key_data)?; -- added.insert(key); -- } -- } -- -- if let Some(removed_array) = data["removed"].as_array() { -- for item in removed_array { -- // Handle nested structure with "values" key -- let key_data = if let Some(values) = item.get("values") { -- values -- } else { -- item -- }; -- let key = KeyByLabelValues::deserialize_from_json(key_data)?; -- removed.insert(key); -- } -- } -- -- Ok(Self { added, removed }) -- } -- -- pub fn deserialize_from_bytes(buffer: &[u8]) -> Result> { -- let mut offset = 0; -- let mut added = HashSet::new(); -- let mut removed = HashSet::new(); -- -- // Read added set -- if offset + 4 > buffer.len() { -- return Err("Buffer too short for added set size".into()); -- } -- let added_size = u32::from_le_bytes([ -- buffer[offset], -- buffer[offset + 1], -- buffer[offset + 2], -- buffer[offset + 3], -- ]) as usize; -- offset += 4; -- -- for _ in 0..added_size { -- if offset + 4 > buffer.len() { -- return Err("Buffer too short for added item size".into()); -- } -- let item_size = u32::from_le_bytes([ -- buffer[offset], -- buffer[offset + 1], -- buffer[offset + 2], -- buffer[offset + 3], -- ]) as usize; -- offset += 4; -- -- if offset + item_size > buffer.len() { -- return Err("Buffer too short for added item data".into()); -- } -- let key = -- KeyByLabelValues::deserialize_from_bytes(&buffer[offset..offset + item_size])?; -- offset += item_size; -- -- added.insert(key); -- } -- -- // Read removed set -- if offset + 4 > buffer.len() { -- return Err("Buffer too short for removed set size".into()); -- } -- let removed_size = u32::from_le_bytes([ -- buffer[offset], -- buffer[offset + 1], -- buffer[offset + 2], -- buffer[offset + 3], -- ]) as usize; -- offset += 4; -- -- for _ in 0..removed_size { -- if offset + 4 > buffer.len() { -- return Err("Buffer too short for removed item size".into()); -- } -- let item_size = u32::from_le_bytes([ -- buffer[offset], -- buffer[offset + 1], -- buffer[offset + 2], -- buffer[offset + 3], -- ]) as usize; -- offset += 4; -- -- if offset + item_size > buffer.len() { -- return Err("Buffer too short for removed item data".into()); -- } -- let key = -- KeyByLabelValues::deserialize_from_bytes(&buffer[offset..offset + item_size])?; -- offset += item_size; -- -- removed.insert(key); -- } -- -- Ok(Self { added, removed }) -- } -+ // pub fn deserialize_from_json(data: &Value) -> Result> { -+ // let mut added = HashSet::new(); -+ // let mut removed = HashSet::new(); -+ -+ // if let Some(added_array) = data["added"].as_array() { -+ // for item in added_array { -+ // // Handle nested structure with "values" key -+ // let key_data = if let Some(values) = item.get("values") { -+ // values -+ // } else { -+ // item -+ // }; -+ // let key = KeyByLabelValues::deserialize_from_json(key_data)?; -+ // added.insert(key); -+ // } -+ // } -+ -+ // if let Some(removed_array) = data["removed"].as_array() { -+ // for item in removed_array { -+ // // Handle nested structure with "values" key -+ // let key_data = if let Some(values) = item.get("values") { -+ // values -+ // } else { -+ // item -+ // }; -+ // let key = KeyByLabelValues::deserialize_from_json(key_data)?; -+ // removed.insert(key); -+ // } -+ // } -+ -+ // Ok(Self { added, removed }) -+ // } -+ -+ // pub fn deserialize_from_bytes(buffer: &[u8]) -> Result> { -+ // let mut offset = 0; -+ // let mut added = HashSet::new(); -+ // let mut removed = HashSet::new(); -+ -+ // // Read added set -+ // if offset + 4 > buffer.len() { -+ // return Err("Buffer too short for added set size".into()); -+ // } -+ // let added_size = u32::from_le_bytes([ -+ // buffer[offset], -+ // buffer[offset + 1], -+ // buffer[offset + 2], -+ // buffer[offset + 3], -+ // ]) as usize; -+ // offset += 4; -+ -+ // for _ in 0..added_size { -+ // if offset + 4 > buffer.len() { -+ // return Err("Buffer too short for added item size".into()); -+ // } -+ // let item_size = u32::from_le_bytes([ -+ // buffer[offset], -+ // buffer[offset + 1], -+ // buffer[offset + 2], -+ // buffer[offset + 3], -+ // ]) as usize; -+ // offset += 4; -+ -+ // if offset + item_size > buffer.len() { -+ // return Err("Buffer too short for added item data".into()); -+ // } -+ // let key = -+ // KeyByLabelValues::deserialize_from_bytes(&buffer[offset..offset + item_size])?; -+ // offset += item_size; -+ -+ // added.insert(key); -+ // } -+ -+ // // Read removed set -+ // if offset + 4 > buffer.len() { -+ // return Err("Buffer too short for removed set size".into()); -+ // } -+ // let removed_size = u32::from_le_bytes([ -+ // buffer[offset], -+ // buffer[offset + 1], -+ // buffer[offset + 2], -+ // buffer[offset + 3], -+ // ]) as usize; -+ // offset += 4; -+ -+ // for _ in 0..removed_size { -+ // if offset + 4 > buffer.len() { -+ // return Err("Buffer too short for removed item size".into()); -+ // } -+ // let item_size = u32::from_le_bytes([ -+ // buffer[offset], -+ // buffer[offset + 1], -+ // buffer[offset + 2], -+ // buffer[offset + 3], -+ // ]) as usize; -+ // offset += 4; -+ -+ // if offset + item_size > buffer.len() { -+ // return Err("Buffer too short for removed item data".into()); -+ // } -+ // let key = -+ // KeyByLabelValues::deserialize_from_bytes(&buffer[offset..offset + item_size])?; -+ // offset += item_size; -+ -+ // removed.insert(key); -+ // } -+ -+ // Ok(Self { added, removed }) -+ // } - - pub fn deserialize_from_bytes_arroyo( - buffer: &[u8], -@@ -412,36 +412,35 @@ mod tests { - } - - #[test] -- fn test_delta_set_aggregator_serialization() { -- let mut acc = DeltaSetAggregatorAccumulator::new(); -- -- let key1 = create_test_key("web"); -- let key2 = create_test_key("api"); -- -- acc.add_key(key1.clone()); -- acc.remove_key(key2.clone()); -- -- // Test JSON serialization -- let json_value = acc.serialize_to_json(); -- let deserialized = -- DeltaSetAggregatorAccumulator::deserialize_from_json(&json_value).unwrap(); -- -- assert_eq!(deserialized.added.len(), 1); -- assert_eq!(deserialized.removed.len(), 1); -- assert!(deserialized.added.contains(&key1)); -- assert!(deserialized.removed.contains(&key2)); -- -- // Test binary serialization -- let bytes = acc.serialize_to_bytes(); -- let deserialized_bytes = -- DeltaSetAggregatorAccumulator::deserialize_from_bytes(&bytes).unwrap(); -- -- assert_eq!(deserialized_bytes.added.len(), 1); -- assert_eq!(deserialized_bytes.removed.len(), 1); -- assert!(deserialized_bytes.added.contains(&key1)); -- assert!(deserialized_bytes.removed.contains(&key2)); -- } -- -+ // fn test_delta_set_aggregator_serialization() { -+ // let mut acc = DeltaSetAggregatorAccumulator::new(); -+ -+ // let key1 = create_test_key("web"); -+ // let key2 = create_test_key("api"); -+ -+ // acc.add_key(key1.clone()); -+ // acc.remove_key(key2.clone()); -+ -+ // // Test JSON serialization -+ // let json_value = acc.serialize_to_json(); -+ // let deserialized = -+ // DeltaSetAggregatorAccumulator::deserialize_from_json(&json_value).unwrap(); -+ -+ // assert_eq!(deserialized.added.len(), 1); -+ // assert_eq!(deserialized.removed.len(), 1); -+ // assert!(deserialized.added.contains(&key1)); -+ // assert!(deserialized.removed.contains(&key2)); -+ -+ // // Test binary serialization -+ // let bytes = acc.serialize_to_bytes(); -+ // let deserialized_bytes = -+ // DeltaSetAggregatorAccumulator::deserialize_from_bytes(&bytes).unwrap(); -+ -+ // assert_eq!(deserialized_bytes.added.len(), 1); -+ // assert_eq!(deserialized_bytes.removed.len(), 1); -+ // assert!(deserialized_bytes.added.contains(&key1)); -+ // assert!(deserialized_bytes.removed.contains(&key2)); -+ // } - #[test] - fn test_delta_set_aggregator_query() { - let acc = DeltaSetAggregatorAccumulator::new(); diff --git a/asap-query-engine/src/data_model/enums.rs b/asap-query-engine/src/data_model/enums.rs index 92c0d13..5b0fc72 100644 --- a/asap-query-engine/src/data_model/enums.rs +++ b/asap-query-engine/src/data_model/enums.rs @@ -4,12 +4,6 @@ pub enum InputFormat { Byte, } -#[derive(clap::ValueEnum, Clone, Debug)] -pub enum StreamingEngine { - Flink, - Arroyo, -} - pub use sketch_db_common::enums::{CleanupPolicy, QueryLanguage}; #[derive(clap::ValueEnum, Clone, Debug, PartialEq)] diff --git a/asap-query-engine/src/data_model/precomputed_output.rs b/asap-query-engine/src/data_model/precomputed_output.rs index c5ebab5..910f357 100644 --- a/asap-query-engine/src/data_model/precomputed_output.rs +++ b/asap-query-engine/src/data_model/precomputed_output.rs @@ -138,56 +138,6 @@ impl PrecomputedOutput { // }) // } - // /// Deserialization for Flink streaming engine - // pub fn deserialize_from_json_flink( - // data: &serde_json::Value, - // streaming_config: &HashMap, - // ) -> Result> { - // let aggregation_id = data - // .get("aggregation_id") - // .and_then(|v| v.as_u64()) - // .ok_or("Missing or invalid 'aggregation_id' field")?; - - // let start_timestamp = data - // .get("start_timestamp") - // .and_then(|v| v.as_u64()) - // .ok_or("Missing or invalid 'start_timestamp' field")?; - - // let end_timestamp = data - // .get("end_timestamp") - // .and_then(|v| v.as_u64()) - // .ok_or("Missing or invalid 'end_timestamp' field")?; - - // let key = if let Some(key_data) = data.get("key") { - // if key_data.is_null() { - // None - // } else { - // Some(KeyByLabelValues::deserialize_from_json(key_data).map_err( - // |e| -> Box { - // format!("Failed to deserialize key: {e}").into() - // }, - // )?) - // } - // } else { - // None - // }; - - // // Get aggregation type from streaming config lookup - // let config = streaming_config - // .get(&aggregation_id) - // .ok_or_else(|| { - // format!("Aggregation ID {aggregation_id} not found in streaming config") - // })? - // .clone(); - - // Ok(Self { - // start_timestamp, - // end_timestamp, - // key, - // config, - // }) - // } - /// Deserialization for Arroyo streaming engine pub fn deserialize_from_json_arroyo( data: &serde_json::Value, @@ -283,7 +233,6 @@ impl PrecomputedOutput { let precompute = Self::create_precompute_from_bytes( &config.aggregation_type, Vec::as_slice(&precompute_bytes), - "arroyo", )?; Ok((precomputed_output, precompute)) @@ -415,22 +364,14 @@ impl PrecomputedOutput { fn create_precompute_from_bytes( precompute_type: &str, buffer: &[u8], - streaming_engine: &str, ) -> Result, Box> { use crate::precompute_operators::*; - // TODO: add arroyo methods in each operator - // TODO: remove flink methods - match precompute_type { "Sum" | "sum" => { - let accumulator = if streaming_engine == "flink" { - SumAccumulator::deserialize_from_bytes(buffer) - } else { - SumAccumulator::deserialize_from_bytes_arroyo(buffer) - } - .map_err(|e| format!("Failed to deserialize SumAccumulator: {e}"))?; + let accumulator = SumAccumulator::deserialize_from_bytes_arroyo(buffer) + .map_err(|e| format!("Failed to deserialize SumAccumulator: {e}"))?; Ok(Box::new(accumulator)) } "MinMax" => { @@ -457,59 +398,50 @@ impl PrecomputedOutput { Ok(Box::new(accumulator)) } "MultipleIncrease" => { - let accumulator = if streaming_engine == "flink" { - MultipleIncreaseAccumulator::deserialize_from_bytes(buffer) - } else { + let accumulator = MultipleIncreaseAccumulator::deserialize_from_bytes_arroyo(buffer) - } - .map_err(|e| format!("Failed to deserialize MultipleIncreaseAccumulator: {e}"))?; + .map_err(|e| { + format!("Failed to deserialize MultipleIncreaseAccumulator: {e}") + })?; Ok(Box::new(accumulator)) } "CountMinSketch" => { - let accumulator = if streaming_engine == "flink" { - CountMinSketchAccumulator::deserialize_from_bytes(buffer) - } else { - CountMinSketchAccumulator::deserialize_from_bytes_arroyo(buffer) - } - .map_err(|e| format!("Failed to deserialize CountMinSketchAccumulator: {e}"))?; + let accumulator = CountMinSketchAccumulator::deserialize_from_bytes_arroyo(buffer) + .map_err(|e| format!("Failed to deserialize CountMinSketchAccumulator: {e}"))?; Ok(Box::new(accumulator)) } "CountMinSketchWithHeap" => { - let accumulator = if streaming_engine == "flink" { - CountMinSketchWithHeapAccumulator::deserialize_from_bytes(buffer) - } else { + let accumulator = CountMinSketchWithHeapAccumulator::deserialize_from_bytes_arroyo(buffer) - } - .map_err(|e| { - format!("Failed to deserialize CountMinSketchWithHeapAccumulator: {e}") - })?; + .map_err(|e| { + format!( + "Failed to deserialize CountMinSketchWithHeapAccumulator: {e}" + ) + })?; Ok(Box::new(accumulator)) } "DatasketchesKLL" => { - let accumulator = if streaming_engine == "flink" { - DatasketchesKLLAccumulator::deserialize_from_bytes(buffer) - } else { + let accumulator = DatasketchesKLLAccumulator::deserialize_from_bytes_arroyo(buffer) - } - .map_err(|e| format!("Failed to deserialize DatasketchesKLLAccumulator: {e}"))?; + .map_err(|e| { + format!("Failed to deserialize DatasketchesKLLAccumulator: {e}") + })?; Ok(Box::new(accumulator)) } "HydraKLL" => { - let accumulator = if streaming_engine == "flink" { - return Err("HydraKLL not supported for Flink".into()); - } else { + let accumulator = HydraKllSketchAccumulator::deserialize_from_bytes_arroyo(buffer) - } - .map_err(|e| format!("Failed to deserialize HydraKllSketchAccumulator: {e}"))?; + .map_err(|e| { + format!("Failed to deserialize HydraKllSketchAccumulator: {e}") + })?; Ok(Box::new(accumulator)) } "DeltaSetAggregator" => { - let accumulator = if streaming_engine == "flink" { - DeltaSetAggregatorAccumulator::deserialize_from_bytes(buffer) - } else { + let accumulator = DeltaSetAggregatorAccumulator::deserialize_from_bytes_arroyo(buffer) - } - .map_err(|e| format!("Failed to deserialize DeltaSetAggregatorAccumulator: {e}"))?; + .map_err(|e| { + format!("Failed to deserialize DeltaSetAggregatorAccumulator: {e}") + })?; Ok(Box::new(accumulator)) } _ => Err(format!("Unknown precompute type: {precompute_type}").into()), diff --git a/asap-query-engine/src/drivers/ingest/kafka.rs b/asap-query-engine/src/drivers/ingest/kafka.rs index e2b9f03..a65a81f 100644 --- a/asap-query-engine/src/drivers/ingest/kafka.rs +++ b/asap-query-engine/src/drivers/ingest/kafka.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tracing::{debug, error, info, warn}; -use crate::data_model::enums::{InputFormat, StreamingEngine}; +use crate::data_model::enums::InputFormat; use crate::data_model::traits::SerializableToSink; use crate::data_model::PrecomputedOutput; use crate::data_model::StreamingConfig; @@ -22,7 +22,6 @@ pub struct KafkaConsumerConfig { pub decompress_json: bool, pub batch_size: usize, pub poll_timeout_ms: u64, - pub streaming_engine: StreamingEngine, pub dump_precomputes: bool, pub dump_output_dir: Option, } @@ -273,155 +272,49 @@ impl KafkaConsumer { Err("Binary input format with precompute not implemented".into()) } InputFormat::Json => { - // Handle streaming engine specific logic - match self.config.streaming_engine { - StreamingEngine::Flink => { - // debug!("Received message of length: {}", payload.len()); - - // let json_data = if self.config.decompress_json { - // // Decompress using gzip - // let mut decoder = GzDecoder::new(payload); - // let mut decompressed = Vec::new(); - // match decoder.read_to_end(&mut decompressed) { - // Ok(_) => { - // debug!( - // "Decompressed JSON message of length: {}", - // decompressed.len() - // ); - // decompressed - // } - // Err(e) => { - // error!("Error decompressing gzip data: {}", e); - // return Err(format!("Gzip decompression error: {e}").into()); - // } - // } - // } else { - // payload.to_vec() - // }; - - // let json_str = match String::from_utf8(json_data) { - // Ok(s) => s, - // Err(e) => { - // error!("Error converting bytes to UTF-8: {}", e); - // return Err(format!("UTF-8 conversion error: {e}").into()); - // } - // }; - - // let json_parse_start_time = Instant::now(); - - // let json_dict: serde_json::Value = match serde_json::from_str(&json_str) { - // Ok(dict) => { - // let json_parse_duration = json_parse_start_time.elapsed(); - // debug!( - // "JSON parsing took: {:.2}ms", - // json_parse_duration.as_secs_f64() * 1000.0 - // ); - // dict - // } - // Err(e) => { - // error!("Error parsing JSON: {}", e); - // debug!("JSON content: {}", json_str); - // return Err(format!("JSON parsing error: {e}").into()); - // } - // }; - - // debug!( - // "Deserializing JSON message: {}, {}, {}", - // json_dict - // .get("aggregation_id") - // .and_then(|v| v.as_u64()) - // .unwrap_or(0), - // json_dict - // .get("start_timestamp") - // .and_then(|v| v.as_u64()) - // .unwrap_or(0), - // json_dict - // .get("end_timestamp") - // .and_then(|v| v.as_u64()) - // .unwrap_or(0) - // ); - - // let deserialize_start_time = Instant::now(); - - // match PrecomputedOutput::deserialize_from_json_with_precompute(&json_dict) { - // Ok((output, precompute)) => { - // let deserialize_duration = deserialize_start_time.elapsed(); - // debug!( - // "Deserialization took: {:.2}ms", - // deserialize_duration.as_secs_f64() * 1000.0 - // ); - // debug!( - // "Deserialized item: {}, {}, {}", - // output.config.aggregation_id, - // output.start_timestamp, - // output.end_timestamp - // ); - // debug!("Successfully deserialized Flink JSON message with precompute data"); - // let total_message_duration = message_start_time.elapsed(); - // debug!( - // "Total message processing took: {:.2}ms", - // total_message_duration.as_secs_f64() * 1000.0 - // ); - // Ok(Some((output, precompute))) - // } - // Err(e) => { - // error!( - // "Error deserializing Flink PrecomputedOutput from JSON with precompute: {}", - // e - // ); - // debug!("JSON content: {}", json_str); - // Err(e) - // } - // } - error!("Flink input format with precompute not implemented"); - Err("Flink input format with precompute not implemented".into()) + // Arroyo messages - gzip decompression is applied at precompute level, not message level + let json_str = match String::from_utf8(payload.to_vec()) { + Ok(s) => s, + Err(e) => { + error!("Error converting bytes to UTF-8: {}", e); + return Err(format!("UTF-8 conversion error: {e}").into()); } - StreamingEngine::Arroyo => { - // Arroyo messages - gzip decompression is applied at precompute level, not message level - let json_str = match String::from_utf8(payload.to_vec()) { - Ok(s) => s, - Err(e) => { - error!("Error converting bytes to UTF-8: {}", e); - return Err(format!("UTF-8 conversion error: {e}").into()); - } - }; + }; - let json_dict: serde_json::Value = match serde_json::from_str(&json_str) { - Ok(dict) => dict, - Err(e) => { - error!("Error parsing Arroyo JSON: {}", e); - debug!("JSON content: {}", json_str); - return Err(format!("JSON parsing error: {e}").into()); - } - }; - - let deserialize_start_time = Instant::now(); - match PrecomputedOutput::deserialize_from_json_arroyo( - &json_dict, - &self.streaming_config, - ) { - Ok((output, precompute)) => { - let deserialize_duration = deserialize_start_time.elapsed(); - debug!( - "Arroyo deserialization took: {:.2}ms", - deserialize_duration.as_secs_f64() * 1000.0 - ); - debug!("Successfully deserialized Arroyo JSON message with precompute data"); - let total_message_duration = message_start_time.elapsed(); - debug!( - "Total Arroyo message processing took: {:.2}ms", - total_message_duration.as_secs_f64() * 1000.0 - ); - Ok(Some((output, precompute))) - } - Err(e) => { - error!( - "Error deserializing Arroyo PrecomputedOutput from JSON with precompute: {e}" - ); - debug!("JSON content: {}", json_str); - Err(e) - } - } + let json_dict: serde_json::Value = match serde_json::from_str(&json_str) { + Ok(dict) => dict, + Err(e) => { + error!("Error parsing Arroyo JSON: {}", e); + debug!("JSON content: {}", json_str); + return Err(format!("JSON parsing error: {e}").into()); + } + }; + + let deserialize_start_time = Instant::now(); + match PrecomputedOutput::deserialize_from_json_arroyo( + &json_dict, + &self.streaming_config, + ) { + Ok((output, precompute)) => { + let deserialize_duration = deserialize_start_time.elapsed(); + debug!( + "Arroyo deserialization took: {:.2}ms", + deserialize_duration.as_secs_f64() * 1000.0 + ); + debug!("Successfully deserialized Arroyo JSON message with precompute data"); + let total_message_duration = message_start_time.elapsed(); + debug!( + "Total Arroyo message processing took: {:.2}ms", + total_message_duration.as_secs_f64() * 1000.0 + ); + Ok(Some((output, precompute))) + } + Err(e) => { + error!( + "Error deserializing Arroyo PrecomputedOutput from JSON with precompute: {e}" + ); + debug!("JSON content: {}", json_str); + Err(e) } } } diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index 0cc5d95..9af1779 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use tokio::signal; use tracing::{error, info}; -use query_engine_rust::data_model::enums::{InputFormat, LockStrategy, StreamingEngine}; +use query_engine_rust::data_model::enums::{InputFormat, LockStrategy}; use query_engine_rust::drivers::AdapterConfig; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::{ @@ -32,10 +32,6 @@ struct Args { #[arg(long)] streaming_config: String, - /// Streaming engine to use - #[arg(long, value_enum)] - streaming_engine: StreamingEngine, - /// Prometheus scrape interval in seconds #[arg(long)] prometheus_scrape_interval: u64, @@ -191,7 +187,6 @@ async fn main() -> Result<()> { decompress_json: args.decompress_json, batch_size: 1000, poll_timeout_ms: 1000, - streaming_engine: args.streaming_engine.clone(), dump_precomputes: args.dump_precomputes, dump_output_dir: if args.dump_precomputes { Some(args.output_dir.clone()) diff --git a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs index 78e6ab0..b074fad 100644 --- a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs @@ -42,7 +42,6 @@ impl DatasketchesKLLAccumulator { .decode(sketch_b64) .map_err(|e| format!("Failed to decode base64 sketch data: {e}"))?; - // TODO: remove this hardcoding once FlinkSketch serializes k in its output Ok(Self { inner: KllSketch::from_dsrs_bytes(&sketch_bytes, 200)?, }) @@ -50,7 +49,6 @@ impl DatasketchesKLLAccumulator { pub fn deserialize_from_bytes(buffer: &[u8]) -> Result> { // Mirror Python implementation: deserialize sketch directly from bytes - // TODO: remove this hardcoding once FlinkSketch serializes k in its output Ok(Self { inner: KllSketch::from_dsrs_bytes(buffer, 200)?, }) diff --git a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs index d08cbdc..42a9489 100644 --- a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs @@ -30,8 +30,7 @@ impl HydraKllSketchAccumulator { } pub fn deserialize_from_bytes(_buffer: &[u8]) -> Result> { - // HydraKLLSketch is only used with Arroyo, not Flink - Err("deserialize_from_bytes for HydraKllSketchAccumulator not implemented for Flink".into()) + Err("deserialize_from_bytes for HydraKllSketchAccumulator not implemented".into()) } pub fn deserialize_from_bytes_arroyo( diff --git a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs index 80cc628..6f38007 100644 --- a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs @@ -129,7 +129,7 @@ impl SerializableToSink for SetAggregatorAccumulator { } fn serialize_to_bytes(&self) -> Vec { - // Legacy binary format (Flink); matches deserialize_from_bytes(). + // Legacy binary format; matches deserialize_from_bytes(). let mut buffer = Vec::new(); buffer.extend_from_slice(&(self.added.len() as u32).to_le_bytes()); for key in &self.added { From 2a99777705a75e3d84f4a2401b5b9881a2c7bc5e Mon Sep 17 00:00:00 2001 From: zz_y Date: Wed, 11 Mar 2026 16:59:21 -0500 Subject: [PATCH 2/2] Restore StreamingEngine type with only Arroyo variant Keep StreamingEngine enum and --streaming-engine CLI arg but with only the Arroyo variant (Flink variant removed). Also fix cargo fmt issues. Co-Authored-By: Claude Sonnet 4.6 --- asap-query-engine/src/data_model/enums.rs | 5 +++ .../src/data_model/precomputed_output.rs | 38 ++++++++----------- asap-query-engine/src/drivers/ingest/kafka.rs | 7 +++- asap-query-engine/src/main.rs | 7 +++- 4 files changed, 31 insertions(+), 26 deletions(-) diff --git a/asap-query-engine/src/data_model/enums.rs b/asap-query-engine/src/data_model/enums.rs index 5b0fc72..bec8a23 100644 --- a/asap-query-engine/src/data_model/enums.rs +++ b/asap-query-engine/src/data_model/enums.rs @@ -4,6 +4,11 @@ pub enum InputFormat { Byte, } +#[derive(clap::ValueEnum, Clone, Debug)] +pub enum StreamingEngine { + Arroyo, +} + pub use sketch_db_common::enums::{CleanupPolicy, QueryLanguage}; #[derive(clap::ValueEnum, Clone, Debug, PartialEq)] diff --git a/asap-query-engine/src/data_model/precomputed_output.rs b/asap-query-engine/src/data_model/precomputed_output.rs index 910f357..9bd1e6c 100644 --- a/asap-query-engine/src/data_model/precomputed_output.rs +++ b/asap-query-engine/src/data_model/precomputed_output.rs @@ -398,11 +398,10 @@ impl PrecomputedOutput { Ok(Box::new(accumulator)) } "MultipleIncrease" => { - let accumulator = - MultipleIncreaseAccumulator::deserialize_from_bytes_arroyo(buffer) - .map_err(|e| { - format!("Failed to deserialize MultipleIncreaseAccumulator: {e}") - })?; + let accumulator = MultipleIncreaseAccumulator::deserialize_from_bytes_arroyo( + buffer, + ) + .map_err(|e| format!("Failed to deserialize MultipleIncreaseAccumulator: {e}"))?; Ok(Box::new(accumulator)) } "CountMinSketch" => { @@ -414,34 +413,27 @@ impl PrecomputedOutput { let accumulator = CountMinSketchWithHeapAccumulator::deserialize_from_bytes_arroyo(buffer) .map_err(|e| { - format!( - "Failed to deserialize CountMinSketchWithHeapAccumulator: {e}" - ) + format!("Failed to deserialize CountMinSketchWithHeapAccumulator: {e}") })?; Ok(Box::new(accumulator)) } "DatasketchesKLL" => { - let accumulator = - DatasketchesKLLAccumulator::deserialize_from_bytes_arroyo(buffer) - .map_err(|e| { - format!("Failed to deserialize DatasketchesKLLAccumulator: {e}") - })?; + let accumulator = DatasketchesKLLAccumulator::deserialize_from_bytes_arroyo(buffer) + .map_err(|e| { + format!("Failed to deserialize DatasketchesKLLAccumulator: {e}") + })?; Ok(Box::new(accumulator)) } "HydraKLL" => { - let accumulator = - HydraKllSketchAccumulator::deserialize_from_bytes_arroyo(buffer) - .map_err(|e| { - format!("Failed to deserialize HydraKllSketchAccumulator: {e}") - })?; + let accumulator = HydraKllSketchAccumulator::deserialize_from_bytes_arroyo(buffer) + .map_err(|e| format!("Failed to deserialize HydraKllSketchAccumulator: {e}"))?; Ok(Box::new(accumulator)) } "DeltaSetAggregator" => { - let accumulator = - DeltaSetAggregatorAccumulator::deserialize_from_bytes_arroyo(buffer) - .map_err(|e| { - format!("Failed to deserialize DeltaSetAggregatorAccumulator: {e}") - })?; + let accumulator = DeltaSetAggregatorAccumulator::deserialize_from_bytes_arroyo( + buffer, + ) + .map_err(|e| format!("Failed to deserialize DeltaSetAggregatorAccumulator: {e}"))?; Ok(Box::new(accumulator)) } _ => Err(format!("Unknown precompute type: {precompute_type}").into()), diff --git a/asap-query-engine/src/drivers/ingest/kafka.rs b/asap-query-engine/src/drivers/ingest/kafka.rs index a65a81f..d9c3bf8 100644 --- a/asap-query-engine/src/drivers/ingest/kafka.rs +++ b/asap-query-engine/src/drivers/ingest/kafka.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tracing::{debug, error, info, warn}; -use crate::data_model::enums::InputFormat; +use crate::data_model::enums::{InputFormat, StreamingEngine}; use crate::data_model::traits::SerializableToSink; use crate::data_model::PrecomputedOutput; use crate::data_model::StreamingConfig; @@ -22,6 +22,7 @@ pub struct KafkaConsumerConfig { pub decompress_json: bool, pub batch_size: usize, pub poll_timeout_ms: u64, + pub streaming_engine: StreamingEngine, pub dump_precomputes: bool, pub dump_output_dir: Option, } @@ -301,7 +302,9 @@ impl KafkaConsumer { "Arroyo deserialization took: {:.2}ms", deserialize_duration.as_secs_f64() * 1000.0 ); - debug!("Successfully deserialized Arroyo JSON message with precompute data"); + debug!( + "Successfully deserialized Arroyo JSON message with precompute data" + ); let total_message_duration = message_start_time.elapsed(); debug!( "Total Arroyo message processing took: {:.2}ms", diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index 9af1779..0cc5d95 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use tokio::signal; use tracing::{error, info}; -use query_engine_rust::data_model::enums::{InputFormat, LockStrategy}; +use query_engine_rust::data_model::enums::{InputFormat, LockStrategy, StreamingEngine}; use query_engine_rust::drivers::AdapterConfig; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::{ @@ -32,6 +32,10 @@ struct Args { #[arg(long)] streaming_config: String, + /// Streaming engine to use + #[arg(long, value_enum)] + streaming_engine: StreamingEngine, + /// Prometheus scrape interval in seconds #[arg(long)] prometheus_scrape_interval: u64, @@ -187,6 +191,7 @@ async fn main() -> Result<()> { decompress_json: args.decompress_json, batch_size: 1000, poll_timeout_ms: 1000, + streaming_engine: args.streaming_engine.clone(), dump_precomputes: args.dump_precomputes, dump_output_dir: if args.dump_precomputes { Some(args.output_dir.clone())