Skip to content

Otel Ingest Driver#179

Merged
zzylol merged 5 commits intomainfrom
otel-ingestion
Mar 18, 2026
Merged

Otel Ingest Driver#179
zzylol merged 5 commits intomainfrom
otel-ingestion

Conversation

@GnaneshGnani
Copy link
Copy Markdown
Contributor

@GnaneshGnani GnaneshGnani commented Mar 11, 2026

OTLP Ingest Driver - Design Document

Overview

This document describes the design and implementation of the OTLP (OpenTelemetry Protocol) receiver in ASAP Query Engine. The receiver accepts metrics from OTLP clients (SDKs, OpenTelemetry Collector, test generators) via standard gRPC and HTTP protocols.

Why OtlpReceiver

Based on industry research, OtlpReceiver was chosen as the component name.

Rationale

Industry Standard: The term "OTLP Receiver" is consistently used across the OpenTelemetry ecosystem:

  1. OpenTelemetry Collector: Official component is called "OTLP Receiver"

  2. Prometheus: Uses "receiver" terminology

  3. VictoriaMetrics: While using "opentelemetry" in package names, their handlers accept/receive OTLP data

  4. Technical Documentation: Industry guides consistently use "OTLP receiver"

Alternatives Considered

  • OtlpConsumer: Misleading - implies pull-based (like Kafka), but OTLP is push-based
  • OtlpIngestServer: Matches our internal patterns but deviates from ecosystem conventions

Architecture

Component Overview

OTLP Clients                     ASAP Query Engine
┌─────────────────┐             ┌──────────────────────┐
│ OTLP SDK        │────gRPC─────│                      │
│ OTEL Collector  │   :4317     │   OtlpReceiver       │──┐
│ otlp_exporter   │────HTTP─────│   - Parse OTLP       │  │
└─────────────────┘   :4318     │   - Count metrics    │  │
                      POST       │   - Log at DEBUG     │  │
                      /v1/metrics│                      │  │
                                 └──────────────────────┘  │
                                                           │
                                 ┌──────────────────────┐  │
                                 │ TODO: Forward to     │◄─┘
                                 │ precompute engine    │
                                 └──────────────────────┘

Protocol Support

gRPC (Port 4317)

  • Implements MetricsService::export from opentelemetry_proto
  • Accepts ExportMetricsServiceRequest
  • Returns ExportMetricsServiceResponse

HTTP (Port 4318)

  • Route: POST /v1/metrics
  • Content: Protobuf-encoded ExportMetricsServiceRequest
  • Response: JSON {"rejected": 0}

Note: The /v1/metrics path is the OTLP specification default and is auto-appended by the OpenTelemetry Collector. Reference: VictoriaMetrics PR #5871

Implementation Details

File Structure

asap-query-engine/src/drivers/ingest/
├── kafka.rs                  # Existing Kafka consumer
├── otel.rs                   # New OTLP receiver (this design)
├── prometheus_remote_write.rs
├── victoriametrics_remote_write.rs
└── mod.rs                    # Exports OtlpReceiver

Core Types

pub struct OtlpReceiverConfig {
    pub grpc_port: u16,    // Default: 4317
    pub http_port: u16,    // Default: 4318
}

pub struct OtlpReceiver {
    config: OtlpReceiverConfig,
}

impl OtlpReceiver {
    pub fn new(config: OtlpReceiverConfig) -> Self;
    pub async fn run(&self) -> Result<...>;
}

Request Processing

Both gRPC and HTTP handlers call process_otlp_request:

fn process_otlp_request(request: &ExportMetricsServiceRequest) {
    let resource_count = request.resource_metrics.len();
    let total_points = otlp_to_record_count(request);
    debug!(
        "OTLP ingest: received {} resource metrics, {} total data points",
        resource_count, total_points
    );
    // TODO: Pass metrics to precompute engine.
}

Metric Counting

