From 513f0ca91f47e66d007e338f1c2e0464dc1305a7 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 5 Mar 2026 15:56:52 -0500 Subject: [PATCH 1/5] handle MSK events --- .../invocation/triggers/msk_event.rs | 47 ++++++++++++++++++- .../payloads/msk_event_with_headers.json | 23 +++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 bottlecap/tests/payloads/msk_event_with_headers.json diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 91bd85d17..d8e9dba36 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -21,6 +21,8 @@ pub struct MSKRecord { pub topic: String, pub partition: i32, pub timestamp: f64, + #[serde(default)] + pub headers: Vec>>, } impl Trigger for MSKEvent { @@ -105,7 +107,17 @@ impl Trigger for MSKEvent { } fn get_carrier(&self) -> HashMap { - HashMap::new() + let mut carrier = HashMap::new(); + if let Some(record) = self.records.values().find_map(|arr| arr.first()) { + for header_map in &record.headers { + for (key, value_bytes) in header_map { + if let Ok(value_str) = String::from_utf8(value_bytes.clone()) { + carrier.insert(key.to_lowercase(), value_str); + } + } + } + } + carrier } fn is_async(&self) -> bool { @@ -142,6 +154,7 @@ mod tests { topic: String::from("topic1"), partition: 0, timestamp: 1_745_846_213_022f64, + headers: vec![], }; let mut expected_records = HashMap::new(); expected_records.insert(String::from("topic1"), vec![record]); @@ -335,4 +348,36 @@ mod tests { "msk" // fallback value ); } + + #[test] + fn test_new_with_headers() { + let json = read_json_file("msk_event_with_headers.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent"); + + let record = result.records.values().find_map(|arr| arr.first()).unwrap(); + assert_eq!(record.topic, "topic1"); + assert_eq!(record.headers.len(), 3); + } + + #[test] + fn test_get_carrier_with_headers() { + let json = read_json_file("msk_event_with_headers.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent"); + let carrier = event.get_carrier(); + + assert_eq!( + carrier.get("x-datadog-trace-id").map(String::as_str), + Some("36979754430890456950") + ); + assert_eq!( + carrier.get("x-datadog-parent-id").map(String::as_str), + Some("7431398482019833808") + ); + assert_eq!( + carrier.get("x-datadog-sampling-priority").map(String::as_str), + Some("1") + ); + } } diff --git a/bottlecap/tests/payloads/msk_event_with_headers.json b/bottlecap/tests/payloads/msk_event_with_headers.json new file mode 100644 index 000000000..b5a6e6238 --- /dev/null +++ b/bottlecap/tests/payloads/msk_event_with_headers.json @@ -0,0 +1,23 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-2.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "topic1": [ + { + "topic": "topic1", + "partition": 0, + "offset": 101, + "timestamp": 1745846213022, + "timestampType":"CREATE_TIME", + "key": "b3JkZXJJZA==", + "value": "eyJvcmRlcklkIjoiMTIzNCIsImFtb3VudCI6MTAwLjAxfQ==", + "headers": [ + {"x-datadog-trace-id": [51, 54, 57, 55, 57, 55, 53, 52, 52, 51, 48, 56, 57, 48, 52, 53, 54, 57, 53, 48]}, + {"x-datadog-parent-id": [55, 52, 51, 49, 51, 57, 56, 52, 56, 50, 48, 49, 57, 56, 51, 51, 56, 48, 56]}, + {"x-datadog-sampling-priority": [49]} + ] + } + ] + } +} From fe2c84b99a7ed832441d060bd83fa976ef9f0eef Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:20:19 -0400 Subject: [PATCH 2/5] cargo fmt and merge main --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index d8e9dba36..cb5cf8796 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -376,7 +376,9 @@ mod tests { Some("7431398482019833808") ); assert_eq!( - carrier.get("x-datadog-sampling-priority").map(String::as_str), + carrier + .get("x-datadog-sampling-priority") + .map(String::as_str), Some("1") ); } From 1dbb7d6c8dc9939466f8797de8f1b9b627290b8e Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:32:35 -0400 Subject: [PATCH 3/5] fix --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index cb5cf8796..58ae9da97 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -355,7 +355,7 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent"); - let record = result.records.values().find_map(|arr| arr.first()).unwrap(); + let record = result.records.values().find_map(|arr| arr.first()).expect("Expected at least one record"); assert_eq!(record.topic, "topic1"); assert_eq!(record.headers.len(), 3); } From 4683724ea818605ad5e472fadcbbc18035733676 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:33:38 -0400 Subject: [PATCH 4/5] cargo fmt --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 58ae9da97..723a60bbb 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -355,7 +355,11 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent"); - let record = result.records.values().find_map(|arr| arr.first()).expect("Expected at least one record"); + let record = result + .records + .values() + .find_map(|arr| arr.first()) + .expect("Expected at least one record"); assert_eq!(record.topic, "topic1"); assert_eq!(record.headers.len(), 3); } From 22b24a23b5e5c2979147192b277122757ccbcbff Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 11 Mar 2026 13:44:58 -0400 Subject: [PATCH 5/5] fix critical vulnerability --- bottlecap/Cargo.lock | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 37517a6ae..cb3b02ab8 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -383,7 +383,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools 0.11.0", "log", "prettyplease", "proc-macro2", @@ -1640,15 +1640,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.14.0" @@ -2585,9 +2576,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ "bytes", "getrandom 0.3.4",