diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index 07c4801e7c..dc2c8dd621 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -68,6 +68,7 @@ pub struct TraceExporterConfig { process_tags: Option, test_session_token: Option, connection_timeout: Option, + otlp_endpoint: Option, } #[no_mangle] @@ -414,8 +415,37 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_connection_timeout( ) } +/// Enables OTLP HTTP/JSON export and sets the endpoint URL. +/// +/// When set, traces are sent to this URL in OTLP HTTP/JSON format instead of the Datadog +/// agent. The host language is responsible for resolving the endpoint from its configuration +/// (e.g. `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`) before calling this function. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint( + config: Option<&mut TraceExporterConfig>, + url: CharSlice, +) -> Option> { + catch_panic!( + if let Some(handle) = config { + handle.otlp_endpoint = match sanitize_string(url) { + Ok(s) => Some(s), + Err(e) => return Some(e), + }; + None + } else { + gen_error!(ErrorCode::InvalidArgument) + }, + gen_error!(ErrorCode::Panic) + ) +} + /// Create a new TraceExporter instance. /// +/// When `OTEL_TRACES_EXPORTER=otlp` is set in the environment, the exporter sends traces in +/// OTLP HTTP/JSON to the configured OTLP endpoint instead of the Datadog agent. The same +/// payload (e.g. MessagePack) is passed to `ddog_trace_exporter_send`; the library decodes +/// and converts to OTLP when OTLP is enabled. +/// /// # Arguments /// /// * `out_handle` - The handle to write the TraceExporter instance in. @@ -467,6 +497,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( builder.enable_health_metrics(); } + if let Some(ref url) = config.otlp_endpoint { + builder.set_otlp_endpoint(url); + } + match builder.build() { Ok(exporter) => { out_handle.as_ptr().write(Box::new(exporter)); diff --git a/libdd-data-pipeline/src/lib.rs b/libdd-data-pipeline/src/lib.rs index 57572cd97a..d4b924d1c2 100644 --- a/libdd-data-pipeline/src/lib.rs +++ b/libdd-data-pipeline/src/lib.rs @@ -12,6 +12,7 @@ pub mod agent_info; mod health_metrics; +pub mod otlp; mod pausable_worker; #[allow(missing_docs)] pub mod stats_exporter; diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs new file mode 100644 index 0000000000..eed20b34dc --- /dev/null +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -0,0 +1,35 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP trace export configuration. + +use std::time::Duration; + +/// OTLP trace export protocol. HTTP/JSON is currently supported. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub(crate) enum OtlpProtocol { + /// HTTP with JSON body (Content-Type: application/json). Default for HTTP. + #[default] + HttpJson, + /// HTTP with protobuf body. (Not supported yet) + HttpProtobuf, + /// gRPC. (Not supported yet) + Grpc, +} + +/// Default timeout for OTLP export requests. +pub const DEFAULT_OTLP_TIMEOUT: Duration = Duration::from_secs(10); + +/// Parsed OTLP trace exporter configuration. +#[derive(Clone, Debug)] +pub struct OtlpTraceConfig { + /// Full URL to POST traces to (e.g. `http://localhost:4318/v1/traces`). + pub endpoint_url: String, + /// Optional HTTP headers (key-value pairs). + pub headers: Vec<(String, String)>, + /// Request timeout. + pub timeout: Duration, + /// Protocol (for future use; currently only HttpJson is supported). + #[allow(dead_code)] + pub(crate) protocol: OtlpProtocol, +} diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs new file mode 100644 index 0000000000..2996c2b411 --- /dev/null +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -0,0 +1,78 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP HTTP/JSON trace exporter. + +use super::config::OtlpTraceConfig; +use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; +use libdd_common::{http_common, Endpoint, HttpClient}; +use libdd_trace_utils::send_with_retry::{ + send_with_retry, RetryBackoffType, RetryStrategy, SendWithRetryError, +}; +use std::collections::HashMap; + +/// Max total attempts for OTLP export (1 initial + up to 4 retries on transient failures). +const OTLP_MAX_ATTEMPTS: u32 = 5; +/// Initial backoff between retries (milliseconds). +const OTLP_RETRY_DELAY_MS: u64 = 100; + +/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries. +/// +/// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters. +/// +/// Note: dynamic OTLP headers from `OTEL_EXPORTER_OTLP_HEADERS` are not forwarded because +/// [`send_with_retry`] requires `&'static str` header keys. Support for arbitrary OTEL headers +/// would require the API to accept `HashMap`. +pub async fn send_otlp_traces_http( + client: &HttpClient, + config: &OtlpTraceConfig, + json_body: Vec, +) -> Result<(), TraceExporterError> { + let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { + TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( + "Invalid OTLP endpoint URL: {}", + e + ))) + })?; + + let target = Endpoint { + url, + timeout_ms: config.timeout.as_millis() as u64, + ..Endpoint::default() + }; + + let headers: HashMap<&'static str, String> = + HashMap::from([("Content-Type", "application/json".to_string())]); + + let retry_strategy = RetryStrategy::new( + OTLP_MAX_ATTEMPTS, + OTLP_RETRY_DELAY_MS, + RetryBackoffType::Exponential, + None, + ); + + match send_with_retry(client, &target, json_body, &headers, &retry_strategy).await { + Ok(_) => Ok(()), + Err(e) => Err(map_send_error(e).await), + } +} + +async fn map_send_error(err: SendWithRetryError) -> TraceExporterError { + match err { + SendWithRetryError::Http(response, _) => { + let status = response.status(); + let body_bytes = http_common::collect_response_bytes(response) + .await + .unwrap_or_default(); + let body_str = String::from_utf8_lossy(&body_bytes); + TraceExporterError::Request(RequestError::new(status, &body_str)) + } + SendWithRetryError::Timeout(_) => { + TraceExporterError::Io(std::io::Error::from(std::io::ErrorKind::TimedOut)) + } + SendWithRetryError::Network(error, _) => TraceExporterError::from(error), + SendWithRetryError::Build(_) => TraceExporterError::Internal( + InternalErrorKind::InvalidWorkerState("Failed to build OTLP request".to_string()), + ), + } +} diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs new file mode 100644 index 0000000000..542a754d40 --- /dev/null +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -0,0 +1,29 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP trace export for libdatadog. +//! +//! When an OTLP endpoint is configured via [`crate::trace_exporter::TraceExporterBuilder::set_otlp_endpoint`], +//! the trace exporter sends traces in OTLP HTTP/JSON format to that endpoint instead of the +//! Datadog agent. The host language is responsible for resolving the endpoint from its own +//! configuration (e.g. `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`). +//! +//! ## Sampling +//! +//! By default, the exporter does not apply its own sampling: it exports every trace it receives +//! from the tracer. The tracer (e.g. dd-trace-py) is responsible for inheriting the sampling +//! decision from the distributed trace context; when no decision is present, the tracer typically +//! uses 100% (always on). +//! +//! ## Partial flush +//! +//! For the POC, partial flush is disabled. The tracer should only invoke the exporter when all +//! spans from a local trace are closed (i.e. send complete trace chunks). This crate does not +//! buffer or flush partially—it exports whatever trace chunks it receives. + +pub mod config; +pub mod exporter; + +pub use config::OtlpTraceConfig; +pub use exporter::send_otlp_traces_http; +pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 4c96161bf8..d6783b70a7 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -2,6 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use crate::agent_info::AgentInfoFetcher; +use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; +use crate::otlp::OtlpTraceConfig; use crate::pausable_worker::PausableWorker; use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; @@ -51,6 +53,7 @@ pub struct TraceExporterBuilder { test_session_token: Option, agent_rates_payload_version_enabled: bool, connection_timeout: Option, + otlp_endpoint: Option, } impl TraceExporterBuilder { @@ -223,6 +226,18 @@ impl TraceExporterBuilder { self } + /// Enables OTLP HTTP/JSON export and sets the endpoint URL. + /// + /// When set, traces are sent to this endpoint in OTLP HTTP/JSON format instead of the + /// Datadog agent. The host language is responsible for resolving the endpoint from its + /// configuration (e.g. `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`) before calling this method. + /// + /// Example: `set_otlp_endpoint("http://localhost:4318/v1/traces")` + pub fn set_otlp_endpoint(&mut self, url: &str) -> &mut Self { + self.otlp_endpoint = Some(url.to_owned()); + self + } + #[allow(missing_docs)] pub fn build(self) -> Result { if !Self::is_inputs_outputs_formats_compatible(self.input_format, self.output_format) { @@ -345,6 +360,12 @@ impl TraceExporterBuilder { .agent_rates_payload_version_enabled .then(AgentResponsePayloadVersion::new), http_client: new_default_client(), + otlp_config: self.otlp_endpoint.map(|url| OtlpTraceConfig { + endpoint_url: url, + headers: vec![], + timeout: DEFAULT_OTLP_TIMEOUT, + protocol: OtlpProtocol::HttpJson, + }), }) } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 4169a89323..39fc31f8fe 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -15,6 +15,7 @@ use self::metrics::MetricsEmitter; use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; use crate::agent_info::{AgentInfoFetcher, ResponseObserver}; +use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig}; use crate::pausable_worker::PausableWorker; use crate::stats_exporter::StatsExporter; use crate::telemetry::{SendPayloadTelemetry, TelemetryClient}; @@ -205,6 +206,8 @@ pub struct TraceExporter { workers: Arc>, agent_payload_response_version: Option, http_client: HttpClient, + /// When set, traces are exported via OTLP HTTP/JSON instead of the Datadog agent. + otlp_config: Option, } impl TraceExporter { @@ -494,13 +497,13 @@ impl TraceExporter { } } - /// Send a list of trace chunks to the agent + /// Send a list of trace chunks to the agent (or OTLP endpoint when configured). /// /// # Arguments /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans. /// /// # Returns - /// * Ok(String): The response from the agent + /// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP) /// * Err(TraceExporterError): An error detailing what went wrong in the process pub fn send_trace_chunks( &self, @@ -511,13 +514,13 @@ impl TraceExporter { .block_on(async { self.send_trace_chunks_inner(trace_chunks).await }) } - /// Send a list of trace chunks to the agent, asynchronously + /// Send a list of trace chunks to the agent, asynchronously (or OTLP when configured). /// /// # Arguments /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans. /// /// # Returns - /// * Ok(String): The response from the agent + /// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP) /// * Err(TraceExporterError): An error detailing what went wrong in the process pub async fn send_trace_chunks_async( &self, @@ -527,6 +530,29 @@ impl TraceExporter { self.send_trace_chunks_inner(trace_chunks).await } + /// Sends trace chunks via OTLP HTTP/JSON when OTLP config is enabled. + async fn send_otlp_traces_inner( + &self, + traces: Vec>>, + config: &OtlpTraceConfig, + ) -> Result { + let resource_info = OtlpResourceInfo { + service: self.metadata.service.clone(), + env: self.metadata.env.clone(), + app_version: self.metadata.app_version.clone(), + language: self.metadata.language.clone(), + tracer_version: self.metadata.tracer_version.clone(), + runtime_id: self.metadata.runtime_id.clone(), + }; + let request = map_traces_to_otlp(traces, &resource_info); + let json_body = serde_json::to_vec(&request).map_err(|e| { + error!("OTLP JSON serialization error: {e}"); + TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string())) + })?; + send_otlp_traces_http(&self.http_client, config, json_body).await?; + Ok(AgentResponse::Unchanged) + } + /// Deserializes, processes and sends trace chunks to the agent fn send_deser( &self, @@ -595,7 +621,8 @@ impl TraceExporter { ) -> Result { let mut header_tags: TracerHeaderTags = self.metadata.borrow().into(); - // Process stats computation + // Process stats computation and drop non-sampled (p0) chunks. + // This must run before the OTLP path so that unsampled spans are not exported. let dropped_p0_stats = stats::process_traces_for_stats( &mut traces, &mut header_tags, @@ -603,6 +630,11 @@ impl TraceExporter { self.client_computed_top_level, ); + // OTLP path: send sampled traces via OTLP when OTEL_TRACES_EXPORTER=otlp. + if let Some(ref config) = self.otlp_config { + return self.send_otlp_traces_inner(traces, config).await; + } + let serializer = TraceSerializer::new( self.output_format, self.agent_payload_response_version.as_ref(), @@ -1866,6 +1898,55 @@ mod tests { assert_eq!(exporter.endpoint.timeout_ms, 42); } + #[test] + #[cfg_attr(miri, ignore)] + fn test_otlp_export_via_builder() { + let server = MockServer::start(); + let mock_otlp = server.mock(|when, then| { + when.method(POST) + .path("/v1/traces") + .header("Content-Type", "application/json"); + then.status(200).body(""); + }); + + let otlp_endpoint = format!("{}/v1/traces", server.url("/").trim_end_matches('/')); + let mut builder = TraceExporterBuilder::default(); + builder + .set_url("http://127.0.0.1:8126") + .set_service("svc") + .set_env("env") + .set_tracer_version("1.0") + .set_language("rust") + .set_language_version("1.0") + .set_language_interpreter("rustc") + .set_otlp_endpoint(&otlp_endpoint) + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V04); + let exporter = builder.build().unwrap(); + + let traces: Vec> = vec![vec![SpanBytes { + name: BytesString::from_slice(b"op").unwrap(), + service: BytesString::from_static("svc"), + resource: BytesString::from_static("res"), + trace_id: 1, + span_id: 2, + parent_id: 0, + start: 1000, + duration: 100, + error: 0, + ..Default::default() + }]]; + let data = msgpack_encoder::v04::to_vec(&traces); + let result = exporter.send(data.as_ref()); + + assert!( + result.is_ok(), + "OTLP send should succeed: {:?}", + result.err() + ); + mock_otlp.assert(); + } + #[test] #[cfg_attr(miri, ignore)] fn stop_and_start_runtime() { diff --git a/libdd-trace-utils/src/lib.rs b/libdd-trace-utils/src/lib.rs index 62b0b8a2aa..5218e30c1a 100644 --- a/libdd-trace-utils/src/lib.rs +++ b/libdd-trace-utils/src/lib.rs @@ -10,6 +10,7 @@ pub mod config_utils; pub mod msgpack_decoder; pub mod msgpack_encoder; +pub mod otlp_encoder; pub mod send_data; pub mod send_with_retry; pub mod stats_utils; diff --git a/libdd-trace-utils/src/otlp_encoder/json_types.rs b/libdd-trace-utils/src/otlp_encoder/json_types.rs new file mode 100644 index 0000000000..b681cc74fc --- /dev/null +++ b/libdd-trace-utils/src/otlp_encoder/json_types.rs @@ -0,0 +1,150 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Minimal serde types for OTLP HTTP/JSON export (ExportTraceServiceRequest). +//! +//! These types mirror the OTLP protobuf schema for the HTTP/JSON wire format. Field names use +//! lowerCamelCase per the Protocol Buffers JSON Mapping spec; trace/span IDs are hex-encoded +//! strings; enum values (SpanKind, StatusCode) are integers. +//! +//! The canonical definitions live in the opentelemetry-proto repository: +//! +//! +//! +//! Hand-rolled serde structs are intentional here: for HTTP/JSON export, duplicating the type +//! definitions is simpler than pulling in `prost`-generated types from the `opentelemetry-proto` +//! crate. When HTTP/protobuf export is added, `opentelemetry-proto` should be introduced as a +//! dependency for that purpose: +//! + +use serde::Serialize; + +/// Top-level OTLP trace export request (ExportTraceServiceRequest). +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ExportTraceServiceRequest { + pub resource_spans: Vec, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ResourceSpans { + pub resource: Option, + pub scope_spans: Vec, +} + +#[derive(Debug, Default, Serialize)] +pub struct Resource { + pub attributes: Vec, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ScopeSpans { + pub scope: Option, + pub spans: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub schema_url: Option, +} + +#[derive(Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct InstrumentationScope { + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub version: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OtlpSpan { + pub trace_id: String, + pub span_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub parent_span_id: Option, + pub name: String, + pub kind: i32, + pub start_time_unix_nano: String, + pub end_time_unix_nano: String, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub attributes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub status: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub links: Vec, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub events: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub dropped_attributes_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub dropped_events_count: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OtlpSpanLink { + pub trace_id: String, + pub span_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub trace_state: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub attributes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub dropped_attributes_count: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OtlpSpanEvent { + pub time_unix_nano: String, + pub name: String, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub attributes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub dropped_attributes_count: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct KeyValue { + pub key: String, + pub value: AnyValue, +} + +/// A typed value in an OTLP attribute. Each variant serializes as a single-key JSON object +/// matching the OTLP HTTP/JSON wire format (e.g. `{"stringValue":"hello"}`). +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum AnyValue { + StringValue(String), + BoolValue(bool), + IntValue(i64), + DoubleValue(f64), + BytesValue(String), +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Status { + #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, + pub code: i32, +} + +/// OTLP SpanKind enum values. +pub mod span_kind { + pub const UNSPECIFIED: i32 = 0; + pub const INTERNAL: i32 = 1; + pub const SERVER: i32 = 2; + pub const CLIENT: i32 = 3; + pub const PRODUCER: i32 = 4; + pub const CONSUMER: i32 = 5; +} + +/// OTLP StatusCode enum values. +pub mod status_code { + pub const UNSET: i32 = 0; + pub const OK: i32 = 1; + pub const ERROR: i32 = 2; +} diff --git a/libdd-trace-utils/src/otlp_encoder/mapper.rs b/libdd-trace-utils/src/otlp_encoder/mapper.rs new file mode 100644 index 0000000000..aecdcb27e2 --- /dev/null +++ b/libdd-trace-utils/src/otlp_encoder/mapper.rs @@ -0,0 +1,361 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Maps Datadog trace/spans to OTLP ExportTraceServiceRequest. + +use super::json_types::{ + self, AnyValue, ExportTraceServiceRequest, InstrumentationScope, KeyValue, OtlpSpan, + OtlpSpanEvent, OtlpSpanLink, Resource, ResourceSpans, ScopeSpans, Status, +}; +use super::OtlpResourceInfo; +use crate::span::v04::{Span, SpanEvent, SpanLink}; +use crate::span::TraceData; +use std::borrow::Borrow; + +/// Maximum number of attributes per span; excess are dropped and counted. +const MAX_ATTRIBUTES_PER_SPAN: usize = 128; + +/// Maps Datadog trace chunks and resource info to an OTLP ExportTraceServiceRequest. +/// +/// Resource: SDK-level attributes (service.name, deployment.environment, telemetry.sdk.*, +/// runtime-id). InstrumentationScope: "datadog" (DD SDKs don't have scope; all spans use this). +/// All analogous DD span fields are mapped; meta→attributes (string), metrics→attributes +/// (int/double), links and events mapped to OTLP links and events. Status from span.error and +/// meta["error.msg"]. +pub fn map_traces_to_otlp( + trace_chunks: Vec>>, + resource_info: &OtlpResourceInfo, +) -> ExportTraceServiceRequest { + let resource = build_resource(resource_info); + let mut all_spans: Vec = Vec::new(); + for chunk in &trace_chunks { + for span in chunk { + all_spans.push(map_span(span)); + } + } + let scope = InstrumentationScope { + name: Some("datadog".to_string()), + version: None, + }; + let scope_spans = ScopeSpans { + scope: Some(scope), + spans: all_spans, + schema_url: None, + }; + let resource_spans = ResourceSpans { + resource: Some(resource), + scope_spans: vec![scope_spans], + }; + ExportTraceServiceRequest { + resource_spans: vec![resource_spans], + } +} + +fn build_resource(resource_info: &OtlpResourceInfo) -> Resource { + let mut attributes: Vec = Vec::new(); + if !resource_info.service.is_empty() { + attributes.push(KeyValue { + key: "service.name".to_string(), + value: AnyValue::StringValue(resource_info.service.clone()), + }); + } + if !resource_info.env.is_empty() { + attributes.push(KeyValue { + key: "deployment.environment".to_string(), + value: AnyValue::StringValue(resource_info.env.clone()), + }); + attributes.push(KeyValue { + key: "deployment.environment.name".to_string(), + value: AnyValue::StringValue(resource_info.env.clone()), + }); + } + if !resource_info.app_version.is_empty() { + attributes.push(KeyValue { + key: "service.version".to_string(), + value: AnyValue::StringValue(resource_info.app_version.clone()), + }); + } + attributes.push(KeyValue { + key: "telemetry.sdk.name".to_string(), + value: AnyValue::StringValue("libdatadog".to_string()), + }); + if !resource_info.language.is_empty() { + attributes.push(KeyValue { + key: "telemetry.sdk.language".to_string(), + value: AnyValue::StringValue(resource_info.language.clone()), + }); + } + if !resource_info.tracer_version.is_empty() { + attributes.push(KeyValue { + key: "telemetry.sdk.version".to_string(), + value: AnyValue::StringValue(resource_info.tracer_version.clone()), + }); + } + if !resource_info.runtime_id.is_empty() { + attributes.push(KeyValue { + key: "runtime-id".to_string(), + value: AnyValue::StringValue(resource_info.runtime_id.clone()), + }); + } + Resource { attributes } +} + +fn map_span(span: &Span) -> OtlpSpan { + let trace_id_hex = format!("{:032x}", span.trace_id); + let span_id_hex = format!("{:016x}", span.span_id); + let parent_span_id = if span.parent_id != 0 { + Some(format!("{:016x}", span.parent_id)) + } else { + None + }; + let start_nano = span.start; + let end_nano = span.start + span.duration; + let start_time_unix_nano = start_nano.to_string(); + let end_time_unix_nano = end_nano.to_string(); + let kind = dd_type_to_otlp_kind(span.r#type.borrow()); + let (attributes, dropped_attributes_count) = map_attributes(span); + let error_msg = span.meta.get("error.msg").map(|v| v.borrow().to_string()); + let status = if span.error != 0 { + Some(Status { + message: error_msg, + code: json_types::status_code::ERROR, + }) + } else { + Some(Status { + message: None, + code: json_types::status_code::UNSET, + }) + }; + let links = span.span_links.iter().map(map_span_link).collect(); + let (events, dropped_events_count) = map_span_events(&span.span_events); + OtlpSpan { + trace_id: trace_id_hex, + span_id: span_id_hex, + parent_span_id, + name: span.name.borrow().to_string(), + kind, + start_time_unix_nano, + end_time_unix_nano, + attributes, + status, + links, + events, + dropped_attributes_count: if dropped_attributes_count > 0 { + Some(dropped_attributes_count as u32) + } else { + None + }, + dropped_events_count: if dropped_events_count > 0 { + Some(dropped_events_count as u32) + } else { + None + }, + } +} + +fn map_span_link(link: &SpanLink) -> OtlpSpanLink { + let trace_id_128 = (link.trace_id_high as u128) << 64 | (link.trace_id as u128); + let trace_id_hex = format!("{:032x}", trace_id_128); + let span_id_hex = format!("{:016x}", link.span_id); + let trace_state = if link.tracestate.borrow().is_empty() { + None + } else { + Some(link.tracestate.borrow().to_string()) + }; + let attributes: Vec = link + .attributes + .iter() + .map(|(k, v)| KeyValue { + key: k.borrow().to_string(), + value: AnyValue::StringValue(v.borrow().to_string()), + }) + .collect(); + OtlpSpanLink { + trace_id: trace_id_hex, + span_id: span_id_hex, + trace_state, + attributes, + dropped_attributes_count: None, + } +} + +fn map_span_events(events: &[SpanEvent]) -> (Vec, usize) { + const MAX_EVENTS_PER_SPAN: usize = 128; + let mut otlp_events = Vec::with_capacity(events.len().min(MAX_EVENTS_PER_SPAN)); + for ev in events.iter().take(MAX_EVENTS_PER_SPAN) { + let attributes: Vec = ev + .attributes + .iter() + .filter_map(|(k, v)| event_attr_to_key_value(k, v)) + .collect(); + otlp_events.push(OtlpSpanEvent { + time_unix_nano: ev.time_unix_nano.to_string(), + name: ev.name.borrow().to_string(), + attributes, + dropped_attributes_count: None, + }); + } + let dropped = events.len().saturating_sub(otlp_events.len()); + (otlp_events, dropped) +} + +fn event_attr_to_key_value( + k: &T::Text, + v: &crate::span::v04::AttributeAnyValue, +) -> Option { + use crate::span::v04::AttributeArrayValue; + let value = match v { + crate::span::v04::AttributeAnyValue::SingleValue(av) => match av { + AttributeArrayValue::String(s) => AnyValue::StringValue(s.borrow().to_string()), + AttributeArrayValue::Boolean(b) => AnyValue::BoolValue(*b), + AttributeArrayValue::Integer(i) => AnyValue::IntValue(*i), + AttributeArrayValue::Double(d) => AnyValue::DoubleValue(*d), + }, + crate::span::v04::AttributeAnyValue::Array(_) => return None, + }; + Some(KeyValue { + key: k.borrow().to_string(), + value, + }) +} + +fn dd_type_to_otlp_kind(t: &str) -> i32 { + match t.to_lowercase().as_str() { + "server" | "web" | "http" => json_types::span_kind::SERVER, + "client" => json_types::span_kind::CLIENT, + "producer" => json_types::span_kind::PRODUCER, + "consumer" => json_types::span_kind::CONSUMER, + _ => json_types::span_kind::INTERNAL, + } +} + +fn map_attributes(span: &Span) -> (Vec, usize) { + let mut attrs: Vec = Vec::new(); + for (k, v) in span.meta.iter() { + if attrs.len() >= MAX_ATTRIBUTES_PER_SPAN { + break; + } + attrs.push(KeyValue { + key: k.borrow().to_string(), + value: AnyValue::StringValue(v.borrow().to_string()), + }); + } + for (k, v) in span.metrics.iter() { + if attrs.len() >= MAX_ATTRIBUTES_PER_SPAN { + break; + } + let value = if v.fract() == 0.0 && (*v >= i64::MIN as f64 && *v <= i64::MAX as f64) { + AnyValue::IntValue(*v as i64) + } else { + AnyValue::DoubleValue(*v) + }; + attrs.push(KeyValue { + key: k.borrow().to_string(), + value, + }); + } + let total = span.meta.len() + span.metrics.len(); + let dropped = total.saturating_sub(attrs.len()); + (attrs, dropped) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::otlp_encoder::OtlpResourceInfo; + use crate::span::BytesData; + + #[test] + fn test_trace_id_span_id_format() { + let resource_info = OtlpResourceInfo::default(); + let span: Span = Span { + trace_id: 0x5B8EFFF798038103D269B633813FC60C_u128, + span_id: 0xEEE19B7EC3C1B174, + parent_id: 0xEEE19B7EC3C1B173, + name: libdd_tinybytes::BytesString::from_static("test"), + service: libdd_tinybytes::BytesString::from_static("svc"), + resource: libdd_tinybytes::BytesString::from_static("res"), + r#type: libdd_tinybytes::BytesString::from_static("web"), + start: 1544712660000000000, + duration: 1000000000, + error: 0, + ..Default::default() + }; + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let rs = &req.resource_spans[0]; + let otlp_span = &rs.scope_spans[0].spans[0]; + assert_eq!(otlp_span.trace_id, "5b8efff798038103d269b633813fc60c"); + assert_eq!(otlp_span.span_id, "eee19b7ec3c1b174"); + assert_eq!( + otlp_span.parent_span_id.as_deref(), + Some("eee19b7ec3c1b173") + ); + assert_eq!(otlp_span.kind, json_types::span_kind::SERVER); + assert_eq!(otlp_span.start_time_unix_nano, "1544712660000000000"); + assert_eq!(otlp_span.end_time_unix_nano, "1544712661000000000"); + assert_eq!( + rs.scope_spans[0].scope.as_ref().unwrap().name.as_deref(), + Some("datadog") + ); + } + + #[test] + fn test_status_error_message_from_meta() { + let resource_info = OtlpResourceInfo::default(); + let mut span: Span = Span { + trace_id: 1, + span_id: 2, + name: libdd_tinybytes::BytesString::from_static("err_span"), + start: 0, + duration: 1, + error: 1, + ..Default::default() + }; + span.meta.insert( + libdd_tinybytes::BytesString::from_static("error.msg"), + libdd_tinybytes::BytesString::from_static("something broke"), + ); + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let otlp_span = &req.resource_spans[0].scope_spans[0].spans[0]; + let status = otlp_span.status.as_ref().unwrap(); + assert_eq!(status.code, json_types::status_code::ERROR); + assert_eq!(status.message.as_deref(), Some("something broke")); + } + + #[test] + fn test_metrics_as_int_or_double() { + let resource_info = OtlpResourceInfo::default(); + let mut span: Span = Span { + trace_id: 1, + span_id: 2, + name: libdd_tinybytes::BytesString::from_static("m"), + start: 0, + duration: 1, + ..Default::default() + }; + span.metrics + .insert(libdd_tinybytes::BytesString::from_static("count"), 42.0); + span.metrics.insert( + libdd_tinybytes::BytesString::from_static("rate"), + std::f64::consts::PI, + ); + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let json = serde_json::to_value(&req).unwrap(); + let attrs = &json["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["attributes"]; + let count_kv = attrs + .as_array() + .unwrap() + .iter() + .find(|a| a["key"] == "count") + .unwrap(); + assert_eq!(count_kv["value"]["intValue"], 42); + let rate_kv = attrs + .as_array() + .unwrap() + .iter() + .find(|a| a["key"] == "rate") + .unwrap(); + let rate = rate_kv["value"]["doubleValue"].as_f64().unwrap(); + assert!((rate - std::f64::consts::PI).abs() < 1e-9); + } +} diff --git a/libdd-trace-utils/src/otlp_encoder/mod.rs b/libdd-trace-utils/src/otlp_encoder/mod.rs new file mode 100644 index 0000000000..ee2c7924c4 --- /dev/null +++ b/libdd-trace-utils/src/otlp_encoder/mod.rs @@ -0,0 +1,24 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP HTTP/JSON encoder: maps Datadog spans to ExportTraceServiceRequest. + +pub mod json_types; +pub mod mapper; + +pub use mapper::map_traces_to_otlp; + +/// Tracer-level attributes used to populate the OTLP Resource on export. +/// +/// These are the fields from the tracer's configuration that map to OTLP Resource attributes +/// (service.name, deployment.environment, service.version, telemetry.sdk.*, runtime-id). +/// Callers should build this from their own tracer metadata struct. +#[derive(Clone, Debug, Default)] +pub struct OtlpResourceInfo { + pub service: String, + pub env: String, + pub app_version: String, + pub language: String, + pub tracer_version: String, + pub runtime_id: String, +}