diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index e626968..848441b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -38,6 +38,11 @@ jobs: toolchain: stable components: rustfmt, clippy + - name: Install protoc + run: | + sudo apt-get update + sudo apt-get install -y protobuf-compiler + - name: Run sccache-cache uses: mozilla-actions/sccache-action@v0.0.4 @@ -68,6 +73,11 @@ jobs: with: toolchain: stable + - name: Install protoc + run: | + sudo apt-get update + sudo apt-get install -y protobuf-compiler + - name: Run sccache-cache uses: mozilla-actions/sccache-action@v0.0.4 diff --git a/Cargo.lock b/Cargo.lock index 4de8ab3..06ce7b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1184,7 +1184,7 @@ dependencies = [ "parquet", "paste", "pin-project-lite", - "rand", + "rand 0.8.5", "sqlparser 0.51.0", "tempfile", "tokio", @@ -1261,7 +1261,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "rand", + "rand 0.8.5", "tempfile", "url", ] @@ -1322,7 +1322,7 @@ dependencies = [ "itertools 0.13.0", "log", "md-5", - "rand", + "rand 0.8.5", "regex", "sha2", "unicode-segmentation", @@ -1361,7 +1361,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "datafusion-physical-expr-common", - "rand", + "rand 0.8.5", ] [[package]] @@ -1384,7 +1384,7 @@ dependencies = [ "itertools 0.13.0", "log", "paste", - "rand", + "rand 0.8.5", ] [[package]] @@ -1471,7 +1471,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", - "rand", + "rand 0.8.5", ] [[package]] @@ -1521,7 +1521,7 @@ dependencies = [ "once_cell", "parking_lot", "pin-project-lite", - "rand", + "rand 0.8.5", "tokio", ] @@ -1628,6 +1628,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + [[package]] name = "errno" version = "0.3.14" @@ -1638,6 +1649,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -2555,6 +2576,16 @@ version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +[[package]] +name = "libloading" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "351a32417a12d5f7e82c368a66781e307834dae04c6ce0cd4456d52989229883" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "libm" version = "0.2.16" @@ -2764,6 +2795,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "native-tls" version = "0.2.18" @@ -3046,7 +3083,7 @@ dependencies = [ "glob", "opentelemetry", "percent-encoding", - "rand", + "rand 0.8.5", "serde_json", "thiserror 2.0.18", ] @@ -3146,6 +3183,21 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pcap" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99e935fc73d54a89fff576526c2ccd42bbf8247aae05b358693475b14fd4ff79" +dependencies = [ + "bitflags 1.3.2", + "errno 0.2.8", + "libc", + "libloading", + "pkg-config", + "regex", + "windows-sys 0.36.1", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -3405,6 +3457,26 @@ dependencies = [ "prost-derive", ] +[[package]] +name = "prost-build" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +dependencies = [ + "heck 0.5.0", + "itertools 0.10.5", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.117", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.13.5" @@ -3418,6 +3490,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -3446,8 +3527,8 @@ dependencies = [ "bincode", "chrono", "clap 4.5.60", - "ctor", "criterion", + "ctor", "dashmap 5.5.3", "datafusion", "datafusion_summary_library", @@ -3472,6 +3553,7 @@ dependencies = [ "serde_yaml", "sketch-core", "sketch_db_common", + "sketchlib-rust", "snap", "sql_utilities", "sqlparser 0.59.0", @@ -3518,8 +3600,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.5", ] [[package]] @@ -3529,7 +3621,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.5", ] [[package]] @@ -3762,7 +3864,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ "bitflags 2.11.0", - "errno", + "errno 0.3.14", "libc", "linux-raw-sys", "windows-sys 0.61.2", @@ -3864,6 +3966,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-big-array" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11fc7cc2c76d73e0f27ee52abbd64eec84d46f370c88371120433196934e4b7f" +dependencies = [ + "serde", +] + [[package]] name = "serde_core" version = "1.0.228" @@ -3965,7 +4076,7 @@ version = "1.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" dependencies = [ - "errno", + "errno 0.3.14", "libc", ] @@ -4006,6 +4117,24 @@ dependencies = [ "sql_utilities", ] +[[package]] +name = "sketchlib-rust" +version = "0.1.0" +source = "git+https://github.com/ProjectASAP/sketchlib-rust#440427438fdaf3ac2298b53ee148f9e12a64ffcc" +dependencies = [ + "bytes", + "clap 4.5.60", + "pcap", + "prost", + "prost-build", + "rand 0.9.2", + "rmp-serde", + "serde", + "serde-big-array", + "smallvec", + "twox-hash 2.1.2", +] + [[package]] name = "slab" version = "0.4.12" @@ -4576,7 +4705,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -4708,6 +4837,9 @@ name = "twox-hash" version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" +dependencies = [ + "rand 0.9.2", +] [[package]] name = "typenum" @@ -5073,6 +5205,19 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc 0.36.1", + "windows_i686_gnu 0.36.1", + "windows_i686_msvc 0.36.1", + "windows_x86_64_gnu 0.36.1", + "windows_x86_64_msvc 0.36.1", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -5152,6 +5297,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -5164,6 +5315,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -5182,6 +5339,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -5194,6 +5357,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -5218,6 +5387,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" diff --git a/asap-planner-rs/Dockerfile b/asap-planner-rs/Dockerfile index f2c6c6f..ca7b183 100644 --- a/asap-planner-rs/Dockerfile +++ b/asap-planner-rs/Dockerfile @@ -5,6 +5,11 @@ LABEL description="ASAP Planner (Rust) for SketchDB" WORKDIR /code +# Required by prost-build dependencies during cargo build +RUN apt-get update && apt-get install -y --no-install-recommends \ + protobuf-compiler \ + && rm -rf /var/lib/apt/lists/* + COPY asap-common ./asap-common COPY Cargo.toml ./ COPY Cargo.lock ./ diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index d10eb5e..c27fe5c 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -56,6 +56,7 @@ lazy_static = "1.4" zstd = "0.13" reqwest = { version = "0.11", features = ["json"] } tracing-appender = "0.2" +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } [dev-dependencies] ctor = "0.2" diff --git a/asap-query-engine/Dockerfile b/asap-query-engine/Dockerfile index 75301df..d4c2f5a 100644 --- a/asap-query-engine/Dockerfile +++ b/asap-query-engine/Dockerfile @@ -8,6 +8,11 @@ LABEL description="QueryEngine Rust service for SketchDB" WORKDIR /code +# Required by prost-build dependencies (e.g., sketchlib-rust) during cargo build +RUN apt-get update && apt-get install -y --no-install-recommends \ + protobuf-compiler \ + && rm -rf /var/lib/apt/lists/* + # Copy the asap-common directory COPY asap-common ./asap-common diff --git a/asap-query-engine/src/drivers/ingest/otel.rs b/asap-query-engine/src/drivers/ingest/otel.rs index 6fc8772..6b5032a 100644 --- a/asap-query-engine/src/drivers/ingest/otel.rs +++ b/asap-query-engine/src/drivers/ingest/otel.rs @@ -5,14 +5,18 @@ //! handoff to precompute engine as TODO. use std::collections::HashMap; +use std::io::Read; use axum::{body::Bytes, extract::State, routing::post, Json, Router}; +use flate2::read::GzDecoder; use opentelemetry_proto::tonic::collector::metrics::v1::{ metrics_service_server::MetricsService, ExportMetricsServiceRequest, ExportMetricsServiceResponse, }; +use opentelemetry_proto::tonic::common::v1::any_value::Value as AnyValueVariant; use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberValue; use prost::Message; +use sketchlib_rust::proto::sketchlib::{sketch_envelope, SketchEnvelope}; use tonic::{Request, Response, Status}; use tracing::{debug, error, info}; @@ -80,8 +84,10 @@ impl MetricsService for MetricsServiceImpl { &self, request: Request, ) -> Result, Status> { + debug!("OTLP received request via gRPC"); let req = request.into_inner(); - process_otlp_request(&req); + process_otlp_request(&req, "gRPC"); + debug!("OTLP sending response via gRPC"); Ok(Response::new(ExportMetricsServiceResponse { partial_success: None, })) @@ -89,16 +95,38 @@ impl MetricsService for MetricsServiceImpl { } async fn handle_otlp_http( + headers: axum::http::HeaderMap, State(_state): State<()>, body: Bytes, ) -> Result, (axum::http::StatusCode, String)> { + debug!("OTLP received request via HTTP, body_bytes={}", body.len()); + let body = if let Some(enc) = headers.get(axum::http::header::CONTENT_ENCODING) { + let enc = enc.to_str().unwrap_or("").trim().to_ascii_lowercase(); + if enc == "gzip" { + let mut decoder = GzDecoder::new(body.as_ref()); + let mut out = Vec::new(); + decoder.read_to_end(&mut out).map_err(|e| { + ( + axum::http::StatusCode::BAD_REQUEST, + format!("Gzip decode error: {}", e), + ) + })?; + Bytes::from(out) + } else { + body + } + } else { + body + }; + let req = ExportMetricsServiceRequest::decode(body.as_ref()).map_err(|e| { ( axum::http::StatusCode::BAD_REQUEST, format!("Protobuf decode error: {}", e), ) })?; - process_otlp_request(&req); + process_otlp_request(&req, "HTTP"); + debug!("OTLP sending response via HTTP"); Ok(Json(serde_json::json!({"rejected": 0}))) } @@ -111,15 +139,104 @@ pub struct MetricPoint { pub value: f64, } -fn process_otlp_request(request: &ExportMetricsServiceRequest) { +type SketchPayload = (String, String, Vec); +type OtlpParseResult = (Vec, Vec); + +fn format_series_key(name: &str, labels: &HashMap) -> String { + let mut pairs: Vec<_> = labels.iter().collect(); + pairs.sort_by_key(|(k, _)| *k); + let labels_str = pairs + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>() + .join(","); + format!("{}{{{}}}", name, labels_str) +} + +fn get_sketch_payload_from_attrs( + attrs: &[opentelemetry_proto::tonic::common::v1::KeyValue], +) -> Option<(String, Vec)> { + for kv in attrs { + match kv.key.as_str() { + "kll.sketch_payload" | "cms.sketch_payload" | "countsketch.sketch_payload" => { + if let Some(value) = &kv.value { + if let Some(AnyValueVariant::BytesValue(bytes)) = &value.value { + return Some((kv.key.clone(), bytes.clone())); + } + } + } + _ => {} + } + } + None +} + +fn log_sketch_envelope_type(attr_name: &str, payload: &[u8], metric_name: &str) { + match SketchEnvelope::decode(payload) { + Ok(env) => { + let sketch_type = match env.sketch_state { + Some(sketch_envelope::SketchState::Kll(_)) => "KLL", + Some(sketch_envelope::SketchState::CountMin(_)) => "CountMin", + Some(sketch_envelope::SketchState::CountSketch(_)) => "CountSketch", + Some(_) => "Other", + None => "Unknown", + }; + debug!( + "OTLP Sketches: metric='{}' attr='{}' payload_bytes={} sketch_type={}", + metric_name, + attr_name, + payload.len(), + sketch_type + ); + } + Err(e) => { + debug!( + "OTLP Sketches: metric='{}' attr='{}' payload_bytes={} decode_error='{}'", + metric_name, + attr_name, + payload.len(), + e + ); + } + } +} + +fn process_otlp_request(request: &ExportMetricsServiceRequest, transport: &str) { let resource_count = request.resource_metrics.len(); let total_points = otlp_to_record_count(request); - debug!( - "OTLP ingest: received {} resource metrics, {} total data points", - resource_count, total_points - ); + if resource_count > 0 || total_points > 0 { + debug!( + "OTLP ingest: received {} resource metrics, {} total data points (transport={})", + resource_count, total_points, transport + ); + } + + let (points, sketch_payloads) = otlp_to_metric_points_and_sketches(request); + + for (metric_name, attr_name, payload) in &sketch_payloads { + log_sketch_envelope_type(attr_name, payload, metric_name); + } + if !sketch_payloads.is_empty() { + debug!( + "OTLP Sketch Payload Flow: received {} sketch payload(s), decoded successfully", + sketch_payloads.len() + ); + } - let points = otlp_to_metric_points(request); + let mut by_series: HashMap = HashMap::new(); + for point in &points { + let key = format_series_key(&point.name, &point.labels); + *by_series.entry(key).or_insert(0) += 1; + } + if !by_series.is_empty() { + debug!( + "OTLP Raw Metrics Flow: received {} raw metric series", + by_series.len() + ); + for (series, count) in by_series { + debug!("OTLP Raw Metrics Flow: series {} count={}", series, count); + } + } if let Some(first) = points.first() { debug!( "OTLP parse example: {} {:?} @{}ns = {}", @@ -178,8 +295,11 @@ fn otlp_to_record_count(request: &ExportMetricsServiceRequest) -> usize { } /// Parse OTLP request and convert to metric data points (name, labels, timestamp, value). -fn otlp_to_metric_points(request: &ExportMetricsServiceRequest) -> Vec { +/// Data points with sketch payloads in attributes are excluded from points and returned +/// separately for Sketch Payload Flow processing. +fn otlp_to_metric_points_and_sketches(request: &ExportMetricsServiceRequest) -> OtlpParseResult { let mut points = Vec::new(); + let mut sketch_payloads = Vec::new(); for resource_metrics in &request.resource_metrics { let resource_attrs = resource_metrics .resource @@ -209,6 +329,12 @@ fn otlp_to_metric_points(request: &ExportMetricsServiceRequest) -> Vec { for dp in &g.data_points { + if let Some((attr_name, payload)) = + get_sketch_payload_from_attrs(&dp.attributes) + { + sketch_payloads.push((metric.name.clone(), attr_name, payload)); + continue; + } let labels = merge_point_attributes(&base_labels, &dp.attributes); let value = number_value_to_f64(&dp.value); points.push(MetricPoint { @@ -221,6 +347,12 @@ fn otlp_to_metric_points(request: &ExportMetricsServiceRequest) -> Vec { for dp in &s.data_points { + if let Some((attr_name, payload)) = + get_sketch_payload_from_attrs(&dp.attributes) + { + sketch_payloads.push((metric.name.clone(), attr_name, payload)); + continue; + } let labels = merge_point_attributes(&base_labels, &dp.attributes); let value = number_value_to_f64(&dp.value); points.push(MetricPoint { @@ -233,6 +365,12 @@ fn otlp_to_metric_points(request: &ExportMetricsServiceRequest) -> Vec { for dp in &hist.data_points { + if let Some((attr_name, payload)) = + get_sketch_payload_from_attrs(&dp.attributes) + { + sketch_payloads.push((metric.name.clone(), attr_name, payload)); + continue; + } let labels = merge_point_attributes(&base_labels, &dp.attributes); if let Some(sum) = dp.sum { points.push(MetricPoint { @@ -252,6 +390,12 @@ fn otlp_to_metric_points(request: &ExportMetricsServiceRequest) -> Vec { for dp in &eh.data_points { + if let Some((attr_name, payload)) = + get_sketch_payload_from_attrs(&dp.attributes) + { + sketch_payloads.push((metric.name.clone(), attr_name, payload)); + continue; + } let labels = merge_point_attributes(&base_labels, &dp.attributes); if let Some(sum) = dp.sum { points.push(MetricPoint { @@ -271,6 +415,12 @@ fn otlp_to_metric_points(request: &ExportMetricsServiceRequest) -> Vec { for dp in &sm.data_points { + if let Some((attr_name, payload)) = + get_sketch_payload_from_attrs(&dp.attributes) + { + sketch_payloads.push((metric.name.clone(), attr_name, payload)); + continue; + } let labels = merge_point_attributes(&base_labels, &dp.attributes); points.push(MetricPoint { name: format!("{}_sum", metric.name), @@ -291,7 +441,7 @@ fn otlp_to_metric_points(request: &ExportMetricsServiceRequest) -> Vec