Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 231 additions & 33 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions asap-query-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ datafusion = "43"
arrow = "53.4.1"
futures = "0.3"
prost = "0.13"
opentelemetry-proto = { version = "0.28", features = ["gen-tonic", "gen-tonic-messages", "metrics"] }
tonic = "0.12"
tokio-stream = "0.1"
snap = "1"
regex = "1"
prometheus = "0.13"
Expand Down
2 changes: 2 additions & 0 deletions asap-query-engine/src/drivers/ingest/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub mod kafka;
pub mod otel;
pub mod prometheus_remote_write;
pub mod victoriametrics_remote_write;

pub use kafka::{KafkaConsumer, KafkaConsumerConfig};
pub use otel::{OtlpReceiver, OtlpReceiverConfig};
// pub use prometheus_remote_write::{PrometheusRemoteWriteConfig, PrometheusRemoteWriteServer};
// pub use victoriametrics_remote_write::{
// VictoriaMetricsRemoteWriteConfig, VictoriaMetricsRemoteWriteServer,
Expand Down
344 changes: 344 additions & 0 deletions asap-query-engine/src/drivers/ingest/otel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,344 @@
//! OTLP ingest driver.
//!
//! Accepts OTLP metrics via gRPC (4317) and HTTP (4318, POST /v1/metrics),
//! parses ExportMetricsServiceRequest, logs counts at DEBUG, and leaves
//! handoff to precompute engine as TODO.

use std::collections::HashMap;

use axum::{body::Bytes, extract::State, routing::post, Json, Router};
use opentelemetry_proto::tonic::collector::metrics::v1::{
metrics_service_server::MetricsService, ExportMetricsServiceRequest,
ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberValue;
use prost::Message;
use tonic::{Request, Response, Status};
use tracing::{debug, error, info};

/// Configuration for the OTLP receiver.
#[derive(Debug, Clone)]
pub struct OtlpReceiverConfig {
pub grpc_port: u16,
pub http_port: u16,
}

/// OTLP receiver that accepts metrics via gRPC and HTTP.
pub struct OtlpReceiver {
config: OtlpReceiverConfig,
}

impl OtlpReceiver {
pub fn new(config: OtlpReceiverConfig) -> Self {
Self { config }
}

pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let grpc_addr = std::net::SocketAddr::from(([0, 0, 0, 0], self.config.grpc_port));
let http_addr = std::net::SocketAddr::from(([0, 0, 0, 0], self.config.http_port));

let grpc_svc = MetricsServiceImpl;
let grpc_svc =
opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(
grpc_svc,
);

let app = Router::new()
.route("/v1/metrics", post(handle_otlp_http))
.with_state(());

let grpc_listener = tokio::net::TcpListener::bind(grpc_addr).await?;
let http_listener = tokio::net::TcpListener::bind(http_addr).await?;

info!("OTLP gRPC listening on {}", grpc_addr);
info!("OTLP HTTP listening on {} (POST /v1/metrics)", http_addr);

tokio::select! {
r = tonic::transport::Server::builder()
.add_service(grpc_svc)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(grpc_listener)) => {
if let Err(e) = r {
error!("OTLP gRPC server error: {}", e);
}
}
r = axum::serve(http_listener, app) => {
if let Err(e) = r {
error!("OTLP HTTP server error: {}", e);
}
}
}

Ok(())
}
}

struct MetricsServiceImpl;

#[tonic::async_trait]
impl MetricsService for MetricsServiceImpl {
async fn export(
&self,
request: Request<ExportMetricsServiceRequest>,
) -> Result<Response<ExportMetricsServiceResponse>, Status> {
let req = request.into_inner();
process_otlp_request(&req);
Ok(Response::new(ExportMetricsServiceResponse {
partial_success: None,
}))
}
}

async fn handle_otlp_http(
State(_state): State<()>,
body: Bytes,
) -> Result<Json<serde_json::Value>, (axum::http::StatusCode, String)> {
let req = ExportMetricsServiceRequest::decode(body.as_ref()).map_err(|e| {
(
axum::http::StatusCode::BAD_REQUEST,
format!("Protobuf decode error: {}", e),
)
})?;
process_otlp_request(&req);
Ok(Json(serde_json::json!({"rejected": 0})))
}

