From 1e86c8d14bb4f86185d0466b09458b2197cb0675 Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Mon, 2 Mar 2026 16:41:59 -0500 Subject: [PATCH 01/10] WIP: OTLP trace export --- libdd-data-pipeline-ffi/src/trace_exporter.rs | 72 ++++ libdd-data-pipeline/src/lib.rs | 1 + libdd-data-pipeline/src/otlp/config.rs | 243 +++++++++++ libdd-data-pipeline/src/otlp/exporter.rs | 129 ++++++ libdd-data-pipeline/src/otlp/json_types.rs | 182 +++++++++ libdd-data-pipeline/src/otlp/mapper.rs | 379 ++++++++++++++++++ libdd-data-pipeline/src/otlp/mod.rs | 30 ++ .../src/trace_exporter/builder.rs | 19 + libdd-data-pipeline/src/trace_exporter/mod.rs | 109 ++++- 9 files changed, 1154 insertions(+), 10 deletions(-) create mode 100644 libdd-data-pipeline/src/otlp/config.rs create mode 100644 libdd-data-pipeline/src/otlp/exporter.rs create mode 100644 libdd-data-pipeline/src/otlp/json_types.rs create mode 100644 libdd-data-pipeline/src/otlp/mapper.rs create mode 100644 libdd-data-pipeline/src/otlp/mod.rs diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index 20a3f380f0..162e7762fd 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -84,6 +84,7 @@ pub struct TelemetryClientConfig<'a> { #[derive(Debug, Default)] pub struct TraceExporterConfig { url: Option, + tracer_name: Option, tracer_version: Option, language: Option, language_version: Option, @@ -92,6 +93,8 @@ pub struct TraceExporterConfig { env: Option, version: Option, service: Option, + git_commit_sha: Option, + git_repository_url: Option, input_format: TraceExporterInputFormat, output_format: TraceExporterOutputFormat, compute_stats: bool, @@ -140,6 +143,27 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_url( ) } +/// Sets the tracer/SDK name for OTLP resource attribute `telemetry.sdk.name` +/// (e.g. "dd-trace-py"). When unset, OTLP uses "libdatadog". +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_config_set_tracer_name( + config: Option<&mut TraceExporterConfig>, + tracer_name: CharSlice, +) -> Option> { + catch_panic!( + if let Option::Some(handle) = config { + handle.tracer_name = match sanitize_string(tracer_name) { + Ok(s) => Some(s), + Err(e) => return Some(e), + }; + None + } else { + gen_error!(ErrorCode::InvalidArgument) + }, + gen_error!(ErrorCode::Panic) + ) +} + /// Sets tracer's version to be included in the headers request. #[no_mangle] pub unsafe extern "C" fn ddog_trace_exporter_config_set_tracer_version( @@ -279,6 +303,46 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_version( ) } +/// Sets git commit SHA (e.g. for OTLP resource attribute `git.commit.sha`). +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_config_set_git_commit_sha( + config: Option<&mut TraceExporterConfig>, + git_commit_sha: CharSlice, +) -> Option> { + catch_panic!( + if let Option::Some(handle) = config { + handle.git_commit_sha = match sanitize_string(git_commit_sha) { + Ok(s) => Some(s), + Err(e) => return Some(e), + }; + None + } else { + gen_error!(ErrorCode::InvalidArgument) + }, + gen_error!(ErrorCode::Panic) + ) +} + +/// Sets git repository URL (e.g. for OTLP resource attribute `git.repository_url`). +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_config_set_git_repository_url( + config: Option<&mut TraceExporterConfig>, + url: CharSlice, +) -> Option> { + catch_panic!( + if let Option::Some(handle) = config { + handle.git_repository_url = match sanitize_string(url) { + Ok(s) => Some(s), + Err(e) => return Some(e), + }; + None + } else { + gen_error!(ErrorCode::InvalidArgument) + }, + gen_error!(ErrorCode::Panic) + ) +} + /// Sets service name to be included in the headers request. #[no_mangle] pub unsafe extern "C" fn ddog_trace_exporter_config_set_service( @@ -428,6 +492,11 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_connection_timeout( /// Create a new TraceExporter instance. /// +/// When `OTEL_TRACES_EXPORTER=otlp` is set in the environment, the exporter sends traces in +/// OTLP HTTP/JSON to the configured OTLP endpoint instead of the Datadog agent. The same +/// payload (e.g. MessagePack) is passed to `ddog_trace_exporter_send`; the library decodes +/// and converts to OTLP when OTLP is enabled. +/// /// # Arguments /// /// * `out_handle` - The handle to write the TraceExporter instance in. @@ -443,6 +512,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( let mut builder = TraceExporter::builder(); builder .set_url(config.url.as_ref().unwrap_or(&"".to_string())) + .set_tracer_name(config.tracer_name.as_ref().unwrap_or(&"".to_string())) .set_tracer_version(config.tracer_version.as_ref().unwrap_or(&"".to_string())) .set_language(config.language.as_ref().unwrap_or(&"".to_string())) .set_language_version(config.language_version.as_ref().unwrap_or(&"".to_string())) @@ -456,6 +526,8 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( .set_env(config.env.as_ref().unwrap_or(&"".to_string())) .set_app_version(config.version.as_ref().unwrap_or(&"".to_string())) .set_service(config.service.as_ref().unwrap_or(&"".to_string())) + .set_git_commit_sha(config.git_commit_sha.as_ref().unwrap_or(&"".to_string())) + .set_git_repository_url(config.git_repository_url.as_ref().unwrap_or(&"".to_string())) .set_input_format(config.input_format) .set_output_format(config.output_format) .set_connection_timeout(config.connection_timeout); diff --git a/libdd-data-pipeline/src/lib.rs b/libdd-data-pipeline/src/lib.rs index 57572cd97a..d4b924d1c2 100644 --- a/libdd-data-pipeline/src/lib.rs +++ b/libdd-data-pipeline/src/lib.rs @@ -12,6 +12,7 @@ pub mod agent_info; mod health_metrics; +pub mod otlp; mod pausable_worker; #[allow(missing_docs)] pub mod stats_exporter; diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs new file mode 100644 index 0000000000..87c44e93f1 --- /dev/null +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -0,0 +1,243 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP trace export configuration. +//! +//! OTLP trace export is enabled when `OTEL_TRACES_EXPORTER=otlp` is set. +//! When enabled, endpoint, headers, timeout, and protocol are read from the +//! `OTEL_EXPORTER_OTLP_TRACES_*` (and generic `OTEL_EXPORTER_OTLP_*`) environment variables. + +use std::env; +use std::time::Duration; + +/// OTLP trace export protocol. Support for HTTP/JSON for now. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum OtlpProtocol { + /// HTTP with JSON body (Content-Type: application/json). Default for HTTP. + #[default] + HttpJson, + /// HTTP with protobuf body. (Not supported yet) + HttpProtobuf, + /// gRPC. (Not supported yet) + Grpc, +} + +impl OtlpProtocol { + fn from_str(s: &str) -> Self { + match s.trim().to_lowercase().as_str() { + "http/json" => OtlpProtocol::HttpJson, + "http/protobuf" => OtlpProtocol::HttpProtobuf, + "grpc" => OtlpProtocol::Grpc, + _ => OtlpProtocol::HttpJson, + } + } +} + +/// Default OTLP HTTP endpoint (no path; path /v1/traces is appended when building request URL). +pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318"; +/// Default OTLP gRPC endpoint. +pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317"; +/// OTLP traces path for HTTP. +pub const OTLP_TRACES_PATH: &str = "/v1/traces"; + +/// Parsed OTLP trace exporter configuration. +#[derive(Clone, Debug)] +pub struct OtlpTraceConfig { + /// Full URL to POST traces (e.g. http://localhost:4318/v1/traces). + pub endpoint_url: String, + /// Optional HTTP headers (key-value pairs). + pub headers: Vec<(String, String)>, + /// Request timeout. + pub timeout: Duration, + /// Protocol (for future use; currently only HttpJson is supported). + pub protocol: OtlpProtocol, +} + +/// Environment variable names (standard OTEL and traces-specific). +pub mod env_keys { + pub const TRACES_EXPORTER: &str = "OTEL_TRACES_EXPORTER"; + pub const TRACES_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"; + pub const PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL"; + pub const TRACES_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"; + pub const ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT"; + pub const TRACES_HEADERS: &str = "OTEL_EXPORTER_OTLP_TRACES_HEADERS"; + pub const HEADERS: &str = "OTEL_EXPORTER_OTLP_HEADERS"; + pub const TRACES_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT"; + pub const TIMEOUT: &str = "OTEL_EXPORTER_OTLP_TIMEOUT"; +} + +/// Default timeout for OTLP export (10 seconds). +const DEFAULT_OTLP_TIMEOUT_MS: u64 = 10_000; + +fn get_env(key: &str) -> Option { + env::var(key).ok().filter(|s| !s.trim().is_empty()) +} + +/// Parse OTEL headers string "key1=value1,key2=value2" into a list of (key, value). +fn parse_headers(s: &str) -> Vec<(String, String)> { + s.split(',') + .filter_map(|pair| { + let pair = pair.trim(); + let eq = pair.find('=')?; + let key = pair[..eq].trim(); + let value = pair[eq + 1..].trim(); + if key.is_empty() { + return None; + } + Some((key.to_string(), value.to_string())) + }) + .collect() +} + +/// Append /v1/traces to a base URL for HTTP trace export. Used only when the endpoint +/// is the fallback default (path is added only for fallback, not when user sets endpoint). +fn fallback_traces_url(base: &str, protocol: OtlpProtocol) -> String { + let base = base.trim().trim_end_matches('/'); + match protocol { + OtlpProtocol::HttpJson | OtlpProtocol::HttpProtobuf => { + format!("{}{}", base, OTLP_TRACES_PATH) + } + OtlpProtocol::Grpc => base.to_string(), + } +} + +/// Resolve OTLP trace export configuration from environment. +/// +/// **Enablement:** Returns `Some(config)` only when `OTEL_TRACES_EXPORTER=otlp` is set. +/// Returns `None` otherwise (use Datadog agent). +/// +/// **Endpoint:** If `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` (or generic +/// `OTEL_EXPORTER_OTLP_ENDPOINT`) is set, that value is used **as-is**. Otherwise the +/// fallback default is used and, for http/json or http/protobuf, the path `/v1/traces` +/// is appended. +/// **Precedence:** Traces-specific env vars override generic OTEL vars for protocol, +/// endpoint, headers, and timeout. +pub fn otlp_trace_config_from_env() -> Option { + let exporter = get_env(env_keys::TRACES_EXPORTER)?; + if exporter.trim().to_lowercase() != "otlp" { + return None; + } + + let protocol_str = get_env(env_keys::TRACES_PROTOCOL).or_else(|| get_env(env_keys::PROTOCOL)); + let protocol = protocol_str + .as_deref() + .map(OtlpProtocol::from_str) + .unwrap_or_default(); + + // Traces-specific endpoint takes precedence over generic OTEL endpoint when both are set. + let endpoint_opt = get_env(env_keys::TRACES_ENDPOINT).or_else(|| get_env(env_keys::ENDPOINT)); + let url = match endpoint_opt { + Some(s) => { + let endpoint = s.trim().to_string(); + if endpoint.is_empty() { + fallback_traces_url(DEFAULT_OTLP_HTTP_ENDPOINT, protocol) + } else if endpoint.contains("://") && (endpoint.contains(':') || endpoint.starts_with("http://") || endpoint.starts_with("https://")) { + // Explicitly set: use as-is + endpoint + } else { + // Bare host:port from env: still "set by user", use as-is (no path) + let base = if endpoint.starts_with(':') { + format!("http://localhost{}", endpoint) + } else { + format!("http://{}", endpoint) + }; + base + } + } + None => fallback_traces_url(DEFAULT_OTLP_HTTP_ENDPOINT, protocol), + }; + + let headers_str = get_env(env_keys::TRACES_HEADERS).or_else(|| get_env(env_keys::HEADERS)); + let headers = headers_str + .as_deref() + .map(parse_headers) + .unwrap_or_default(); + + let timeout_ms = get_env(env_keys::TRACES_TIMEOUT) + .or_else(|| get_env(env_keys::TIMEOUT)) + .and_then(|s| parse_timeout(&s)) + .unwrap_or(DEFAULT_OTLP_TIMEOUT_MS); + + Some(OtlpTraceConfig { + endpoint_url: url, + headers, + timeout: Duration::from_millis(timeout_ms), + protocol, + }) +} + +/// Parse timeout string: digits with optional unit (ms, s, m). Default unit: milliseconds. +fn parse_timeout(s: &str) -> Option { + let s = s.trim(); + let s = s.to_lowercase(); + if s.ends_with("ms") { + s[..s.len() - 2].trim().parse::().ok() + } else if s.ends_with('s') && !s.ends_with("ms") { + s[..s.len() - 1].trim().parse::().ok().map(|v| v * 1000) + } else if s.ends_with('m') { + s[..s.len() - 1].trim().parse::().ok().map(|v| v * 60 * 1000) + } else { + s.parse::().ok() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_headers() { + let h = parse_headers("key1=val1,key2=val2"); + assert_eq!(h.len(), 2); + assert_eq!(h[0], ("key1".to_string(), "val1".to_string())); + assert_eq!(h[1], ("key2".to_string(), "val2".to_string())); + } + + #[test] + fn test_parse_timeout() { + assert_eq!(parse_timeout("5000"), Some(5000)); + assert_eq!(parse_timeout("5s"), Some(5000)); + assert_eq!(parse_timeout("100ms"), Some(100)); + } + + #[test] + fn test_fallback_traces_url() { + // Fallback: path /v1/traces is appended for http/json + assert_eq!( + fallback_traces_url("http://localhost:4318", OtlpProtocol::HttpJson), + "http://localhost:4318/v1/traces" + ); + assert_eq!( + fallback_traces_url(DEFAULT_OTLP_HTTP_ENDPOINT, OtlpProtocol::HttpJson), + "http://localhost:4318/v1/traces" + ); + } + + #[test] + fn test_protocol_from_str() { + assert_eq!(OtlpProtocol::from_str("http/json"), OtlpProtocol::HttpJson); + assert_eq!(OtlpProtocol::from_str("grpc"), OtlpProtocol::Grpc); + } + + #[test] + fn test_otlp_disabled_without_traces_exporter() { + // Without OTEL_TRACES_EXPORTER=otlp, config should be None + let _ = std::env::remove_var(env_keys::TRACES_EXPORTER); + let _ = std::env::remove_var(env_keys::TRACES_ENDPOINT); + assert!(otlp_trace_config_from_env().is_none()); + } + + #[test] + fn test_explicit_endpoint_used_as_is() { + // Per spec: when OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is set, use as-is (no /v1/traces appended) + let _ = std::env::remove_var(env_keys::TRACES_EXPORTER); + let _ = std::env::remove_var(env_keys::TRACES_ENDPOINT); + std::env::set_var(env_keys::TRACES_EXPORTER, "otlp"); + std::env::set_var(env_keys::TRACES_ENDPOINT, "http://custom:9999"); + let config = otlp_trace_config_from_env(); + let _ = std::env::remove_var(env_keys::TRACES_EXPORTER); + let _ = std::env::remove_var(env_keys::TRACES_ENDPOINT); + let config = config.expect("config when TRACES_EXPORTER=otlp and endpoint set"); + assert_eq!(config.endpoint_url, "http://custom:9999"); + } +} diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs new file mode 100644 index 0000000000..4d81aa77ac --- /dev/null +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -0,0 +1,129 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP HTTP/JSON trace exporter. Sends ExportTraceServiceRequest with retries on 429, 502, 503, 504. + +use super::config::OtlpTraceConfig; +use crate::trace_exporter::error::TraceExporterError; +use http::Method; +use libdd_common::http_common::{self, Body}; +use libdd_common::HttpClient; +use std::time::Duration; +use tokio::time::sleep; +use tracing::{debug, error, warn}; + +/// Max retries for OTLP export (transient failures only). +const OTLP_MAX_RETRIES: u32 = 5; +/// Initial backoff between retries (milliseconds). +const OTLP_RETRY_DELAY_MS: u64 = 100; + +/// Status codes that trigger a retry (transient). +fn is_retryable_status(code: u16) -> bool { + matches!(code, 429 | 502 | 503 | 504) +} + +/// Send OTLP trace payload (JSON bytes) to the configured endpoint. +/// +/// Retries with exponential backoff only on 429, 502, 503, 504. Does not retry on 4xx (e.g. 400). +/// Uses the timeout from config. +pub async fn send_otlp_traces_http( + client: &HttpClient, + config: &OtlpTraceConfig, + json_body: Vec, +) -> Result<(), TraceExporterError> { + let uri = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { + TraceExporterError::Internal(crate::trace_exporter::error::InternalErrorKind::InvalidWorkerState( + format!("Invalid OTLP endpoint URL: {}", e), + )) + })?; + + let mut attempt = 0u32; + loop { + attempt += 1; + let req_builder = build_request(&uri, config)?; + let timeout = config.timeout; + let body_bytes = bytes::Bytes::from(json_body.clone()); + + debug!( + attempt, + url = %config.endpoint_url, + "OTLP trace export attempt" + ); + + let req = req_builder.body(Body::from_bytes(body_bytes)).map_err(|e| { + TraceExporterError::Internal(crate::trace_exporter::error::InternalErrorKind::InvalidWorkerState( + e.to_string(), + )) + })?; + + match tokio::time::timeout(timeout, client.request(req)).await { + Ok(Ok(response)) => { + let status = response.status(); + if status.is_success() { + debug!(status = %status, "OTLP trace export succeeded"); + return Ok(()); + } + let code = status.as_u16(); + if is_retryable_status(code) && attempt < OTLP_MAX_RETRIES { + let delay_ms = OTLP_RETRY_DELAY_MS * (1 << (attempt - 1)); + warn!( + status = %status, + attempt, + delay_ms, + "OTLP export transient failure, retrying" + ); + sleep(Duration::from_millis(delay_ms)).await; + continue; + } + let response = http_common::into_response(response); + let body_bytes = http_common::collect_response_bytes(response).await.unwrap_or_default(); + let body_str = String::from_utf8_lossy(&body_bytes); + error!( + status = %status, + attempt, + body = %body_str, + "OTLP trace export failed" + ); + return Err(TraceExporterError::Request( + crate::trace_exporter::error::RequestError::new(status, &body_str), + )); + } + Ok(Err(e)) => { + if attempt < OTLP_MAX_RETRIES { + let delay_ms = OTLP_RETRY_DELAY_MS * (1 << (attempt - 1)); + warn!(error = ?e, attempt, "OTLP export network error, retrying"); + sleep(Duration::from_millis(delay_ms)).await; + continue; + } + error!(error = ?e, attempt, "OTLP trace export failed after retries"); + return Err(TraceExporterError::from(http_common::into_error(e))); + } + Err(_) => { + if attempt < OTLP_MAX_RETRIES { + let delay_ms = OTLP_RETRY_DELAY_MS * (1 << (attempt - 1)); + warn!(attempt, "OTLP export timeout, retrying"); + sleep(Duration::from_millis(delay_ms)).await; + continue; + } + error!(attempt, "OTLP trace export timed out after retries"); + return Err(TraceExporterError::from(std::io::Error::from( + std::io::ErrorKind::TimedOut, + ))); + } + } + } +} + +fn build_request( + uri: &http::Uri, + config: &OtlpTraceConfig, +) -> Result { + let mut builder = http::Request::builder() + .method(Method::POST) + .uri(uri.clone()) + .header("Content-Type", "application/json"); + for (k, v) in &config.headers { + builder = builder.header(k.as_str(), v.as_str()); + } + Ok(builder) +} diff --git a/libdd-data-pipeline/src/otlp/json_types.rs b/libdd-data-pipeline/src/otlp/json_types.rs new file mode 100644 index 0000000000..cd69b5a517 --- /dev/null +++ b/libdd-data-pipeline/src/otlp/json_types.rs @@ -0,0 +1,182 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP JSON encoding types for ExportTraceServiceRequest. +//! Field names use lowerCamelCase per the OTLP HTTP/JSON spec (Protocol Buffers JSON Mapping). +//! Trace/span IDs are hex-encoded strings; enum values are integers. + +use serde::Serialize; + +/// Top-level OTLP trace export request (ExportTraceServiceRequest). +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ExportTraceServiceRequest { + pub resource_spans: Vec, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ResourceSpans { + pub resource: Option, + pub scope_spans: Vec, +} + +#[derive(Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Resource { + pub attributes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub dropped_attributes_count: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ScopeSpans { + pub scope: Option, + pub spans: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub schema_url: Option, +} + +#[derive(Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct InstrumentationScope { + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub version: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OtlpSpan { + pub trace_id: String, + pub span_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub parent_span_id: Option, + pub name: String, + pub kind: i32, + pub start_time_unix_nano: String, + pub end_time_unix_nano: String, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub attributes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub status: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub links: Vec, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub events: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub dropped_attributes_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub dropped_events_count: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OtlpSpanLink { + pub trace_id: String, + pub span_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub trace_state: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub attributes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub dropped_attributes_count: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OtlpSpanEvent { + pub time_unix_nano: String, + pub name: String, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub attributes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub dropped_attributes_count: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct KeyValue { + pub key: String, + pub value: AnyValue, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AnyValue { + #[serde(skip_serializing_if = "Option::is_none")] + pub string_value: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub bool_value: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub int_value: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub double_value: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub bytes_value: Option, +} + +impl AnyValue { + pub fn string(s: String) -> Self { + AnyValue { + string_value: Some(s), + bool_value: None, + int_value: None, + double_value: None, + bytes_value: None, + } + } + pub fn int(i: i64) -> Self { + AnyValue { + string_value: None, + bool_value: None, + int_value: Some(i), + double_value: None, + bytes_value: None, + } + } + pub fn double(d: f64) -> Self { + AnyValue { + string_value: None, + bool_value: None, + int_value: None, + double_value: Some(d), + bytes_value: None, + } + } + pub fn bool(b: bool) -> Self { + AnyValue { + string_value: None, + bool_value: Some(b), + int_value: None, + double_value: None, + bytes_value: None, + } + } +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Status { + pub message: Option, + pub code: i32, +} + +/// OTLP SpanKind enum values. +pub mod span_kind { + pub const UNSPECIFIED: i32 = 0; + pub const INTERNAL: i32 = 1; + pub const SERVER: i32 = 2; + pub const CLIENT: i32 = 3; + pub const PRODUCER: i32 = 4; + pub const CONSUMER: i32 = 5; +} + +/// OTLP StatusCode enum values. +pub mod status_code { + pub const UNSET: i32 = 0; + pub const OK: i32 = 1; + pub const ERROR: i32 = 2; +} diff --git a/libdd-data-pipeline/src/otlp/mapper.rs b/libdd-data-pipeline/src/otlp/mapper.rs new file mode 100644 index 0000000000..6bc71e8a84 --- /dev/null +++ b/libdd-data-pipeline/src/otlp/mapper.rs @@ -0,0 +1,379 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Maps Datadog trace/spans to OTLP ExportTraceServiceRequest. + +use super::json_types::{ + self, AnyValue, ExportTraceServiceRequest, InstrumentationScope, KeyValue, OtlpSpan, + OtlpSpanEvent, OtlpSpanLink, Resource, ResourceSpans, ScopeSpans, Status, +}; +use crate::trace_exporter::TracerMetadata; +use libdd_trace_utils::span::v04::{Span, SpanEvent, SpanLink}; +use libdd_trace_utils::span::TraceData; +use std::borrow::Borrow; + +/// Maximum number of attributes per span; excess are dropped and counted. +const MAX_ATTRIBUTES_PER_SPAN: usize = 128; + +/// Maps Datadog trace chunks and metadata to an OTLP ExportTraceServiceRequest. +/// +/// Resource: SDK-level attributes (service.name, deployment.environment, telemetry.sdk.*). +/// InstrumentationScope: "datadog" (DD SDKs don't have scope; all spans use this). +/// All analogous DD span fields are mapped; meta→attributes (string), metrics→attributes (int/double), +/// links and events mapped to OTLP links and events. Status from span.error and meta["error.msg"]. +pub fn map_traces_to_otlp( + trace_chunks: Vec>>, + metadata: &TracerMetadata, +) -> ExportTraceServiceRequest +where + T::Text: Borrow, +{ + let resource = build_resource(metadata); + let mut all_spans: Vec = Vec::new(); + for chunk in &trace_chunks { + for span in chunk { + all_spans.push(map_span(span)); + } + } + let scope = InstrumentationScope { + name: Some("datadog".to_string()), + version: None, + }; + let scope_spans = ScopeSpans { + scope: Some(scope), + spans: all_spans, + schema_url: None, + }; + let resource_spans = ResourceSpans { + resource: Some(resource), + scope_spans: vec![scope_spans], + }; + ExportTraceServiceRequest { + resource_spans: vec![resource_spans], + } +} + +fn build_resource(metadata: &TracerMetadata) -> Resource { + let mut attributes: Vec = Vec::new(); + if !metadata.service.is_empty() { + attributes.push(KeyValue { + key: "service.name".to_string(), + value: AnyValue::string(metadata.service.clone()), + }); + } + if !metadata.env.is_empty() { + attributes.push(KeyValue { + key: "deployment.environment".to_string(), + value: AnyValue::string(metadata.env.clone()), + }); + attributes.push(KeyValue { + key: "deployment.environment.name".to_string(), + value: AnyValue::string(metadata.env.clone()), + }); + } + if !metadata.app_version.is_empty() { + attributes.push(KeyValue { + key: "service.version".to_string(), + value: AnyValue::string(metadata.app_version.clone()), + }); + } + let sdk_name = if metadata.tracer_name.is_empty() { + "libdatadog".to_string() + } else { + metadata.tracer_name.clone() + }; + attributes.push(KeyValue { + key: "telemetry.sdk.name".to_string(), + value: AnyValue::string(sdk_name), + }); + if !metadata.language.is_empty() { + attributes.push(KeyValue { + key: "telemetry.sdk.language".to_string(), + value: AnyValue::string(metadata.language.clone()), + }); + } + if !metadata.tracer_version.is_empty() { + attributes.push(KeyValue { + key: "telemetry.sdk.version".to_string(), + value: AnyValue::string(metadata.tracer_version.clone()), + }); + } + if !metadata.git_commit_sha.is_empty() { + attributes.push(KeyValue { + key: "git.commit.sha".to_string(), + value: AnyValue::string(metadata.git_commit_sha.clone()), + }); + } + if !metadata.git_repository_url.is_empty() { + attributes.push(KeyValue { + key: "git.repository_url".to_string(), + value: AnyValue::string(metadata.git_repository_url.clone()), + }); + } + if !metadata.runtime_id.is_empty() { + attributes.push(KeyValue { + key: "runtime-id".to_string(), + value: AnyValue::string(metadata.runtime_id.clone()), + }); + } + Resource { + attributes, + dropped_attributes_count: None, + } +} + +fn map_span(span: &Span) -> OtlpSpan +where + T::Text: Borrow, +{ + let trace_id_hex = format!("{:032X}", span.trace_id); + let span_id_hex = format!("{:016X}", span.span_id); + let parent_span_id = if span.parent_id != 0 { + Some(format!("{:016X}", span.parent_id)) + } else { + None + }; + let start_nano = span.start; + let end_nano = span.start + span.duration; + let start_time_unix_nano = start_nano.to_string(); + let end_time_unix_nano = end_nano.to_string(); + let kind = dd_type_to_otlp_kind(span.r#type.borrow()); + let (attributes, dropped_attributes_count) = map_attributes(span); + let error_msg = span.meta.get("error.msg").map(|v| v.borrow().to_string()); + let status = if span.error != 0 { + Some(Status { + message: error_msg, + code: json_types::status_code::ERROR, + }) + } else { + Some(Status { + message: None, + code: json_types::status_code::UNSET, + }) + }; + let links = span.span_links.iter().map(map_span_link).collect(); + let (events, dropped_events_count) = map_span_events(&span.span_events); + OtlpSpan { + trace_id: trace_id_hex, + span_id: span_id_hex, + parent_span_id, + name: span.name.borrow().to_string(), + kind, + start_time_unix_nano, + end_time_unix_nano, + attributes, + status, + links, + events, + dropped_attributes_count: if dropped_attributes_count > 0 { + Some(dropped_attributes_count as u32) + } else { + None + }, + dropped_events_count: if dropped_events_count > 0 { + Some(dropped_events_count as u32) + } else { + None + }, + } +} + +fn map_span_link(link: &SpanLink) -> OtlpSpanLink +where + T::Text: Borrow, +{ + let trace_id_128 = (link.trace_id_high as u128) << 64 | (link.trace_id as u128); + let trace_id_hex = format!("{:032X}", trace_id_128); + let span_id_hex = format!("{:016X}", link.span_id); + let trace_state = if link.tracestate.borrow().is_empty() { + None + } else { + Some(link.tracestate.borrow().to_string()) + }; + let attributes: Vec = link + .attributes + .iter() + .map(|(k, v)| KeyValue { + key: k.borrow().to_string(), + value: AnyValue::string(v.borrow().to_string()), + }) + .collect(); + OtlpSpanLink { + trace_id: trace_id_hex, + span_id: span_id_hex, + trace_state, + attributes, + dropped_attributes_count: None, + } +} + +fn map_span_events( + events: &[SpanEvent], +) -> (Vec, usize) +where + T::Text: Borrow, +{ + const MAX_EVENTS_PER_SPAN: usize = 128; + let mut otlp_events = Vec::with_capacity(events.len().min(MAX_EVENTS_PER_SPAN)); + for ev in events.iter().take(MAX_EVENTS_PER_SPAN) { + let attributes: Vec = ev + .attributes + .iter() + .filter_map(|(k, v)| event_attr_to_key_value(k, v)) + .collect(); + otlp_events.push(OtlpSpanEvent { + time_unix_nano: ev.time_unix_nano.to_string(), + name: ev.name.borrow().to_string(), + attributes, + dropped_attributes_count: None, + }); + } + let dropped = events.len().saturating_sub(otlp_events.len()); + (otlp_events, dropped) +} + +fn event_attr_to_key_value( + k: &T::Text, + v: &libdd_trace_utils::span::v04::AttributeAnyValue, +) -> Option +where + T::Text: Borrow, +{ + use libdd_trace_utils::span::v04::AttributeArrayValue; + let value = match v { + libdd_trace_utils::span::v04::AttributeAnyValue::SingleValue(av) => match av { + AttributeArrayValue::String(s) => AnyValue::string(s.borrow().to_string()), + AttributeArrayValue::Boolean(b) => AnyValue::bool(*b), + AttributeArrayValue::Integer(i) => AnyValue::int(*i), + AttributeArrayValue::Double(d) => AnyValue::double(*d), + }, + libdd_trace_utils::span::v04::AttributeAnyValue::Array(_) => return None, + }; + Some(KeyValue { + key: k.borrow().to_string(), + value, + }) +} + +fn dd_type_to_otlp_kind(t: &str) -> i32 { + match t.to_lowercase().as_str() { + "server" | "web" | "http" => json_types::span_kind::SERVER, + "client" => json_types::span_kind::CLIENT, + "producer" => json_types::span_kind::PRODUCER, + "consumer" => json_types::span_kind::CONSUMER, + _ => json_types::span_kind::INTERNAL, + } +} + +fn map_attributes(span: &Span) -> (Vec, usize) +where + T::Text: Borrow, +{ + let mut attrs: Vec = Vec::new(); + for (k, v) in span.meta.iter() { + if attrs.len() >= MAX_ATTRIBUTES_PER_SPAN { + break; + } + attrs.push(KeyValue { + key: k.borrow().to_string(), + value: AnyValue::string(v.borrow().to_string()), + }); + } + for (k, v) in span.metrics.iter() { + if attrs.len() >= MAX_ATTRIBUTES_PER_SPAN { + break; + } + let value = if v.fract() == 0.0 && (*v >= i64::MIN as f64 && *v <= i64::MAX as f64) { + AnyValue::int(*v as i64) + } else { + AnyValue::double(*v) + }; + attrs.push(KeyValue { + key: k.borrow().to_string(), + value, + }); + } + let total = span.meta.len() + span.metrics.len(); + let dropped = total.saturating_sub(attrs.len()); + (attrs, dropped) +} + +#[cfg(test)] +mod tests { + use super::*; + use libdd_trace_utils::span::BytesData; + + #[test] + fn test_trace_id_span_id_format() { + let metadata = TracerMetadata::default(); + let mut span: Span = Span::default(); + span.trace_id = 0x5B8EFFF798038103D269B633813FC60C_u128; + span.span_id = 0xEEE19B7EC3C1B174; + span.parent_id = 0xEEE19B7EC3C1B173; + span.name = libdd_tinybytes::BytesString::from_static("test"); + span.service = libdd_tinybytes::BytesString::from_static("svc"); + span.resource = libdd_tinybytes::BytesString::from_static("res"); + span.r#type = libdd_tinybytes::BytesString::from_static("web"); + span.start = 1544712660000000000; + span.duration = 1000000000; + span.error = 0; + let req = map_traces_to_otlp(vec![vec![span]], &metadata); + let rs = &req.resource_spans[0]; + let otlp_span = &rs.scope_spans[0].spans[0]; + assert_eq!(otlp_span.trace_id, "5B8EFFF798038103D269B633813FC60C"); + assert_eq!(otlp_span.span_id, "EEE19B7EC3C1B174"); + assert_eq!(otlp_span.parent_span_id.as_deref(), Some("EEE19B7EC3C1B173")); + assert_eq!(otlp_span.kind, json_types::span_kind::SERVER); + assert_eq!(otlp_span.start_time_unix_nano, "1544712660000000000"); + assert_eq!(otlp_span.end_time_unix_nano, "1544712661000000000"); + assert_eq!(rs.scope_spans[0].scope.as_ref().unwrap().name.as_deref(), Some("datadog")); + } + + #[test] + fn test_status_error_message_from_meta() { + let metadata = TracerMetadata::default(); + let mut span: Span = Span::default(); + span.trace_id = 1; + span.span_id = 2; + span.name = libdd_tinybytes::BytesString::from_static("err_span"); + span.start = 0; + span.duration = 1; + span.error = 1; + span.meta.insert( + libdd_tinybytes::BytesString::from_static("error.msg"), + libdd_tinybytes::BytesString::from_static("something broke"), + ); + let req = map_traces_to_otlp(vec![vec![span]], &metadata); + let otlp_span = &req.resource_spans[0].scope_spans[0].spans[0]; + let status = otlp_span.status.as_ref().unwrap(); + assert_eq!(status.code, json_types::status_code::ERROR); + assert_eq!(status.message.as_deref(), Some("something broke")); + } + + #[test] + fn test_metrics_as_int_or_double() { + let metadata = TracerMetadata::default(); + let mut span: Span = Span::default(); + span.trace_id = 1; + span.span_id = 2; + span.name = libdd_tinybytes::BytesString::from_static("m"); + span.start = 0; + span.duration = 1; + span.metrics.insert( + libdd_tinybytes::BytesString::from_static("count"), + 42.0, + ); + span.metrics.insert( + libdd_tinybytes::BytesString::from_static("rate"), + 3.14, + ); + let req = map_traces_to_otlp(vec![vec![span]], &metadata); + let attrs = &req.resource_spans[0].scope_spans[0].spans[0].attributes; + let count_kv = attrs.iter().find(|a| a.key == "count").unwrap(); + assert!(count_kv.value.int_value.is_some()); + assert_eq!(count_kv.value.int_value, Some(42)); + let rate_kv = attrs.iter().find(|a| a.key == "rate").unwrap(); + assert!(rate_kv.value.double_value.is_some()); + assert!((rate_kv.value.double_value.unwrap() - 3.14).abs() < 1e-9); + } +} diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs new file mode 100644 index 0000000000..8af55ec44d --- /dev/null +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -0,0 +1,30 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP trace export for libdatadog. +//! +//! When `OTEL_TRACES_EXPORTER=otlp` is set, the trace exporter sends traces in OTLP HTTP/JSON +//! format to the configured endpoint instead of the Datadog agent. See [`config::otlp_trace_config_from_env`] +//! and [`config::env_keys`] for environment variable names and precedence. +//! +//! ## Sampling +//! +//! By default, the exporter does not apply its own sampling: it exports every trace it receives +//! from the tracer. The tracer (e.g. dd-trace-py) is responsible for inheriting the sampling +//! decision from the distributed trace context; when no decision is present, the tracer typically +//! uses 100% (always on). +//! +//! ## Partial flush +//! +//! For the POC, partial flush is disabled. The tracer should only invoke the exporter when all +//! spans from a local trace are closed (i.e. send complete trace chunks). This crate does not +//! buffer or flush partially—it exports whatever trace chunks it receives. + +pub mod config; +pub mod exporter; +pub mod json_types; +pub mod mapper; + +pub use config::{otlp_trace_config_from_env, OtlpProtocol, OtlpTraceConfig}; +pub use exporter::send_otlp_traces_http; +pub use mapper::map_traces_to_otlp; diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index f9833fa668..94f1258b2a 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::agent_info::AgentInfoFetcher; +use crate::otlp::otlp_trace_config_from_env; use crate::pausable_worker::PausableWorker; use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; @@ -28,12 +29,14 @@ pub struct TraceExporterBuilder { env: String, app_version: String, service: String, + tracer_name: String, tracer_version: String, language: String, language_version: String, language_interpreter: String, language_interpreter_vendor: String, git_commit_sha: String, + git_repository_url: String, input_format: TraceExporterInputFormat, output_format: TraceExporterOutputFormat, dogstatsd_url: Option, @@ -111,6 +114,19 @@ impl TraceExporterBuilder { self } + /// Set the git repository URL (e.g. for OTLP resource attribute `git.repository_url`) + pub fn set_git_repository_url(&mut self, url: &str) -> &mut Self { + url.clone_into(&mut self.git_repository_url); + self + } + + /// Set the tracer/SDK name used for OTLP resource attribute `telemetry.sdk.name` + /// (e.g. "dd-trace-py", "dd-trace-js"). When unset, OTLP export uses "libdatadog". + pub fn set_tracer_name(&mut self, tracer_name: &str) -> &mut Self { + tracer_name.clone_into(&mut self.tracer_name); + self + } + /// Set the `Datadog-Meta-Tracer-Version` header pub fn set_tracer_version(&mut self, tracer_version: &str) -> &mut Self { tracer_version.clone_into(&mut self.tracer_version); @@ -303,12 +319,14 @@ impl TraceExporterBuilder { ..Default::default() }, metadata: TracerMetadata { + tracer_name: self.tracer_name.clone(), tracer_version: self.tracer_version, language_version: self.language_version, language_interpreter: self.language_interpreter, language_interpreter_vendor: self.language_interpreter_vendor, language: self.language, git_commit_sha: self.git_commit_sha, + git_repository_url: self.git_repository_url.clone(), client_computed_stats: self.client_computed_stats, client_computed_top_level: self.client_computed_top_level, hostname: self.hostname, @@ -338,6 +356,7 @@ impl TraceExporterBuilder { .agent_rates_payload_version_enabled .then(AgentResponsePayloadVersion::new), http_client: new_default_client(), + otlp_config: otlp_trace_config_from_env(), }) } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 02320a684d..f8c45475c2 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -21,6 +21,7 @@ use crate::telemetry::{SendPayloadTelemetry, TelemetryClient}; use crate::trace_exporter::agent_response::{ AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION_HEADER, }; +use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpTraceConfig}; use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; use crate::{ agent_info::{self, schema::AgentInfo}, @@ -122,12 +123,16 @@ pub struct TracerMetadata { pub app_version: String, pub runtime_id: String, pub service: String, + /// Tracer/SDK name for OTLP resource attribute `telemetry.sdk.name` (e.g. "dd-trace-py"). + /// When empty, OTLP export uses "libdatadog". + pub tracer_name: String, pub tracer_version: String, pub language: String, pub language_version: String, pub language_interpreter: String, pub language_interpreter_vendor: String, pub git_commit_sha: String, + pub git_repository_url: String, pub client_computed_stats: bool, pub client_computed_top_level: bool, } @@ -204,6 +209,8 @@ pub struct TraceExporter { workers: Arc>, agent_payload_response_version: Option, http_client: HttpClient, + /// When set, traces are exported via OTLP HTTP/JSON instead of the Datadog agent. + otlp_config: Option, } impl TraceExporter { @@ -493,39 +500,60 @@ impl TraceExporter { } } - /// Send a list of trace chunks to the agent + /// Send a list of trace chunks to the agent (or OTLP endpoint when configured). /// /// # Arguments /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans. /// /// # Returns - /// * Ok(String): The response from the agent + /// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP) /// * Err(TraceExporterError): An error detailing what went wrong in the process - pub fn send_trace_chunks( - &self, - trace_chunks: Vec>>, - ) -> Result { + pub fn send_trace_chunks(&self, trace_chunks: Vec>>) -> Result + where + T::Text: Borrow, + { self.check_agent_info(); self.runtime()? .block_on(async { self.send_trace_chunks_inner(trace_chunks).await }) } - /// Send a list of trace chunks to the agent, asynchronously + /// Send a list of trace chunks to the agent, asynchronously (or OTLP when configured). /// /// # Arguments /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans. /// /// # Returns - /// * Ok(String): The response from the agent + /// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP) /// * Err(TraceExporterError): An error detailing what went wrong in the process pub async fn send_trace_chunks_async( &self, trace_chunks: Vec>>, - ) -> Result { + ) -> Result + where + T::Text: Borrow, + { self.check_agent_info(); self.send_trace_chunks_inner(trace_chunks).await } + /// Sends trace chunks via OTLP HTTP/JSON when OTLP config is enabled. + async fn send_otlp_traces_inner( + &self, + traces: Vec>>, + config: &OtlpTraceConfig, + ) -> Result + where + T::Text: Borrow, + { + let request = map_traces_to_otlp(traces, &self.metadata); + let json_body = serde_json::to_vec(&request).map_err(|e| { + error!("OTLP JSON serialization error: {e}"); + TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string())) + })?; + send_otlp_traces_http(&self.http_client, config, json_body).await?; + Ok(AgentResponse::Unchanged) + } + /// Deserializes, processes and sends trace chunks to the agent fn send_deser( &self, @@ -591,7 +619,16 @@ impl TraceExporter { async fn send_trace_chunks_inner( &self, mut traces: Vec>>, - ) -> Result { + ) -> Result + where + T::Text: Borrow, + { + // OTLP path: when OTEL_TRACES_EXPORTER=otlp. No sampling/dropping—export all received + // (equivalent to OTEL_TRACES_SAMPLER=parentbased_always_on). + if let Some(ref config) = self.otlp_config { + return self.send_otlp_traces_inner(traces, config).await; + } + let mut header_tags: TracerHeaderTags = self.metadata.borrow().into(); // Process stats computation @@ -1865,6 +1902,58 @@ mod tests { assert_eq!(exporter.endpoint.timeout_ms, 42); } + #[test] + #[cfg_attr(miri, ignore)] + fn test_otlp_export_when_env_set() { + let server = MockServer::start(); + let mock_otlp = server.mock(|when, then| { + when.method(POST) + .path("/v1/traces") + .header("Content-Type", "application/json"); + then.status(200).body(""); + }); + + std::env::set_var(crate::otlp::config::env_keys::TRACES_EXPORTER, "otlp"); + // Endpoint set explicitly is used as-is (no path appended); so set full path for mock + std::env::set_var( + crate::otlp::config::env_keys::TRACES_ENDPOINT, + format!("{}/v1/traces", server.url("/").trim_end_matches('/')), + ); + let mut builder = TraceExporterBuilder::default(); + builder + .set_url("http://127.0.0.1:8126") + .set_service("svc") + .set_env("env") + .set_tracer_version("1.0") + .set_language("rust") + .set_language_version("1.0") + .set_language_interpreter("rustc") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V04); + let exporter = builder.build().unwrap(); + + let traces: Vec> = vec![vec![SpanBytes { + name: BytesString::from_slice(b"op").unwrap(), + service: BytesString::from_static("svc"), + resource: BytesString::from_static("res"), + trace_id: 1, + span_id: 2, + parent_id: 0, + start: 1000, + duration: 100, + error: 0, + ..Default::default() + }]]; + let data = msgpack_encoder::v04::to_vec(&traces); + let result = exporter.send(data.as_ref()); + + let _ = std::env::remove_var(crate::otlp::config::env_keys::TRACES_EXPORTER); + let _ = std::env::remove_var(crate::otlp::config::env_keys::TRACES_ENDPOINT); + + assert!(result.is_ok(), "OTLP send should succeed: {:?}", result.err()); + mock_otlp.assert(); + } + #[test] #[cfg_attr(miri, ignore)] fn stop_and_start_runtime() { From 01662c2758b2b76ecc339afa4f54abf5b8089d63 Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Mon, 2 Mar 2026 16:48:12 -0500 Subject: [PATCH 02/10] linting --- libdd-data-pipeline-ffi/src/trace_exporter.rs | 7 +- libdd-data-pipeline/src/otlp/config.rs | 30 +++++-- libdd-data-pipeline/src/otlp/exporter.rs | 27 ++++-- libdd-data-pipeline/src/otlp/mapper.rs | 82 ++++++++++--------- libdd-data-pipeline/src/trace_exporter/mod.rs | 17 ++-- 5 files changed, 101 insertions(+), 62 deletions(-) diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index 162e7762fd..2620c48bbe 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -527,7 +527,12 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( .set_app_version(config.version.as_ref().unwrap_or(&"".to_string())) .set_service(config.service.as_ref().unwrap_or(&"".to_string())) .set_git_commit_sha(config.git_commit_sha.as_ref().unwrap_or(&"".to_string())) - .set_git_repository_url(config.git_repository_url.as_ref().unwrap_or(&"".to_string())) + .set_git_repository_url( + config + .git_repository_url + .as_ref() + .unwrap_or(&"".to_string()), + ) .set_input_format(config.input_format) .set_output_format(config.output_format) .set_connection_timeout(config.connection_timeout); diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 87c44e93f1..cc40ee4596 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -131,7 +131,11 @@ pub fn otlp_trace_config_from_env() -> Option { let endpoint = s.trim().to_string(); if endpoint.is_empty() { fallback_traces_url(DEFAULT_OTLP_HTTP_ENDPOINT, protocol) - } else if endpoint.contains("://") && (endpoint.contains(':') || endpoint.starts_with("http://") || endpoint.starts_with("https://")) { + } else if endpoint.contains("://") + && (endpoint.contains(':') + || endpoint.starts_with("http://") + || endpoint.starts_with("https://")) + { // Explicitly set: use as-is endpoint } else { @@ -173,9 +177,17 @@ fn parse_timeout(s: &str) -> Option { if s.ends_with("ms") { s[..s.len() - 2].trim().parse::().ok() } else if s.ends_with('s') && !s.ends_with("ms") { - s[..s.len() - 1].trim().parse::().ok().map(|v| v * 1000) + s[..s.len() - 1] + .trim() + .parse::() + .ok() + .map(|v| v * 1000) } else if s.ends_with('m') { - s[..s.len() - 1].trim().parse::().ok().map(|v| v * 60 * 1000) + s[..s.len() - 1] + .trim() + .parse::() + .ok() + .map(|v| v * 60 * 1000) } else { s.parse::().ok() } @@ -222,21 +234,21 @@ mod tests { #[test] fn test_otlp_disabled_without_traces_exporter() { // Without OTEL_TRACES_EXPORTER=otlp, config should be None - let _ = std::env::remove_var(env_keys::TRACES_EXPORTER); - let _ = std::env::remove_var(env_keys::TRACES_ENDPOINT); + std::env::remove_var(env_keys::TRACES_EXPORTER); + std::env::remove_var(env_keys::TRACES_ENDPOINT); assert!(otlp_trace_config_from_env().is_none()); } #[test] fn test_explicit_endpoint_used_as_is() { // Per spec: when OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is set, use as-is (no /v1/traces appended) - let _ = std::env::remove_var(env_keys::TRACES_EXPORTER); - let _ = std::env::remove_var(env_keys::TRACES_ENDPOINT); + std::env::remove_var(env_keys::TRACES_EXPORTER); + std::env::remove_var(env_keys::TRACES_ENDPOINT); std::env::set_var(env_keys::TRACES_EXPORTER, "otlp"); std::env::set_var(env_keys::TRACES_ENDPOINT, "http://custom:9999"); let config = otlp_trace_config_from_env(); - let _ = std::env::remove_var(env_keys::TRACES_EXPORTER); - let _ = std::env::remove_var(env_keys::TRACES_ENDPOINT); + std::env::remove_var(env_keys::TRACES_EXPORTER); + std::env::remove_var(env_keys::TRACES_ENDPOINT); let config = config.expect("config when TRACES_EXPORTER=otlp and endpoint set"); assert_eq!(config.endpoint_url, "http://custom:9999"); } diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index 4d81aa77ac..ee3e5d721e 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -32,9 +32,12 @@ pub async fn send_otlp_traces_http( json_body: Vec, ) -> Result<(), TraceExporterError> { let uri = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { - TraceExporterError::Internal(crate::trace_exporter::error::InternalErrorKind::InvalidWorkerState( - format!("Invalid OTLP endpoint URL: {}", e), - )) + TraceExporterError::Internal( + crate::trace_exporter::error::InternalErrorKind::InvalidWorkerState(format!( + "Invalid OTLP endpoint URL: {}", + e + )), + ) })?; let mut attempt = 0u32; @@ -50,11 +53,15 @@ pub async fn send_otlp_traces_http( "OTLP trace export attempt" ); - let req = req_builder.body(Body::from_bytes(body_bytes)).map_err(|e| { - TraceExporterError::Internal(crate::trace_exporter::error::InternalErrorKind::InvalidWorkerState( - e.to_string(), - )) - })?; + let req = req_builder + .body(Body::from_bytes(body_bytes)) + .map_err(|e| { + TraceExporterError::Internal( + crate::trace_exporter::error::InternalErrorKind::InvalidWorkerState( + e.to_string(), + ), + ) + })?; match tokio::time::timeout(timeout, client.request(req)).await { Ok(Ok(response)) => { @@ -76,7 +83,9 @@ pub async fn send_otlp_traces_http( continue; } let response = http_common::into_response(response); - let body_bytes = http_common::collect_response_bytes(response).await.unwrap_or_default(); + let body_bytes = http_common::collect_response_bytes(response) + .await + .unwrap_or_default(); let body_str = String::from_utf8_lossy(&body_bytes); error!( status = %status, diff --git a/libdd-data-pipeline/src/otlp/mapper.rs b/libdd-data-pipeline/src/otlp/mapper.rs index 6bc71e8a84..218c4e2108 100644 --- a/libdd-data-pipeline/src/otlp/mapper.rs +++ b/libdd-data-pipeline/src/otlp/mapper.rs @@ -207,9 +207,7 @@ where } } -fn map_span_events( - events: &[SpanEvent], -) -> (Vec, usize) +fn map_span_events(events: &[SpanEvent]) -> (Vec, usize) where T::Text: Borrow, { @@ -306,39 +304,49 @@ mod tests { #[test] fn test_trace_id_span_id_format() { let metadata = TracerMetadata::default(); - let mut span: Span = Span::default(); - span.trace_id = 0x5B8EFFF798038103D269B633813FC60C_u128; - span.span_id = 0xEEE19B7EC3C1B174; - span.parent_id = 0xEEE19B7EC3C1B173; - span.name = libdd_tinybytes::BytesString::from_static("test"); - span.service = libdd_tinybytes::BytesString::from_static("svc"); - span.resource = libdd_tinybytes::BytesString::from_static("res"); - span.r#type = libdd_tinybytes::BytesString::from_static("web"); - span.start = 1544712660000000000; - span.duration = 1000000000; - span.error = 0; + let span: Span = Span { + trace_id: 0x5B8EFFF798038103D269B633813FC60C_u128, + span_id: 0xEEE19B7EC3C1B174, + parent_id: 0xEEE19B7EC3C1B173, + name: libdd_tinybytes::BytesString::from_static("test"), + service: libdd_tinybytes::BytesString::from_static("svc"), + resource: libdd_tinybytes::BytesString::from_static("res"), + r#type: libdd_tinybytes::BytesString::from_static("web"), + start: 1544712660000000000, + duration: 1000000000, + error: 0, + ..Default::default() + }; let req = map_traces_to_otlp(vec![vec![span]], &metadata); let rs = &req.resource_spans[0]; let otlp_span = &rs.scope_spans[0].spans[0]; assert_eq!(otlp_span.trace_id, "5B8EFFF798038103D269B633813FC60C"); assert_eq!(otlp_span.span_id, "EEE19B7EC3C1B174"); - assert_eq!(otlp_span.parent_span_id.as_deref(), Some("EEE19B7EC3C1B173")); + assert_eq!( + otlp_span.parent_span_id.as_deref(), + Some("EEE19B7EC3C1B173") + ); assert_eq!(otlp_span.kind, json_types::span_kind::SERVER); assert_eq!(otlp_span.start_time_unix_nano, "1544712660000000000"); assert_eq!(otlp_span.end_time_unix_nano, "1544712661000000000"); - assert_eq!(rs.scope_spans[0].scope.as_ref().unwrap().name.as_deref(), Some("datadog")); + assert_eq!( + rs.scope_spans[0].scope.as_ref().unwrap().name.as_deref(), + Some("datadog") + ); } #[test] fn test_status_error_message_from_meta() { let metadata = TracerMetadata::default(); - let mut span: Span = Span::default(); - span.trace_id = 1; - span.span_id = 2; - span.name = libdd_tinybytes::BytesString::from_static("err_span"); - span.start = 0; - span.duration = 1; - span.error = 1; + let mut span: Span = Span { + trace_id: 1, + span_id: 2, + name: libdd_tinybytes::BytesString::from_static("err_span"), + start: 0, + duration: 1, + error: 1, + ..Default::default() + }; span.meta.insert( libdd_tinybytes::BytesString::from_static("error.msg"), libdd_tinybytes::BytesString::from_static("something broke"), @@ -353,20 +361,18 @@ mod tests { #[test] fn test_metrics_as_int_or_double() { let metadata = TracerMetadata::default(); - let mut span: Span = Span::default(); - span.trace_id = 1; - span.span_id = 2; - span.name = libdd_tinybytes::BytesString::from_static("m"); - span.start = 0; - span.duration = 1; - span.metrics.insert( - libdd_tinybytes::BytesString::from_static("count"), - 42.0, - ); - span.metrics.insert( - libdd_tinybytes::BytesString::from_static("rate"), - 3.14, - ); + let mut span: Span = Span { + trace_id: 1, + span_id: 2, + name: libdd_tinybytes::BytesString::from_static("m"), + start: 0, + duration: 1, + ..Default::default() + }; + span.metrics + .insert(libdd_tinybytes::BytesString::from_static("count"), 42.0); + span.metrics + .insert(libdd_tinybytes::BytesString::from_static("rate"), std::f64::consts::PI); let req = map_traces_to_otlp(vec![vec![span]], &metadata); let attrs = &req.resource_spans[0].scope_spans[0].spans[0].attributes; let count_kv = attrs.iter().find(|a| a.key == "count").unwrap(); @@ -374,6 +380,6 @@ mod tests { assert_eq!(count_kv.value.int_value, Some(42)); let rate_kv = attrs.iter().find(|a| a.key == "rate").unwrap(); assert!(rate_kv.value.double_value.is_some()); - assert!((rate_kv.value.double_value.unwrap() - 3.14).abs() < 1e-9); + assert!((rate_kv.value.double_value.unwrap() - std::f64::consts::PI).abs() < 1e-9); } } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index f8c45475c2..b1f24bfcd3 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -15,13 +15,13 @@ use self::metrics::MetricsEmitter; use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; use crate::agent_info::{AgentInfoFetcher, ResponseObserver}; +use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpTraceConfig}; use crate::pausable_worker::PausableWorker; use crate::stats_exporter::StatsExporter; use crate::telemetry::{SendPayloadTelemetry, TelemetryClient}; use crate::trace_exporter::agent_response::{ AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION_HEADER, }; -use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpTraceConfig}; use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; use crate::{ agent_info::{self, schema::AgentInfo}, @@ -508,7 +508,10 @@ impl TraceExporter { /// # Returns /// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP) /// * Err(TraceExporterError): An error detailing what went wrong in the process - pub fn send_trace_chunks(&self, trace_chunks: Vec>>) -> Result + pub fn send_trace_chunks( + &self, + trace_chunks: Vec>>, + ) -> Result where T::Text: Borrow, { @@ -1947,10 +1950,14 @@ mod tests { let data = msgpack_encoder::v04::to_vec(&traces); let result = exporter.send(data.as_ref()); - let _ = std::env::remove_var(crate::otlp::config::env_keys::TRACES_EXPORTER); - let _ = std::env::remove_var(crate::otlp::config::env_keys::TRACES_ENDPOINT); + std::env::remove_var(crate::otlp::config::env_keys::TRACES_EXPORTER); + std::env::remove_var(crate::otlp::config::env_keys::TRACES_ENDPOINT); - assert!(result.is_ok(), "OTLP send should succeed: {:?}", result.err()); + assert!( + result.is_ok(), + "OTLP send should succeed: {:?}", + result.err() + ); mock_otlp.assert(); } From 91a655961dd614de68535de2474ba7581957d211 Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Mon, 2 Mar 2026 16:51:22 -0500 Subject: [PATCH 03/10] linting on comments --- libdd-data-pipeline/src/otlp/config.rs | 3 ++- libdd-data-pipeline/src/otlp/exporter.rs | 3 ++- libdd-data-pipeline/src/otlp/mapper.rs | 11 +++++++---- libdd-data-pipeline/src/otlp/mod.rs | 5 +++-- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index cc40ee4596..39c9bdbf04 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -241,7 +241,8 @@ mod tests { #[test] fn test_explicit_endpoint_used_as_is() { - // Per spec: when OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is set, use as-is (no /v1/traces appended) + // Per spec: when OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is set, use as-is (no /v1/traces + // appended) std::env::remove_var(env_keys::TRACES_EXPORTER); std::env::remove_var(env_keys::TRACES_ENDPOINT); std::env::set_var(env_keys::TRACES_EXPORTER, "otlp"); diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index ee3e5d721e..69bdce56b1 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -1,7 +1,8 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! OTLP HTTP/JSON trace exporter. Sends ExportTraceServiceRequest with retries on 429, 502, 503, 504. +//! OTLP HTTP/JSON trace exporter. Sends ExportTraceServiceRequest with retries on 429, 502, 503, +//! 504. use super::config::OtlpTraceConfig; use crate::trace_exporter::error::TraceExporterError; diff --git a/libdd-data-pipeline/src/otlp/mapper.rs b/libdd-data-pipeline/src/otlp/mapper.rs index 218c4e2108..e877449f82 100644 --- a/libdd-data-pipeline/src/otlp/mapper.rs +++ b/libdd-data-pipeline/src/otlp/mapper.rs @@ -19,8 +19,9 @@ const MAX_ATTRIBUTES_PER_SPAN: usize = 128; /// /// Resource: SDK-level attributes (service.name, deployment.environment, telemetry.sdk.*). /// InstrumentationScope: "datadog" (DD SDKs don't have scope; all spans use this). -/// All analogous DD span fields are mapped; meta→attributes (string), metrics→attributes (int/double), -/// links and events mapped to OTLP links and events. Status from span.error and meta["error.msg"]. +/// All analogous DD span fields are mapped; meta→attributes (string), metrics→attributes +/// (int/double), links and events mapped to OTLP links and events. Status from span.error and +/// meta["error.msg"]. pub fn map_traces_to_otlp( trace_chunks: Vec>>, metadata: &TracerMetadata, @@ -371,8 +372,10 @@ mod tests { }; span.metrics .insert(libdd_tinybytes::BytesString::from_static("count"), 42.0); - span.metrics - .insert(libdd_tinybytes::BytesString::from_static("rate"), std::f64::consts::PI); + span.metrics.insert( + libdd_tinybytes::BytesString::from_static("rate"), + std::f64::consts::PI, + ); let req = map_traces_to_otlp(vec![vec![span]], &metadata); let attrs = &req.resource_spans[0].scope_spans[0].spans[0].attributes; let count_kv = attrs.iter().find(|a| a.key == "count").unwrap(); diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index 8af55ec44d..f1d8107a5d 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -4,8 +4,9 @@ //! OTLP trace export for libdatadog. //! //! When `OTEL_TRACES_EXPORTER=otlp` is set, the trace exporter sends traces in OTLP HTTP/JSON -//! format to the configured endpoint instead of the Datadog agent. See [`config::otlp_trace_config_from_env`] -//! and [`config::env_keys`] for environment variable names and precedence. +//! format to the configured endpoint instead of the Datadog agent. See +//! [`config::otlp_trace_config_from_env`] and [`config::env_keys`] for environment variable names +//! and precedence. //! //! ## Sampling //! From 293ecd841e3fbba4ad030dc361c17ee74a19eafb Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Mon, 2 Mar 2026 20:34:35 -0500 Subject: [PATCH 04/10] fix endpoint and mapping --- libdd-data-pipeline/src/otlp/config.rs | 54 +++++++++++++++++----- libdd-data-pipeline/src/otlp/exporter.rs | 10 ++-- libdd-data-pipeline/src/otlp/json_types.rs | 1 + libdd-data-pipeline/src/otlp/mapper.rs | 16 +++---- 4 files changed, 57 insertions(+), 24 deletions(-) diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 39c9bdbf04..2e7041e247 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -125,27 +125,34 @@ pub fn otlp_trace_config_from_env() -> Option { .unwrap_or_default(); // Traces-specific endpoint takes precedence over generic OTEL endpoint when both are set. - let endpoint_opt = get_env(env_keys::TRACES_ENDPOINT).or_else(|| get_env(env_keys::ENDPOINT)); + // Per spec: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is used as-is; the generic + // OTEL_EXPORTER_OTLP_ENDPOINT gets /v1/traces appended for HTTP signals. + let traces_endpoint = get_env(env_keys::TRACES_ENDPOINT); + let (endpoint_opt, is_signal_specific) = match traces_endpoint { + Some(ep) => (Some(ep), true), + None => (get_env(env_keys::ENDPOINT), false), + }; let url = match endpoint_opt { Some(s) => { let endpoint = s.trim().to_string(); if endpoint.is_empty() { fallback_traces_url(DEFAULT_OTLP_HTTP_ENDPOINT, protocol) - } else if endpoint.contains("://") - && (endpoint.contains(':') - || endpoint.starts_with("http://") - || endpoint.starts_with("https://")) - { - // Explicitly set: use as-is - endpoint } else { - // Bare host:port from env: still "set by user", use as-is (no path) - let base = if endpoint.starts_with(':') { + // Normalize bare host:port to a full URL. + let normalized = if endpoint.contains("://") { + endpoint + } else if endpoint.starts_with(':') { format!("http://localhost{}", endpoint) } else { format!("http://{}", endpoint) }; - base + // Spec: signal-specific TRACES_ENDPOINT is used as-is; generic ENDPOINT gets + // /v1/traces appended for HTTP. + if is_signal_specific { + normalized + } else { + fallback_traces_url(&normalized, protocol) + } } } None => fallback_traces_url(DEFAULT_OTLP_HTTP_ENDPOINT, protocol), @@ -196,6 +203,10 @@ fn parse_timeout(s: &str) -> Option { #[cfg(test)] mod tests { use super::*; + use std::sync::Mutex; + + // Env-var-dependent tests must be serialized: parallel mutation of global env is not safe. + static ENV_LOCK: Mutex<()> = Mutex::new(()); #[test] fn test_parse_headers() { @@ -233,18 +244,22 @@ mod tests { #[test] fn test_otlp_disabled_without_traces_exporter() { + let _guard = ENV_LOCK.lock().unwrap(); // Without OTEL_TRACES_EXPORTER=otlp, config should be None std::env::remove_var(env_keys::TRACES_EXPORTER); std::env::remove_var(env_keys::TRACES_ENDPOINT); + std::env::remove_var(env_keys::ENDPOINT); assert!(otlp_trace_config_from_env().is_none()); } #[test] fn test_explicit_endpoint_used_as_is() { + let _guard = ENV_LOCK.lock().unwrap(); // Per spec: when OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is set, use as-is (no /v1/traces // appended) std::env::remove_var(env_keys::TRACES_EXPORTER); std::env::remove_var(env_keys::TRACES_ENDPOINT); + std::env::remove_var(env_keys::ENDPOINT); std::env::set_var(env_keys::TRACES_EXPORTER, "otlp"); std::env::set_var(env_keys::TRACES_ENDPOINT, "http://custom:9999"); let config = otlp_trace_config_from_env(); @@ -253,4 +268,21 @@ mod tests { let config = config.expect("config when TRACES_EXPORTER=otlp and endpoint set"); assert_eq!(config.endpoint_url, "http://custom:9999"); } + + #[test] + fn test_generic_endpoint_gets_path_appended() { + let _guard = ENV_LOCK.lock().unwrap(); + // Per spec: OTEL_EXPORTER_OTLP_ENDPOINT (generic) must have /v1/traces appended for HTTP. + std::env::remove_var(env_keys::TRACES_EXPORTER); + std::env::remove_var(env_keys::TRACES_ENDPOINT); + std::env::remove_var(env_keys::ENDPOINT); + std::env::set_var(env_keys::TRACES_EXPORTER, "otlp"); + std::env::set_var(env_keys::ENDPOINT, "http://collector:4318"); + let config = otlp_trace_config_from_env(); + std::env::remove_var(env_keys::TRACES_EXPORTER); + std::env::remove_var(env_keys::TRACES_ENDPOINT); + std::env::remove_var(env_keys::ENDPOINT); + let config = config.expect("config when TRACES_EXPORTER=otlp and generic endpoint set"); + assert_eq!(config.endpoint_url, "http://collector:4318/v1/traces"); + } } diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index 69bdce56b1..fddc814851 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -13,8 +13,8 @@ use std::time::Duration; use tokio::time::sleep; use tracing::{debug, error, warn}; -/// Max retries for OTLP export (transient failures only). -const OTLP_MAX_RETRIES: u32 = 5; +/// Max total attempts for OTLP export (1 initial + up to 4 retries on transient failures). +const OTLP_MAX_ATTEMPTS: u32 = 5; /// Initial backoff between retries (milliseconds). const OTLP_RETRY_DELAY_MS: u64 = 100; @@ -72,7 +72,7 @@ pub async fn send_otlp_traces_http( return Ok(()); } let code = status.as_u16(); - if is_retryable_status(code) && attempt < OTLP_MAX_RETRIES { + if is_retryable_status(code) && attempt < OTLP_MAX_ATTEMPTS { let delay_ms = OTLP_RETRY_DELAY_MS * (1 << (attempt - 1)); warn!( status = %status, @@ -99,7 +99,7 @@ pub async fn send_otlp_traces_http( )); } Ok(Err(e)) => { - if attempt < OTLP_MAX_RETRIES { + if attempt < OTLP_MAX_ATTEMPTS { let delay_ms = OTLP_RETRY_DELAY_MS * (1 << (attempt - 1)); warn!(error = ?e, attempt, "OTLP export network error, retrying"); sleep(Duration::from_millis(delay_ms)).await; @@ -109,7 +109,7 @@ pub async fn send_otlp_traces_http( return Err(TraceExporterError::from(http_common::into_error(e))); } Err(_) => { - if attempt < OTLP_MAX_RETRIES { + if attempt < OTLP_MAX_ATTEMPTS { let delay_ms = OTLP_RETRY_DELAY_MS * (1 << (attempt - 1)); warn!(attempt, "OTLP export timeout, retrying"); sleep(Duration::from_millis(delay_ms)).await; diff --git a/libdd-data-pipeline/src/otlp/json_types.rs b/libdd-data-pipeline/src/otlp/json_types.rs index cd69b5a517..2c5062dc46 100644 --- a/libdd-data-pipeline/src/otlp/json_types.rs +++ b/libdd-data-pipeline/src/otlp/json_types.rs @@ -160,6 +160,7 @@ impl AnyValue { #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct Status { + #[serde(skip_serializing_if = "Option::is_none")] pub message: Option, pub code: i32, } diff --git a/libdd-data-pipeline/src/otlp/mapper.rs b/libdd-data-pipeline/src/otlp/mapper.rs index e877449f82..ca280ab831 100644 --- a/libdd-data-pipeline/src/otlp/mapper.rs +++ b/libdd-data-pipeline/src/otlp/mapper.rs @@ -127,10 +127,10 @@ fn map_span(span: &Span) -> OtlpSpan where T::Text: Borrow, { - let trace_id_hex = format!("{:032X}", span.trace_id); - let span_id_hex = format!("{:016X}", span.span_id); + let trace_id_hex = format!("{:032x}", span.trace_id); + let span_id_hex = format!("{:016x}", span.span_id); let parent_span_id = if span.parent_id != 0 { - Some(format!("{:016X}", span.parent_id)) + Some(format!("{:016x}", span.parent_id)) } else { None }; @@ -184,8 +184,8 @@ where T::Text: Borrow, { let trace_id_128 = (link.trace_id_high as u128) << 64 | (link.trace_id as u128); - let trace_id_hex = format!("{:032X}", trace_id_128); - let span_id_hex = format!("{:016X}", link.span_id); + let trace_id_hex = format!("{:032x}", trace_id_128); + let span_id_hex = format!("{:016x}", link.span_id); let trace_state = if link.tracestate.borrow().is_empty() { None } else { @@ -321,11 +321,11 @@ mod tests { let req = map_traces_to_otlp(vec![vec![span]], &metadata); let rs = &req.resource_spans[0]; let otlp_span = &rs.scope_spans[0].spans[0]; - assert_eq!(otlp_span.trace_id, "5B8EFFF798038103D269B633813FC60C"); - assert_eq!(otlp_span.span_id, "EEE19B7EC3C1B174"); + assert_eq!(otlp_span.trace_id, "5b8efff798038103d269b633813fc60c"); + assert_eq!(otlp_span.span_id, "eee19b7ec3c1b174"); assert_eq!( otlp_span.parent_span_id.as_deref(), - Some("EEE19B7EC3C1B173") + Some("eee19b7ec3c1b173") ); assert_eq!(otlp_span.kind, json_types::span_kind::SERVER); assert_eq!(otlp_span.start_time_unix_nano, "1544712660000000000"); From 2dd4b574cdfe606be150f756309e7e96d3817bbf Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Fri, 6 Mar 2026 14:34:28 -0500 Subject: [PATCH 05/10] implement feedback --- libdd-data-pipeline-ffi/src/trace_exporter.rs | 72 ------------------- libdd-data-pipeline/src/otlp/config.rs | 4 +- libdd-data-pipeline/src/otlp/json_types.rs | 21 ++++-- libdd-data-pipeline/src/otlp/mapper.rs | 28 ++------ libdd-data-pipeline/src/otlp/mod.rs | 2 +- .../src/trace_exporter/builder.rs | 17 ----- libdd-data-pipeline/src/trace_exporter/mod.rs | 4 -- 7 files changed, 22 insertions(+), 126 deletions(-) diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index 2620c48bbe..7062da398e 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -84,7 +84,6 @@ pub struct TelemetryClientConfig<'a> { #[derive(Debug, Default)] pub struct TraceExporterConfig { url: Option, - tracer_name: Option, tracer_version: Option, language: Option, language_version: Option, @@ -93,8 +92,6 @@ pub struct TraceExporterConfig { env: Option, version: Option, service: Option, - git_commit_sha: Option, - git_repository_url: Option, input_format: TraceExporterInputFormat, output_format: TraceExporterOutputFormat, compute_stats: bool, @@ -143,27 +140,6 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_url( ) } -/// Sets the tracer/SDK name for OTLP resource attribute `telemetry.sdk.name` -/// (e.g. "dd-trace-py"). When unset, OTLP uses "libdatadog". -#[no_mangle] -pub unsafe extern "C" fn ddog_trace_exporter_config_set_tracer_name( - config: Option<&mut TraceExporterConfig>, - tracer_name: CharSlice, -) -> Option> { - catch_panic!( - if let Option::Some(handle) = config { - handle.tracer_name = match sanitize_string(tracer_name) { - Ok(s) => Some(s), - Err(e) => return Some(e), - }; - None - } else { - gen_error!(ErrorCode::InvalidArgument) - }, - gen_error!(ErrorCode::Panic) - ) -} - /// Sets tracer's version to be included in the headers request. #[no_mangle] pub unsafe extern "C" fn ddog_trace_exporter_config_set_tracer_version( @@ -303,46 +279,6 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_version( ) } -/// Sets git commit SHA (e.g. for OTLP resource attribute `git.commit.sha`). -#[no_mangle] -pub unsafe extern "C" fn ddog_trace_exporter_config_set_git_commit_sha( - config: Option<&mut TraceExporterConfig>, - git_commit_sha: CharSlice, -) -> Option> { - catch_panic!( - if let Option::Some(handle) = config { - handle.git_commit_sha = match sanitize_string(git_commit_sha) { - Ok(s) => Some(s), - Err(e) => return Some(e), - }; - None - } else { - gen_error!(ErrorCode::InvalidArgument) - }, - gen_error!(ErrorCode::Panic) - ) -} - -/// Sets git repository URL (e.g. for OTLP resource attribute `git.repository_url`). -#[no_mangle] -pub unsafe extern "C" fn ddog_trace_exporter_config_set_git_repository_url( - config: Option<&mut TraceExporterConfig>, - url: CharSlice, -) -> Option> { - catch_panic!( - if let Option::Some(handle) = config { - handle.git_repository_url = match sanitize_string(url) { - Ok(s) => Some(s), - Err(e) => return Some(e), - }; - None - } else { - gen_error!(ErrorCode::InvalidArgument) - }, - gen_error!(ErrorCode::Panic) - ) -} - /// Sets service name to be included in the headers request. #[no_mangle] pub unsafe extern "C" fn ddog_trace_exporter_config_set_service( @@ -512,7 +448,6 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( let mut builder = TraceExporter::builder(); builder .set_url(config.url.as_ref().unwrap_or(&"".to_string())) - .set_tracer_name(config.tracer_name.as_ref().unwrap_or(&"".to_string())) .set_tracer_version(config.tracer_version.as_ref().unwrap_or(&"".to_string())) .set_language(config.language.as_ref().unwrap_or(&"".to_string())) .set_language_version(config.language_version.as_ref().unwrap_or(&"".to_string())) @@ -526,13 +461,6 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( .set_env(config.env.as_ref().unwrap_or(&"".to_string())) .set_app_version(config.version.as_ref().unwrap_or(&"".to_string())) .set_service(config.service.as_ref().unwrap_or(&"".to_string())) - .set_git_commit_sha(config.git_commit_sha.as_ref().unwrap_or(&"".to_string())) - .set_git_repository_url( - config - .git_repository_url - .as_ref() - .unwrap_or(&"".to_string()), - ) .set_input_format(config.input_format) .set_output_format(config.output_format) .set_connection_timeout(config.connection_timeout); diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 2e7041e247..0eb5557e3a 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -12,7 +12,7 @@ use std::time::Duration; /// OTLP trace export protocol. Support for HTTP/JSON for now. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] -pub enum OtlpProtocol { +pub(crate) enum OtlpProtocol { /// HTTP with JSON body (Content-Type: application/json). Default for HTTP. #[default] HttpJson, @@ -50,7 +50,7 @@ pub struct OtlpTraceConfig { /// Request timeout. pub timeout: Duration, /// Protocol (for future use; currently only HttpJson is supported). - pub protocol: OtlpProtocol, + pub(crate) protocol: OtlpProtocol, } /// Environment variable names (standard OTEL and traces-specific). diff --git a/libdd-data-pipeline/src/otlp/json_types.rs b/libdd-data-pipeline/src/otlp/json_types.rs index 2c5062dc46..45d6510c4f 100644 --- a/libdd-data-pipeline/src/otlp/json_types.rs +++ b/libdd-data-pipeline/src/otlp/json_types.rs @@ -1,9 +1,21 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! OTLP JSON encoding types for ExportTraceServiceRequest. -//! Field names use lowerCamelCase per the OTLP HTTP/JSON spec (Protocol Buffers JSON Mapping). -//! Trace/span IDs are hex-encoded strings; enum values are integers. +//! Minimal serde types for OTLP HTTP/JSON export (ExportTraceServiceRequest). +//! +//! These types mirror the OTLP protobuf schema for the HTTP/JSON wire format. Field names use +//! lowerCamelCase per the Protocol Buffers JSON Mapping spec; trace/span IDs are hex-encoded +//! strings; enum values (SpanKind, StatusCode) are integers. +//! +//! The canonical definitions live in the opentelemetry-proto repository: +//! +//! +//! +//! The Rust implementation in opentelemetry-rust uses `prost`-generated types with an optional +//! `with-serde` feature (`opentelemetry-proto` crate). We use hand-rolled serde structs here to +//! avoid the `prost` + `tonic` dependency tree in this early implementation. If/when protobuf +//! support is added, these types should be replaced with `opentelemetry-proto`: +//! use serde::Serialize; @@ -22,11 +34,8 @@ pub struct ResourceSpans { } #[derive(Debug, Default, Serialize)] -#[serde(rename_all = "camelCase")] pub struct Resource { pub attributes: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - pub dropped_attributes_count: Option, } #[derive(Debug, Serialize)] diff --git a/libdd-data-pipeline/src/otlp/mapper.rs b/libdd-data-pipeline/src/otlp/mapper.rs index ca280ab831..e1c1d73197 100644 --- a/libdd-data-pipeline/src/otlp/mapper.rs +++ b/libdd-data-pipeline/src/otlp/mapper.rs @@ -17,8 +17,8 @@ const MAX_ATTRIBUTES_PER_SPAN: usize = 128; /// Maps Datadog trace chunks and metadata to an OTLP ExportTraceServiceRequest. /// -/// Resource: SDK-level attributes (service.name, deployment.environment, telemetry.sdk.*). -/// InstrumentationScope: "datadog" (DD SDKs don't have scope; all spans use this). +/// Resource: SDK-level attributes (service.name, deployment.environment, telemetry.sdk.*, +/// runtime-id). InstrumentationScope: "datadog" (DD SDKs don't have scope; all spans use this). /// All analogous DD span fields are mapped; meta→attributes (string), metrics→attributes /// (int/double), links and events mapped to OTLP links and events. Status from span.error and /// meta["error.msg"]. @@ -78,14 +78,9 @@ fn build_resource(metadata: &TracerMetadata) -> Resource { value: AnyValue::string(metadata.app_version.clone()), }); } - let sdk_name = if metadata.tracer_name.is_empty() { - "libdatadog".to_string() - } else { - metadata.tracer_name.clone() - }; attributes.push(KeyValue { key: "telemetry.sdk.name".to_string(), - value: AnyValue::string(sdk_name), + value: AnyValue::string("libdatadog".to_string()), }); if !metadata.language.is_empty() { attributes.push(KeyValue { @@ -99,28 +94,13 @@ fn build_resource(metadata: &TracerMetadata) -> Resource { value: AnyValue::string(metadata.tracer_version.clone()), }); } - if !metadata.git_commit_sha.is_empty() { - attributes.push(KeyValue { - key: "git.commit.sha".to_string(), - value: AnyValue::string(metadata.git_commit_sha.clone()), - }); - } - if !metadata.git_repository_url.is_empty() { - attributes.push(KeyValue { - key: "git.repository_url".to_string(), - value: AnyValue::string(metadata.git_repository_url.clone()), - }); - } if !metadata.runtime_id.is_empty() { attributes.push(KeyValue { key: "runtime-id".to_string(), value: AnyValue::string(metadata.runtime_id.clone()), }); } - Resource { - attributes, - dropped_attributes_count: None, - } + Resource { attributes } } fn map_span(span: &Span) -> OtlpSpan diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index f1d8107a5d..fe26957709 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -26,6 +26,6 @@ pub mod exporter; pub mod json_types; pub mod mapper; -pub use config::{otlp_trace_config_from_env, OtlpProtocol, OtlpTraceConfig}; +pub use config::{otlp_trace_config_from_env, OtlpTraceConfig}; pub use exporter::send_otlp_traces_http; pub use mapper::map_traces_to_otlp; diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 94f1258b2a..89f7b40fb7 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -29,14 +29,12 @@ pub struct TraceExporterBuilder { env: String, app_version: String, service: String, - tracer_name: String, tracer_version: String, language: String, language_version: String, language_interpreter: String, language_interpreter_vendor: String, git_commit_sha: String, - git_repository_url: String, input_format: TraceExporterInputFormat, output_format: TraceExporterOutputFormat, dogstatsd_url: Option, @@ -114,19 +112,6 @@ impl TraceExporterBuilder { self } - /// Set the git repository URL (e.g. for OTLP resource attribute `git.repository_url`) - pub fn set_git_repository_url(&mut self, url: &str) -> &mut Self { - url.clone_into(&mut self.git_repository_url); - self - } - - /// Set the tracer/SDK name used for OTLP resource attribute `telemetry.sdk.name` - /// (e.g. "dd-trace-py", "dd-trace-js"). When unset, OTLP export uses "libdatadog". - pub fn set_tracer_name(&mut self, tracer_name: &str) -> &mut Self { - tracer_name.clone_into(&mut self.tracer_name); - self - } - /// Set the `Datadog-Meta-Tracer-Version` header pub fn set_tracer_version(&mut self, tracer_version: &str) -> &mut Self { tracer_version.clone_into(&mut self.tracer_version); @@ -319,14 +304,12 @@ impl TraceExporterBuilder { ..Default::default() }, metadata: TracerMetadata { - tracer_name: self.tracer_name.clone(), tracer_version: self.tracer_version, language_version: self.language_version, language_interpreter: self.language_interpreter, language_interpreter_vendor: self.language_interpreter_vendor, language: self.language, git_commit_sha: self.git_commit_sha, - git_repository_url: self.git_repository_url.clone(), client_computed_stats: self.client_computed_stats, client_computed_top_level: self.client_computed_top_level, hostname: self.hostname, diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index b1f24bfcd3..f31783dd4c 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -123,16 +123,12 @@ pub struct TracerMetadata { pub app_version: String, pub runtime_id: String, pub service: String, - /// Tracer/SDK name for OTLP resource attribute `telemetry.sdk.name` (e.g. "dd-trace-py"). - /// When empty, OTLP export uses "libdatadog". - pub tracer_name: String, pub tracer_version: String, pub language: String, pub language_version: String, pub language_interpreter: String, pub language_interpreter_vendor: String, pub git_commit_sha: String, - pub git_repository_url: String, pub client_computed_stats: bool, pub client_computed_top_level: bool, } From 42de22c0fe116d682b0b37a0af4dec9377c3f059 Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Fri, 6 Mar 2026 14:48:25 -0500 Subject: [PATCH 06/10] lint and private fields --- libdd-data-pipeline/src/otlp/config.rs | 1 + libdd-data-pipeline/src/otlp/json_types.rs | 10 ++++----- libdd-data-pipeline/src/otlp/mapper.rs | 24 +++++++++++++++------- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 0eb5557e3a..9804a6f94c 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -50,6 +50,7 @@ pub struct OtlpTraceConfig { /// Request timeout. pub timeout: Duration, /// Protocol (for future use; currently only HttpJson is supported). + #[allow(dead_code)] pub(crate) protocol: OtlpProtocol, } diff --git a/libdd-data-pipeline/src/otlp/json_types.rs b/libdd-data-pipeline/src/otlp/json_types.rs index 45d6510c4f..c2fd5c8af3 100644 --- a/libdd-data-pipeline/src/otlp/json_types.rs +++ b/libdd-data-pipeline/src/otlp/json_types.rs @@ -116,15 +116,15 @@ pub struct KeyValue { #[serde(rename_all = "camelCase")] pub struct AnyValue { #[serde(skip_serializing_if = "Option::is_none")] - pub string_value: Option, + string_value: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub bool_value: Option, + bool_value: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub int_value: Option, + int_value: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub double_value: Option, + double_value: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub bytes_value: Option, + bytes_value: Option, } impl AnyValue { diff --git a/libdd-data-pipeline/src/otlp/mapper.rs b/libdd-data-pipeline/src/otlp/mapper.rs index e1c1d73197..c8aedc63f2 100644 --- a/libdd-data-pipeline/src/otlp/mapper.rs +++ b/libdd-data-pipeline/src/otlp/mapper.rs @@ -357,12 +357,22 @@ mod tests { std::f64::consts::PI, ); let req = map_traces_to_otlp(vec![vec![span]], &metadata); - let attrs = &req.resource_spans[0].scope_spans[0].spans[0].attributes; - let count_kv = attrs.iter().find(|a| a.key == "count").unwrap(); - assert!(count_kv.value.int_value.is_some()); - assert_eq!(count_kv.value.int_value, Some(42)); - let rate_kv = attrs.iter().find(|a| a.key == "rate").unwrap(); - assert!(rate_kv.value.double_value.is_some()); - assert!((rate_kv.value.double_value.unwrap() - std::f64::consts::PI).abs() < 1e-9); + let json = serde_json::to_value(&req).unwrap(); + let attrs = &json["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["attributes"]; + let count_kv = attrs + .as_array() + .unwrap() + .iter() + .find(|a| a["key"] == "count") + .unwrap(); + assert_eq!(count_kv["value"]["intValue"], 42); + let rate_kv = attrs + .as_array() + .unwrap() + .iter() + .find(|a| a["key"] == "rate") + .unwrap(); + let rate = rate_kv["value"]["doubleValue"].as_f64().unwrap(); + assert!((rate - std::f64::consts::PI).abs() < 1e-9); } } From f359ff5394c0a1f91ce12fe3edfa0c45a6257bb2 Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Tue, 10 Mar 2026 10:08:29 -0400 Subject: [PATCH 07/10] send_with_retry and move to libdd_trace_utils --- libdd-data-pipeline/src/otlp/exporter.rs | 164 ++++++------------ libdd-data-pipeline/src/otlp/mod.rs | 4 +- libdd-data-pipeline/src/trace_exporter/mod.rs | 12 +- libdd-trace-utils/src/lib.rs | 1 + .../src/otlp_encoder}/json_types.rs | 0 .../src/otlp_encoder}/mapper.rs | 63 +++---- libdd-trace-utils/src/otlp_encoder/mod.rs | 24 +++ 7 files changed, 120 insertions(+), 148 deletions(-) rename {libdd-data-pipeline/src/otlp => libdd-trace-utils/src/otlp_encoder}/json_types.rs (100%) rename {libdd-data-pipeline/src/otlp => libdd-trace-utils/src/otlp_encoder}/mapper.rs (86%) create mode 100644 libdd-trace-utils/src/otlp_encoder/mod.rs diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index fddc814851..a4b558c835 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -1,139 +1,79 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! OTLP HTTP/JSON trace exporter. Sends ExportTraceServiceRequest with retries on 429, 502, 503, -//! 504. +//! OTLP HTTP/JSON trace exporter. use super::config::OtlpTraceConfig; -use crate::trace_exporter::error::TraceExporterError; -use http::Method; -use libdd_common::http_common::{self, Body}; -use libdd_common::HttpClient; -use std::time::Duration; -use tokio::time::sleep; -use tracing::{debug, error, warn}; +use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; +use libdd_common::{http_common, Endpoint, HttpClient}; +use libdd_trace_utils::send_with_retry::{ + RetryBackoffType, RetryStrategy, SendWithRetryError, + send_with_retry, +}; +use std::collections::HashMap; /// Max total attempts for OTLP export (1 initial + up to 4 retries on transient failures). const OTLP_MAX_ATTEMPTS: u32 = 5; /// Initial backoff between retries (milliseconds). const OTLP_RETRY_DELAY_MS: u64 = 100; -/// Status codes that trigger a retry (transient). -fn is_retryable_status(code: u16) -> bool { - matches!(code, 429 | 502 | 503 | 504) -} - -/// Send OTLP trace payload (JSON bytes) to the configured endpoint. +/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries. /// -/// Retries with exponential backoff only on 429, 502, 503, 504. Does not retry on 4xx (e.g. 400). -/// Uses the timeout from config. +/// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters. +/// +/// Note: dynamic OTLP headers from `OTEL_EXPORTER_OTLP_HEADERS` are not forwarded because +/// [`send_with_retry`] requires `&'static str` header keys. Support for arbitrary OTEL headers +/// would require the API to accept `HashMap`. pub async fn send_otlp_traces_http( client: &HttpClient, config: &OtlpTraceConfig, json_body: Vec, ) -> Result<(), TraceExporterError> { - let uri = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { - TraceExporterError::Internal( - crate::trace_exporter::error::InternalErrorKind::InvalidWorkerState(format!( - "Invalid OTLP endpoint URL: {}", - e - )), - ) + let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { + TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( + "Invalid OTLP endpoint URL: {}", + e + ))) })?; - let mut attempt = 0u32; - loop { - attempt += 1; - let req_builder = build_request(&uri, config)?; - let timeout = config.timeout; - let body_bytes = bytes::Bytes::from(json_body.clone()); + let target = Endpoint { + url, + timeout_ms: config.timeout.as_millis() as u64, + ..Endpoint::default() + }; - debug!( - attempt, - url = %config.endpoint_url, - "OTLP trace export attempt" - ); + let headers: HashMap<&'static str, String> = + HashMap::from([("Content-Type", "application/json".to_string())]); - let req = req_builder - .body(Body::from_bytes(body_bytes)) - .map_err(|e| { - TraceExporterError::Internal( - crate::trace_exporter::error::InternalErrorKind::InvalidWorkerState( - e.to_string(), - ), - ) - })?; + let retry_strategy = RetryStrategy::new( + OTLP_MAX_ATTEMPTS, + OTLP_RETRY_DELAY_MS, + RetryBackoffType::Exponential, + None, + ); - match tokio::time::timeout(timeout, client.request(req)).await { - Ok(Ok(response)) => { - let status = response.status(); - if status.is_success() { - debug!(status = %status, "OTLP trace export succeeded"); - return Ok(()); - } - let code = status.as_u16(); - if is_retryable_status(code) && attempt < OTLP_MAX_ATTEMPTS { - let delay_ms = OTLP_RETRY_DELAY_MS * (1 << (attempt - 1)); - warn!( - status = %status, - attempt, - delay_ms, - "OTLP export transient failure, retrying" - ); - sleep(Duration::from_millis(delay_ms)).await; - continue; - } - let response = http_common::into_response(response); - let body_bytes = http_common::collect_response_bytes(response) - .await - .unwrap_or_default(); - let body_str = String::from_utf8_lossy(&body_bytes); - error!( - status = %status, - attempt, - body = %body_str, - "OTLP trace export failed" - ); - return Err(TraceExporterError::Request( - crate::trace_exporter::error::RequestError::new(status, &body_str), - )); - } - Ok(Err(e)) => { - if attempt < OTLP_MAX_ATTEMPTS { - let delay_ms = OTLP_RETRY_DELAY_MS * (1 << (attempt - 1)); - warn!(error = ?e, attempt, "OTLP export network error, retrying"); - sleep(Duration::from_millis(delay_ms)).await; - continue; - } - error!(error = ?e, attempt, "OTLP trace export failed after retries"); - return Err(TraceExporterError::from(http_common::into_error(e))); - } - Err(_) => { - if attempt < OTLP_MAX_ATTEMPTS { - let delay_ms = OTLP_RETRY_DELAY_MS * (1 << (attempt - 1)); - warn!(attempt, "OTLP export timeout, retrying"); - sleep(Duration::from_millis(delay_ms)).await; - continue; - } - error!(attempt, "OTLP trace export timed out after retries"); - return Err(TraceExporterError::from(std::io::Error::from( - std::io::ErrorKind::TimedOut, - ))); - } - } + match send_with_retry(client, &target, json_body, &headers, &retry_strategy).await { + Ok(_) => Ok(()), + Err(e) => Err(map_send_error(e).await), } } -fn build_request( - uri: &http::Uri, - config: &OtlpTraceConfig, -) -> Result { - let mut builder = http::Request::builder() - .method(Method::POST) - .uri(uri.clone()) - .header("Content-Type", "application/json"); - for (k, v) in &config.headers { - builder = builder.header(k.as_str(), v.as_str()); +async fn map_send_error(err: SendWithRetryError) -> TraceExporterError { + match err { + SendWithRetryError::Http(response, _) => { + let status = response.status(); + let body_bytes = http_common::collect_response_bytes(response) + .await + .unwrap_or_default(); + let body_str = String::from_utf8_lossy(&body_bytes); + TraceExporterError::Request(RequestError::new(status, &body_str)) + } + SendWithRetryError::Timeout(_) => { + TraceExporterError::Io(std::io::Error::from(std::io::ErrorKind::TimedOut)) + } + SendWithRetryError::Network(error, _) => TraceExporterError::from(error), + SendWithRetryError::Build(_) => TraceExporterError::Internal( + InternalErrorKind::InvalidWorkerState("Failed to build OTLP request".to_string()), + ), } - Ok(builder) } diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index fe26957709..14b4c6f638 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -23,9 +23,7 @@ pub mod config; pub mod exporter; -pub mod json_types; -pub mod mapper; pub use config::{otlp_trace_config_from_env, OtlpTraceConfig}; pub use exporter::send_otlp_traces_http; -pub use mapper::map_traces_to_otlp; +pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index f31783dd4c..bc3791a2fe 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -15,7 +15,7 @@ use self::metrics::MetricsEmitter; use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; use crate::agent_info::{AgentInfoFetcher, ResponseObserver}; -use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpTraceConfig}; +use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig}; use crate::pausable_worker::PausableWorker; use crate::stats_exporter::StatsExporter; use crate::telemetry::{SendPayloadTelemetry, TelemetryClient}; @@ -544,7 +544,15 @@ impl TraceExporter { where T::Text: Borrow, { - let request = map_traces_to_otlp(traces, &self.metadata); + let resource_info = OtlpResourceInfo { + service: self.metadata.service.clone(), + env: self.metadata.env.clone(), + app_version: self.metadata.app_version.clone(), + language: self.metadata.language.clone(), + tracer_version: self.metadata.tracer_version.clone(), + runtime_id: self.metadata.runtime_id.clone(), + }; + let request = map_traces_to_otlp(traces, &resource_info); let json_body = serde_json::to_vec(&request).map_err(|e| { error!("OTLP JSON serialization error: {e}"); TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string())) diff --git a/libdd-trace-utils/src/lib.rs b/libdd-trace-utils/src/lib.rs index 62b0b8a2aa..5218e30c1a 100644 --- a/libdd-trace-utils/src/lib.rs +++ b/libdd-trace-utils/src/lib.rs @@ -10,6 +10,7 @@ pub mod config_utils; pub mod msgpack_decoder; pub mod msgpack_encoder; +pub mod otlp_encoder; pub mod send_data; pub mod send_with_retry; pub mod stats_utils; diff --git a/libdd-data-pipeline/src/otlp/json_types.rs b/libdd-trace-utils/src/otlp_encoder/json_types.rs similarity index 100% rename from libdd-data-pipeline/src/otlp/json_types.rs rename to libdd-trace-utils/src/otlp_encoder/json_types.rs diff --git a/libdd-data-pipeline/src/otlp/mapper.rs b/libdd-trace-utils/src/otlp_encoder/mapper.rs similarity index 86% rename from libdd-data-pipeline/src/otlp/mapper.rs rename to libdd-trace-utils/src/otlp_encoder/mapper.rs index c8aedc63f2..bb6bd1c858 100644 --- a/libdd-data-pipeline/src/otlp/mapper.rs +++ b/libdd-trace-utils/src/otlp_encoder/mapper.rs @@ -7,15 +7,15 @@ use super::json_types::{ self, AnyValue, ExportTraceServiceRequest, InstrumentationScope, KeyValue, OtlpSpan, OtlpSpanEvent, OtlpSpanLink, Resource, ResourceSpans, ScopeSpans, Status, }; -use crate::trace_exporter::TracerMetadata; -use libdd_trace_utils::span::v04::{Span, SpanEvent, SpanLink}; -use libdd_trace_utils::span::TraceData; +use super::OtlpResourceInfo; +use crate::span::v04::{Span, SpanEvent, SpanLink}; +use crate::span::TraceData; use std::borrow::Borrow; /// Maximum number of attributes per span; excess are dropped and counted. const MAX_ATTRIBUTES_PER_SPAN: usize = 128; -/// Maps Datadog trace chunks and metadata to an OTLP ExportTraceServiceRequest. +/// Maps Datadog trace chunks and resource info to an OTLP ExportTraceServiceRequest. /// /// Resource: SDK-level attributes (service.name, deployment.environment, telemetry.sdk.*, /// runtime-id). InstrumentationScope: "datadog" (DD SDKs don't have scope; all spans use this). @@ -24,12 +24,12 @@ const MAX_ATTRIBUTES_PER_SPAN: usize = 128; /// meta["error.msg"]. pub fn map_traces_to_otlp( trace_chunks: Vec>>, - metadata: &TracerMetadata, + resource_info: &OtlpResourceInfo, ) -> ExportTraceServiceRequest where T::Text: Borrow, { - let resource = build_resource(metadata); + let resource = build_resource(resource_info); let mut all_spans: Vec = Vec::new(); for chunk in &trace_chunks { for span in chunk { @@ -54,50 +54,50 @@ where } } -fn build_resource(metadata: &TracerMetadata) -> Resource { +fn build_resource(resource_info: &OtlpResourceInfo) -> Resource { let mut attributes: Vec = Vec::new(); - if !metadata.service.is_empty() { + if !resource_info.service.is_empty() { attributes.push(KeyValue { key: "service.name".to_string(), - value: AnyValue::string(metadata.service.clone()), + value: AnyValue::string(resource_info.service.clone()), }); } - if !metadata.env.is_empty() { + if !resource_info.env.is_empty() { attributes.push(KeyValue { key: "deployment.environment".to_string(), - value: AnyValue::string(metadata.env.clone()), + value: AnyValue::string(resource_info.env.clone()), }); attributes.push(KeyValue { key: "deployment.environment.name".to_string(), - value: AnyValue::string(metadata.env.clone()), + value: AnyValue::string(resource_info.env.clone()), }); } - if !metadata.app_version.is_empty() { + if !resource_info.app_version.is_empty() { attributes.push(KeyValue { key: "service.version".to_string(), - value: AnyValue::string(metadata.app_version.clone()), + value: AnyValue::string(resource_info.app_version.clone()), }); } attributes.push(KeyValue { key: "telemetry.sdk.name".to_string(), value: AnyValue::string("libdatadog".to_string()), }); - if !metadata.language.is_empty() { + if !resource_info.language.is_empty() { attributes.push(KeyValue { key: "telemetry.sdk.language".to_string(), - value: AnyValue::string(metadata.language.clone()), + value: AnyValue::string(resource_info.language.clone()), }); } - if !metadata.tracer_version.is_empty() { + if !resource_info.tracer_version.is_empty() { attributes.push(KeyValue { key: "telemetry.sdk.version".to_string(), - value: AnyValue::string(metadata.tracer_version.clone()), + value: AnyValue::string(resource_info.tracer_version.clone()), }); } - if !metadata.runtime_id.is_empty() { + if !resource_info.runtime_id.is_empty() { attributes.push(KeyValue { key: "runtime-id".to_string(), - value: AnyValue::string(metadata.runtime_id.clone()), + value: AnyValue::string(resource_info.runtime_id.clone()), }); } Resource { attributes } @@ -213,20 +213,20 @@ where fn event_attr_to_key_value( k: &T::Text, - v: &libdd_trace_utils::span::v04::AttributeAnyValue, + v: &crate::span::v04::AttributeAnyValue, ) -> Option where T::Text: Borrow, { - use libdd_trace_utils::span::v04::AttributeArrayValue; + use crate::span::v04::AttributeArrayValue; let value = match v { - libdd_trace_utils::span::v04::AttributeAnyValue::SingleValue(av) => match av { + crate::span::v04::AttributeAnyValue::SingleValue(av) => match av { AttributeArrayValue::String(s) => AnyValue::string(s.borrow().to_string()), AttributeArrayValue::Boolean(b) => AnyValue::bool(*b), AttributeArrayValue::Integer(i) => AnyValue::int(*i), AttributeArrayValue::Double(d) => AnyValue::double(*d), }, - libdd_trace_utils::span::v04::AttributeAnyValue::Array(_) => return None, + crate::span::v04::AttributeAnyValue::Array(_) => return None, }; Some(KeyValue { key: k.borrow().to_string(), @@ -280,11 +280,12 @@ where #[cfg(test)] mod tests { use super::*; - use libdd_trace_utils::span::BytesData; + use crate::otlp_encoder::OtlpResourceInfo; + use crate::span::BytesData; #[test] fn test_trace_id_span_id_format() { - let metadata = TracerMetadata::default(); + let resource_info = OtlpResourceInfo::default(); let span: Span = Span { trace_id: 0x5B8EFFF798038103D269B633813FC60C_u128, span_id: 0xEEE19B7EC3C1B174, @@ -298,7 +299,7 @@ mod tests { error: 0, ..Default::default() }; - let req = map_traces_to_otlp(vec![vec![span]], &metadata); + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); let rs = &req.resource_spans[0]; let otlp_span = &rs.scope_spans[0].spans[0]; assert_eq!(otlp_span.trace_id, "5b8efff798038103d269b633813fc60c"); @@ -318,7 +319,7 @@ mod tests { #[test] fn test_status_error_message_from_meta() { - let metadata = TracerMetadata::default(); + let resource_info = OtlpResourceInfo::default(); let mut span: Span = Span { trace_id: 1, span_id: 2, @@ -332,7 +333,7 @@ mod tests { libdd_tinybytes::BytesString::from_static("error.msg"), libdd_tinybytes::BytesString::from_static("something broke"), ); - let req = map_traces_to_otlp(vec![vec![span]], &metadata); + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); let otlp_span = &req.resource_spans[0].scope_spans[0].spans[0]; let status = otlp_span.status.as_ref().unwrap(); assert_eq!(status.code, json_types::status_code::ERROR); @@ -341,7 +342,7 @@ mod tests { #[test] fn test_metrics_as_int_or_double() { - let metadata = TracerMetadata::default(); + let resource_info = OtlpResourceInfo::default(); let mut span: Span = Span { trace_id: 1, span_id: 2, @@ -356,7 +357,7 @@ mod tests { libdd_tinybytes::BytesString::from_static("rate"), std::f64::consts::PI, ); - let req = map_traces_to_otlp(vec![vec![span]], &metadata); + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); let json = serde_json::to_value(&req).unwrap(); let attrs = &json["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["attributes"]; let count_kv = attrs diff --git a/libdd-trace-utils/src/otlp_encoder/mod.rs b/libdd-trace-utils/src/otlp_encoder/mod.rs new file mode 100644 index 0000000000..ee2c7924c4 --- /dev/null +++ b/libdd-trace-utils/src/otlp_encoder/mod.rs @@ -0,0 +1,24 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP HTTP/JSON encoder: maps Datadog spans to ExportTraceServiceRequest. + +pub mod json_types; +pub mod mapper; + +pub use mapper::map_traces_to_otlp; + +/// Tracer-level attributes used to populate the OTLP Resource on export. +/// +/// These are the fields from the tracer's configuration that map to OTLP Resource attributes +/// (service.name, deployment.environment, service.version, telemetry.sdk.*, runtime-id). +/// Callers should build this from their own tracer metadata struct. +#[derive(Clone, Debug, Default)] +pub struct OtlpResourceInfo { + pub service: String, + pub env: String, + pub app_version: String, + pub language: String, + pub tracer_version: String, + pub runtime_id: String, +} From 9bbc48e92b227cd8b111ca363d39eea8f2c40bbf Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Tue, 10 Mar 2026 10:51:44 -0400 Subject: [PATCH 08/10] lint + nits --- libdd-data-pipeline/src/trace_exporter/mod.rs | 12 ++-- .../src/otlp_encoder/json_types.rs | 66 ++++--------------- libdd-trace-utils/src/otlp_encoder/mapper.rs | 50 +++++++------- 3 files changed, 38 insertions(+), 90 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index bc3791a2fe..47539a6bf6 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -508,8 +508,7 @@ impl TraceExporter { &self, trace_chunks: Vec>>, ) -> Result - where - T::Text: Borrow, + { self.check_agent_info(); self.runtime()? @@ -528,8 +527,7 @@ impl TraceExporter { &self, trace_chunks: Vec>>, ) -> Result - where - T::Text: Borrow, + { self.check_agent_info(); self.send_trace_chunks_inner(trace_chunks).await @@ -541,8 +539,7 @@ impl TraceExporter { traces: Vec>>, config: &OtlpTraceConfig, ) -> Result - where - T::Text: Borrow, + { let resource_info = OtlpResourceInfo { service: self.metadata.service.clone(), @@ -627,8 +624,7 @@ impl TraceExporter { &self, mut traces: Vec>>, ) -> Result - where - T::Text: Borrow, + { // OTLP path: when OTEL_TRACES_EXPORTER=otlp. No sampling/dropping—export all received // (equivalent to OTEL_TRACES_SAMPLER=parentbased_always_on). diff --git a/libdd-trace-utils/src/otlp_encoder/json_types.rs b/libdd-trace-utils/src/otlp_encoder/json_types.rs index c2fd5c8af3..b681cc74fc 100644 --- a/libdd-trace-utils/src/otlp_encoder/json_types.rs +++ b/libdd-trace-utils/src/otlp_encoder/json_types.rs @@ -11,10 +11,10 @@ //! //! //! -//! The Rust implementation in opentelemetry-rust uses `prost`-generated types with an optional -//! `with-serde` feature (`opentelemetry-proto` crate). We use hand-rolled serde structs here to -//! avoid the `prost` + `tonic` dependency tree in this early implementation. If/when protobuf -//! support is added, these types should be replaced with `opentelemetry-proto`: +//! Hand-rolled serde structs are intentional here: for HTTP/JSON export, duplicating the type +//! definitions is simpler than pulling in `prost`-generated types from the `opentelemetry-proto` +//! crate. When HTTP/protobuf export is added, `opentelemetry-proto` should be introduced as a +//! dependency for that purpose: //! use serde::Serialize; @@ -112,58 +112,16 @@ pub struct KeyValue { pub value: AnyValue, } +/// A typed value in an OTLP attribute. Each variant serializes as a single-key JSON object +/// matching the OTLP HTTP/JSON wire format (e.g. `{"stringValue":"hello"}`). #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] -pub struct AnyValue { - #[serde(skip_serializing_if = "Option::is_none")] - string_value: Option, - #[serde(skip_serializing_if = "Option::is_none")] - bool_value: Option, - #[serde(skip_serializing_if = "Option::is_none")] - int_value: Option, - #[serde(skip_serializing_if = "Option::is_none")] - double_value: Option, - #[serde(skip_serializing_if = "Option::is_none")] - bytes_value: Option, -} - -impl AnyValue { - pub fn string(s: String) -> Self { - AnyValue { - string_value: Some(s), - bool_value: None, - int_value: None, - double_value: None, - bytes_value: None, - } - } - pub fn int(i: i64) -> Self { - AnyValue { - string_value: None, - bool_value: None, - int_value: Some(i), - double_value: None, - bytes_value: None, - } - } - pub fn double(d: f64) -> Self { - AnyValue { - string_value: None, - bool_value: None, - int_value: None, - double_value: Some(d), - bytes_value: None, - } - } - pub fn bool(b: bool) -> Self { - AnyValue { - string_value: None, - bool_value: Some(b), - int_value: None, - double_value: None, - bytes_value: None, - } - } +pub enum AnyValue { + StringValue(String), + BoolValue(bool), + IntValue(i64), + DoubleValue(f64), + BytesValue(String), } #[derive(Debug, Serialize)] diff --git a/libdd-trace-utils/src/otlp_encoder/mapper.rs b/libdd-trace-utils/src/otlp_encoder/mapper.rs index bb6bd1c858..1634b75858 100644 --- a/libdd-trace-utils/src/otlp_encoder/mapper.rs +++ b/libdd-trace-utils/src/otlp_encoder/mapper.rs @@ -26,8 +26,7 @@ pub fn map_traces_to_otlp( trace_chunks: Vec>>, resource_info: &OtlpResourceInfo, ) -> ExportTraceServiceRequest -where - T::Text: Borrow, + { let resource = build_resource(resource_info); let mut all_spans: Vec = Vec::new(); @@ -59,53 +58,52 @@ fn build_resource(resource_info: &OtlpResourceInfo) -> Resource { if !resource_info.service.is_empty() { attributes.push(KeyValue { key: "service.name".to_string(), - value: AnyValue::string(resource_info.service.clone()), + value: AnyValue::StringValue(resource_info.service.clone()), }); } if !resource_info.env.is_empty() { attributes.push(KeyValue { key: "deployment.environment".to_string(), - value: AnyValue::string(resource_info.env.clone()), + value: AnyValue::StringValue(resource_info.env.clone()), }); attributes.push(KeyValue { key: "deployment.environment.name".to_string(), - value: AnyValue::string(resource_info.env.clone()), + value: AnyValue::StringValue(resource_info.env.clone()), }); } if !resource_info.app_version.is_empty() { attributes.push(KeyValue { key: "service.version".to_string(), - value: AnyValue::string(resource_info.app_version.clone()), + value: AnyValue::StringValue(resource_info.app_version.clone()), }); } attributes.push(KeyValue { key: "telemetry.sdk.name".to_string(), - value: AnyValue::string("libdatadog".to_string()), + value: AnyValue::StringValue("libdatadog".to_string()), }); if !resource_info.language.is_empty() { attributes.push(KeyValue { key: "telemetry.sdk.language".to_string(), - value: AnyValue::string(resource_info.language.clone()), + value: AnyValue::StringValue(resource_info.language.clone()), }); } if !resource_info.tracer_version.is_empty() { attributes.push(KeyValue { key: "telemetry.sdk.version".to_string(), - value: AnyValue::string(resource_info.tracer_version.clone()), + value: AnyValue::StringValue(resource_info.tracer_version.clone()), }); } if !resource_info.runtime_id.is_empty() { attributes.push(KeyValue { key: "runtime-id".to_string(), - value: AnyValue::string(resource_info.runtime_id.clone()), + value: AnyValue::StringValue(resource_info.runtime_id.clone()), }); } Resource { attributes } } fn map_span(span: &Span) -> OtlpSpan -where - T::Text: Borrow, + { let trace_id_hex = format!("{:032x}", span.trace_id); let span_id_hex = format!("{:016x}", span.span_id); @@ -160,8 +158,7 @@ where } fn map_span_link(link: &SpanLink) -> OtlpSpanLink -where - T::Text: Borrow, + { let trace_id_128 = (link.trace_id_high as u128) << 64 | (link.trace_id as u128); let trace_id_hex = format!("{:032x}", trace_id_128); @@ -176,7 +173,7 @@ where .iter() .map(|(k, v)| KeyValue { key: k.borrow().to_string(), - value: AnyValue::string(v.borrow().to_string()), + value: AnyValue::StringValue(v.borrow().to_string()), }) .collect(); OtlpSpanLink { @@ -189,8 +186,7 @@ where } fn map_span_events(events: &[SpanEvent]) -> (Vec, usize) -where - T::Text: Borrow, + { const MAX_EVENTS_PER_SPAN: usize = 128; let mut otlp_events = Vec::with_capacity(events.len().min(MAX_EVENTS_PER_SPAN)); @@ -215,16 +211,15 @@ fn event_attr_to_key_value( k: &T::Text, v: &crate::span::v04::AttributeAnyValue, ) -> Option -where - T::Text: Borrow, + { use crate::span::v04::AttributeArrayValue; let value = match v { crate::span::v04::AttributeAnyValue::SingleValue(av) => match av { - AttributeArrayValue::String(s) => AnyValue::string(s.borrow().to_string()), - AttributeArrayValue::Boolean(b) => AnyValue::bool(*b), - AttributeArrayValue::Integer(i) => AnyValue::int(*i), - AttributeArrayValue::Double(d) => AnyValue::double(*d), + AttributeArrayValue::String(s) => AnyValue::StringValue(s.borrow().to_string()), + AttributeArrayValue::Boolean(b) => AnyValue::BoolValue(*b), + AttributeArrayValue::Integer(i) => AnyValue::IntValue(*i), + AttributeArrayValue::Double(d) => AnyValue::DoubleValue(*d), }, crate::span::v04::AttributeAnyValue::Array(_) => return None, }; @@ -245,8 +240,7 @@ fn dd_type_to_otlp_kind(t: &str) -> i32 { } fn map_attributes(span: &Span) -> (Vec, usize) -where - T::Text: Borrow, + { let mut attrs: Vec = Vec::new(); for (k, v) in span.meta.iter() { @@ -255,7 +249,7 @@ where } attrs.push(KeyValue { key: k.borrow().to_string(), - value: AnyValue::string(v.borrow().to_string()), + value: AnyValue::StringValue(v.borrow().to_string()), }); } for (k, v) in span.metrics.iter() { @@ -263,9 +257,9 @@ where break; } let value = if v.fract() == 0.0 && (*v >= i64::MIN as f64 && *v <= i64::MAX as f64) { - AnyValue::int(*v as i64) + AnyValue::IntValue(*v as i64) } else { - AnyValue::double(*v) + AnyValue::DoubleValue(*v) }; attrs.push(KeyValue { key: k.borrow().to_string(), From db08608401a0ecf7ae34034aae619b635300eebd Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Tue, 10 Mar 2026 10:56:41 -0400 Subject: [PATCH 09/10] lint --- libdd-data-pipeline/src/otlp/exporter.rs | 3 +-- libdd-data-pipeline/src/trace_exporter/mod.rs | 16 ++++--------- libdd-trace-utils/src/otlp_encoder/mapper.rs | 24 +++++-------------- 3 files changed, 11 insertions(+), 32 deletions(-) diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index a4b558c835..2996c2b411 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -7,8 +7,7 @@ use super::config::OtlpTraceConfig; use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; use libdd_common::{http_common, Endpoint, HttpClient}; use libdd_trace_utils::send_with_retry::{ - RetryBackoffType, RetryStrategy, SendWithRetryError, - send_with_retry, + send_with_retry, RetryBackoffType, RetryStrategy, SendWithRetryError, }; use std::collections::HashMap; diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 47539a6bf6..1a1bf04978 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -507,9 +507,7 @@ impl TraceExporter { pub fn send_trace_chunks( &self, trace_chunks: Vec>>, - ) -> Result - - { + ) -> Result { self.check_agent_info(); self.runtime()? .block_on(async { self.send_trace_chunks_inner(trace_chunks).await }) @@ -526,9 +524,7 @@ impl TraceExporter { pub async fn send_trace_chunks_async( &self, trace_chunks: Vec>>, - ) -> Result - - { + ) -> Result { self.check_agent_info(); self.send_trace_chunks_inner(trace_chunks).await } @@ -538,9 +534,7 @@ impl TraceExporter { &self, traces: Vec>>, config: &OtlpTraceConfig, - ) -> Result - - { + ) -> Result { let resource_info = OtlpResourceInfo { service: self.metadata.service.clone(), env: self.metadata.env.clone(), @@ -623,9 +617,7 @@ impl TraceExporter { async fn send_trace_chunks_inner( &self, mut traces: Vec>>, - ) -> Result - - { + ) -> Result { // OTLP path: when OTEL_TRACES_EXPORTER=otlp. No sampling/dropping—export all received // (equivalent to OTEL_TRACES_SAMPLER=parentbased_always_on). if let Some(ref config) = self.otlp_config { diff --git a/libdd-trace-utils/src/otlp_encoder/mapper.rs b/libdd-trace-utils/src/otlp_encoder/mapper.rs index 1634b75858..aecdcb27e2 100644 --- a/libdd-trace-utils/src/otlp_encoder/mapper.rs +++ b/libdd-trace-utils/src/otlp_encoder/mapper.rs @@ -25,9 +25,7 @@ const MAX_ATTRIBUTES_PER_SPAN: usize = 128; pub fn map_traces_to_otlp( trace_chunks: Vec>>, resource_info: &OtlpResourceInfo, -) -> ExportTraceServiceRequest - -{ +) -> ExportTraceServiceRequest { let resource = build_resource(resource_info); let mut all_spans: Vec = Vec::new(); for chunk in &trace_chunks { @@ -102,9 +100,7 @@ fn build_resource(resource_info: &OtlpResourceInfo) -> Resource { Resource { attributes } } -fn map_span(span: &Span) -> OtlpSpan - -{ +fn map_span(span: &Span) -> OtlpSpan { let trace_id_hex = format!("{:032x}", span.trace_id); let span_id_hex = format!("{:016x}", span.span_id); let parent_span_id = if span.parent_id != 0 { @@ -157,9 +153,7 @@ fn map_span(span: &Span) -> OtlpSpan } } -fn map_span_link(link: &SpanLink) -> OtlpSpanLink - -{ +fn map_span_link(link: &SpanLink) -> OtlpSpanLink { let trace_id_128 = (link.trace_id_high as u128) << 64 | (link.trace_id as u128); let trace_id_hex = format!("{:032x}", trace_id_128); let span_id_hex = format!("{:016x}", link.span_id); @@ -185,9 +179,7 @@ fn map_span_link(link: &SpanLink) -> OtlpSpanLink } } -fn map_span_events(events: &[SpanEvent]) -> (Vec, usize) - -{ +fn map_span_events(events: &[SpanEvent]) -> (Vec, usize) { const MAX_EVENTS_PER_SPAN: usize = 128; let mut otlp_events = Vec::with_capacity(events.len().min(MAX_EVENTS_PER_SPAN)); for ev in events.iter().take(MAX_EVENTS_PER_SPAN) { @@ -210,9 +202,7 @@ fn map_span_events(events: &[SpanEvent]) -> (Vec fn event_attr_to_key_value( k: &T::Text, v: &crate::span::v04::AttributeAnyValue, -) -> Option - -{ +) -> Option { use crate::span::v04::AttributeArrayValue; let value = match v { crate::span::v04::AttributeAnyValue::SingleValue(av) => match av { @@ -239,9 +229,7 @@ fn dd_type_to_otlp_kind(t: &str) -> i32 { } } -fn map_attributes(span: &Span) -> (Vec, usize) - -{ +fn map_attributes(span: &Span) -> (Vec, usize) { let mut attrs: Vec = Vec::new(); for (k, v) in span.meta.iter() { if attrs.len() >= MAX_ATTRIBUTES_PER_SPAN { From f1f792175404f2767cf8de856272b527c765f8d0 Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Wed, 11 Mar 2026 19:27:59 -0400 Subject: [PATCH 10/10] config change and sampling update --- libdd-data-pipeline-ffi/src/trace_exporter.rs | 29 ++ libdd-data-pipeline/src/otlp/config.rs | 262 +----------------- libdd-data-pipeline/src/otlp/mod.rs | 10 +- .../src/trace_exporter/builder.rs | 23 +- libdd-data-pipeline/src/trace_exporter/mod.rs | 27 +- 5 files changed, 69 insertions(+), 282 deletions(-) diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index 7062da398e..cf33494aa1 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -100,6 +100,7 @@ pub struct TraceExporterConfig { health_metrics_enabled: bool, test_session_token: Option, connection_timeout: Option, + otlp_endpoint: Option, } #[no_mangle] @@ -426,6 +427,30 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_connection_timeout( ) } +/// Enables OTLP HTTP/JSON export and sets the endpoint URL. +/// +/// When set, traces are sent to this URL in OTLP HTTP/JSON format instead of the Datadog +/// agent. The host language is responsible for resolving the endpoint from its configuration +/// (e.g. `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`) before calling this function. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint( + config: Option<&mut TraceExporterConfig>, + url: CharSlice, +) -> Option> { + catch_panic!( + if let Some(handle) = config { + handle.otlp_endpoint = match sanitize_string(url) { + Ok(s) => Some(s), + Err(e) => return Some(e), + }; + None + } else { + gen_error!(ErrorCode::InvalidArgument) + }, + gen_error!(ErrorCode::Panic) + ) +} + /// Create a new TraceExporter instance. /// /// When `OTEL_TRACES_EXPORTER=otlp` is set in the environment, the exporter sends traces in @@ -483,6 +508,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( builder.enable_health_metrics(); } + if let Some(ref url) = config.otlp_endpoint { + builder.set_otlp_endpoint(url); + } + match builder.build() { Ok(exporter) => { out_handle.as_ptr().write(Box::new(exporter)); diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 9804a6f94c..eed20b34dc 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -2,15 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 //! OTLP trace export configuration. -//! -//! OTLP trace export is enabled when `OTEL_TRACES_EXPORTER=otlp` is set. -//! When enabled, endpoint, headers, timeout, and protocol are read from the -//! `OTEL_EXPORTER_OTLP_TRACES_*` (and generic `OTEL_EXPORTER_OTLP_*`) environment variables. -use std::env; use std::time::Duration; -/// OTLP trace export protocol. Support for HTTP/JSON for now. +/// OTLP trace export protocol. HTTP/JSON is currently supported. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub(crate) enum OtlpProtocol { /// HTTP with JSON body (Content-Type: application/json). Default for HTTP. @@ -22,28 +17,13 @@ pub(crate) enum OtlpProtocol { Grpc, } -impl OtlpProtocol { - fn from_str(s: &str) -> Self { - match s.trim().to_lowercase().as_str() { - "http/json" => OtlpProtocol::HttpJson, - "http/protobuf" => OtlpProtocol::HttpProtobuf, - "grpc" => OtlpProtocol::Grpc, - _ => OtlpProtocol::HttpJson, - } - } -} - -/// Default OTLP HTTP endpoint (no path; path /v1/traces is appended when building request URL). -pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318"; -/// Default OTLP gRPC endpoint. -pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317"; -/// OTLP traces path for HTTP. -pub const OTLP_TRACES_PATH: &str = "/v1/traces"; +/// Default timeout for OTLP export requests. +pub const DEFAULT_OTLP_TIMEOUT: Duration = Duration::from_secs(10); /// Parsed OTLP trace exporter configuration. #[derive(Clone, Debug)] pub struct OtlpTraceConfig { - /// Full URL to POST traces (e.g. http://localhost:4318/v1/traces). + /// Full URL to POST traces to (e.g. `http://localhost:4318/v1/traces`). pub endpoint_url: String, /// Optional HTTP headers (key-value pairs). pub headers: Vec<(String, String)>, @@ -53,237 +33,3 @@ pub struct OtlpTraceConfig { #[allow(dead_code)] pub(crate) protocol: OtlpProtocol, } - -/// Environment variable names (standard OTEL and traces-specific). -pub mod env_keys { - pub const TRACES_EXPORTER: &str = "OTEL_TRACES_EXPORTER"; - pub const TRACES_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"; - pub const PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL"; - pub const TRACES_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"; - pub const ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT"; - pub const TRACES_HEADERS: &str = "OTEL_EXPORTER_OTLP_TRACES_HEADERS"; - pub const HEADERS: &str = "OTEL_EXPORTER_OTLP_HEADERS"; - pub const TRACES_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT"; - pub const TIMEOUT: &str = "OTEL_EXPORTER_OTLP_TIMEOUT"; -} - -/// Default timeout for OTLP export (10 seconds). -const DEFAULT_OTLP_TIMEOUT_MS: u64 = 10_000; - -fn get_env(key: &str) -> Option { - env::var(key).ok().filter(|s| !s.trim().is_empty()) -} - -/// Parse OTEL headers string "key1=value1,key2=value2" into a list of (key, value). -fn parse_headers(s: &str) -> Vec<(String, String)> { - s.split(',') - .filter_map(|pair| { - let pair = pair.trim(); - let eq = pair.find('=')?; - let key = pair[..eq].trim(); - let value = pair[eq + 1..].trim(); - if key.is_empty() { - return None; - } - Some((key.to_string(), value.to_string())) - }) - .collect() -} - -/// Append /v1/traces to a base URL for HTTP trace export. Used only when the endpoint -/// is the fallback default (path is added only for fallback, not when user sets endpoint). -fn fallback_traces_url(base: &str, protocol: OtlpProtocol) -> String { - let base = base.trim().trim_end_matches('/'); - match protocol { - OtlpProtocol::HttpJson | OtlpProtocol::HttpProtobuf => { - format!("{}{}", base, OTLP_TRACES_PATH) - } - OtlpProtocol::Grpc => base.to_string(), - } -} - -/// Resolve OTLP trace export configuration from environment. -/// -/// **Enablement:** Returns `Some(config)` only when `OTEL_TRACES_EXPORTER=otlp` is set. -/// Returns `None` otherwise (use Datadog agent). -/// -/// **Endpoint:** If `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` (or generic -/// `OTEL_EXPORTER_OTLP_ENDPOINT`) is set, that value is used **as-is**. Otherwise the -/// fallback default is used and, for http/json or http/protobuf, the path `/v1/traces` -/// is appended. -/// **Precedence:** Traces-specific env vars override generic OTEL vars for protocol, -/// endpoint, headers, and timeout. -pub fn otlp_trace_config_from_env() -> Option { - let exporter = get_env(env_keys::TRACES_EXPORTER)?; - if exporter.trim().to_lowercase() != "otlp" { - return None; - } - - let protocol_str = get_env(env_keys::TRACES_PROTOCOL).or_else(|| get_env(env_keys::PROTOCOL)); - let protocol = protocol_str - .as_deref() - .map(OtlpProtocol::from_str) - .unwrap_or_default(); - - // Traces-specific endpoint takes precedence over generic OTEL endpoint when both are set. - // Per spec: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is used as-is; the generic - // OTEL_EXPORTER_OTLP_ENDPOINT gets /v1/traces appended for HTTP signals. - let traces_endpoint = get_env(env_keys::TRACES_ENDPOINT); - let (endpoint_opt, is_signal_specific) = match traces_endpoint { - Some(ep) => (Some(ep), true), - None => (get_env(env_keys::ENDPOINT), false), - }; - let url = match endpoint_opt { - Some(s) => { - let endpoint = s.trim().to_string(); - if endpoint.is_empty() { - fallback_traces_url(DEFAULT_OTLP_HTTP_ENDPOINT, protocol) - } else { - // Normalize bare host:port to a full URL. - let normalized = if endpoint.contains("://") { - endpoint - } else if endpoint.starts_with(':') { - format!("http://localhost{}", endpoint) - } else { - format!("http://{}", endpoint) - }; - // Spec: signal-specific TRACES_ENDPOINT is used as-is; generic ENDPOINT gets - // /v1/traces appended for HTTP. - if is_signal_specific { - normalized - } else { - fallback_traces_url(&normalized, protocol) - } - } - } - None => fallback_traces_url(DEFAULT_OTLP_HTTP_ENDPOINT, protocol), - }; - - let headers_str = get_env(env_keys::TRACES_HEADERS).or_else(|| get_env(env_keys::HEADERS)); - let headers = headers_str - .as_deref() - .map(parse_headers) - .unwrap_or_default(); - - let timeout_ms = get_env(env_keys::TRACES_TIMEOUT) - .or_else(|| get_env(env_keys::TIMEOUT)) - .and_then(|s| parse_timeout(&s)) - .unwrap_or(DEFAULT_OTLP_TIMEOUT_MS); - - Some(OtlpTraceConfig { - endpoint_url: url, - headers, - timeout: Duration::from_millis(timeout_ms), - protocol, - }) -} - -/// Parse timeout string: digits with optional unit (ms, s, m). Default unit: milliseconds. -fn parse_timeout(s: &str) -> Option { - let s = s.trim(); - let s = s.to_lowercase(); - if s.ends_with("ms") { - s[..s.len() - 2].trim().parse::().ok() - } else if s.ends_with('s') && !s.ends_with("ms") { - s[..s.len() - 1] - .trim() - .parse::() - .ok() - .map(|v| v * 1000) - } else if s.ends_with('m') { - s[..s.len() - 1] - .trim() - .parse::() - .ok() - .map(|v| v * 60 * 1000) - } else { - s.parse::().ok() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::sync::Mutex; - - // Env-var-dependent tests must be serialized: parallel mutation of global env is not safe. - static ENV_LOCK: Mutex<()> = Mutex::new(()); - - #[test] - fn test_parse_headers() { - let h = parse_headers("key1=val1,key2=val2"); - assert_eq!(h.len(), 2); - assert_eq!(h[0], ("key1".to_string(), "val1".to_string())); - assert_eq!(h[1], ("key2".to_string(), "val2".to_string())); - } - - #[test] - fn test_parse_timeout() { - assert_eq!(parse_timeout("5000"), Some(5000)); - assert_eq!(parse_timeout("5s"), Some(5000)); - assert_eq!(parse_timeout("100ms"), Some(100)); - } - - #[test] - fn test_fallback_traces_url() { - // Fallback: path /v1/traces is appended for http/json - assert_eq!( - fallback_traces_url("http://localhost:4318", OtlpProtocol::HttpJson), - "http://localhost:4318/v1/traces" - ); - assert_eq!( - fallback_traces_url(DEFAULT_OTLP_HTTP_ENDPOINT, OtlpProtocol::HttpJson), - "http://localhost:4318/v1/traces" - ); - } - - #[test] - fn test_protocol_from_str() { - assert_eq!(OtlpProtocol::from_str("http/json"), OtlpProtocol::HttpJson); - assert_eq!(OtlpProtocol::from_str("grpc"), OtlpProtocol::Grpc); - } - - #[test] - fn test_otlp_disabled_without_traces_exporter() { - let _guard = ENV_LOCK.lock().unwrap(); - // Without OTEL_TRACES_EXPORTER=otlp, config should be None - std::env::remove_var(env_keys::TRACES_EXPORTER); - std::env::remove_var(env_keys::TRACES_ENDPOINT); - std::env::remove_var(env_keys::ENDPOINT); - assert!(otlp_trace_config_from_env().is_none()); - } - - #[test] - fn test_explicit_endpoint_used_as_is() { - let _guard = ENV_LOCK.lock().unwrap(); - // Per spec: when OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is set, use as-is (no /v1/traces - // appended) - std::env::remove_var(env_keys::TRACES_EXPORTER); - std::env::remove_var(env_keys::TRACES_ENDPOINT); - std::env::remove_var(env_keys::ENDPOINT); - std::env::set_var(env_keys::TRACES_EXPORTER, "otlp"); - std::env::set_var(env_keys::TRACES_ENDPOINT, "http://custom:9999"); - let config = otlp_trace_config_from_env(); - std::env::remove_var(env_keys::TRACES_EXPORTER); - std::env::remove_var(env_keys::TRACES_ENDPOINT); - let config = config.expect("config when TRACES_EXPORTER=otlp and endpoint set"); - assert_eq!(config.endpoint_url, "http://custom:9999"); - } - - #[test] - fn test_generic_endpoint_gets_path_appended() { - let _guard = ENV_LOCK.lock().unwrap(); - // Per spec: OTEL_EXPORTER_OTLP_ENDPOINT (generic) must have /v1/traces appended for HTTP. - std::env::remove_var(env_keys::TRACES_EXPORTER); - std::env::remove_var(env_keys::TRACES_ENDPOINT); - std::env::remove_var(env_keys::ENDPOINT); - std::env::set_var(env_keys::TRACES_EXPORTER, "otlp"); - std::env::set_var(env_keys::ENDPOINT, "http://collector:4318"); - let config = otlp_trace_config_from_env(); - std::env::remove_var(env_keys::TRACES_EXPORTER); - std::env::remove_var(env_keys::TRACES_ENDPOINT); - std::env::remove_var(env_keys::ENDPOINT); - let config = config.expect("config when TRACES_EXPORTER=otlp and generic endpoint set"); - assert_eq!(config.endpoint_url, "http://collector:4318/v1/traces"); - } -} diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index 14b4c6f638..542a754d40 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -3,10 +3,10 @@ //! OTLP trace export for libdatadog. //! -//! When `OTEL_TRACES_EXPORTER=otlp` is set, the trace exporter sends traces in OTLP HTTP/JSON -//! format to the configured endpoint instead of the Datadog agent. See -//! [`config::otlp_trace_config_from_env`] and [`config::env_keys`] for environment variable names -//! and precedence. +//! When an OTLP endpoint is configured via [`crate::trace_exporter::TraceExporterBuilder::set_otlp_endpoint`], +//! the trace exporter sends traces in OTLP HTTP/JSON format to that endpoint instead of the +//! Datadog agent. The host language is responsible for resolving the endpoint from its own +//! configuration (e.g. `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`). //! //! ## Sampling //! @@ -24,6 +24,6 @@ pub mod config; pub mod exporter; -pub use config::{otlp_trace_config_from_env, OtlpTraceConfig}; +pub use config::OtlpTraceConfig; pub use exporter::send_otlp_traces_http; pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 89f7b40fb7..e3bef7cd94 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -2,7 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use crate::agent_info::AgentInfoFetcher; -use crate::otlp::otlp_trace_config_from_env; +use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; +use crate::otlp::OtlpTraceConfig; use crate::pausable_worker::PausableWorker; use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; @@ -51,6 +52,7 @@ pub struct TraceExporterBuilder { test_session_token: Option, agent_rates_payload_version_enabled: bool, connection_timeout: Option, + otlp_endpoint: Option, } impl TraceExporterBuilder { @@ -218,6 +220,18 @@ impl TraceExporterBuilder { self } + /// Enables OTLP HTTP/JSON export and sets the endpoint URL. + /// + /// When set, traces are sent to this endpoint in OTLP HTTP/JSON format instead of the + /// Datadog agent. The host language is responsible for resolving the endpoint from its + /// configuration (e.g. `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`) before calling this method. + /// + /// Example: `set_otlp_endpoint("http://localhost:4318/v1/traces")` + pub fn set_otlp_endpoint(&mut self, url: &str) -> &mut Self { + self.otlp_endpoint = Some(url.to_owned()); + self + } + #[allow(missing_docs)] pub fn build(self) -> Result { if !Self::is_inputs_outputs_formats_compatible(self.input_format, self.output_format) { @@ -339,7 +353,12 @@ impl TraceExporterBuilder { .agent_rates_payload_version_enabled .then(AgentResponsePayloadVersion::new), http_client: new_default_client(), - otlp_config: otlp_trace_config_from_env(), + otlp_config: self.otlp_endpoint.map(|url| OtlpTraceConfig { + endpoint_url: url, + headers: vec![], + timeout: DEFAULT_OTLP_TIMEOUT, + protocol: OtlpProtocol::HttpJson, + }), }) } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 1a1bf04978..db05b42a95 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -618,15 +618,10 @@ impl TraceExporter { &self, mut traces: Vec>>, ) -> Result { - // OTLP path: when OTEL_TRACES_EXPORTER=otlp. No sampling/dropping—export all received - // (equivalent to OTEL_TRACES_SAMPLER=parentbased_always_on). - if let Some(ref config) = self.otlp_config { - return self.send_otlp_traces_inner(traces, config).await; - } - let mut header_tags: TracerHeaderTags = self.metadata.borrow().into(); - // Process stats computation + // Process stats computation and drop non-sampled (p0) chunks. + // This must run before the OTLP path so that unsampled spans are not exported. let dropped_p0_stats = stats::process_traces_for_stats( &mut traces, &mut header_tags, @@ -634,6 +629,11 @@ impl TraceExporter { self.client_computed_top_level, ); + // OTLP path: send sampled traces via OTLP when OTEL_TRACES_EXPORTER=otlp. + if let Some(ref config) = self.otlp_config { + return self.send_otlp_traces_inner(traces, config).await; + } + let serializer = TraceSerializer::new( self.output_format, self.agent_payload_response_version.as_ref(), @@ -1899,7 +1899,7 @@ mod tests { #[test] #[cfg_attr(miri, ignore)] - fn test_otlp_export_when_env_set() { + fn test_otlp_export_via_builder() { let server = MockServer::start(); let mock_otlp = server.mock(|when, then| { when.method(POST) @@ -1908,12 +1908,7 @@ mod tests { then.status(200).body(""); }); - std::env::set_var(crate::otlp::config::env_keys::TRACES_EXPORTER, "otlp"); - // Endpoint set explicitly is used as-is (no path appended); so set full path for mock - std::env::set_var( - crate::otlp::config::env_keys::TRACES_ENDPOINT, - format!("{}/v1/traces", server.url("/").trim_end_matches('/')), - ); + let otlp_endpoint = format!("{}/v1/traces", server.url("/").trim_end_matches('/')); let mut builder = TraceExporterBuilder::default(); builder .set_url("http://127.0.0.1:8126") @@ -1923,6 +1918,7 @@ mod tests { .set_language("rust") .set_language_version("1.0") .set_language_interpreter("rustc") + .set_otlp_endpoint(&otlp_endpoint) .set_input_format(TraceExporterInputFormat::V04) .set_output_format(TraceExporterOutputFormat::V04); let exporter = builder.build().unwrap(); @@ -1942,9 +1938,6 @@ mod tests { let data = msgpack_encoder::v04::to_vec(&traces); let result = exporter.send(data.as_ref()); - std::env::remove_var(crate::otlp::config::env_keys::TRACES_EXPORTER); - std::env::remove_var(crate::otlp::config::env_keys::TRACES_ENDPOINT); - assert!( result.is_ok(), "OTLP send should succeed: {:?}",