From cdaea3cdc0d05fc5d55f63dea628195875f7b711 Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Tue, 10 Mar 2026 21:59:03 -0400 Subject: [PATCH 1/5] Otel Ingest Driver --- Cargo.lock | 376 ++++++++++++++++-- Cargo.toml | 2 + asap-query-engine/Cargo.toml | 3 + asap-query-engine/src/drivers/ingest/mod.rs | 2 + asap-query-engine/src/drivers/ingest/otel.rs | 202 ++++++++++ asap-query-engine/src/drivers/mod.rs | 2 +- asap-query-engine/src/lib.rs | 5 +- asap-query-engine/src/main.rs | 42 +- .../data-sources/otlp-exporters/Cargo.toml | 21 + .../data-sources/otlp-exporters/src/main.rs | 165 ++++++++ 10 files changed, 783 insertions(+), 37 deletions(-) create mode 100644 asap-query-engine/src/drivers/ingest/otel.rs create mode 100644 asap-tools/data-sources/otlp-exporters/Cargo.toml create mode 100644 asap-tools/data-sources/otlp-exporters/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index f8793a9..fa47d85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,7 +297,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "indexmap", + "indexmap 2.13.0", "lexical-core", "num", "serde", @@ -370,6 +370,26 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "asap-otel-ingest" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "chrono", + "clap 4.5.60", + "opentelemetry-proto 0.28.0", + "prost", + "rdkafka", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-subscriber", +] + [[package]] name = "async-compression" version = "0.4.19" @@ -388,6 +408,28 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -459,7 +501,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.2", "tokio", - "tower", + "tower 0.5.3", "tower-layer", "tower-service", "tracing", @@ -666,7 +708,7 @@ version = "0.13.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fe45e18904af7af10e4312df7c97251e98af98c70f42f1f2587aecfcbee56bf" dependencies = [ - "indexmap", + "indexmap 2.13.0", "lazy_static", "num-traits", "regex", @@ -926,7 +968,7 @@ checksum = "b0f4697d190a142477b16aef7da8a99bfdc41e7e8b1687583c0d23a79c7afc1e" dependencies = [ "cc", "codespan-reporting", - "indexmap", + "indexmap 2.13.0", "proc-macro2", "quote", "scratch", @@ -941,7 +983,7 @@ checksum = "d0956799fa8678d4c50eed028f2de1c0552ae183c76e976cf7ca8c4e36a7c328" dependencies = [ "clap 4.5.60", "codespan-reporting", - "indexmap", + "indexmap 2.13.0", "proc-macro2", "quote", "syn 2.0.117", @@ -959,7 +1001,7 @@ version = "1.0.194" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6acc6b5822b9526adfb4fc377b67128fdd60aac757cc4a741a6278603f763cf" dependencies = [ - "indexmap", + "indexmap 2.13.0", "proc-macro2", "quote", "syn 2.0.117", @@ -1029,7 +1071,7 @@ dependencies = [ "glob", "half", "hashbrown 0.14.5", - "indexmap", + "indexmap 2.13.0", "itertools 0.13.0", "log", "num_cpus", @@ -1078,7 +1120,7 @@ dependencies = [ "chrono", "half", "hashbrown 0.14.5", - "indexmap", + "indexmap 2.13.0", "instant", "libc", "num_cpus", @@ -1136,7 +1178,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr-common", - "indexmap", + "indexmap 2.13.0", "paste", "serde_json", "sqlparser 0.51.0", @@ -1199,7 +1241,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "half", - "indexmap", + "indexmap 2.13.0", "log", "paste", ] @@ -1279,7 +1321,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.14.5", - "indexmap", + "indexmap 2.13.0", "itertools 0.13.0", "log", "paste", @@ -1307,7 +1349,7 @@ dependencies = [ "datafusion-physical-expr-common", "half", "hashbrown 0.14.5", - "indexmap", + "indexmap 2.13.0", "itertools 0.13.0", "log", "paste", @@ -1369,7 +1411,7 @@ dependencies = [ "futures", "half", "hashbrown 0.14.5", - "indexmap", + "indexmap 2.13.0", "itertools 0.13.0", "log", "once_cell", @@ -1390,7 +1432,7 @@ dependencies = [ "arrow-schema", "datafusion-common", "datafusion-expr", - "indexmap", + "indexmap 2.13.0", "log", "regex", "sqlparser 0.51.0", @@ -1771,7 +1813,26 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap", + "indexmap 2.13.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.4.0", + "indexmap 2.13.0", "slab", "tokio", "tokio-util", @@ -1790,6 +1851,12 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.5" @@ -1943,7 +2010,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.27", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -1967,6 +2034,7 @@ dependencies = [ "bytes", "futures-channel", "futures-core", + "h2 0.4.13", "http 1.4.0", "http-body 1.0.1", "httparse", @@ -1976,6 +2044,20 @@ dependencies = [ "pin-utils", "smallvec", "tokio", + "want", +] + +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.8.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", ] [[package]] @@ -1998,12 +2080,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ "bytes", + "futures-channel", + "futures-util", "http 1.4.0", "http-body 1.0.1", "hyper 1.8.1", + "libc", "pin-project-lite", + "socket2 0.6.3", "tokio", "tower-service", + "tracing", ] [[package]] @@ -2147,6 +2234,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.13.0" @@ -2345,9 +2442,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.182" +version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" [[package]] name = "libloading" @@ -2464,7 +2561,7 @@ dependencies = [ "cactus", "cfgrammar", "filetime", - "indexmap", + "indexmap 2.13.0", "lazy_static", "lrtable", "num-traits", @@ -2801,6 +2898,120 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 1.0.69", + "tracing", +] + +[[package]] +name = "opentelemetry" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "236e667b670a5cdf90c258f5a55794ec5ac5027e960c224bff8367a59e1e6426" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.18", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76" +dependencies = [ + "async-trait", + "futures-core", + "http 1.4.0", + "opentelemetry 0.27.1", + "opentelemetry-proto 0.27.0", + "opentelemetry_sdk 0.27.1", + "prost", + "thiserror 1.0.69", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6" +dependencies = [ + "opentelemetry 0.27.1", + "opentelemetry_sdk 0.27.1", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f8870d3024727e99212eb3bb1762ec16e255e3e6f58eeb3dc8db1aa226746d" +dependencies = [ + "base64 0.22.1", + "hex", + "opentelemetry 0.28.0", + "opentelemetry_sdk 0.28.0", + "prost", + "serde", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry 0.27.1", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84dfad6042089c7fc1f6118b7040dc2eb4ab520abbf410b79dc481032af39570" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry 0.28.0", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror 2.0.18", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -2810,6 +3021,22 @@ dependencies = [ "num-traits", ] +[[package]] +name = "otlp_exporter" +version = "0.1.0" +dependencies = [ + "clap 4.5.60", + "opentelemetry 0.27.1", + "opentelemetry-otlp", + "opentelemetry_sdk 0.27.1", + "rand 0.8.5", + "rand_distr", + "tokio", + "tonic", + "tracing", + "tracing-subscriber", +] + [[package]] name = "packedvec" version = "1.2.5" @@ -2924,7 +3151,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap", + "indexmap 2.13.0", ] [[package]] @@ -2945,6 +3172,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "pin-project-lite" version = "0.2.17" @@ -3162,6 +3409,7 @@ dependencies = [ "futures", "hex", "lazy_static", + "opentelemetry-proto 0.28.0", "prometheus", "promql-parser", "promql_utilities", @@ -3184,6 +3432,8 @@ dependencies = [ "tempfile", "thiserror 1.0.69", "tokio", + "tokio-stream", + "tonic", "tracing", "tracing-appender", "tracing-subscriber", @@ -3273,6 +3523,16 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_distr" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" +dependencies = [ + "num-traits", + "rand 0.8.5", +] + [[package]] name = "rdkafka" version = "0.34.0" @@ -3387,7 +3647,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.27", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.32", @@ -3638,7 +3898,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap", + "indexmap 2.13.0", "itoa", "ryu", "serde", @@ -3719,7 +3979,7 @@ dependencies = [ [[package]] name = "sketchlib-rust" version = "0.1.0" -source = "git+https://github.com/ProjectASAP/sketchlib-rust#348db8415f97246c42de68b407b47fa038cf8b1f" +source = "git+https://github.com/ProjectASAP/sketchlib-rust#a729288270cc8f74a4ac9451e5c63cd9c693668c" dependencies = [ "ahash", "clap 4.5.60", @@ -4236,7 +4496,7 @@ version = "0.25.4+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7193cbd0ce53dc966037f54351dbbcf0d5a642c7f0038c382ef9e677ce8c13f2" dependencies = [ - "indexmap", + "indexmap 2.13.0", "toml_datetime", "toml_parser", "winnow", @@ -4251,6 +4511,56 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2 0.5.10", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.3" @@ -4625,7 +4935,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" dependencies = [ "anyhow", - "indexmap", + "indexmap 2.13.0", "wasm-encoder", "wasmparser", ] @@ -4638,7 +4948,7 @@ checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ "bitflags 2.11.0", "hashbrown 0.15.5", - "indexmap", + "indexmap 2.13.0", "semver", ] @@ -4989,7 +5299,7 @@ checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" dependencies = [ "anyhow", "heck 0.5.0", - "indexmap", + "indexmap 2.13.0", "prettyplease", "syn 2.0.117", "wasm-metadata", @@ -5020,7 +5330,7 @@ checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", "bitflags 2.11.0", - "indexmap", + "indexmap 2.13.0", "log", "serde", "serde_derive", @@ -5039,7 +5349,7 @@ checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" dependencies = [ "anyhow", "id-arena", - "indexmap", + "indexmap 2.13.0", "log", "semver", "serde", @@ -5095,18 +5405,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.40" +version = "0.8.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a789c6e490b576db9f7e6b6d661bcc9799f7c0ac8352f56ea20193b2681532e5" +checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.40" +version = "0.8.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f65c489a7071a749c849713807783f70672b28094011623e200cb86dcb835953" +checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 5101fb6..03c1cc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,8 @@ members = [ "asap-common/dependencies/rs/sketch_db_common", "asap-common/dependencies/rs/datafusion_summary_library", "asap-query-engine", + "asap-otel-ingest", + "asap-tools/data-sources/otlp-exporters" ] [workspace.package] diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 11484c6..2e2ccf4 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -47,6 +47,9 @@ datafusion = "43" arrow = "53.4.1" futures = "0.3" prost = "0.13" +opentelemetry-proto = { version = "0.28", features = ["gen-tonic", "gen-tonic-messages", "metrics"] } +tonic = "0.12" +tokio-stream = "0.1" snap = "1" regex = "1" prometheus = "0.13" diff --git a/asap-query-engine/src/drivers/ingest/mod.rs b/asap-query-engine/src/drivers/ingest/mod.rs index f300788..1c6f3d1 100644 --- a/asap-query-engine/src/drivers/ingest/mod.rs +++ b/asap-query-engine/src/drivers/ingest/mod.rs @@ -1,8 +1,10 @@ pub mod kafka; +pub mod otel; pub mod prometheus_remote_write; pub mod victoriametrics_remote_write; pub use kafka::{KafkaConsumer, KafkaConsumerConfig}; +pub use otel::{OtelIngestConfig, OtelIngestServer}; // pub use prometheus_remote_write::{PrometheusRemoteWriteConfig, PrometheusRemoteWriteServer}; // pub use victoriametrics_remote_write::{ // VictoriaMetricsRemoteWriteConfig, VictoriaMetricsRemoteWriteServer, diff --git a/asap-query-engine/src/drivers/ingest/otel.rs b/asap-query-engine/src/drivers/ingest/otel.rs new file mode 100644 index 0000000..d9fbafc --- /dev/null +++ b/asap-query-engine/src/drivers/ingest/otel.rs @@ -0,0 +1,202 @@ +//! OTLP ingest driver (phase 1). +//! +//! Accepts OTLP metrics via gRPC (4317) and HTTP (4318, POST /v1/metrics), +//! parses ExportMetricsServiceRequest, logs counts at DEBUG, and leaves +//! handoff to precompute engine as TODO. + +use std::collections::HashMap; + +use axum::{ + body::Bytes, + extract::State, + routing::post, + Json, Router, +}; +use opentelemetry_proto::tonic::collector::metrics::v1::{ + metrics_service_server::MetricsService, ExportMetricsServiceRequest, ExportMetricsServiceResponse, +}; +use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberValue; +use prost::Message; +use tonic::{Request, Response, Status}; +use tracing::{debug, error, info}; + +/// Configuration for the OTLP ingest server. +#[derive(Debug, Clone)] +pub struct OtelIngestConfig { + pub grpc_port: u16, + pub http_port: u16, +} + +/// OTLP ingest server that accepts metrics via gRPC and HTTP. +pub struct OtelIngestServer { + config: OtelIngestConfig, +} + +impl OtelIngestServer { + pub fn new(config: OtelIngestConfig) -> Self { + Self { config } + } + + pub async fn run(&self) -> Result<(), Box> { + let grpc_addr = std::net::SocketAddr::from(([0, 0, 0, 0], self.config.grpc_port)); + let http_addr = std::net::SocketAddr::from(([0, 0, 0, 0], self.config.http_port)); + + let grpc_svc = MetricsServiceImpl; + let grpc_svc = + opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new( + grpc_svc, + ); + + let app = Router::new() + .route("/v1/metrics", post(handle_otlp_http)) + .with_state(()); + + let grpc_listener = tokio::net::TcpListener::bind(grpc_addr).await?; + let http_listener = tokio::net::TcpListener::bind(http_addr).await?; + + info!("OTLP gRPC listening on {}", grpc_addr); + info!("OTLP HTTP listening on {} (POST /v1/metrics)", http_addr); + + tokio::select! { + r = tonic::transport::Server::builder() + .add_service(grpc_svc) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(grpc_listener)) => { + if let Err(e) = r { + error!("OTLP gRPC server error: {}", e); + } + } + r = axum::serve(http_listener, app) => { + if let Err(e) = r { + error!("OTLP HTTP server error: {}", e); + } + } + } + + Ok(()) + } +} + +struct MetricsServiceImpl; + +#[tonic::async_trait] +impl MetricsService for MetricsServiceImpl { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + process_otlp_request(&req); + Ok(Response::new(ExportMetricsServiceResponse { + partial_success: None, + })) + } +} + +async fn handle_otlp_http( + State(_state): State<()>, + body: Bytes, +) -> Result, (axum::http::StatusCode, String)> { + let req = ExportMetricsServiceRequest::decode(body.as_ref()) + .map_err(|e| (axum::http::StatusCode::BAD_REQUEST, format!("Protobuf decode error: {}", e)))?; + process_otlp_request(&req); + Ok(Json(serde_json::json!({"rejected": 0}))) +} + +fn process_otlp_request(request: &ExportMetricsServiceRequest) { + let resource_count = request.resource_metrics.len(); + let total_points = otlp_to_record_count(request); + debug!( + "OTLP ingest: received {} resource metrics, {} total data points", + resource_count, total_points + ); + // TODO(otel-phase1): Pass metrics to precompute engine. For Arroyo pipeline: produce to Kafka topic (e.g. otel_metrics) in JSON format { name, labels, timestamp, value }. See asap-otel-ingest for conversion and produce logic. +} + +/// Count total data points by traversing resource_metrics -> scope_metrics -> metrics. +/// Reuses the same conversion traversal as asap-otel-ingest (Gauge, Sum, Histogram, etc.). +fn otlp_to_record_count(request: &ExportMetricsServiceRequest) -> usize { + let mut count = 0; + for resource_metrics in &request.resource_metrics { + for scope_metrics in &resource_metrics.scope_metrics { + for metric in &scope_metrics.metrics { + if metric.name.is_empty() { + continue; + } + + use opentelemetry_proto::tonic::metrics::v1::metric::Data; + match &metric.data { + Some(Data::Gauge(g)) => count += g.data_points.len(), + Some(Data::Sum(s)) => count += s.data_points.len(), + Some(Data::Histogram(hist)) => { + for dp in &hist.data_points { + if dp.sum.is_some() { + count += 1; + } + count += 1; // _count + count += dp.bucket_counts.len(); // _bucket per le + } + } + Some(Data::ExponentialHistogram(eh)) => { + for dp in &eh.data_points { + if dp.sum.is_some() { + count += 1; + } + count += 1; // _count + count += 1; // _scale + } + } + Some(Data::Summary(summary)) => { + for dp in &summary.data_points { + count += 1; // _sum + count += 1; // _count + count += dp.quantile_values.len(); + } + } + None => {} + } + } + } + } + count +} + +#[allow(dead_code)] +fn number_value_to_f64(v: &Option) -> f64 { + match v { + Some(NumberValue::AsDouble(x)) => *x, + Some(NumberValue::AsInt(x)) => *x as f64, + None => 0.0, + } +} + +#[allow(dead_code)] +fn any_value_to_string(v: &opentelemetry_proto::tonic::common::v1::AnyValue) -> String { + use opentelemetry_proto::tonic::common::v1::any_value::Value as AnyValueVariant; + match &v.value { + Some(AnyValueVariant::StringValue(s)) => s.clone(), + Some(AnyValueVariant::IntValue(i)) => i.to_string(), + Some(AnyValueVariant::DoubleValue(d)) => d.to_string(), + Some(AnyValueVariant::BoolValue(b)) => b.to_string(), + Some(AnyValueVariant::BytesValue(bytes)) => format!("{:?}", bytes), + _ => String::new(), + } +} + +#[allow(dead_code)] +fn attributes_to_map( + attrs: &[opentelemetry_proto::tonic::common::v1::KeyValue], +) -> HashMap { + let mut m = HashMap::new(); + for kv in attrs { + let key = kv.key.clone(); + let value = kv + .value + .as_ref() + .map(any_value_to_string) + .unwrap_or_default(); + if !key.is_empty() { + m.insert(key, value); + } + } + m +} diff --git a/asap-query-engine/src/drivers/mod.rs b/asap-query-engine/src/drivers/mod.rs index 544d648..80f79a4 100644 --- a/asap-query-engine/src/drivers/mod.rs +++ b/asap-query-engine/src/drivers/mod.rs @@ -2,5 +2,5 @@ pub mod ingest; pub mod query; // Re-export commonly used types for convenience -pub use ingest::{KafkaConsumer, KafkaConsumerConfig}; +pub use ingest::{KafkaConsumer, KafkaConsumerConfig, OtelIngestConfig, OtelIngestServer}; pub use query::{AdapterConfig, HttpServer, HttpServerConfig}; diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index 47893c1..85272f4 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -24,7 +24,10 @@ pub use stores::{SimpleMapStore, Store, StoreResult}; pub use engines::{InstantVector, QueryResult, SimpleEngine}; -pub use drivers::{HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig}; +pub use drivers::{ + HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtelIngestConfig, + OtelIngestServer, +}; pub use utils::{normalize_spatial_filter, read_inference_config, read_streaming_config}; diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index 0cc5d95..e0c7a3e 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -9,8 +9,8 @@ use query_engine_rust::data_model::enums::{InputFormat, LockStrategy, StreamingE use query_engine_rust::drivers::AdapterConfig; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::{ - HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, Result, SimpleEngine, - SimpleMapStore, + HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtelIngestConfig, + OtelIngestServer, Result, SimpleEngine, SimpleMapStore, }; #[derive(Parser, Debug)] @@ -107,6 +107,18 @@ struct Args { /// Path to promsketch configuration YAML file (optional; uses defaults if omitted) #[arg(long)] promsketch_config: Option, + + /// Enable OTLP metrics ingest (gRPC + HTTP) + #[arg(long)] + enable_otel_ingest: bool, + + /// OTLP gRPC listen port + #[arg(long, default_value = "4317")] + otel_grpc_port: u16, + + /// OTLP HTTP listen port + #[arg(long, default_value = "4318")] + otel_http_port: u16, } #[tokio::main] @@ -219,6 +231,26 @@ async fn main() -> Result<()> { } }; + // Setup OTLP ingest server + let otel_handle = if args.enable_otel_ingest { + let otel_config = OtelIngestConfig { + grpc_port: args.otel_grpc_port, + http_port: args.otel_http_port, + }; + let server = OtelIngestServer::new(otel_config); + info!( + "Starting OTLP ingest server (gRPC port {}, HTTP port {})", + args.otel_grpc_port, args.otel_http_port + ); + Some(tokio::spawn(async move { + if let Err(e) = server.run().await { + error!("OTLP ingest server error: {}", e); + } + })) + } else { + None + }; + // Setup Prometheus remote write server // let prometheus_remote_write_handle = if args.enable_prometheus_remote_write { // let prw_config = PrometheusRemoteWriteConfig { @@ -283,6 +315,12 @@ async fn main() -> Result<()> { let _ = handle.await; } + if let Some(handle) = otel_handle { + info!("Shutting down OTLP ingest server..."); + handle.abort(); + let _ = handle.await; + } + // if let Some(handle) = prometheus_remote_write_handle { // info!("Shutting down Prometheus remote write server..."); // handle.abort(); diff --git a/asap-tools/data-sources/otlp-exporters/Cargo.toml b/asap-tools/data-sources/otlp-exporters/Cargo.toml new file mode 100644 index 0000000..495cda5 --- /dev/null +++ b/asap-tools/data-sources/otlp-exporters/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "otlp_exporter" +version = "0.1.0" +edition = "2021" +description = "OTLP test data generator that sends synthetic metrics to an OTLP endpoint" + +[[bin]] +name = "otlp_exporter" +path = "src/main.rs" + +[dependencies] +opentelemetry = { version = "0.27", features = ["metrics"] } +opentelemetry_sdk = { version = "0.27", features = ["rt-tokio", "metrics"] } +opentelemetry-otlp = { version = "0.27", features = ["grpc-tonic", "metrics"] } +tonic = "0.12" +tokio = { version = "1.0", features = ["full"] } +clap = { version = "4.0", features = ["derive"] } +rand = "0.8" +rand_distr = "0.4" +tracing = "0.1" +tracing-subscriber = "0.3" diff --git a/asap-tools/data-sources/otlp-exporters/src/main.rs b/asap-tools/data-sources/otlp-exporters/src/main.rs new file mode 100644 index 0000000..94a0689 --- /dev/null +++ b/asap-tools/data-sources/otlp-exporters/src/main.rs @@ -0,0 +1,165 @@ +//! OTLP test data generator for ASAP pipeline testing. +//! +//! Sends synthetic metrics (Counter, Histogram, Gauge) to an OTLP/gRPC endpoint +//! at a configurable rate. Designed to exercise `asap-otel-ingest` end-to-end. + +use clap::Parser; +use opentelemetry::metrics::MeterProvider as _; +use opentelemetry::KeyValue; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry_sdk::Resource; +use rand::rngs::SmallRng; +use rand::{Rng, SeedableRng}; +use rand_distr::{Distribution, Normal, Uniform}; +use std::time::Duration; +use tracing::info; + +#[derive(Parser, Debug)] +#[command( + name = "otlp_exporter", + about = "Synthetic OTLP metrics generator for testing asap-otel-ingest" +)] +struct Args { + /// OTLP/gRPC endpoint to send metrics to + #[arg(long, default_value = "http://localhost:4317")] + endpoint: String, + + /// Number of services to simulate (max 5) + #[arg(long, default_value_t = 3)] + num_services: usize, + + /// Number of HTTP methods per service (max 5) + #[arg(long, default_value_t = 3)] + num_methods: usize, + + /// Interval between metric collection flushes in milliseconds + #[arg(long, default_value_t = 1000)] + interval_ms: u64, + + /// Number of iterations (0 = run forever) + #[arg(long, default_value_t = 0)] + iterations: u64, +} + +static SERVICES: &[&str] = &[ + "auth-service", + "api-gateway", + "user-service", + "billing-service", + "search-service", +]; +static METHODS: &[&str] = &["GET", "POST", "PUT", "DELETE", "PATCH"]; +static STATUS_CODES: &[&str] = &["200", "201", "400", "404", "500", "503"]; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + let args = Args::parse(); + + info!( + "Starting OTLP exporter → {} ({} services, {} methods, {}ms interval)", + args.endpoint, args.num_services, args.num_methods, args.interval_ms + ); + + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(&args.endpoint) + .with_timeout(Duration::from_secs(5)) + .build()?; + + let reader = + opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio) + .with_interval(Duration::from_millis(args.interval_ms)) + .build(); + + let provider = SdkMeterProvider::builder() + .with_reader(reader) + .with_resource(Resource::new(vec![KeyValue::new( + "service.name", + "asap.test.otlp_exporter", + )])) + .build(); + + let meter = provider.meter("asap.test.otlp_exporter"); + + let request_counter = meter + .u64_counter("http_requests_total") + .with_description("Total HTTP requests") + .with_unit("1") + .build(); + + let request_duration = meter + .f64_histogram("http_request_duration_seconds") + .with_description("HTTP request duration in seconds") + .with_unit("s") + .build(); + + let active_connections = meter + .i64_up_down_counter("http_active_connections") + .with_description("Number of active HTTP connections") + .with_unit("1") + .build(); + + let response_size = meter + .f64_histogram("http_response_size_bytes") + .with_description("HTTP response size in bytes") + .with_unit("By") + .build(); + + let mut rng = SmallRng::from_entropy(); + let normal = Normal::new(0.15_f64, 0.05_f64).unwrap(); + let uniform = Uniform::new(100.0_f64, 50_000.0_f64); + + let num_services = args.num_services.min(SERVICES.len()); + let num_methods = args.num_methods.min(METHODS.len()); + let services = &SERVICES[..num_services]; + let methods = &METHODS[..num_methods]; + + let mut iteration: u64 = 0; + loop { + iteration += 1; + + for service in services { + for method in methods { + let status = STATUS_CODES[rng.gen_range(0..STATUS_CODES.len())]; + let labels = [ + KeyValue::new("service.name", service.to_string()), + KeyValue::new("http.method", method.to_string()), + KeyValue::new("http.status_code", status.to_string()), + ]; + + let count = rng.gen_range(1u64..=10); + request_counter.add(count, &labels); + + let latency = normal.sample(&mut rng).abs().max(0.001); + request_duration.record(latency, &labels); + + let size = uniform.sample(&mut rng); + response_size.record(size, &labels); + } + + let delta: i64 = rng.gen_range(-5..=5); + active_connections.add( + delta, + &[KeyValue::new("service.name", service.to_string())], + ); + } + + info!( + "Iteration {}: recorded metrics for {} services", + iteration, + services.len() + ); + + if args.iterations > 0 && iteration >= args.iterations { + info!("Reached {} iterations, stopping", args.iterations); + break; + } + + tokio::time::sleep(Duration::from_millis(args.interval_ms)).await; + } + + provider.shutdown()?; + Ok(()) +} From 1a99feeeb573006350f2ec071b6398b5c51597e3 Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Tue, 10 Mar 2026 22:09:05 -0400 Subject: [PATCH 2/5] Minor Changes --- Cargo.lock | 120 +------------------ Cargo.toml | 2 - asap-query-engine/src/drivers/ingest/mod.rs | 2 +- asap-query-engine/src/drivers/ingest/otel.rs | 18 +-- asap-query-engine/src/drivers/mod.rs | 2 +- asap-query-engine/src/lib.rs | 4 +- asap-query-engine/src/main.rs | 16 +-- 7 files changed, 25 insertions(+), 139 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fa47d85..4f36b9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -370,26 +370,6 @@ dependencies = [ "regex-syntax", ] -[[package]] -name = "asap-otel-ingest" -version = "0.1.0" -dependencies = [ - "anyhow", - "axum", - "chrono", - "clap 4.5.60", - "opentelemetry-proto 0.28.0", - "prost", - "rdkafka", - "serde", - "serde_json", - "tokio", - "tokio-stream", - "tonic", - "tracing", - "tracing-subscriber", -] - [[package]] name = "async-compression" version = "0.4.19" @@ -2898,20 +2878,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "opentelemetry" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" -dependencies = [ - "futures-core", - "futures-sink", - "js-sys", - "pin-project-lite", - "thiserror 1.0.69", - "tracing", -] - [[package]] name = "opentelemetry" version = "0.28.0" @@ -2926,37 +2892,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "opentelemetry-otlp" -version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76" -dependencies = [ - "async-trait", - "futures-core", - "http 1.4.0", - "opentelemetry 0.27.1", - "opentelemetry-proto 0.27.0", - "opentelemetry_sdk 0.27.1", - "prost", - "thiserror 1.0.69", - "tokio", - "tonic", - "tracing", -] - -[[package]] -name = "opentelemetry-proto" -version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6" -dependencies = [ - "opentelemetry 0.27.1", - "opentelemetry_sdk 0.27.1", - "prost", - "tonic", -] - [[package]] name = "opentelemetry-proto" version = "0.28.0" @@ -2965,35 +2900,14 @@ checksum = "56f8870d3024727e99212eb3bb1762ec16e255e3e6f58eeb3dc8db1aa226746d" dependencies = [ "base64 0.22.1", "hex", - "opentelemetry 0.28.0", - "opentelemetry_sdk 0.28.0", + "opentelemetry", + "opentelemetry_sdk", "prost", "serde", "tonic", "tracing", ] -[[package]] -name = "opentelemetry_sdk" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" -dependencies = [ - "async-trait", - "futures-channel", - "futures-executor", - "futures-util", - "glob", - "opentelemetry 0.27.1", - "percent-encoding", - "rand 0.8.5", - "serde_json", - "thiserror 1.0.69", - "tokio", - "tokio-stream", - "tracing", -] - [[package]] name = "opentelemetry_sdk" version = "0.28.0" @@ -3005,7 +2919,7 @@ dependencies = [ "futures-executor", "futures-util", "glob", - "opentelemetry 0.28.0", + "opentelemetry", "percent-encoding", "rand 0.8.5", "serde_json", @@ -3021,22 +2935,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "otlp_exporter" -version = "0.1.0" -dependencies = [ - "clap 4.5.60", - "opentelemetry 0.27.1", - "opentelemetry-otlp", - "opentelemetry_sdk 0.27.1", - "rand 0.8.5", - "rand_distr", - "tokio", - "tonic", - "tracing", - "tracing-subscriber", -] - [[package]] name = "packedvec" version = "1.2.5" @@ -3409,7 +3307,7 @@ dependencies = [ "futures", "hex", "lazy_static", - "opentelemetry-proto 0.28.0", + "opentelemetry-proto", "prometheus", "promql-parser", "promql_utilities", @@ -3523,16 +3421,6 @@ dependencies = [ "getrandom 0.3.4", ] -[[package]] -name = "rand_distr" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" -dependencies = [ - "num-traits", - "rand 0.8.5", -] - [[package]] name = "rdkafka" version = "0.34.0" diff --git a/Cargo.toml b/Cargo.toml index 03c1cc7..5101fb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,6 @@ members = [ "asap-common/dependencies/rs/sketch_db_common", "asap-common/dependencies/rs/datafusion_summary_library", "asap-query-engine", - "asap-otel-ingest", - "asap-tools/data-sources/otlp-exporters" ] [workspace.package] diff --git a/asap-query-engine/src/drivers/ingest/mod.rs b/asap-query-engine/src/drivers/ingest/mod.rs index 1c6f3d1..a9d3f65 100644 --- a/asap-query-engine/src/drivers/ingest/mod.rs +++ b/asap-query-engine/src/drivers/ingest/mod.rs @@ -4,7 +4,7 @@ pub mod prometheus_remote_write; pub mod victoriametrics_remote_write; pub use kafka::{KafkaConsumer, KafkaConsumerConfig}; -pub use otel::{OtelIngestConfig, OtelIngestServer}; +pub use otel::{OtlpConsumer, OtlpConsumerConfig}; // pub use prometheus_remote_write::{PrometheusRemoteWriteConfig, PrometheusRemoteWriteServer}; // pub use victoriametrics_remote_write::{ // VictoriaMetricsRemoteWriteConfig, VictoriaMetricsRemoteWriteServer, diff --git a/asap-query-engine/src/drivers/ingest/otel.rs b/asap-query-engine/src/drivers/ingest/otel.rs index d9fbafc..391776d 100644 --- a/asap-query-engine/src/drivers/ingest/otel.rs +++ b/asap-query-engine/src/drivers/ingest/otel.rs @@ -1,4 +1,4 @@ -//! OTLP ingest driver (phase 1). +//! OTLP ingest driver. //! //! Accepts OTLP metrics via gRPC (4317) and HTTP (4318, POST /v1/metrics), //! parses ExportMetricsServiceRequest, logs counts at DEBUG, and leaves @@ -20,20 +20,20 @@ use prost::Message; use tonic::{Request, Response, Status}; use tracing::{debug, error, info}; -/// Configuration for the OTLP ingest server. +/// Configuration for the OTLP ingest consumer. #[derive(Debug, Clone)] -pub struct OtelIngestConfig { +pub struct OtlpConsumerConfig { pub grpc_port: u16, pub http_port: u16, } -/// OTLP ingest server that accepts metrics via gRPC and HTTP. -pub struct OtelIngestServer { - config: OtelIngestConfig, +/// OTLP consumer that accepts metrics via gRPC and HTTP. +pub struct OtlpConsumer { + config: OtlpConsumerConfig, } -impl OtelIngestServer { - pub fn new(config: OtelIngestConfig) -> Self { +impl OtlpConsumer { + pub fn new(config: OtlpConsumerConfig) -> Self { Self { config } } @@ -109,7 +109,7 @@ fn process_otlp_request(request: &ExportMetricsServiceRequest) { "OTLP ingest: received {} resource metrics, {} total data points", resource_count, total_points ); - // TODO(otel-phase1): Pass metrics to precompute engine. For Arroyo pipeline: produce to Kafka topic (e.g. otel_metrics) in JSON format { name, labels, timestamp, value }. See asap-otel-ingest for conversion and produce logic. + // TODO: Pass metrics to precompute engine. } /// Count total data points by traversing resource_metrics -> scope_metrics -> metrics. diff --git a/asap-query-engine/src/drivers/mod.rs b/asap-query-engine/src/drivers/mod.rs index 80f79a4..4f48ad1 100644 --- a/asap-query-engine/src/drivers/mod.rs +++ b/asap-query-engine/src/drivers/mod.rs @@ -2,5 +2,5 @@ pub mod ingest; pub mod query; // Re-export commonly used types for convenience -pub use ingest::{KafkaConsumer, KafkaConsumerConfig, OtelIngestConfig, OtelIngestServer}; +pub use ingest::{KafkaConsumer, KafkaConsumerConfig, OtlpConsumer, OtlpConsumerConfig}; pub use query::{AdapterConfig, HttpServer, HttpServerConfig}; diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index 85272f4..c1d894e 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -25,8 +25,8 @@ pub use stores::{SimpleMapStore, Store, StoreResult}; pub use engines::{InstantVector, QueryResult, SimpleEngine}; pub use drivers::{ - HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtelIngestConfig, - OtelIngestServer, + HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpConsumer, + OtlpConsumerConfig, }; pub use utils::{normalize_spatial_filter, read_inference_config, read_streaming_config}; diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index e0c7a3e..c30dc5e 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -9,8 +9,8 @@ use query_engine_rust::data_model::enums::{InputFormat, LockStrategy, StreamingE use query_engine_rust::drivers::AdapterConfig; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::{ - HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtelIngestConfig, - OtelIngestServer, Result, SimpleEngine, SimpleMapStore, + HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpConsumer, + OtlpConsumerConfig, Result, SimpleEngine, SimpleMapStore, }; #[derive(Parser, Debug)] @@ -233,18 +233,18 @@ async fn main() -> Result<()> { // Setup OTLP ingest server let otel_handle = if args.enable_otel_ingest { - let otel_config = OtelIngestConfig { + let otel_config = OtlpConsumerConfig { grpc_port: args.otel_grpc_port, http_port: args.otel_http_port, }; - let server = OtelIngestServer::new(otel_config); + let consumer = OtlpConsumer::new(otel_config); info!( - "Starting OTLP ingest server (gRPC port {}, HTTP port {})", + "Starting OTLP consumer (gRPC port {}, HTTP port {})", args.otel_grpc_port, args.otel_http_port ); Some(tokio::spawn(async move { - if let Err(e) = server.run().await { - error!("OTLP ingest server error: {}", e); + if let Err(e) = consumer.run().await { + error!("OTLP consumer error: {}", e); } })) } else { @@ -316,7 +316,7 @@ async fn main() -> Result<()> { } if let Some(handle) = otel_handle { - info!("Shutting down OTLP ingest server..."); + info!("Shutting down OTLP consumer..."); handle.abort(); let _ = handle.await; } From 735f6be56c494cf0a6de9f8dd774aaf9fcaa7a47 Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Tue, 10 Mar 2026 22:18:24 -0400 Subject: [PATCH 3/5] Fix Formatting Errors --- asap-query-engine/src/drivers/ingest/otel.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/asap-query-engine/src/drivers/ingest/otel.rs b/asap-query-engine/src/drivers/ingest/otel.rs index 391776d..f65ea10 100644 --- a/asap-query-engine/src/drivers/ingest/otel.rs +++ b/asap-query-engine/src/drivers/ingest/otel.rs @@ -6,14 +6,10 @@ use std::collections::HashMap; -use axum::{ - body::Bytes, - extract::State, - routing::post, - Json, Router, -}; +use axum::{body::Bytes, extract::State, routing::post, Json, Router}; use opentelemetry_proto::tonic::collector::metrics::v1::{ - metrics_service_server::MetricsService, ExportMetricsServiceRequest, ExportMetricsServiceResponse, + metrics_service_server::MetricsService, ExportMetricsServiceRequest, + ExportMetricsServiceResponse, }; use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberValue; use prost::Message; @@ -96,8 +92,12 @@ async fn handle_otlp_http( State(_state): State<()>, body: Bytes, ) -> Result, (axum::http::StatusCode, String)> { - let req = ExportMetricsServiceRequest::decode(body.as_ref()) - .map_err(|e| (axum::http::StatusCode::BAD_REQUEST, format!("Protobuf decode error: {}", e)))?; + let req = ExportMetricsServiceRequest::decode(body.as_ref()).map_err(|e| { + ( + axum::http::StatusCode::BAD_REQUEST, + format!("Protobuf decode error: {}", e), + ) + })?; process_otlp_request(&req); Ok(Json(serde_json::json!({"rejected": 0}))) } @@ -109,7 +109,7 @@ fn process_otlp_request(request: &ExportMetricsServiceRequest) { "OTLP ingest: received {} resource metrics, {} total data points", resource_count, total_points ); - // TODO: Pass metrics to precompute engine. + // TODO: Pass metrics to precompute engine. } /// Count total data points by traversing resource_metrics -> scope_metrics -> metrics. From 11bf3cd519444b869a2721991135f330e3a40014 Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Wed, 11 Mar 2026 13:50:57 -0400 Subject: [PATCH 4/5] Renaming --- asap-query-engine/src/drivers/ingest/mod.rs | 2 +- asap-query-engine/src/drivers/ingest/otel.rs | 14 +++++++------- asap-query-engine/src/drivers/mod.rs | 2 +- asap-query-engine/src/lib.rs | 4 ++-- asap-query-engine/src/main.rs | 18 +++++++++--------- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/asap-query-engine/src/drivers/ingest/mod.rs b/asap-query-engine/src/drivers/ingest/mod.rs index a9d3f65..3d8dd9d 100644 --- a/asap-query-engine/src/drivers/ingest/mod.rs +++ b/asap-query-engine/src/drivers/ingest/mod.rs @@ -4,7 +4,7 @@ pub mod prometheus_remote_write; pub mod victoriametrics_remote_write; pub use kafka::{KafkaConsumer, KafkaConsumerConfig}; -pub use otel::{OtlpConsumer, OtlpConsumerConfig}; +pub use otel::{OtlpReceiver, OtlpReceiverConfig}; // pub use prometheus_remote_write::{PrometheusRemoteWriteConfig, PrometheusRemoteWriteServer}; // pub use victoriametrics_remote_write::{ // VictoriaMetricsRemoteWriteConfig, VictoriaMetricsRemoteWriteServer, diff --git a/asap-query-engine/src/drivers/ingest/otel.rs b/asap-query-engine/src/drivers/ingest/otel.rs index f65ea10..408fb9d 100644 --- a/asap-query-engine/src/drivers/ingest/otel.rs +++ b/asap-query-engine/src/drivers/ingest/otel.rs @@ -16,20 +16,20 @@ use prost::Message; use tonic::{Request, Response, Status}; use tracing::{debug, error, info}; -/// Configuration for the OTLP ingest consumer. +/// Configuration for the OTLP receiver. #[derive(Debug, Clone)] -pub struct OtlpConsumerConfig { +pub struct OtlpReceiverConfig { pub grpc_port: u16, pub http_port: u16, } -/// OTLP consumer that accepts metrics via gRPC and HTTP. -pub struct OtlpConsumer { - config: OtlpConsumerConfig, +/// OTLP receiver that accepts metrics via gRPC and HTTP. +pub struct OtlpReceiver { + config: OtlpReceiverConfig, } -impl OtlpConsumer { - pub fn new(config: OtlpConsumerConfig) -> Self { +impl OtlpReceiver { + pub fn new(config: OtlpReceiverConfig) -> Self { Self { config } } diff --git a/asap-query-engine/src/drivers/mod.rs b/asap-query-engine/src/drivers/mod.rs index 4f48ad1..a5f3e00 100644 --- a/asap-query-engine/src/drivers/mod.rs +++ b/asap-query-engine/src/drivers/mod.rs @@ -2,5 +2,5 @@ pub mod ingest; pub mod query; // Re-export commonly used types for convenience -pub use ingest::{KafkaConsumer, KafkaConsumerConfig, OtlpConsumer, OtlpConsumerConfig}; +pub use ingest::{KafkaConsumer, KafkaConsumerConfig, OtlpReceiver, OtlpReceiverConfig}; pub use query::{AdapterConfig, HttpServer, HttpServerConfig}; diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index c1d894e..22295ed 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -25,8 +25,8 @@ pub use stores::{SimpleMapStore, Store, StoreResult}; pub use engines::{InstantVector, QueryResult, SimpleEngine}; pub use drivers::{ - HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpConsumer, - OtlpConsumerConfig, + HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpReceiver, + OtlpReceiverConfig, }; pub use utils::{normalize_spatial_filter, read_inference_config, read_streaming_config}; diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index c30dc5e..a950fba 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -9,8 +9,8 @@ use query_engine_rust::data_model::enums::{InputFormat, LockStrategy, StreamingE use query_engine_rust::drivers::AdapterConfig; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::{ - HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpConsumer, - OtlpConsumerConfig, Result, SimpleEngine, SimpleMapStore, + HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpReceiver, + OtlpReceiverConfig, Result, SimpleEngine, SimpleMapStore, }; #[derive(Parser, Debug)] @@ -231,20 +231,20 @@ async fn main() -> Result<()> { } }; - // Setup OTLP ingest server + // Setup OTLP receiver let otel_handle = if args.enable_otel_ingest { - let otel_config = OtlpConsumerConfig { + let otel_config = OtlpReceiverConfig { grpc_port: args.otel_grpc_port, http_port: args.otel_http_port, }; - let consumer = OtlpConsumer::new(otel_config); + let receiver = OtlpReceiver::new(otel_config); info!( - "Starting OTLP consumer (gRPC port {}, HTTP port {})", + "Starting OTLP receiver (gRPC port {}, HTTP port {})", args.otel_grpc_port, args.otel_http_port ); Some(tokio::spawn(async move { - if let Err(e) = consumer.run().await { - error!("OTLP consumer error: {}", e); + if let Err(e) = receiver.run().await { + error!("OTLP receiver error: {}", e); } })) } else { @@ -316,7 +316,7 @@ async fn main() -> Result<()> { } if let Some(handle) = otel_handle { - info!("Shutting down OTLP consumer..."); + info!("Shutting down OTLP receiver..."); handle.abort(); let _ = handle.await; } From d4d036ad73e158010d9ef28a2e224cf93f4922e4 Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Wed, 11 Mar 2026 19:38:53 -0400 Subject: [PATCH 5/5] Convert the recieved data --- asap-query-engine/src/drivers/ingest/otel.rs | 148 ++++++++++++++++++- 1 file changed, 145 insertions(+), 3 deletions(-) diff --git a/asap-query-engine/src/drivers/ingest/otel.rs b/asap-query-engine/src/drivers/ingest/otel.rs index 408fb9d..6fc8772 100644 --- a/asap-query-engine/src/drivers/ingest/otel.rs +++ b/asap-query-engine/src/drivers/ingest/otel.rs @@ -102,6 +102,15 @@ async fn handle_otlp_http( Ok(Json(serde_json::json!({"rejected": 0}))) } +/// A parsed metric data point: name, labels, timestamp (nanos), and numeric value. +#[derive(Debug)] +pub struct MetricPoint { + pub name: String, + pub labels: HashMap, + pub timestamp_nanos: u64, + pub value: f64, +} + fn process_otlp_request(request: &ExportMetricsServiceRequest) { let resource_count = request.resource_metrics.len(); let total_points = otlp_to_record_count(request); @@ -109,6 +118,14 @@ fn process_otlp_request(request: &ExportMetricsServiceRequest) { "OTLP ingest: received {} resource metrics, {} total data points", resource_count, total_points ); + + let points = otlp_to_metric_points(request); + if let Some(first) = points.first() { + debug!( + "OTLP parse example: {} {:?} @{}ns = {}", + first.name, first.labels, first.timestamp_nanos, first.value + ); + } // TODO: Pass metrics to precompute engine. } @@ -160,7 +177,134 @@ fn otlp_to_record_count(request: &ExportMetricsServiceRequest) -> usize { count } -#[allow(dead_code)] +/// Parse OTLP request and convert to metric data points (name, labels, timestamp, value). +fn otlp_to_metric_points(request: &ExportMetricsServiceRequest) -> Vec { + let mut points = Vec::new(); + for resource_metrics in &request.resource_metrics { + let resource_attrs = resource_metrics + .resource + .as_ref() + .map(|r| attributes_to_map(&r.attributes)) + .unwrap_or_default(); + + for scope_metrics in &resource_metrics.scope_metrics { + let scope_attrs = scope_metrics + .scope + .as_ref() + .map(|s| attributes_to_map(&s.attributes)) + .unwrap_or_default(); + + for metric in &scope_metrics.metrics { + if metric.name.is_empty() { + continue; + } + + let base_labels: HashMap = scope_attrs + .iter() + .chain(resource_attrs.iter()) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + use opentelemetry_proto::tonic::metrics::v1::metric::Data; + match &metric.data { + Some(Data::Gauge(g)) => { + for dp in &g.data_points { + let labels = merge_point_attributes(&base_labels, &dp.attributes); + let value = number_value_to_f64(&dp.value); + points.push(MetricPoint { + name: metric.name.clone(), + labels, + timestamp_nanos: dp.time_unix_nano, + value, + }); + } + } + Some(Data::Sum(s)) => { + for dp in &s.data_points { + let labels = merge_point_attributes(&base_labels, &dp.attributes); + let value = number_value_to_f64(&dp.value); + points.push(MetricPoint { + name: metric.name.clone(), + labels, + timestamp_nanos: dp.time_unix_nano, + value, + }); + } + } + Some(Data::Histogram(hist)) => { + for dp in &hist.data_points { + let labels = merge_point_attributes(&base_labels, &dp.attributes); + if let Some(sum) = dp.sum { + points.push(MetricPoint { + name: format!("{}_sum", metric.name), + labels: labels.clone(), + timestamp_nanos: dp.time_unix_nano, + value: sum, + }); + } + points.push(MetricPoint { + name: format!("{}_count", metric.name), + labels: labels.clone(), + timestamp_nanos: dp.time_unix_nano, + value: dp.count as f64, + }); + } + } + Some(Data::ExponentialHistogram(eh)) => { + for dp in &eh.data_points { + let labels = merge_point_attributes(&base_labels, &dp.attributes); + if let Some(sum) = dp.sum { + points.push(MetricPoint { + name: format!("{}_sum", metric.name), + labels: labels.clone(), + timestamp_nanos: dp.time_unix_nano, + value: sum, + }); + } + points.push(MetricPoint { + name: format!("{}_count", metric.name), + labels: labels.clone(), + timestamp_nanos: dp.time_unix_nano, + value: dp.count as f64, + }); + } + } + Some(Data::Summary(sm)) => { + for dp in &sm.data_points { + let labels = merge_point_attributes(&base_labels, &dp.attributes); + points.push(MetricPoint { + name: format!("{}_sum", metric.name), + labels: labels.clone(), + timestamp_nanos: dp.time_unix_nano, + value: dp.sum, + }); + points.push(MetricPoint { + name: format!("{}_count", metric.name), + labels: labels.clone(), + timestamp_nanos: dp.time_unix_nano, + value: dp.count as f64, + }); + } + } + None => {} + } + } + } + } + points +} + +fn merge_point_attributes( + base: &HashMap, + attrs: &[opentelemetry_proto::tonic::common::v1::KeyValue], +) -> HashMap { + let mut m = base.clone(); + for (k, v) in attributes_to_map(attrs) { + m.insert(k, v); + } + m +} + fn number_value_to_f64(v: &Option) -> f64 { match v { Some(NumberValue::AsDouble(x)) => *x, @@ -169,7 +313,6 @@ fn number_value_to_f64(v: &Option) -> f64 { } } -#[allow(dead_code)] fn any_value_to_string(v: &opentelemetry_proto::tonic::common::v1::AnyValue) -> String { use opentelemetry_proto::tonic::common::v1::any_value::Value as AnyValueVariant; match &v.value { @@ -182,7 +325,6 @@ fn any_value_to_string(v: &opentelemetry_proto::tonic::common::v1::AnyValue) -> } } -#[allow(dead_code)] fn attributes_to_map( attrs: &[opentelemetry_proto::tonic::common::v1::KeyValue], ) -> HashMap {