The otlp_to_record_count function traverses the OTLP hierarchy:

  • resource_metrics[] - Metrics grouped by resource (host, service)
  • scope_metrics[] - Metrics grouped by instrumentation scope
  • metrics[] - Individual metric definitions
  • data_points[] - Time series samples

Supported OTLP metric types:

  • Gauge - Instantaneous measurements
  • Sum - Cumulative or delta counters
  • Histogram - Distribution with explicit buckets (emits _sum, _count, _bucket)
  • ExponentialHistogram - Distribution with exponential buckets (emits _sum, _count, _scale)
  • Summary - Pre-computed quantiles (emits _sum, _count, quantile values)

Configuration

CLI Flags

--enable-otel-ingest       # Enable OTLP receiver
--otel-grpc-port 4317      # gRPC listen port (default: 4317)
--otel-http-port 4318      # HTTP listen port (default: 4318)

Example Usage

./target/release/query_engine_rust \
  --kafka-topic flink_output \
  --input-format json \
  --config inference_config.yaml \
  --streaming-config streaming_config.yaml \
  --streaming-engine flink \
  --output-dir /tmp/query_engine \
  --query-language promql \
  --lock-strategy per-key \
  --enable-otel-ingest \
  --otel-grpc-port 4317 \
  --otel-http-port 4318

With debug logging:

RUST_LOG=query_engine_rust=debug ./target/release/query_engine_rust ...

OpenTelemetry Collector Integration

Collector Configuration

To send metrics from an OpenTelemetry Collector to ASAP Query Engine:

exporters:
  # HTTP exporter (recommended)
  otlphttp:
    endpoint: "http://asap-query-engine:4318"
    compression: gzip
    timeout: 30s
    retry_on_failure:
      enabled: true
      initial_interval: 5s
      max_interval: 30s

  # Alternative: gRPC exporter
  otlp:
    endpoint: "asap-query-engine:4317"
    tls:
      insecure: true  # Dev only; use proper TLS in production

service:
  pipelines:
    metrics:
      receivers: [otlp]
      processors: [batch]
      exporters: [otlphttp]  # or [otlp] for gRPC

Key Points:

Testing

With otlp_exporter

# Build test generator
cargo build --release -p otlp_exporter

# Send synthetic metrics to OTLP receiver
./target/release/otlp_exporter \
  --endpoint http://localhost:4317 \
  --num-services 2 \
  --num-methods 2 \
  --interval-ms 1000 \
  --iterations 10

Expected Debug Output

2026-03-11T01:25:12.373943Z DEBUG query_engine_rust::drivers::ingest::otel: OTLP ingest: received 1 resource metrics, 187 total data points
2026-03-11T01:25:12.873032Z DEBUG query_engine_rust::drivers::ingest::otel: OTLP ingest: received 1 resource metrics, 261 total data points

Dependencies

Added to asap-query-engine/Cargo.toml:

opentelemetry-proto = { version = "0.28", features = ["gen-tonic", "gen-tonic-messages", "metrics"] }
tonic = "0.12"
tokio-stream = "0.1"

What We Implemented

The OTLP receiver accepts metrics via gRPC (port 4317) and HTTP (port 4318, POST /v1/metrics). It parses ExportMetricsServiceRequest, counts resource metrics and data points, and logs them at DEBUG level.

TODO: Forward received metrics to the precompute engine (handoff point is in code).

@GnaneshGnani GnaneshGnani requested a review from zzylol March 11, 2026 02:14
@GnaneshGnani GnaneshGnani linked an issue Mar 11, 2026 that may be closed by this pull request
@GnaneshGnani GnaneshGnani self-assigned this Mar 11, 2026
@zzylol
Copy link
Copy Markdown
Contributor

zzylol commented Mar 15, 2026

@milindsrivastava1997 This PR is also a separate ingest driver from the current Kafka Arroyo path. So once ready, should be no harm to merge into main.

@zzylol zzylol merged commit 8ddb32f into main Mar 18, 2026
14 checks passed
@zzylol zzylol deleted the otel-ingestion branch March 18, 2026 02:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add OpenTelemetry ingestion connector

3 participants