/// A parsed metric data point: name, labels, timestamp (nanos), and numeric value.
#[derive(Debug)]
pub struct MetricPoint {
pub name: String,
pub labels: HashMap<String, String>,
pub timestamp_nanos: u64,
pub value: f64,
}

fn process_otlp_request(request: &ExportMetricsServiceRequest) {
let resource_count = request.resource_metrics.len();
let total_points = otlp_to_record_count(request);
debug!(
"OTLP ingest: received {} resource metrics, {} total data points",
resource_count, total_points
);

let points = otlp_to_metric_points(request);
if let Some(first) = points.first() {
debug!(
"OTLP parse example: {} {:?} @{}ns = {}",
first.name, first.labels, first.timestamp_nanos, first.value
);
}
// TODO: Pass metrics to precompute engine.
}

/// Count total data points by traversing resource_metrics -> scope_metrics -> metrics.
/// Reuses the same conversion traversal as asap-otel-ingest (Gauge, Sum, Histogram, etc.).
fn otlp_to_record_count(request: &ExportMetricsServiceRequest) -> usize {
let mut count = 0;
for resource_metrics in &request.resource_metrics {
for scope_metrics in &resource_metrics.scope_metrics {
for metric in &scope_metrics.metrics {
if metric.name.is_empty() {
continue;
}

use opentelemetry_proto::tonic::metrics::v1::metric::Data;
match &metric.data {
Some(Data::Gauge(g)) => count += g.data_points.len(),
Some(Data::Sum(s)) => count += s.data_points.len(),
Some(Data::Histogram(hist)) => {
for dp in &hist.data_points {
if dp.sum.is_some() {
count += 1;
}
count += 1; // _count
count += dp.bucket_counts.len(); // _bucket per le
}
}
Some(Data::ExponentialHistogram(eh)) => {
for dp in &eh.data_points {
if dp.sum.is_some() {
count += 1;
}
count += 1; // _count
count += 1; // _scale
}
}
Some(Data::Summary(summary)) => {
for dp in &summary.data_points {
count += 1; // _sum
count += 1; // _count
count += dp.quantile_values.len();
}
}
None => {}
}
}
}
}
count
}

/// Parse OTLP request and convert to metric data points (name, labels, timestamp, value).
fn otlp_to_metric_points(request: &ExportMetricsServiceRequest) -> Vec<MetricPoint> {
let mut points = Vec::new();
for resource_metrics in &request.resource_metrics {
let resource_attrs = resource_metrics
.resource
.as_ref()
.map(|r| attributes_to_map(&r.attributes))
.unwrap_or_default();

for scope_metrics in &resource_metrics.scope_metrics {
let scope_attrs = scope_metrics
.scope
.as_ref()
.map(|s| attributes_to_map(&s.attributes))
.unwrap_or_default();

for metric in &scope_metrics.metrics {
if metric.name.is_empty() {
continue;
}

let base_labels: HashMap<String, String> = scope_attrs
.iter()
.chain(resource_attrs.iter())
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

use opentelemetry_proto::tonic::metrics::v1::metric::Data;
match &metric.data {
Some(Data::Gauge(g)) => {
for dp in &g.data_points {
let labels = merge_point_attributes(&base_labels, &dp.attributes);
let value = number_value_to_f64(&dp.value);
points.push(MetricPoint {
name: metric.name.clone(),
labels,
timestamp_nanos: dp.time_unix_nano,
value,
});
}
}
Some(Data::Sum(s)) => {
for dp in &s.data_points {
let labels = merge_point_attributes(&base_labels, &dp.attributes);
let value = number_value_to_f64(&dp.value);
points.push(MetricPoint {
name: metric.name.clone(),
labels,
timestamp_nanos: dp.time_unix_nano,
value,
});
}
}
Some(Data::Histogram(hist)) => {
for dp in &hist.data_points {
let labels = merge_point_attributes(&base_labels, &dp.attributes);
if let Some(sum) = dp.sum {
points.push(MetricPoint {
name: format!("{}_sum", metric.name),
labels: labels.clone(),
timestamp_nanos: dp.time_unix_nano,
value: sum,
});
}
points.push(MetricPoint {
name: format!("{}_count", metric.name),
labels: labels.clone(),
timestamp_nanos: dp.time_unix_nano,
value: dp.count as f64,
});
}
}
Some(Data::ExponentialHistogram(eh)) => {
for dp in &eh.data_points {
let labels = merge_point_attributes(&base_labels, &dp.attributes);
if let Some(sum) = dp.sum {
points.push(MetricPoint {
name: format!("{}_sum", metric.name),
labels: labels.clone(),
timestamp_nanos: dp.time_unix_nano,
value: sum,
});
}
points.push(MetricPoint {
name: format!("{}_count", metric.name),
labels: labels.clone(),
timestamp_nanos: dp.time_unix_nano,
value: dp.count as f64,
});
}
}
Some(Data::Summary(sm)) => {
for dp in &sm.data_points {
let labels = merge_point_attributes(&base_labels, &dp.attributes);
points.push(MetricPoint {
name: format!("{}_sum", metric.name),
labels: labels.clone(),
timestamp_nanos: dp.time_unix_nano,
value: dp.sum,
});
points.push(MetricPoint {
name: format!("{}_count", metric.name),
labels: labels.clone(),
timestamp_nanos: dp.time_unix_nano,
value: dp.count as f64,
});
}
}
None => {}
}
}
}
}
points
}

