Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,032 changes: 0 additions & 1,032 deletions asap-query-engine/src/commenting_out_flink_diff

This file was deleted.

1 change: 0 additions & 1 deletion asap-query-engine/src/data_model/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub enum InputFormat {

#[derive(clap::ValueEnum, Clone, Debug)]
pub enum StreamingEngine {
Flink,
Arroyo,
}

Expand Down
116 changes: 20 additions & 96 deletions asap-query-engine/src/data_model/precomputed_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,56 +138,6 @@ impl PrecomputedOutput {
// })
// }

// /// Deserialization for Flink streaming engine
// pub fn deserialize_from_json_flink(
// data: &serde_json::Value,
// streaming_config: &HashMap<u64, AggregationConfig>,
// ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
// let aggregation_id = data
// .get("aggregation_id")
// .and_then(|v| v.as_u64())
// .ok_or("Missing or invalid 'aggregation_id' field")?;

// let start_timestamp = data
// .get("start_timestamp")
// .and_then(|v| v.as_u64())
// .ok_or("Missing or invalid 'start_timestamp' field")?;

// let end_timestamp = data
// .get("end_timestamp")
// .and_then(|v| v.as_u64())
// .ok_or("Missing or invalid 'end_timestamp' field")?;

// let key = if let Some(key_data) = data.get("key") {
// if key_data.is_null() {
// None
// } else {
// Some(KeyByLabelValues::deserialize_from_json(key_data).map_err(
// |e| -> Box<dyn std::error::Error + Send + Sync> {
// format!("Failed to deserialize key: {e}").into()
// },
// )?)
// }
// } else {
// None
// };

// // Get aggregation type from streaming config lookup
// let config = streaming_config
// .get(&aggregation_id)
// .ok_or_else(|| {
// format!("Aggregation ID {aggregation_id} not found in streaming config")
// })?
// .clone();

// Ok(Self {
// start_timestamp,
// end_timestamp,
// key,
// config,
// })
// }

/// Deserialization for Arroyo streaming engine
pub fn deserialize_from_json_arroyo(
data: &serde_json::Value,
Expand Down Expand Up @@ -283,7 +233,6 @@ impl PrecomputedOutput {
let precompute = Self::create_precompute_from_bytes(
&config.aggregation_type,
Vec::as_slice(&precompute_bytes),
"arroyo",
)?;

Ok((precomputed_output, precompute))
Expand Down Expand Up @@ -415,22 +364,14 @@ impl PrecomputedOutput {
fn create_precompute_from_bytes(
precompute_type: &str,
buffer: &[u8],
streaming_engine: &str,
) -> Result<Box<dyn crate::data_model::AggregateCore>, Box<dyn std::error::Error + Send + Sync>>
{
use crate::precompute_operators::*;

// TODO: add arroyo methods in each operator
// TODO: remove flink methods

match precompute_type {
"Sum" | "sum" => {
let accumulator = if streaming_engine == "flink" {
SumAccumulator::deserialize_from_bytes(buffer)
} else {
SumAccumulator::deserialize_from_bytes_arroyo(buffer)
}
.map_err(|e| format!("Failed to deserialize SumAccumulator: {e}"))?;
let accumulator = SumAccumulator::deserialize_from_bytes_arroyo(buffer)
.map_err(|e| format!("Failed to deserialize SumAccumulator: {e}"))?;
Ok(Box::new(accumulator))
}
"MinMax" => {
Expand All @@ -457,58 +398,41 @@ impl PrecomputedOutput {
Ok(Box::new(accumulator))
}
"MultipleIncrease" => {
let accumulator = if streaming_engine == "flink" {
MultipleIncreaseAccumulator::deserialize_from_bytes(buffer)
} else {
MultipleIncreaseAccumulator::deserialize_from_bytes_arroyo(buffer)
}
let accumulator = MultipleIncreaseAccumulator::deserialize_from_bytes_arroyo(
buffer,
)
.map_err(|e| format!("Failed to deserialize MultipleIncreaseAccumulator: {e}"))?;
Ok(Box::new(accumulator))
}
"CountMinSketch" => {
let accumulator = if streaming_engine == "flink" {
CountMinSketchAccumulator::deserialize_from_bytes(buffer)
} else {
CountMinSketchAccumulator::deserialize_from_bytes_arroyo(buffer)
}
.map_err(|e| format!("Failed to deserialize CountMinSketchAccumulator: {e}"))?;
let accumulator = CountMinSketchAccumulator::deserialize_from_bytes_arroyo(buffer)
.map_err(|e| format!("Failed to deserialize CountMinSketchAccumulator: {e}"))?;
Ok(Box::new(accumulator))
}
"CountMinSketchWithHeap" => {
let accumulator = if streaming_engine == "flink" {
CountMinSketchWithHeapAccumulator::deserialize_from_bytes(buffer)
} else {
let accumulator =
CountMinSketchWithHeapAccumulator::deserialize_from_bytes_arroyo(buffer)
}
.map_err(|e| {
format!("Failed to deserialize CountMinSketchWithHeapAccumulator: {e}")
})?;
.map_err(|e| {
format!("Failed to deserialize CountMinSketchWithHeapAccumulator: {e}")
})?;
Ok(Box::new(accumulator))
}
"DatasketchesKLL" => {
let accumulator = if streaming_engine == "flink" {
DatasketchesKLLAccumulator::deserialize_from_bytes(buffer)
} else {
DatasketchesKLLAccumulator::deserialize_from_bytes_arroyo(buffer)
}
.map_err(|e| format!("Failed to deserialize DatasketchesKLLAccumulator: {e}"))?;
let accumulator = DatasketchesKLLAccumulator::deserialize_from_bytes_arroyo(buffer)
.map_err(|e| {
format!("Failed to deserialize DatasketchesKLLAccumulator: {e}")
})?;
Ok(Box::new(accumulator))
}
"HydraKLL" => {
let accumulator = if streaming_engine == "flink" {
return Err("HydraKLL not supported for Flink".into());
} else {
HydraKllSketchAccumulator::deserialize_from_bytes_arroyo(buffer)
}
.map_err(|e| format!("Failed to deserialize HydraKllSketchAccumulator: {e}"))?;
let accumulator = HydraKllSketchAccumulator::deserialize_from_bytes_arroyo(buffer)
.map_err(|e| format!("Failed to deserialize HydraKllSketchAccumulator: {e}"))?;
Ok(Box::new(accumulator))
}
"DeltaSetAggregator" => {
let accumulator = if streaming_engine == "flink" {
DeltaSetAggregatorAccumulator::deserialize_from_bytes(buffer)
} else {
DeltaSetAggregatorAccumulator::deserialize_from_bytes_arroyo(buffer)
}
let accumulator = DeltaSetAggregatorAccumulator::deserialize_from_bytes_arroyo(
buffer,
)
.map_err(|e| format!("Failed to deserialize DeltaSetAggregatorAccumulator: {e}"))?;
Ok(Box::new(accumulator))
}
Expand Down
190 changes: 43 additions & 147 deletions asap-query-engine/src/drivers/ingest/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,155 +273,51 @@ impl<T: Store + Send + Sync + 'static> KafkaConsumer<T> {
Err("Binary input format with precompute not implemented".into())
}
InputFormat::Json => {
// Handle streaming engine specific logic
match self.config.streaming_engine {
StreamingEngine::Flink => {
// debug!("Received message of length: {}", payload.len());

// let json_data = if self.config.decompress_json {
// // Decompress using gzip
// let mut decoder = GzDecoder::new(payload);
// let mut decompressed = Vec::new();
// match decoder.read_to_end(&mut decompressed) {
// Ok(_) => {
// debug!(
// "Decompressed JSON message of length: {}",
// decompressed.len()
// );
// decompressed
// }
// Err(e) => {
// error!("Error decompressing gzip data: {}", e);
// return Err(format!("Gzip decompression error: {e}").into());
// }
// }
// } else {
// payload.to_vec()
// };

// let json_str = match String::from_utf8(json_data) {
// Ok(s) => s,
// Err(e) => {
// error!("Error converting bytes to UTF-8: {}", e);
// return Err(format!("UTF-8 conversion error: {e}").into());
// }
// };

// let json_parse_start_time = Instant::now();

// let json_dict: serde_json::Value = match serde_json::from_str(&json_str) {
// Ok(dict) => {
// let json_parse_duration = json_parse_start_time.elapsed();
// debug!(
// "JSON parsing took: {:.2}ms",
// json_parse_duration.as_secs_f64() * 1000.0
// );
// dict
// }
// Err(e) => {
// error!("Error parsing JSON: {}", e);
// debug!("JSON content: {}", json_str);
// return Err(format!("JSON parsing error: {e}").into());
// }
// };

// debug!(
// "Deserializing JSON message: {}, {}, {}",
// json_dict
// .get("aggregation_id")
// .and_then(|v| v.as_u64())
// .unwrap_or(0),
// json_dict
// .get("start_timestamp")
// .and_then(|v| v.as_u64())
// .unwrap_or(0),
// json_dict
// .get("end_timestamp")
// .and_then(|v| v.as_u64())
// .unwrap_or(0)
// );

// let deserialize_start_time = Instant::now();

// match PrecomputedOutput::deserialize_from_json_with_precompute(&json_dict) {
// Ok((output, precompute)) => {
// let deserialize_duration = deserialize_start_time.elapsed();
// debug!(
// "Deserialization took: {:.2}ms",
// deserialize_duration.as_secs_f64() * 1000.0
// );
// debug!(
// "Deserialized item: {}, {}, {}",
// output.config.aggregation_id,
// output.start_timestamp,
// output.end_timestamp
// );
// debug!("Successfully deserialized Flink JSON message with precompute data");
// let total_message_duration = message_start_time.elapsed();
// debug!(
// "Total message processing took: {:.2}ms",
// total_message_duration.as_secs_f64() * 1000.0
// );
// Ok(Some((output, precompute)))
// }
// Err(e) => {
// error!(
// "Error deserializing Flink PrecomputedOutput from JSON with precompute: {}",
// e
// );
// debug!("JSON content: {}", json_str);
// Err(e)
// }
// }
error!("Flink input format with precompute not implemented");
Err("Flink input format with precompute not implemented".into())
// Arroyo messages - gzip decompression is applied at precompute level, not message level
let json_str = match String::from_utf8(payload.to_vec()) {
Ok(s) => s,
Err(e) => {
error!("Error converting bytes to UTF-8: {}", e);
return Err(format!("UTF-8 conversion error: {e}").into());
}
StreamingEngine::Arroyo => {
// Arroyo messages - gzip decompression is applied at precompute level, not message level
let json_str = match String::from_utf8(payload.to_vec()) {
Ok(s) => s,
Err(e) => {
error!("Error converting bytes to UTF-8: {}", e);
return Err(format!("UTF-8 conversion error: {e}").into());
}
};

let json_dict: serde_json::Value = match serde_json::from_str(&json_str) {
Ok(dict) => dict,
Err(e) => {
error!("Error parsing Arroyo JSON: {}", e);
debug!("JSON content: {}", json_str);
return Err(format!("JSON parsing error: {e}").into());
}
};
};

let deserialize_start_time = Instant::now();
match PrecomputedOutput::deserialize_from_json_arroyo(
&json_dict,
&self.streaming_config,
) {
Ok((output, precompute)) => {
let deserialize_duration = deserialize_start_time.elapsed();
debug!(
"Arroyo deserialization took: {:.2}ms",
deserialize_duration.as_secs_f64() * 1000.0
);
debug!("Successfully deserialized Arroyo JSON message with precompute data");
let total_message_duration = message_start_time.elapsed();
debug!(
"Total Arroyo message processing took: {:.2}ms",
total_message_duration.as_secs_f64() * 1000.0
);
Ok(Some((output, precompute)))
}
Err(e) => {
error!(
"Error deserializing Arroyo PrecomputedOutput from JSON with precompute: {e}"
);
debug!("JSON content: {}", json_str);
Err(e)
}
}
let json_dict: serde_json::Value = match serde_json::from_str(&json_str) {
Ok(dict) => dict,
Err(e) => {
error!("Error parsing Arroyo JSON: {}", e);
debug!("JSON content: {}", json_str);
return Err(format!("JSON parsing error: {e}").into());
}
};

let deserialize_start_time = Instant::now();
match PrecomputedOutput::deserialize_from_json_arroyo(
&json_dict,
&self.streaming_config,
) {
Ok((output, precompute)) => {
let deserialize_duration = deserialize_start_time.elapsed();
debug!(
"Arroyo deserialization took: {:.2}ms",
deserialize_duration.as_secs_f64() * 1000.0
);
debug!(
"Successfully deserialized Arroyo JSON message with precompute data"
);
let total_message_duration = message_start_time.elapsed();
debug!(
"Total Arroyo message processing took: {:.2}ms",
total_message_duration.as_secs_f64() * 1000.0
);
Ok(Some((output, precompute)))
}
Err(e) => {
error!(
"Error deserializing Arroyo PrecomputedOutput from JSON with precompute: {e}"
);
debug!("JSON content: {}", json_str);
Err(e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ impl DatasketchesKLLAccumulator {
.decode(sketch_b64)
.map_err(|e| format!("Failed to decode base64 sketch data: {e}"))?;

// TODO: remove this hardcoding once FlinkSketch serializes k in its output
Ok(Self {
inner: KllSketch::from_dsrs_bytes(&sketch_bytes, 200)?,
})
}

pub fn deserialize_from_bytes(buffer: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
// Mirror Python implementation: deserialize sketch directly from bytes
// TODO: remove this hardcoding once FlinkSketch serializes k in its output
Ok(Self {
inner: KllSketch::from_dsrs_bytes(buffer, 200)?,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ impl HydraKllSketchAccumulator {
}

pub fn deserialize_from_bytes(_buffer: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
// HydraKLLSketch is only used with Arroyo, not Flink
Err("deserialize_from_bytes for HydraKllSketchAccumulator not implemented for Flink".into())
Err("deserialize_from_bytes for HydraKllSketchAccumulator not implemented".into())
}

pub fn deserialize_from_bytes_arroyo(
Expand Down
Loading
Loading