diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 827d6168be..98d972d6cd 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -46,6 +46,7 @@ libdd-dogstatsd-client @DataDog/apm-common-components-core libdd-library-config*/ @DataDog/apm-sdk-capabilities libdd-log*/ @DataDog/apm-common-components-core libdd-profiling*/ @DataDog/libdatadog-profiling +libdd-proto-codec @DataDog/apm-common-components-core libdd-telemetry*/ @DataDog/apm-common-components-core libdd-tinybytes @DataDog/apm-common-components-core libdd-trace-normalization @DataDog/serverless @DataDog/libdatadog-apm diff --git a/Cargo.lock b/Cargo.lock index f5e51d432b..c0762e461d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1055,7 +1055,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8599749b6667e2f0c910c1d0dff6901163ff698a52d5a39720f61b5be4b20d3" dependencies = [ "futures-core", - "prost", + "prost 0.14.3", "prost-types", "tonic", "tonic-prost", @@ -1075,7 +1075,7 @@ dependencies = [ "hdrhistogram", "humantime", "hyper-util", - "prost", + "prost 0.14.3", "prost-types", "serde", "serde_json", @@ -1514,7 +1514,7 @@ dependencies = [ "clap", "libdd-profiling", "libdd-profiling-protobuf", - "prost", + "prost 0.14.3", "sysinfo", ] @@ -3131,7 +3131,7 @@ dependencies = [ name = "libdd-ddsketch" version = "1.0.1" dependencies = [ - "prost", + "prost 0.14.3", "prost-build", "protoc-bin-vendored", ] @@ -3235,7 +3235,7 @@ dependencies = [ "mime", "parking_lot", "proptest", - "prost", + "prost 0.14.3", "rand 0.8.5", "reqwest", "rustc-hash 1.1.0", @@ -3286,7 +3286,18 @@ version = "1.0.0" dependencies = [ "bolero", "libdd-profiling-protobuf", - "prost", + "prost 0.14.3", +] + +[[package]] +name = "libdd-proto-codec" +version = "28.0.2" +dependencies = [ + "arbitrary", + "bytes", + "criterion", + "prost 0.13.5", + "rand 0.9.0", ] [[package]] @@ -3375,7 +3386,7 @@ dependencies = [ name = "libdd-trace-protobuf" version = "1.1.0" dependencies = [ - "prost", + "prost 0.14.3", "prost-build", "protoc-bin-vendored", "serde", @@ -3415,11 +3426,12 @@ dependencies = [ "hyper", "indexmap 2.12.1", "libdd-common", + "libdd-proto-codec", "libdd-tinybytes", "libdd-trace-normalization", "libdd-trace-protobuf", "libdd-trace-utils", - "prost", + "prost 0.14.3", "rand 0.8.5", "rmp", "rmp-serde", @@ -4415,6 +4427,16 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive 0.13.5", +] + [[package]] name = "prost" version = "0.14.3" @@ -4422,7 +4444,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.14.3", ] [[package]] @@ -4437,13 +4459,26 @@ dependencies = [ "multimap", "petgraph", "prettyplease", - "prost", + "prost 0.14.3", "prost-types", "regex", "syn 2.0.87", "tempfile", ] +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.11.0", + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "prost-derive" version = "0.14.3" @@ -4463,7 +4498,7 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7" dependencies = [ - "prost", + "prost 0.14.3", ] [[package]] @@ -6209,7 +6244,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" dependencies = [ "bytes", - "prost", + "prost 0.14.3", "tonic", ] diff --git a/Cargo.toml b/Cargo.toml index 23cd1b87fc..f28f51bc8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ members = [ "libdd-tinybytes", "libdd-dogstatsd-client", "libdd-log", - "libdd-log-ffi", + "libdd-log-ffi", "libdd-proto-codec", ] # https://doc.rust-lang.org/cargo/reference/resolver.html diff --git a/libdd-proto-codec/Cargo.toml b/libdd-proto-codec/Cargo.toml new file mode 100644 index 0000000000..ec6d4a855c --- /dev/null +++ b/libdd-proto-codec/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "libdd-proto-codec" +rust-version.workspace = true +edition.workspace = true +version.workspace = true +license.workspace = true +authors.workspace = true + +[dependencies] +bytes = { version = "1.4" } + +[dev-dependencies] +prost = { version = "0.13", features = ["derive"] } +arbitrary = { version = "1", features = ["derive"] } +rand = "0.9" +criterion = { version = "0.5", features = ["html_reports"] } diff --git a/libdd-proto-codec/src/constants.rs b/libdd-proto-codec/src/constants.rs new file mode 100644 index 0000000000..b6510be460 --- /dev/null +++ b/libdd-proto-codec/src/constants.rs @@ -0,0 +1,33 @@ +use core::hash; + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, hash::Hash, PartialOrd, Ord)] +pub enum WireType { + Varint = 0, + Fixed64 = 1, + LengthDelimited = 2, + StartGroup = 3, // Deprecated in proto3, but still used in proto2. + EndGroup = 4, // Deprecated in proto3, but still used in proto2. + Fixed32 = 5, +} + +impl WireType { + #[inline] + #[allow(unused)] + pub(crate) const fn from_u32(value: u32) -> Option { + match value { + 0 => Some(WireType::Varint), + 1 => Some(WireType::Fixed64), + 2 => Some(WireType::LengthDelimited), + 3 => Some(WireType::StartGroup), + 4 => Some(WireType::EndGroup), + 5 => Some(WireType::Fixed32), + _ => None, + } + } + + #[inline] + pub(crate) const fn to_u32(self) -> u32 { + self as u32 + } +} diff --git a/libdd-proto-codec/src/encoder.rs b/libdd-proto-codec/src/encoder.rs new file mode 100644 index 0000000000..133669f0ea --- /dev/null +++ b/libdd-proto-codec/src/encoder.rs @@ -0,0 +1,540 @@ +use crate::constants::WireType; +use alloc::vec::Vec; +use core::ops::DerefMut; + +pub const MAP_KEY_FIELD_NUM: u32 = 1; +pub const MAP_VALUE_FIELD_NUM: u32 = 2; + +pub trait BufMut: DerefMut { + fn put_u8(&mut self, v: u8); + fn put_slice(&mut self, slice: &[u8]); + fn truncate(&mut self, new_len: usize); + + fn with_capacity(cap: usize) -> Self; +} + +impl BufMut for Vec { + fn put_u8(&mut self, v: u8) { + self.push(v); + } + + fn put_slice(&mut self, slice: &[u8]) { + self.extend_from_slice(slice); + } + + fn truncate(&mut self, new_len: usize) { + self.truncate(new_len); + } + + fn with_capacity(cap: usize) -> Self { + Vec::with_capacity(cap) + } +} + +impl BufMut for bytes::BytesMut { + fn put_u8(&mut self, v: u8) { + ::put_u8(self, v); + } + + fn put_slice(&mut self, slice: &[u8]) { + ::put_slice(self, slice); + } + + fn truncate(&mut self, new_len: usize) { + self.truncate(new_len); + } + + fn with_capacity(cap: usize) -> Self { + bytes::BytesMut::with_capacity(cap) + } +} + +#[derive(Default)] +pub struct TopLevelEncoder { + data: B, +} + +impl TopLevelEncoder { + pub fn with_capacity(cap: usize) -> Self { + Self { + data: B::with_capacity(cap), + } + } + pub fn encoder(&mut self) -> Encoder<'_, B> { + Encoder { + data: &mut self.data, + } + } + + pub fn finish(self) -> B { + self.data + } +} + +pub struct NestedEncoder<'a, B: BufMut> { + tag_position: usize, + size_position: usize, + write_empty: bool, + encoder: Encoder<'a, B>, +} + +impl NestedEncoder<'_, B> { + pub fn encoder(&mut self) -> Encoder<'_, B> { + Encoder { + data: self.encoder.data, + } + } +} + +impl Drop for NestedEncoder<'_, B> { + fn drop(&mut self) { + let size = self.encoder.data.len() - self.size_position - 5; + if !self.write_empty && size == 0 { + // If the message is empty, we need to remove the tag and size + self.encoder.data.truncate(self.tag_position); + return; + } + + let size_placeholder: &mut [u8; 5] = (&mut self.encoder.data + [self.size_position..self.size_position + 5]) + .try_into() + .unwrap(); + write_varint_max(size as u64, size_placeholder); + } +} + +trait ScalarEncoder { + type Input; + const WIRE_TYPE: WireType; + + fn encode(input: Self::Input, data: &mut B); +} + +#[inline(always)] +const fn append_varint u64>(f: F) -> impl FnOnce(T, &mut B) { + move |input: T, data: &mut B| { + let v = f(input); + encode_varint(v, data) + } +} + +macro_rules! impl_scalar_encode { + ($ty:ident, $input_ty:ty, $write_fn:expr, $wire_ty:expr) => { + struct $ty; + impl ScalarEncoder for $ty { + type Input = $input_ty; + const WIRE_TYPE: WireType = $wire_ty; + + #[inline(always)] + fn encode(input: Self::Input, data: &mut B) { + $write_fn(input, data); + } + } + }; +} + +macro_rules! impl_scalar_encode_varint { + ($ty:ident, $input_ty:ty, $to_varint_fn:expr) => { + impl_scalar_encode!( + $ty, + $input_ty, + append_varint($to_varint_fn), + WireType::Varint + ); + }; +} + +impl_scalar_encode_varint!(UInt64Encoder, u64, |v| v); +impl_scalar_encode_varint!(UInt32Encoder, u32, |v| v as u64); +impl_scalar_encode_varint!(Int64Encoder, i64, |v| v as u64); +impl_scalar_encode_varint!(Int32Encoder, i32, |v| v as u64); +impl_scalar_encode_varint!(SInt64Encoder, i64, |v| ((v << 1) ^ (v >> 63)) as u64); +impl_scalar_encode_varint!(SInt32Encoder, i32, |v| ((v << 1) ^ (v >> 31)) as u32 as u64); +impl_scalar_encode_varint!(BoolEncoder, bool, |v| v as u64); +impl_scalar_encode!( + Fixed64Encoder, + u64, + |v: u64, data: &mut B| { + data.put_slice(&v.to_le_bytes()); + }, + WireType::Fixed64 +); +impl_scalar_encode!( + Fixed32Encoder, + u32, + |v: u32, data: &mut B| { + data.put_slice(&v.to_le_bytes()); + }, + WireType::Fixed32 +); +impl_scalar_encode!( + SFixed64Encoder, + i64, + |v: i64, data: &mut B| { + data.put_slice(&v.to_le_bytes()); + }, + WireType::Fixed64 +); +impl_scalar_encode!( + SFixed32Encoder, + i32, + |v: i32, data: &mut B| { + data.put_slice(&v.to_le_bytes()); + }, + WireType::Fixed32 +); +impl_scalar_encode!( + F64Encoder, + f64, + |v: f64, data: &mut B| { + let bits = v.to_bits(); + data.put_slice(&bits.to_le_bytes()); + }, + WireType::Fixed64 +); +impl_scalar_encode!( + F32Encoder, + f32, + |v: f32, data: &mut B| { + let bits = v.to_bits(); + data.put_slice(&bits.to_le_bytes()); + }, + WireType::Fixed32 +); + +struct StringEncoder<'a>(core::marker::PhantomData<&'a ()>); + +impl<'a> ScalarEncoder for StringEncoder<'a> { + type Input = &'a str; + const WIRE_TYPE: WireType = WireType::LengthDelimited; + + #[inline(always)] + fn encode(input: Self::Input, data: &mut B) { + BytesEncoder::encode(input.as_bytes(), data); + } +} + +struct BytesEncoder<'a>(core::marker::PhantomData<&'a ()>); + +impl<'a> ScalarEncoder for BytesEncoder<'a> { + type Input = &'a [u8]; + const WIRE_TYPE: WireType = WireType::LengthDelimited; + + #[inline(always)] + fn encode(input: Self::Input, data: &mut B) { + let size = input.len(); + encode_varint(size as u64, data); + data.put_slice(input); + } +} + +fn encode_packed, B: BufMut>( + values: I, + data: &mut B, +) { + let size_position = data.len(); + data.put_slice(&[0; 5]); // Placeholder for size + for value in values { + E::encode(value, data); + } + let size = data.len() - size_position - 5; + let size_placeholder: &mut [u8; 5] = (&mut data[size_position..size_position + 5]) + .try_into() + .unwrap(); + write_varint_max(size as u64, size_placeholder); +} + +#[derive(Debug)] +pub struct Encoder<'a, B: BufMut> { + data: &'a mut B, +} + +impl Encoder<'_, B> { + /// returns an Encoder for a nested message. + /// + /// ```rust + /// use libdd_proto_codec::encoder::{TopLevelEncoder, Encoder, BufMut}; + /// + /// struct Bar { + /// baz: i32, + /// } + /// + /// fn encode_bar(e: &mut Encoder<'_, B>, bar: &Bar) { + /// e.write_sint32(1, bar.baz); + /// } + /// + /// struct Foo { + /// bar: Bar, + /// } + /// + /// fn encode_foo(e: &mut Encoder<'_, B>, foo: &Foo) { + /// encode_bar(&mut e.write_message(1).encoder(), &foo.bar); + /// } + /// + /// let mut e = TopLevelEncoder::>::default(); + /// encode_foo(&mut e.encoder(), &Foo { bar: Bar { baz: -1 } } ); + /// dbg!(e.finish()); + /// ``` + pub fn write_message(&mut self, field_number: u32) -> NestedEncoder<'_, B> { + let tag_position = self.data.len(); + encode_tagged(field_number, WireType::LengthDelimited, self.data); + let size_position = self.data.len(); + self.data.put_slice(&[0; 5]); // Placeholder for size + NestedEncoder { + tag_position, + write_empty: false, + size_position, + encoder: Encoder { data: self.data }, + } + } + + /// returns an Encoder for a nested message. + /// + /// If the nested message has a zero value (all fields are default or missing) + /// it will still be encoded into the buffer + pub fn write_message_opt(&mut self, field_number: u32) -> NestedEncoder<'_, B> { + encode_tagged(field_number, WireType::LengthDelimited, self.data); + let size_position = self.data.len(); + self.data.put_slice(&[0; 5]); // Placeholder for size + NestedEncoder { + // not needed + tag_position: 0, + write_empty: true, + size_position, + encoder: Encoder { data: self.data }, + } + } + + /// returns an Encoder for a nested message with repeated annotation + /// + /// ```rust + /// use libdd_proto_codec::encoder::{TopLevelEncoder, Encoder, BufMut}; + /// + /// struct Bar { + /// baz: i32, + /// } + /// + /// fn encode_bar(e: &mut Encoder<'_, B>, bar: &Bar) { + /// e.write_sint32(1, bar.baz); + /// } + /// + /// struct Foo { + /// bars: Vec, + /// } + /// + /// fn encode_foo(e: &mut Encoder<'_, B>, foo: &Foo) { + /// for bar in &foo.bars { + /// encode_bar(&mut e.write_message(1).encoder(), &bar); + /// } + /// } + /// + /// let mut e = TopLevelEncoder::>::default(); + /// encode_foo(&mut e.encoder(), &Foo { bars: vec![Bar { baz: -1 }, Bar { baz: 0 }] } ); + /// dbg!(e.finish()); + /// ``` + pub fn write_message_repeated(&mut self, field_number: u32) -> NestedEncoder<'_, B> { + self.write_message_opt(field_number) + } + + pub fn write_strings_repeated<'b, I: IntoIterator>( + &mut self, + field_number: u32, + v: I, + ) { + for value in v { + self.write_string_repeated(field_number, value); + } + } + + pub fn write_bytess_repeated<'b, I: IntoIterator>( + &mut self, + field_number: u32, + v: I, + ) { + for value in v { + self.write_bytes_repeated(field_number, value); + } + } + + /// returns a helper to encode protobufs maps + /// + /// ``` + /// use libdd_proto_codec::encoder::{Encoder, MapEncoder, BufMut, MAP_KEY_FIELD_NUM, MAP_VALUE_FIELD_NUM}; + /// + /// // message Example { + /// // map field = 3; + /// //} + /// + /// struct Example { + /// field: Vec<(String, i64)>, + /// } + /// fn encode_example(e: &mut Encoder<'_, Vec>, example: &Example) { + /// let map_encoder = e.write_map(3); + /// } + /// + /// fn encode_string_i64_map<'a, I: IntoIterator>(mut e: MapEncoder<'_, Vec>, map: I) { + /// for (k, v) in map { + /// encode_string_i64_map_entry(&mut e.write_map_entry() + /// .encoder(), k, *v); + /// } + /// } + /// + /// fn encode_string_i64_map_entry(e: &mut Encoder<'_, B>, key: &str, value: i64) { + /// e.write_string(MAP_KEY_FIELD_NUM, key); + /// e.write_int64(MAP_VALUE_FIELD_NUM, value); + /// } + /// ``` + pub fn write_map(&mut self, field_number: u32) -> MapEncoder<'_, B> { + MapEncoder { + data: self.data, + field_number, + } + } +} + +pub struct MapEncoder<'a, B: BufMut> { + data: &'a mut B, + field_number: u32, +} + +impl MapEncoder<'_, B> { + pub fn write_map_entry(&mut self) -> NestedEncoder<'_, B> { + let tag_position = self.data.len(); + encode_tagged(self.field_number, WireType::LengthDelimited, self.data); + let size_position = self.data.len(); + self.data.put_slice(&[0; 5]); // Placeholder for size + NestedEncoder { + tag_position, + write_empty: true, + size_position, + encoder: Encoder { data: self.data }, + } + } +} + +macro_rules! impl_scalar { + ($($fn_name:ident, $opt_fn_name:ident, $repeated_fn_name:ident, $($repeated_iter_fn_name:ident)?, $input_ty:ty, $encoder:ty,)*) => { + impl Encoder<'_, B> { + $( + pub fn $fn_name(&mut self, field_number: u32, v: $input_ty) { + if v == <$input_ty> :: default() { + return; + } + encode_tagged(field_number, <$encoder>::WIRE_TYPE, self.data); + <$encoder>::encode(v, self.data); + } + + pub fn $opt_fn_name(&mut self, field_number: u32, v: $input_ty) { + encode_tagged(field_number, <$encoder>::WIRE_TYPE, self.data); + <$encoder>::encode(v, self.data); + } + + pub fn $repeated_fn_name(&mut self, field_number: u32, v: $input_ty) { + self.$opt_fn_name(field_number, v); + } + + $( + pub fn $repeated_iter_fn_name>(&mut self, field_number: u32, v: I) { + for value in v { + self.$repeated_fn_name(field_number, value); + } + } + )? + )* + } + }; +} + +macro_rules! impl_packed { + ($($fn_name:ident, $input_ty:ty, $encoder:ty,)*) => { + impl Encoder<'_, B> { + $( + pub fn $fn_name>(&mut self, field_number: u32, v: I) { + let mut v = v.into_iter().peekable(); + if v.peek().is_none() { + return; + } + encode_tagged(field_number, WireType::LengthDelimited, self.data); + encode_packed::<$encoder, _, B>(v, self.data); + } + )* + } + }; +} + +impl_scalar! { + write_uint64, write_uint64_opt, write_uint64_repeated, write_uint64s_repeated, u64, UInt64Encoder, + write_uint32, write_uint32_opt, write_uint32_repeated, write_uint32s_repeated, u32, UInt32Encoder, + write_int64, write_int64_opt, write_int64_repeated, write_int64s_repeated, i64, Int64Encoder, + write_int32, write_int32_opt, write_int32_repeated, write_int32s_repeated, i32, Int32Encoder, + write_sint64, write_sint64_opt, write_sint64_repeated, write_sint64s_repeated, i64, SInt64Encoder, + write_sint32, write_sint32_opt, write_sint32_repeated, write_sint32s_repeated, i32, SInt32Encoder, + write_fixed64, write_fixed64_opt, write_fixed64_repeated, write_fixed64s_repeated, u64, Fixed64Encoder, + write_fixed32, write_fixed32_opt, write_fixed32_repeated, write_fixed32s_repeated, u32, Fixed32Encoder, + write_sfixed64, write_sfixed64_opt, write_sfixed64_repeated, write_sfixed64s_repeated, i64, SFixed64Encoder, + write_sfixed32, write_sfixed32_opt, write_sfixed32_repeated, write_sfixed32s_repeated, i32, SFixed32Encoder, + write_bool, write_bool_opt, write_bool_repeated, write_bools_repeated, bool, BoolEncoder, + write_f64, write_f64_opt, write_f64_repeated, write_f64s_repeated, f64, F64Encoder, + write_f32, write_f32_opt, write_f32_repeated, write_f32s_repeated, f32, F32Encoder, + write_string, write_string_opt, write_string_repeated, , &str, StringEncoder<'_>, + write_bytes, write_bytes_opt, write_bytes_repeated, , &[u8], BytesEncoder<'_>, +} + +impl_packed! { + write_uint64_packed, u64, UInt64Encoder, + write_uint32_packed, u32, UInt32Encoder, + write_int64_packed, i64, Int64Encoder, + write_int32_packed, i32, Int32Encoder, + write_sint64_packed, i64, SInt64Encoder, + write_sint32_packed, i32, SInt32Encoder, + write_fixed64_packed, u64, Fixed64Encoder, + write_fixed32_packed, u32, Fixed32Encoder, + write_sfixed64_packed, i64, SFixed64Encoder, + write_sfixed32_packed, i32, SFixed32Encoder, + write_bool_packed, bool, BoolEncoder, + write_f64_packed, f64, F64Encoder, + write_f32_packed, f32, F32Encoder, +} + +#[test] +fn test_encoding() { + let mut data = vec![]; + let mut encoder = Encoder { data: &mut data }; + + encoder.write_message(1).encoder().write_uint32(1, 2); + encoder.write_uint32(2, 3); + assert_eq!(data, &[10, 130, 128, 128, 128, 0, 8, 2, 16, 3]) +} + +#[inline] +fn encode_tagged(field_number: u32, wire_type: WireType, data: &mut B) { + let tag = (field_number << 3) | wire_type.to_u32(); + encode_varint(tag as u64, data); +} + +#[inline] +fn write_varint_max(mut v: u64, buf: &mut [u8; 5]) { + for (i, item) in buf.iter_mut().enumerate() { + *item = (v & 0x7F) as u8; + v >>= 7; + if i != 4 { + *item |= 0x80; + } + } +} + +/// Encodes an integer value into LEB128 variable length format, and writes it to the buffer. +/// The buffer must have enough remaining space (maximum 10 bytes). +#[inline] +fn encode_varint(mut value: u64, buf: &mut B) { + // Varints are never more than 10 bytes + for _ in 0..10 { + if value < 0x80 { + buf.put_u8(value as u8); + break; + } else { + buf.put_u8(((value & 0x7F) | 0x80) as u8); + value >>= 7; + } + } +} diff --git a/libdd-proto-codec/src/lib.rs b/libdd-proto-codec/src/lib.rs new file mode 100644 index 0000000000..fec0bf8dd6 --- /dev/null +++ b/libdd-proto-codec/src/lib.rs @@ -0,0 +1,7 @@ +extern crate alloc; + +pub mod constants; +pub mod encoder; + +#[cfg(test)] +mod tests; diff --git a/libdd-proto-codec/src/tests.rs b/libdd-proto-codec/src/tests.rs new file mode 100644 index 0000000000..3c9ae2e11c --- /dev/null +++ b/libdd-proto-codec/src/tests.rs @@ -0,0 +1,144 @@ +use arbitrary::Arbitrary; +use prost::Message; + +use crate::encoder::{self, MAP_KEY_FIELD_NUM, MAP_VALUE_FIELD_NUM}; + +#[derive(PartialEq, prost::Message, arbitrary::Arbitrary)] +struct Bar { + #[prost(message, tag = "1", required)] + foo: Foo, + #[prost(string, repeated, tag = "2")] + string_repeated_field: Vec, + #[prost(sfixed32, repeated, tag = "3")] + i32_repeated_field: Vec, + #[prost(string, tag = "4")] + string_field: String, + #[prost(map = "string, sint64", tag = "5")] + map_field: std::collections::HashMap, +} + +#[derive(prost::Message, arbitrary::Arbitrary)] +struct Foo { + #[prost(uint64, tag = "1")] + u64_field: u64, + #[prost(uint32, tag = "2")] + u32_field: u32, + #[prost(int64, tag = "3")] + i64_field: i64, + #[prost(int32, tag = "4")] + i32_field: i32, + #[prost(sint64, tag = "5")] + si64_field: i64, + #[prost(sint32, tag = "6")] + si32_field: i32, + #[prost(bool, tag = "7")] + bool_field: bool, + #[prost(double, tag = "8")] + f64_field: f64, + #[prost(float, tag = "9")] + f32_field: f32, + #[prost(sint64, repeated, packed, tag = "10")] + packed_si64_packed_field: Vec, + #[prost(string, tag = "11")] + string_field: String, +} + +impl PartialEq for Foo { + fn eq(&self, other: &Self) -> bool { + self.u64_field == other.u64_field + && self.u32_field == other.u32_field + && self.i64_field == other.i64_field + && self.i32_field == other.i32_field + && self.si64_field == other.si64_field + && self.si32_field == other.si32_field + && self.bool_field == other.bool_field + && self.f64_field.total_cmp(&other.f64_field).is_eq() + && self.f32_field.total_cmp(&other.f32_field).is_eq() + && self.packed_si64_packed_field == other.packed_si64_packed_field + && self.string_field == other.string_field + } +} + +fn manual_encode_bar(e: &mut encoder::Encoder<'_, B>, bar: &Bar) { + manual_encode_foo(&mut e.write_message(1).encoder(), &bar.foo); + e.write_strings_repeated(2, bar.string_repeated_field.iter().map(|s| s.as_str())); + e.write_sfixed32_packed(3, bar.i32_repeated_field.iter().copied()); + e.write_string(4, &bar.string_field); + let mut map_enc = e.write_map(5); + for (k, v) in &bar.map_field { + let mut entry = map_enc.write_map_entry(); + let mut entry_enc = entry.encoder(); + entry_enc.write_string(MAP_KEY_FIELD_NUM, k); + entry_enc.write_sint64(MAP_VALUE_FIELD_NUM, *v); + } +} + +fn manual_bar_top_level_encoder(bar: &Bar) -> Vec { + let mut encoder = encoder::TopLevelEncoder::default(); + manual_encode_bar(&mut encoder.encoder(), bar); + encoder.finish() +} + +fn manual_encode_foo(e: &mut encoder::Encoder<'_, B>, foo: &Foo) { + e.write_uint64(1, foo.u64_field); + e.write_uint32(2, foo.u32_field); + e.write_int64(3, foo.i64_field); + e.write_int32(4, foo.i32_field); + e.write_sint64(5, foo.si64_field); + e.write_sint32(6, foo.si32_field); + e.write_bool(7, foo.bool_field); + e.write_f64(8, foo.f64_field); + e.write_f32(9, foo.f32_field); + e.write_sint64_packed(10, foo.packed_si64_packed_field.iter().copied()); + e.write_string(11, &foo.string_field); +} + +fn manual_foo_top_level_encoder(foo: &Foo) -> Vec { + let mut encoder = encoder::TopLevelEncoder::default(); + manual_encode_foo(&mut encoder.encoder(), foo); + encoder.finish() +} + +#[test] +fn test_roundtrip_bar() { + for _ in 0..100 { + let l = rand::random_range(0..255_usize); + let input: Vec = (0..l).map(|_| rand::random()).collect(); + let input_bar: Bar = Bar::arbitrary(&mut arbitrary::Unstructured::new(&input)).unwrap(); + + test_roundtrip_bar_inner(&input_bar); + } +} + +fn test_roundtrip_bar_inner(input_bar: &Bar) { + let manual_encoded = manual_bar_top_level_encoder(input_bar); + let prost_encoded = input_bar.encode_to_vec(); + + let prost_decoded_prost_encoded = Bar::decode(&*prost_encoded).unwrap(); + let prost_decoded_manual_encoded = Bar::decode(&*manual_encoded).unwrap(); + + assert_eq!(&prost_decoded_prost_encoded, input_bar); + assert_eq!(&prost_decoded_manual_encoded, input_bar); +} + +#[test] +fn test_roundtrip_foo() { + for _ in 0..100 { + let l = rand::random_range(0..255_usize); + let input: Vec = (0..l).map(|_| rand::random()).collect(); + let input_foo: Foo = Foo::arbitrary(&mut arbitrary::Unstructured::new(&input)).unwrap(); + test_roundtrip_foo_inner(&input_foo); + } +} + +fn test_roundtrip_foo_inner(input_foo: &Foo) { + let manual_encoded = manual_foo_top_level_encoder(input_foo); + let prost_encoded = input_foo.encode_to_vec(); + + let prost_decoded_prost_encoded = Foo::decode(&*prost_encoded).unwrap(); + + let prost_decoded_manual_encoded = Foo::decode(&*manual_encoded).unwrap(); + + assert_eq!(&prost_decoded_prost_encoded, input_foo); + assert_eq!(&prost_decoded_manual_encoded, input_foo); +} diff --git a/libdd-trace-utils/Cargo.toml b/libdd-trace-utils/Cargo.toml index 734ff431db..c211bf6b07 100644 --- a/libdd-trace-utils/Cargo.toml +++ b/libdd-trace-utils/Cargo.toml @@ -37,6 +37,7 @@ rmp = { version = "0.8.14", default-features = false } libdd-common = { version = "2.0.0", path = "../libdd-common", default-features = false } libdd-trace-protobuf = { version = "1.1.0", path = "../libdd-trace-protobuf" } +libdd-proto-codec = { path = "../libdd-proto-codec", default-features = false } libdd-trace-normalization = { version = "1.0.1", path = "../libdd-trace-normalization" } libdd-tinybytes = { version = "1.1.0", path = "../libdd-tinybytes", features = [ "bytes_string", diff --git a/libdd-trace-utils/src/msgpack_encoder/mod.rs b/libdd-trace-utils/src/msgpack_encoder/mod.rs index 876d09627d..addb235008 100644 --- a/libdd-trace-utils/src/msgpack_encoder/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/mod.rs @@ -2,3 +2,4 @@ // SPDX-License-Identifier: Apache-2.0 pub mod v04; +pub mod otlp; \ No newline at end of file diff --git a/libdd-trace-utils/src/msgpack_encoder/otlp/common.proto b/libdd-trace-utils/src/msgpack_encoder/otlp/common.proto new file mode 100644 index 0000000000..0155b0b0eb --- /dev/null +++ b/libdd-trace-utils/src/msgpack_encoder/otlp/common.proto @@ -0,0 +1,154 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.common.v1; + +option csharp_namespace = "OpenTelemetry.Proto.Common.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.common.v1"; +option java_outer_classname = "CommonProto"; +option go_package = "go.opentelemetry.io/proto/otlp/common/v1"; + +// Represents any type of attribute value. AnyValue may contain a +// primitive value such as a string or integer or it may contain an arbitrary nested +// object containing arrays, key-value lists and primitives. +message AnyValue { + // The value is one of the listed fields. It is valid for all values to be unspecified + // in which case this AnyValue is considered to be "empty". + oneof value { + string string_value = 1; + bool bool_value = 2; + int64 int_value = 3; + double double_value = 4; + ArrayValue array_value = 5; + KeyValueList kvlist_value = 6; + bytes bytes_value = 7; + // Reference to the string value in ProfilesDictionary.string_table. + // + // Note: This is currently used exclusively in the Profiling signal. + // Implementers of OTLP receivers for signals other than Profiling should + // treat the presence of this value as a non-fatal issue. + // Log an error or warning indicating an unexpected field intended for the + // Profiling signal and process the data as if this value were absent or + // empty, ignoring its semantic content for the non-Profiling signal. + // + // Status: [Development] + int32 string_value_strindex = 8; + } +} + +// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message +// since oneof in AnyValue does not allow repeated fields. +message ArrayValue { + // Array of values. The array may be empty (contain 0 elements). + repeated AnyValue values = 1; +} + +// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message +// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need +// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to +// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches +// are semantically equivalent. +message KeyValueList { + // A collection of key/value pairs of key-value pairs. The list may be empty (may + // contain 0 elements). + // + // The keys MUST be unique (it is not allowed to have more than one + // value with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated KeyValue values = 1; +} + +// Represents a key-value pair that is used to store Span attributes, Link +// attributes, etc. +message KeyValue { + // The key name of the pair. + // key_ref MUST NOT be set if key is used. + string key = 1; + + // The value of the pair. + AnyValue value = 2; + + // Reference to the string key in ProfilesDictionary.string_table. + // key MUST NOT be set if key_strindex is used. + // + // Note: This is currently used exclusively in the Profiling signal. + // Implementers of OTLP receivers for signals other than Profiling should + // treat the presence of this key as a non-fatal issue. + // Log an error or warning indicating an unexpected field intended for the + // Profiling signal and process the data as if this value were absent or + // empty, ignoring its semantic content for the non-Profiling signal. + // + // Status: [Development] + int32 key_strindex = 3; +} + +// InstrumentationScope is a message representing the instrumentation scope information +// such as the fully qualified name and version. +message InstrumentationScope { + // A name denoting the Instrumentation scope. + // An empty instrumentation scope name means the name is unknown. + string name = 1; + + // Defines the version of the instrumentation scope. + // An empty instrumentation scope version means the version is unknown. + string version = 2; + + // Additional attributes that describe the scope. [Optional]. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated KeyValue attributes = 3; + + // The number of attributes that were discarded. Attributes + // can be discarded because their keys are too long or because there are too many + // attributes. If this value is 0, then no attributes were dropped. + uint32 dropped_attributes_count = 4; +} + +// A reference to an Entity. +// Entity represents an object of interest associated with produced telemetry: e.g spans, metrics, profiles, or logs. +// +// Status: [Development] +message EntityRef { + // The Schema URL, if known. This is the identifier of the Schema that the entity data + // is recorded in. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // + // This schema_url applies to the data in this message and to the Resource attributes + // referenced by id_keys and description_keys. + // TODO: discuss if we are happy with this somewhat complicated definition of what + // the schema_url applies to. + // + // This field obsoletes the schema_url field in ResourceMetrics/ResourceSpans/ResourceLogs. + string schema_url = 1; + + // Defines the type of the entity. MUST not change during the lifetime of the entity. + // For example: "service" or "host". This field is required and MUST not be empty + // for valid entities. + string type = 2; + + // Attribute Keys that identify the entity. + // MUST not change during the lifetime of the entity. The Id must contain at least one attribute. + // These keys MUST exist in the containing {message}.attributes. + repeated string id_keys = 3; + + // Descriptive (non-identifying) attribute keys of the entity. + // MAY change over the lifetime of the entity. MAY be empty. + // These attribute keys are not part of entity's identity. + // These keys MUST exist in the containing {message}.attributes. + repeated string description_keys = 4; +} diff --git a/libdd-trace-utils/src/msgpack_encoder/otlp/mod.rs b/libdd-trace-utils/src/msgpack_encoder/otlp/mod.rs new file mode 100644 index 0000000000..7ec58b3674 --- /dev/null +++ b/libdd-trace-utils/src/msgpack_encoder/otlp/mod.rs @@ -0,0 +1,4 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +pub mod trace; diff --git a/libdd-trace-utils/src/msgpack_encoder/otlp/resource.proto b/libdd-trace-utils/src/msgpack_encoder/otlp/resource.proto new file mode 100644 index 0000000000..dadba7867d --- /dev/null +++ b/libdd-trace-utils/src/msgpack_encoder/otlp/resource.proto @@ -0,0 +1,46 @@ + +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.resource.v1; + +import "libdd-trace-utils/src/msgpack_encoder/otlp/common.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Resource.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.resource.v1"; +option java_outer_classname = "ResourceProto"; +option go_package = "go.opentelemetry.io/proto/otlp/resource/v1"; + +// Resource information. +message Resource { + // Set of attributes that describe the resource. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 1; + + // The number of dropped attributes. If the value is 0, then + // no attributes were dropped. + uint32 dropped_attributes_count = 2; + + // Set of entities that participate in this Resource. + // + // Note: keys in the references MUST exist in attributes of this message. + // + // Status: [Development] + repeated opentelemetry.proto.common.v1.EntityRef entity_refs = 3; +} diff --git a/libdd-trace-utils/src/msgpack_encoder/otlp/trace.proto b/libdd-trace-utils/src/msgpack_encoder/otlp/trace.proto new file mode 100644 index 0000000000..40a3a034da --- /dev/null +++ b/libdd-trace-utils/src/msgpack_encoder/otlp/trace.proto @@ -0,0 +1,359 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.trace.v1; + +import "libdd-trace-utils/src/msgpack_encoder/otlp/common.proto"; +import "libdd-trace-utils/src/msgpack_encoder/otlp/resource.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Trace.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.trace.v1"; +option java_outer_classname = "TraceProto"; +option go_package = "go.opentelemetry.io/proto/otlp/trace/v1"; + +// TracesData represents the traces data that can be stored in a persistent storage, +// OR can be embedded by other protocols that transfer OTLP traces data but do +// not implement the OTLP protocol. +// +// The main difference between this message and collector protocol is that +// in this message there will not be any "control" or "metadata" specific to +// OTLP protocol. +// +// When new fields are added into this message, the OTLP request MUST be updated +// as well. +message TracesData { + // An array of ResourceSpans. + // For data coming from a single resource this array will typically contain + // one element. Intermediary nodes that receive data from multiple origins + // typically batch the data before forwarding further and in that case this + // array will contain multiple elements. + repeated ResourceSpans resource_spans = 1; +} + +// A collection of ScopeSpans from a Resource. +message ResourceSpans { + reserved 1000; + + // The resource for the spans in this message. + // If this field is not set then no resource info is known. + opentelemetry.proto.resource.v1.Resource resource = 1; + + // A list of ScopeSpans that originate from a resource. + repeated ScopeSpans scope_spans = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the resource data + // is recorded in. Notably, the last part of the URL path is the version number of the + // schema: http[s]://server[:port]/path/. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to the data in the "resource" field. It does not apply + // to the data in the "scope_spans" field which have their own schema_url field. + string schema_url = 3; +} + +// A collection of Spans produced by an InstrumentationScope. +message ScopeSpans { + // The instrumentation scope information for the spans in this message. + // Semantically when InstrumentationScope isn't set, it is equivalent with + // an empty instrumentation scope name (unknown). + opentelemetry.proto.common.v1.InstrumentationScope scope = 1; + + // A list of Spans that originate from an instrumentation scope. + repeated Span spans = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the span data + // is recorded in. Notably, the last part of the URL path is the version number of the + // schema: http[s]://server[:port]/path/. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to the data in the "scope" field and all spans and span + // events in the "spans" field. + string schema_url = 3; +} + +// A Span represents a single operation performed by a single component of the system. +// +// The next available field id is 17. +message Span { + // A unique identifier for a trace. All spans from the same trace share + // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR + // of length other than 16 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is required. + bytes trace_id = 1; + + // A unique identifier for a span within a trace, assigned when the span + // is created. The ID is an 8-byte array. An ID with all zeroes OR of length + // other than 8 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is required. + bytes span_id = 2; + + // trace_state conveys information about request position in multiple distributed tracing graphs. + // It is a trace_state in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header + // See also https://github.com/w3c/distributed-tracing for more details about this field. + string trace_state = 3; + + // The `span_id` of this span's parent span. If this is a root span, then this + // field must be empty. The ID is an 8-byte array. + bytes parent_span_id = 4; + + // Flags, a bit field. + // + // Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace + // Context specification. To read the 8-bit W3C trace flag, use + // `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`. + // + // See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. + // + // Bits 8 and 9 represent the 3 states of whether a span's parent + // is remote. The states are (unknown, is not remote, is remote). + // To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`. + // To read whether the span is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`. + // + // When creating span messages, if the message is logically forwarded from another source + // with an equivalent flags fields (i.e., usually another OTLP span message), the field SHOULD + // be copied as-is. If creating from a source that does not have an equivalent flags field + // (such as a runtime representation of an OpenTelemetry span), the high 22 bits MUST + // be set to zero. + // Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero. + // + // [Optional]. + fixed32 flags = 16; + + // A description of the span's operation. + // + // For example, the name can be a qualified method name or a file name + // and a line number where the operation is called. A best practice is to use + // the same display name at the same call point in an application. + // This makes it easier to correlate spans in different traces. + // + // This field is semantically required to be set to non-empty string. + // Empty value is equivalent to an unknown span name. + // + // This field is required. + string name = 5; + + // SpanKind is the type of span. Can be used to specify additional relationships between spans + // in addition to a parent/child relationship. + enum SpanKind { + // Unspecified. Do NOT use as default. + // Implementations MAY assume SpanKind to be INTERNAL when receiving UNSPECIFIED. + SPAN_KIND_UNSPECIFIED = 0; + + // Indicates that the span represents an internal operation within an application, + // as opposed to an operation happening at the boundaries. Default value. + SPAN_KIND_INTERNAL = 1; + + // Indicates that the span covers server-side handling of an RPC or other + // remote network request. + SPAN_KIND_SERVER = 2; + + // Indicates that the span describes a request to some remote service. + SPAN_KIND_CLIENT = 3; + + // Indicates that the span describes a producer sending a message to a broker. + // Unlike CLIENT and SERVER, there is often no direct critical path latency relationship + // between producer and consumer spans. A PRODUCER span ends when the message was accepted + // by the broker while the logical processing of the message might span a much longer time. + SPAN_KIND_PRODUCER = 4; + + // Indicates that the span describes consumer receiving a message from a broker. + // Like the PRODUCER kind, there is often no direct critical path latency relationship + // between producer and consumer spans. + SPAN_KIND_CONSUMER = 5; + } + + // Distinguishes between spans generated in a particular context. For example, + // two spans with the same name may be distinguished using `CLIENT` (caller) + // and `SERVER` (callee) to identify queueing latency associated with the span. + SpanKind kind = 6; + + // The start time of the span. On the client side, this is the time + // kept by the local machine where the span execution starts. On the server side, this + // is the time when the server's application handler starts running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 start_time_unix_nano = 7; + + // The end time of the span. On the client side, this is the time + // kept by the local machine where the span execution ends. On the server side, this + // is the time when the server application handler stops running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 end_time_unix_nano = 8; + + // A collection of key/value pairs. Note, global attributes + // like server name can be set using the resource API. Examples of attributes: + // + // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" + // "/http/server_latency": 300 + // "example.com/myattribute": true + // "example.com/score": 10.239 + // + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 9; + + // The number of attributes that were discarded. Attributes + // can be discarded because their keys are too long or because there are too many + // attributes. If this value is 0, then no attributes were dropped. + uint32 dropped_attributes_count = 10; + + // Event is a time-stamped annotation of the span, consisting of user-supplied + // text description and key-value pairs. + message Event { + // The time the event occurred. + fixed64 time_unix_nano = 1; + + // The name of the event. + // This field is semantically required to be set to non-empty string. + string name = 2; + + // A collection of attribute key/value pairs on the event. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 3; + + // The number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 4; + } + + // A collection of Event items. + repeated Event events = 11; + + // The number of dropped events. If the value is 0, then no + // events were dropped. + uint32 dropped_events_count = 12; + + // A pointer from the current span to another span in the same trace or in a + // different trace. For example, this can be used in batching operations, + // where a single batch handler processes multiple requests from different + // traces or when the handler receives a request from a different project. + message Link { + // A unique identifier of a trace that this linked span is part of. The ID is a + // 16-byte array. + bytes trace_id = 1; + + // A unique identifier for the linked span. The ID is an 8-byte array. + bytes span_id = 2; + + // The trace_state associated with the link. + string trace_state = 3; + + // A collection of attribute key/value pairs on the link. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 4; + + // The number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 5; + + // Flags, a bit field. + // + // Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace + // Context specification. To read the 8-bit W3C trace flag, use + // `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`. + // + // See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. + // + // Bits 8 and 9 represent the 3 states of whether the link is remote. + // The states are (unknown, is not remote, is remote). + // To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`. + // To read whether the link is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`. + // + // Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero. + // When creating new spans, bits 10-31 (most-significant 22-bits) MUST be zero. + // + // [Optional]. + fixed32 flags = 6; + } + + // A collection of Links, which are references from this span to a span + // in the same or different trace. + repeated Link links = 13; + + // The number of dropped links after the maximum size was + // enforced. If this value is 0, then no links were dropped. + uint32 dropped_links_count = 14; + + // An optional final status for this span. Semantically when Status isn't set, it means + // span's status code is unset, i.e. assume STATUS_CODE_UNSET (code = 0). + Status status = 15; +} + +// The Status type defines a logical error model that is suitable for different +// programming environments, including REST APIs and RPC APIs. +message Status { + reserved 1; + + // A developer-facing human readable error message. + string message = 2; + + // For the semantics of status codes see + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#set-status + enum StatusCode { + // The default status. + STATUS_CODE_UNSET = 0; + // The Span has been validated by an Application developer or Operator to + // have completed successfully. + STATUS_CODE_OK = 1; + // The Span contains an error. + STATUS_CODE_ERROR = 2; + }; + + // The status code. + StatusCode code = 3; +} + +// SpanFlags represents constants used to interpret the +// Span.flags field, which is protobuf 'fixed32' type and is to +// be used as bit-fields. Each non-zero value defined in this enum is +// a bit-mask. To extract the bit-field, for example, use an +// expression like: +// +// (span.flags & SPAN_FLAGS_TRACE_FLAGS_MASK) +// +// See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. +// +// Note that Span flags were introduced in version 1.1 of the +// OpenTelemetry protocol. Older Span producers do not set this +// field, consequently consumers should not rely on the absence of a +// particular flag bit to indicate the presence of a particular feature. +enum SpanFlags { + // The zero value for the enum. Should not be used for comparisons. + // Instead use bitwise "and" with the appropriate mask as shown above. + SPAN_FLAGS_DO_NOT_USE = 0; + + // Bits 0-7 are used for trace flags. + SPAN_FLAGS_TRACE_FLAGS_MASK = 0x000000FF; + + // Bits 8 and 9 are used to indicate that the parent span or link span is remote. + // Bit 8 (`HAS_IS_REMOTE`) indicates whether the value is known. + // Bit 9 (`IS_REMOTE`) indicates whether the span or link is remote. + SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK = 0x00000100; + SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK = 0x00000200; + + // Bits 10-31 are reserved for future use. +} \ No newline at end of file diff --git a/libdd-trace-utils/src/msgpack_encoder/otlp/trace.rs b/libdd-trace-utils/src/msgpack_encoder/otlp/trace.rs new file mode 100644 index 0000000000..785bd63b61 --- /dev/null +++ b/libdd-trace-utils/src/msgpack_encoder/otlp/trace.rs @@ -0,0 +1,789 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Manual protobuf encoding of v04 spans into OTLP TracesData messages. +//! +//! Field number references: +//! - common.proto: KeyValue(1=key, 2=value), AnyValue(1=string, 2=bool, 3=int64, 4=double, 5=array, 7=bytes), ArrayValue(1=values) +//! - resource.proto: Resource(1=attributes) +//! - trace.proto: +//! - TracesData(1=resource_spans) +//! - ResourceSpans(1=resource, 2=scope_spans) +//! - ScopeSpans(2=spans) +//! - Span(1=trace_id, 2=span_id, 4=parent_span_id, 5=name, 7=start_time_unix_nano, +//! 8=end_time_unix_nano, 9=attributes, 11=events, 13=links, 15=status) +//! - Status(3=code) +//! - Span.Event(1=time_unix_nano, 2=name, 3=attributes) +//! - Span.Link(1=trace_id, 2=span_id, 3=trace_state, 4=attributes, 6=flags) + +use libdd_proto_codec::encoder::{BufMut, Encoder, TopLevelEncoder}; +use std::borrow::Borrow; + +use crate::span::trace_utils::has_top_level; +use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; +use crate::span::TraceData; + +// common.proto: KeyValue +const KV_KEY: u32 = 1; +const KV_VALUE: u32 = 2; + +// common.proto: AnyValue oneof field numbers +const ANY_VALUE_STRING: u32 = 1; +const ANY_VALUE_BOOL: u32 = 2; +const ANY_VALUE_INT: u32 = 3; +const ANY_VALUE_DOUBLE: u32 = 4; +const ANY_VALUE_ARRAY: u32 = 5; +const ANY_VALUE_BYTES: u32 = 7; + +// common.proto: ArrayValue +const ARRAY_VALUES: u32 = 1; + +// resource.proto: Resource +const RESOURCE_ATTRIBUTES: u32 = 1; + +// trace.proto: TracesData +const TRACES_DATA_RESOURCE_SPANS: u32 = 1; + +// trace.proto: ResourceSpans +const RESOURCE_SPANS_RESOURCE: u32 = 1; +const RESOURCE_SPANS_SCOPE_SPANS: u32 = 2; + +// trace.proto: ScopeSpans +const SCOPE_SPANS_SPANS: u32 = 2; + +// trace.proto: Span +const SPAN_TRACE_ID: u32 = 1; +const SPAN_SPAN_ID: u32 = 2; +const SPAN_PARENT_SPAN_ID: u32 = 4; +const SPAN_NAME: u32 = 5; +const SPAN_START_TIME_UNIX_NANO: u32 = 7; +const SPAN_END_TIME_UNIX_NANO: u32 = 8; +const SPAN_ATTRIBUTES: u32 = 9; +const SPAN_EVENTS: u32 = 11; +const SPAN_LINKS: u32 = 13; +const SPAN_STATUS: u32 = 15; + +// trace.proto: Status — field 1 is reserved, code is field 3 +const STATUS_CODE: u32 = 3; + +// trace.proto: Span.Event +const EVENT_TIME_UNIX_NANO: u32 = 1; +const EVENT_NAME: u32 = 2; +const EVENT_ATTRIBUTES: u32 = 3; + +// trace.proto: Span.Link +const LINK_TRACE_ID: u32 = 1; +const LINK_SPAN_ID: u32 = 2; +const LINK_TRACE_STATE: u32 = 3; +const LINK_ATTRIBUTES: u32 = 4; +const LINK_FLAGS: u32 = 6; + +fn encode_kv_string(e: &mut Encoder<'_, B>, kv_field: u32, key: &str, value: &str) { + let mut kv = e.write_message_repeated(kv_field); + let mut kv_e = kv.encoder(); + kv_e.write_string_opt(KV_KEY, key); + kv_e.write_message_repeated(KV_VALUE) + .encoder() + .write_string_opt(ANY_VALUE_STRING, value); +} + +fn encode_kv_double(e: &mut Encoder<'_, B>, kv_field: u32, key: &str, value: f64) { + let mut kv = e.write_message_repeated(kv_field); + let mut kv_e = kv.encoder(); + kv_e.write_string_opt(KV_KEY, key); + kv_e.write_message_repeated(KV_VALUE) + .encoder() + .write_f64_opt(ANY_VALUE_DOUBLE, value); +} + +fn encode_kv_bytes(e: &mut Encoder<'_, B>, kv_field: u32, key: &str, value: &[u8]) { + let mut kv = e.write_message_repeated(kv_field); + let mut kv_e = kv.encoder(); + kv_e.write_string(KV_KEY, key); + kv_e.write_message(KV_VALUE) + .encoder() + .write_bytes_opt(ANY_VALUE_BYTES, value); +} + +fn encode_attribute_array_value_fields( + e: &mut Encoder<'_, B>, + value: &AttributeArrayValue, +) { + match value { + AttributeArrayValue::String(s) => e.write_string_opt(ANY_VALUE_STRING, s.borrow()), + AttributeArrayValue::Boolean(b) => e.write_bool_opt(ANY_VALUE_BOOL, *b), + AttributeArrayValue::Integer(i) => e.write_int64_opt(ANY_VALUE_INT, *i), + AttributeArrayValue::Double(d) => e.write_f64_opt(ANY_VALUE_DOUBLE, *d), + } +} + +fn encode_kv_attribute_any_value( + e: &mut Encoder<'_, B>, + kv_field: u32, + key: &str, + value: &AttributeAnyValue, +) { + let mut kv = e.write_message_repeated(kv_field); + let mut kv_e = kv.encoder(); + kv_e.write_string_opt(KV_KEY, key); + { + let mut av = kv_e.write_message_repeated(KV_VALUE); + let mut av_e = av.encoder(); + match value { + AttributeAnyValue::SingleValue(v) => { + encode_attribute_array_value_fields(&mut av_e, v); + } + AttributeAnyValue::Array(arr) => { + let mut arr_msg = av_e.write_message_repeated(ANY_VALUE_ARRAY); + let mut arr_e = arr_msg.encoder(); + for item in arr { + let mut item_msg = arr_e.write_message_repeated(ARRAY_VALUES); + encode_attribute_array_value_fields(&mut item_msg.encoder(), item); + } + } + } + } +} + +fn encode_span_event(e: &mut Encoder<'_, B>, event: &SpanEvent) { + let mut ev = e.write_message_repeated(SPAN_EVENTS); + let mut ev_e = ev.encoder(); + ev_e.write_fixed64_opt(EVENT_TIME_UNIX_NANO, event.time_unix_nano); + ev_e.write_string_opt(EVENT_NAME, event.name.borrow()); + for (k, v) in &event.attributes { + encode_kv_attribute_any_value(&mut ev_e, EVENT_ATTRIBUTES, k.borrow(), v); + } +} + +fn encode_span_link(e: &mut Encoder<'_, B>, link: &SpanLink) { + let mut lk = e.write_message_repeated(SPAN_LINKS); + let mut lk_e = lk.encoder(); + + // trace_id: 16 bytes big-endian — high 64 bits followed by low 64 bits + let mut trace_id_bytes = [0u8; 16]; + trace_id_bytes[0..8].copy_from_slice(&link.trace_id_high.to_be_bytes()); + trace_id_bytes[8..16].copy_from_slice(&link.trace_id.to_be_bytes()); + lk_e.write_bytes_opt(LINK_TRACE_ID, &trace_id_bytes); + + lk_e.write_bytes_opt(LINK_SPAN_ID, &link.span_id.to_be_bytes()); + + if !link.tracestate.borrow().is_empty() { + lk_e.write_string_opt(LINK_TRACE_STATE, link.tracestate.borrow()); + } + + for (k, v) in &link.attributes { + encode_kv_string(&mut lk_e, LINK_ATTRIBUTES, k.borrow(), v.borrow()); + } + + if link.flags != 0 { + lk_e.write_fixed32_opt(LINK_FLAGS, link.flags); + } +} + +/// Encodes a v04 `Span` as an OTLP Span protobuf message into the provided encoder. +/// +/// Field mapping: +/// - `trace_id` → `Span.trace_id` (16 bytes big-endian; lower 64 bits of the u128) +/// - `span_id` → `Span.span_id` (8 bytes big-endian) +/// - `parent_id` → `Span.parent_span_id` (8 bytes big-endian; omitted if zero) +/// - `name` → `Span.name` +/// - `start` → `Span.start_time_unix_nano` +/// - `start + duration` → `Span.end_time_unix_nano` +/// - `meta` → `Span.attributes` (string AnyValue) +/// - `metrics` → `Span.attributes` (double AnyValue) +/// - `meta_struct` → `Span.attributes` (bytes AnyValue) +/// - `type` → `Span.attributes["dd.span.type"]` (if non-empty) +/// - `resource` → `Span.attributes["dd.span.resource"]` (if non-empty) +/// - `error != 0` → `Span.status.code = STATUS_CODE_ERROR (2)` +/// - `span_events` → `Span.events` +/// - `span_links` → `Span.links` +/// +/// The `service` field is not written here; see [`encode_traces_data`] for the full +/// TracesData wrapper which places `service` in `Resource.attributes["service.name"]`. +fn encode_span(e: &mut Encoder<'_, B>, span: &Span) { + e.write_bytes_opt(SPAN_TRACE_ID, &span.trace_id.to_be_bytes()); + e.write_bytes_opt(SPAN_SPAN_ID, &span.span_id.to_be_bytes()); + + if span.parent_id != 0 { + e.write_bytes_opt(SPAN_PARENT_SPAN_ID, &span.parent_id.to_be_bytes()); + } + + e.write_string_opt(SPAN_NAME, span.name.borrow()); + e.write_fixed64_opt(SPAN_START_TIME_UNIX_NANO, span.start as u64); + e.write_fixed64_opt(SPAN_END_TIME_UNIX_NANO, (span.start + span.duration) as u64); + + encode_kv_string(e, SPAN_ATTRIBUTES, "service.name", span.name.borrow()); + + for (k, v) in &span.meta { + encode_kv_string(e, SPAN_ATTRIBUTES, k.borrow(), v.borrow()); + } + for (k, v) in &span.metrics { + encode_kv_double(e, SPAN_ATTRIBUTES, k.borrow(), *v); + } + for (k, v) in &span.meta_struct { + encode_kv_bytes(e, SPAN_ATTRIBUTES, k.borrow(), v.borrow()); + } + if !span.r#type.borrow().is_empty() { + encode_kv_string(e, SPAN_ATTRIBUTES, "dd.span.type", span.r#type.borrow()); + } + if !span.resource.borrow().is_empty() { + encode_kv_string( + e, + SPAN_ATTRIBUTES, + "dd.span.resource", + span.resource.borrow(), + ); + } + + for event in &span.span_events { + encode_span_event(e, event); + } + for link in &span.span_links { + encode_span_link(e, link); + } + + if span.error != 0 { + // STATUS_CODE_ERROR = 2 + let mut status = e.write_message(SPAN_STATUS); + status.encoder().write_int32_opt(STATUS_CODE, 2); + } +} + +/// Encodes a single trace chunk as an OTLP `ResourceSpans` message into the provided encoder. +/// +/// The service name is taken from the first span in the chunk where [`has_top_level`] is true. +/// If no top-level span is found the `Resource` message (and its `service.name` attribute) +/// is omitted. All spans in the chunk are written into a single `ScopeSpans`. +fn encode_resource_spans(e: &mut Encoder<'_, B>, chunk: &[Span]) { + let mut resource_spans = e.write_message_repeated(TRACES_DATA_RESOURCE_SPANS); + let mut rs_e = resource_spans.encoder(); + + { + // Resource message is omitted when no service can be determined + // (write_message skips the field when nothing is written inside it). + let service = chunk + .iter() + .find(|s| has_top_level(s)) + .map(|s| s.service.borrow()); + let mut resource = rs_e.write_message(RESOURCE_SPANS_RESOURCE); + if let Some(svc) = service { + if !svc.is_empty() { + encode_kv_string( + &mut resource.encoder(), + RESOURCE_ATTRIBUTES, + "service.name", + svc, + ); + } + } + } + + { + let mut scope_spans = rs_e.write_message_repeated(RESOURCE_SPANS_SCOPE_SPANS); + let mut ss_e = scope_spans.encoder(); + for span in chunk { + let mut span_msg = ss_e.write_message_repeated(SCOPE_SPANS_SPANS); + encode_span(&mut span_msg.encoder(), span); + } + } +} + +/// Encodes a list of trace chunks into the OTLP `TracesData` protobuf structure. +/// +/// Each chunk becomes one `ResourceSpans`. The service name for each `ResourceSpans` is +/// determined by finding the first span in the chunk where [`has_top_level`] is true. +/// +/// ```text +/// TracesData +/// ResourceSpans ← one per chunk +/// Resource +/// attributes: [{ key: "service.name", value: }] +/// ScopeSpans +/// Span … ← one per span in the chunk +/// ``` +/// +/// See [`encode_span`] for how individual span fields are mapped. +fn encode_traces_data( + e: &mut Encoder<'_, B>, + trace_chunks: &[Vec>], +) { + for chunk in trace_chunks { + encode_resource_spans(e, chunk); + } +} + +/// Encodes a list of trace chunks into the OTLP `TracesData` protobuf format and returns +/// the bytes. +/// +/// See [`encode_traces_data`] for the structure and field mapping. +pub fn to_buf(trace_chunks: &[Vec>]) -> B { + let mut enc = TopLevelEncoder::with_capacity(256); + encode_traces_data(&mut enc.encoder(), trace_chunks); + enc.finish() +} + +/// Encodes a list of trace chunks into the OTLP `TracesData` protobuf format and returns +/// the bytes as a Vec. +/// +/// See [`encode_traces_data`] for the structure and field mapping. +pub fn to_vec(trace_chunks: &[Vec>]) -> Vec { + to_buf(trace_chunks) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::span::v04::{ + AttributeAnyValue, AttributeArrayValue, SpanEvent, SpanLink, SpanSlice, + }; + use prost::Message; + use std::collections::HashMap; + + // Minimal inline prost structs for round-trip verification. + // Defined here to avoid a prost build-script dependency on the OTLP .proto files. + + #[derive(prost::Message)] + struct TracesData { + #[prost(message, repeated, tag = "1")] + resource_spans: Vec, + } + + #[derive(prost::Message)] + struct ResourceSpans { + #[prost(message, optional, tag = "1")] + resource: Option, + #[prost(message, repeated, tag = "2")] + scope_spans: Vec, + } + + #[derive(prost::Message)] + struct Resource { + #[prost(message, repeated, tag = "1")] + attributes: Vec, + } + + #[derive(prost::Message)] + struct ScopeSpans { + #[prost(message, repeated, tag = "2")] + spans: Vec, + } + + #[derive(prost::Message, PartialEq)] + struct OtlpSpan { + #[prost(bytes = "vec", tag = "1")] + trace_id: Vec, + #[prost(bytes = "vec", tag = "2")] + span_id: Vec, + #[prost(bytes = "vec", tag = "4")] + parent_span_id: Vec, + #[prost(string, tag = "5")] + name: String, + #[prost(fixed64, tag = "7")] + start_time_unix_nano: u64, + #[prost(fixed64, tag = "8")] + end_time_unix_nano: u64, + #[prost(message, repeated, tag = "9")] + attributes: Vec, + #[prost(message, repeated, tag = "11")] + events: Vec, + #[prost(message, repeated, tag = "13")] + links: Vec, + #[prost(message, optional, tag = "15")] + status: Option, + } + + // AnyValue without the oneof wrapper — protobuf wire format is identical, + // prost will populate whichever optional field matches the tag. + #[derive(prost::Message, PartialEq, Clone)] + struct AnyValue { + #[prost(string, optional, tag = "1")] + string_value: Option, + #[prost(bool, optional, tag = "2")] + bool_value: Option, + #[prost(int64, optional, tag = "3")] + int_value: Option, + #[prost(double, optional, tag = "4")] + double_value: Option, + #[prost(message, optional, tag = "5")] + array_value: Option, + #[prost(bytes = "vec", optional, tag = "7")] + bytes_value: Option>, + } + + #[derive(prost::Message, PartialEq, Clone)] + struct ArrayValue { + #[prost(message, repeated, tag = "1")] + values: Vec, + } + + #[derive(prost::Message, PartialEq)] + struct KeyValue { + #[prost(string, tag = "1")] + key: String, + #[prost(message, optional, tag = "2")] + value: Option, + } + + #[derive(prost::Message, PartialEq)] + struct OtlpEvent { + #[prost(fixed64, tag = "1")] + time_unix_nano: u64, + #[prost(string, tag = "2")] + name: String, + #[prost(message, repeated, tag = "3")] + attributes: Vec, + } + + #[derive(prost::Message, PartialEq)] + struct OtlpLink { + #[prost(bytes = "vec", tag = "1")] + trace_id: Vec, + #[prost(bytes = "vec", tag = "2")] + span_id: Vec, + #[prost(string, tag = "3")] + trace_state: String, + #[prost(message, repeated, tag = "4")] + attributes: Vec, + #[prost(fixed32, tag = "6")] + flags: u32, + } + + #[derive(prost::Message, PartialEq)] + struct OtlpStatus { + #[prost(int32, tag = "3")] + code: i32, + } + + fn decode(bytes: &[u8]) -> TracesData { + TracesData::decode(bytes).expect("valid protobuf") + } + + fn find_kv<'a>(attrs: &'a [KeyValue], key: &str) -> Option<&'a AnyValue> { + attrs + .iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.as_ref()) + } + + // Returns a metrics map with `_top_level` set, making `has_top_level` return true. + fn top_level_metrics() -> HashMap<&'static str, f64> { + HashMap::from([("_top_level", 1.0)]) + } + + #[test] + fn test_basic_span_fields() { + let span = SpanSlice { + trace_id: 0x0102030405060708090a0b0c0d0e0f10_u128, + span_id: 0xaabbccdd11223344_u64, + parent_id: 0x1111222233334444_u64, + name: "my.operation", + start: 1_000_000_000, + duration: 500_000_000, + ..Default::default() + }; + + let decoded = decode(&to_vec(&[vec![span]])); + + assert_eq!(decoded.resource_spans.len(), 1); + let rs = &decoded.resource_spans[0]; + // No top-level span with service → resource is omitted + assert!(rs.resource.is_none()); + + assert_eq!(rs.scope_spans.len(), 1); + let spans = &rs.scope_spans[0].spans; + assert_eq!(spans.len(), 1); + + let s = &spans[0]; + assert_eq!( + s.trace_id, + 0x0102030405060708090a0b0c0d0e0f10_u128 + .to_be_bytes() + .to_vec() + ); + assert_eq!(s.span_id, 0xaabbccdd11223344_u64.to_be_bytes().to_vec()); + assert_eq!( + s.parent_span_id, + 0x1111222233334444_u64.to_be_bytes().to_vec() + ); + assert_eq!(s.name, "my.operation"); + assert_eq!(s.start_time_unix_nano, 1_000_000_000_u64); + assert_eq!(s.end_time_unix_nano, 1_500_000_000_u64); + assert!(s.status.is_none()); + } + + #[test] + fn test_service_taken_from_top_level_span() { + // Non-top-level span first, then top-level span — service comes from the top-level one. + let chunk = vec![ + SpanSlice { + service: "wrong_service", + name: "child", + ..Default::default() + }, + SpanSlice { + service: "correct_service", + name: "root", + metrics: top_level_metrics(), + ..Default::default() + }, + ]; + + let decoded = decode(&to_vec(&[chunk])); + let resource = decoded.resource_spans[0] + .resource + .as_ref() + .expect("resource present"); + let svc = find_kv(&resource.attributes, "service.name") + .expect("service.name attribute") + .string_value + .as_deref() + .unwrap(); + assert_eq!(svc, "correct_service"); + } + + #[test] + fn test_no_top_level_span_omits_resource() { + // None of the spans has has_top_level → Resource message must be omitted. + let chunk = vec![SpanSlice { + service: "some_service", + name: "op", + ..Default::default() + }]; + + let decoded = decode(&to_vec(&[chunk])); + assert!(decoded.resource_spans[0].resource.is_none()); + } + + #[test] + fn test_multiple_chunks_produce_multiple_resource_spans() { + let chunk_a = vec![ + SpanSlice { + service: "svc_a", + name: "root_a", + span_id: 1, + metrics: top_level_metrics(), + ..Default::default() + }, + SpanSlice { + service: "svc_a", + name: "child_a", + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_b = vec![SpanSlice { + service: "svc_b", + name: "root_b", + span_id: 3, + metrics: top_level_metrics(), + ..Default::default() + }]; + + let decoded = decode(&to_vec(&[chunk_a, chunk_b])); + + assert_eq!(decoded.resource_spans.len(), 2); + + // chunk_a → first ResourceSpans with 2 spans + let rs_a = &decoded.resource_spans[0]; + let svc_a = find_kv(&rs_a.resource.as_ref().unwrap().attributes, "service.name") + .unwrap() + .string_value + .as_deref() + .unwrap(); + assert_eq!(svc_a, "svc_a"); + assert_eq!(rs_a.scope_spans[0].spans.len(), 2); + + // chunk_b → second ResourceSpans with 1 span + let rs_b = &decoded.resource_spans[1]; + let svc_b = find_kv(&rs_b.resource.as_ref().unwrap().attributes, "service.name") + .unwrap() + .string_value + .as_deref() + .unwrap(); + assert_eq!(svc_b, "svc_b"); + assert_eq!(rs_b.scope_spans[0].spans.len(), 1); + } + + #[test] + fn test_span_attributes_from_meta_and_metrics() { + let span = SpanSlice { + name: "op", + r#type: "web", + resource: "GET /users", + meta: HashMap::from([("http.method", "GET"), ("env", "prod")]), + metrics: HashMap::from([("duration_ms", 42.5_f64)]), + ..Default::default() + }; + + let decoded = decode(&to_vec(&[vec![span]])); + let attrs = &decoded.resource_spans[0].scope_spans[0].spans[0].attributes; + + assert_eq!( + find_kv(attrs, "http.method") + .unwrap() + .string_value + .as_deref(), + Some("GET") + ); + assert_eq!( + find_kv(attrs, "env").unwrap().string_value.as_deref(), + Some("prod") + ); + assert_eq!( + find_kv(attrs, "duration_ms").unwrap().double_value, + Some(42.5) + ); + assert_eq!( + find_kv(attrs, "dd.span.type") + .unwrap() + .string_value + .as_deref(), + Some("web") + ); + assert_eq!( + find_kv(attrs, "dd.span.resource") + .unwrap() + .string_value + .as_deref(), + Some("GET /users") + ); + } + + #[test] + fn test_error_maps_to_status_code_error() { + let span = SpanSlice { + name: "op", + error: 1, + ..Default::default() + }; + + let decoded = decode(&to_vec(&[vec![span]])); + let status = decoded.resource_spans[0].scope_spans[0].spans[0] + .status + .as_ref() + .expect("status present"); + // STATUS_CODE_ERROR = 2 + assert_eq!(status.code, 2); + } + + #[test] + fn test_span_events_encoding() { + let span = SpanSlice { + name: "op", + span_events: vec![SpanEvent { + time_unix_nano: 9_999_999_999, + name: "exception", + attributes: HashMap::from([ + ( + "exception.message", + AttributeAnyValue::SingleValue(AttributeArrayValue::String( + "divide by zero", + )), + ), + ( + "exception.escaped", + AttributeAnyValue::SingleValue(AttributeArrayValue::Boolean(true)), + ), + ( + "exception.count", + AttributeAnyValue::SingleValue(AttributeArrayValue::Integer(3)), + ), + ( + "exception.duration", + AttributeAnyValue::SingleValue(AttributeArrayValue::Double(0.5)), + ), + ( + "exception.lines", + AttributeAnyValue::Array(vec![ + AttributeArrayValue::String("line 1"), + AttributeArrayValue::String("line 2"), + ]), + ), + ]), + }], + ..Default::default() + }; + + let decoded = decode(&to_vec(&[vec![span]])); + let events = &decoded.resource_spans[0].scope_spans[0].spans[0].events; + assert_eq!(events.len(), 1); + + let ev = &events[0]; + assert_eq!(ev.time_unix_nano, 9_999_999_999); + assert_eq!(ev.name, "exception"); + + assert_eq!( + find_kv(&ev.attributes, "exception.message") + .unwrap() + .string_value + .as_deref(), + Some("divide by zero") + ); + assert_eq!( + find_kv(&ev.attributes, "exception.escaped") + .unwrap() + .bool_value, + Some(true) + ); + assert_eq!( + find_kv(&ev.attributes, "exception.count") + .unwrap() + .int_value, + Some(3) + ); + assert_eq!( + find_kv(&ev.attributes, "exception.duration") + .unwrap() + .double_value, + Some(0.5) + ); + + let array_val = find_kv(&ev.attributes, "exception.lines") + .unwrap() + .array_value + .as_ref() + .expect("array value"); + assert_eq!(array_val.values.len(), 2); + assert_eq!(array_val.values[0].string_value.as_deref(), Some("line 1")); + assert_eq!(array_val.values[1].string_value.as_deref(), Some("line 2")); + } + + #[test] + fn test_span_links_encoding() { + let span = SpanSlice { + name: "op", + span_links: vec![SpanLink { + trace_id: 0x0102030405060708, + trace_id_high: 0x090a0b0c0d0e0f10, + span_id: 0xaabbccdd11223344, + tracestate: "vendor=value", + flags: 0x00000300, + attributes: HashMap::from([("link.attr", "value")]), + }], + ..Default::default() + }; + + let decoded = decode(&to_vec(&[vec![span]])); + let links = &decoded.resource_spans[0].scope_spans[0].spans[0].links; + assert_eq!(links.len(), 1); + + let lk = &links[0]; + + let mut expected_trace_id = [0u8; 16]; + expected_trace_id[0..8].copy_from_slice(&0x090a0b0c0d0e0f10_u64.to_be_bytes()); + expected_trace_id[8..16].copy_from_slice(&0x0102030405060708_u64.to_be_bytes()); + assert_eq!(lk.trace_id, expected_trace_id.to_vec()); + + assert_eq!(lk.span_id, 0xaabbccdd11223344_u64.to_be_bytes().to_vec()); + assert_eq!(lk.trace_state, "vendor=value"); + assert_eq!(lk.flags, 0x00000300); + assert_eq!( + find_kv(&lk.attributes, "link.attr") + .unwrap() + .string_value + .as_deref(), + Some("value") + ); + } +}