fn merge_point_attributes(
base: &HashMap<String, String>,
attrs: &[opentelemetry_proto::tonic::common::v1::KeyValue],
) -> HashMap<String, String> {
let mut m = base.clone();
for (k, v) in attributes_to_map(attrs) {
m.insert(k, v);
}
m
}

fn number_value_to_f64(v: &Option<NumberValue>) -> f64 {
match v {
Some(NumberValue::AsDouble(x)) => *x,
Some(NumberValue::AsInt(x)) => *x as f64,
None => 0.0,
}
}

fn any_value_to_string(v: &opentelemetry_proto::tonic::common::v1::AnyValue) -> String {
use opentelemetry_proto::tonic::common::v1::any_value::Value as AnyValueVariant;
match &v.value {
Some(AnyValueVariant::StringValue(s)) => s.clone(),
Some(AnyValueVariant::IntValue(i)) => i.to_string(),
Some(AnyValueVariant::DoubleValue(d)) => d.to_string(),
Some(AnyValueVariant::BoolValue(b)) => b.to_string(),
Some(AnyValueVariant::BytesValue(bytes)) => format!("{:?}", bytes),
_ => String::new(),
}
}

fn attributes_to_map(
attrs: &[opentelemetry_proto::tonic::common::v1::KeyValue],
) -> HashMap<String, String> {
let mut m = HashMap::new();
for kv in attrs {
let key = kv.key.clone();
let value = kv
.value
.as_ref()
.map(any_value_to_string)
.unwrap_or_default();
if !key.is_empty() {
m.insert(key, value);
}
}
m
}
2 changes: 1 addition & 1 deletion asap-query-engine/src/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ pub mod ingest;
pub mod query;

// Re-export commonly used types for convenience
pub use ingest::{KafkaConsumer, KafkaConsumerConfig};
pub use ingest::{KafkaConsumer, KafkaConsumerConfig, OtlpReceiver, OtlpReceiverConfig};
pub use query::{AdapterConfig, HttpServer, HttpServerConfig};
5 changes: 4 additions & 1 deletion asap-query-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ pub use stores::{SimpleMapStore, Store, StoreResult};

pub use engines::{InstantVector, QueryResult, SimpleEngine};

pub use drivers::{HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig};
pub use drivers::{
HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpReceiver,
OtlpReceiverConfig,
};

pub use utils::{normalize_spatial_filter, read_inference_config, read_streaming_config};

Expand Down
Loading
Loading