Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/vrl_decoder_inject_metadata.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `vrl` codec now supports an `inject_metadata` option. When set to `true`, sources can inject per-request metadata into the VRL program before it executes, making source-specific context readable via `%`-prefixed paths (e.g. `%exec.host`, `%exec.command`, `%vector.secrets.*`). The `exec` source is the first to support this. VRL-produced metadata always takes priority over injected values on collision.

authors: thomasqueirozb
16 changes: 15 additions & 1 deletion lib/codecs/src/decoding/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use bytes::{Bytes, BytesMut};
use smallvec::SmallVec;
use vector_common::internal_event::emit;
use vector_core::{config::LogNamespace, event::Event};
use vector_core::{
config::LogNamespace,
event::{Event, EventMetadata},
};

use crate::{
decoding::format::Deserializer as _,
Expand Down Expand Up @@ -53,6 +56,17 @@ impl Decoder {
self
}

/// Attaches a per-decode-call metadata template to the inner deserializer.
///
/// For deserializers that support it (currently only `VrlDeserializer`) the
/// template is pre-populated on the synthetic event before any user program
/// executes, making every `%`-prefixed path readable (e.g. `%splunk_hec.host`,
/// `%vector.secrets.*`). For all other deserializers this is a no-op.
pub fn with_metadata_template(mut self, metadata: EventMetadata) -> Self {
self.deserializer = self.deserializer.with_metadata_template(metadata);
self
}

/// Handles the framing result and parses it into a structured event, if
/// possible.
///
Expand Down
120 changes: 115 additions & 5 deletions lib/codecs/src/decoding/format/vrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use vrl::{
value::Kind,
};

use crate::{BytesDeserializerConfig, decoding::format::Deserializer};
use vector_core::event::EventMetadata;

use crate::decoding::format::Deserializer;

/// Config used to build a `VrlDeserializer`.
#[configurable_component]
Expand Down Expand Up @@ -45,6 +47,16 @@ pub struct VrlDeserializerOptions {
#[serde(default)]
#[configurable(metadata(docs::advanced))]
pub timezone: Option<TimeZone>,

/// When `true`, the source may inject per-request metadata into the VRL
/// runtime before the program executes. Injected metadata is accessible
/// via `%`-prefixed paths (e.g. `%exec.host`, `%vector.secrets.*`).
///
/// Each source controls which metadata it injects; see the source
/// documentation for details. If the source does not support metadata
/// injection, this option has no effect.
#[serde(default)]
pub inject_metadata: bool,
}

impl VrlDeserializerConfig {
Expand All @@ -64,6 +76,8 @@ impl VrlDeserializerConfig {
Ok(result) => Ok(VrlDeserializer {
program: result.program,
timezone: self.vrl.timezone.unwrap_or(TimeZone::Local),
inject_metadata_enabled: self.vrl.inject_metadata,
metadata_template: None,
}),
Err(diagnostics) => Err(Formatter::new(&self.vrl.source, diagnostics)
.to_string()
Expand Down Expand Up @@ -94,9 +108,34 @@ impl VrlDeserializerConfig {
pub struct VrlDeserializer {
program: Program,
timezone: TimeZone,
/// Whether this deserializer accepts a metadata template from its source.
/// Set from [`VrlDeserializerOptions::inject_metadata`] at build time.
inject_metadata_enabled: bool,
/// Per-call metadata template. Only populated when `inject_metadata_enabled`
/// is true and the source calls [`VrlDeserializer::with_metadata_template`].
metadata_template: Option<EventMetadata>,
}

impl VrlDeserializer {
/// Attach a metadata template that will be pre-populated on each synthetic
/// event before the VRL program runs. This is a no-op unless
/// `inject_metadata: true` was set in the VRL decoder config.
///
/// Sources call this once per request/frame with the metadata they have
/// assembled (e.g. envelope fields, auth tokens). VRL can then read those
/// values via `%`-prefixed paths such as `%exec.host` or
/// `%vector.secrets.*`.
#[must_use]
pub fn with_metadata_template(mut self, metadata: EventMetadata) -> Self {
if self.inject_metadata_enabled {
self.metadata_template = Some(metadata);
}
self
}
}

fn parse_bytes(bytes: Bytes, log_namespace: LogNamespace) -> Event {
use crate::BytesDeserializerConfig;
let bytes_deserializer = BytesDeserializerConfig::new().build();
let log_event = bytes_deserializer.parse_single(bytes, log_namespace);
Event::from(log_event)
Expand All @@ -108,11 +147,14 @@ impl Deserializer for VrlDeserializer {
bytes: Bytes,
log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
let event = parse_bytes(bytes, log_namespace);
match self.run_vrl(event, log_namespace) {
Ok(events) => Ok(events),
Err(e) => Err(e),
let mut event = parse_bytes(bytes, log_namespace);
if let Some(template) = &self.metadata_template {
// Pre-populate the synthetic event with the source-assembled metadata so
// every `%`-prefixed path is in scope when VRL executes. This lets
// user programs read `%splunk_hec.host`, `%vector.secrets.*`, etc.
*event.metadata_mut() = template.clone();
}
self.run_vrl(event, log_namespace)
}
}

Expand Down Expand Up @@ -148,6 +190,19 @@ mod tests {
vrl: VrlDeserializerOptions {
source: source.to_string(),
timezone: None,
inject_metadata: false,
},
}
.build()
.expect("Failed to build VrlDeserializer")
}

fn make_decoder_with_inject_metadata(source: &str) -> VrlDeserializer {
VrlDeserializerConfig {
vrl: VrlDeserializerOptions {
source: source.to_string(),
timezone: None,
inject_metadata: true,
},
}
.build()
Expand Down Expand Up @@ -302,6 +357,7 @@ mod tests {
vrl: VrlDeserializerOptions {
source: ". ?".to_string(),
timezone: None,
inject_metadata: false,
},
}
.build()
Expand All @@ -320,4 +376,58 @@ mod tests {
.to_string();
assert!(error.contains("aborted"));
}

// Tests for `with_metadata_template` —————————————————————————————————————

fn metadata_with_secret(key: &str, value: &str) -> EventMetadata {
let mut metadata = EventMetadata::default();
metadata.secrets_mut().insert(key, value);
metadata
}

/// A VRL program that uses `get_secret!()` can read a secret injected via
/// `with_metadata_template`.
#[test]
fn test_with_metadata_template_vrl_can_read_secret() {
// VRL program copies the injected secret into an event field so we can
// assert on its value. The input bytes become `.message` (Legacy namespace)
// and we add `.secret_value` alongside it.
let decoder =
make_decoder_with_inject_metadata(r#".secret_value = get_secret!("my_token")"#)
.with_metadata_template(metadata_with_secret("my_token", "super-secret"));

let bytes = Bytes::from(r#"hello"#);
let events = decoder
.parse(bytes, LogNamespace::Legacy)
.expect("parse should succeed");

assert_eq!(events.len(), 1);
assert_eq!(
*events[0].as_log().get("secret_value").unwrap(),
Value::from("super-secret")
);
}

/// Secrets explicitly set by the VRL program win over the template because
/// `set_secret!` runs after the template is pre-populated.
#[test]
fn test_with_metadata_template_codec_wins_on_collision() {
let decoder = make_decoder_with_inject_metadata(r#"set_secret!("my_token", "codec-wins")"#)
.with_metadata_template(metadata_with_secret("my_token", "template-loses"));

let bytes = Bytes::from(r#"hello"#);
let events = decoder
.parse(bytes, LogNamespace::Legacy)
.expect("parse should succeed");

assert_eq!(
events[0]
.metadata()
.secrets()
.get("my_token")
.unwrap()
.as_ref(),
"codec-wins"
);
}
}
20 changes: 19 additions & 1 deletion lib/codecs/src/decoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use smallvec::SmallVec;
use vector_config::configurable_component;
use vector_core::{
config::{DataType, LogNamespace},
event::Event,
event::{Event, EventMetadata},
schema,
};

Expand Down Expand Up @@ -426,6 +426,12 @@ impl DeserializerConfig {
}
}

/// Returns `true` if this is a VRL deserializer with `inject_metadata: true`.
/// Sources use this to decide whether to call `Decoder::with_metadata_template`.
pub fn inject_metadata_enabled(&self) -> bool {
matches!(self, DeserializerConfig::Vrl(c) if c.vrl.inject_metadata)
}

/// Return the type of event build by this deserializer.
pub fn output_type(&self) -> DataType {
match self {
Expand Down Expand Up @@ -542,6 +548,18 @@ pub enum Deserializer {
Vrl(VrlDeserializer),
}

impl Deserializer {
/// Attaches a metadata template to the inner deserializer, if it supports
/// one. Currently only [`VrlDeserializer`] uses this; for all other variants
/// this is a no-op and `self` is returned unchanged.
pub fn with_metadata_template(self, metadata: EventMetadata) -> Self {
match self {
Deserializer::Vrl(d) => Deserializer::Vrl(d.with_metadata_template(metadata)),
other => other,
}
}
}

impl format::Deserializer for Deserializer {
fn parse(
&self,
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use finalization::{
Finalizable,
};
pub use log_event::LogEvent;
pub use metadata::{DatadogMetricOriginMetadata, EventMetadata, WithMetadata};
pub use metadata::{DatadogMetricOriginMetadata, EventMetadata, Secrets, WithMetadata};
pub use metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind};
pub use r#ref::{EventMutRef, EventRef};
use serde::{Deserialize, Serialize};
Expand Down
65 changes: 49 additions & 16 deletions src/sources/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use vector_lib::{
},
config::{LegacyKey, LogNamespace, log_schema},
configurable::configurable_component,
event::EventMetadata,
internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
lookup::{owned_value_path, path},
lookup::{metadata_path, owned_value_path, path},
};
use vrl::{path::OwnedValuePath, value::Kind};

Expand Down Expand Up @@ -254,7 +255,24 @@ impl SourceConfig for ExecConfig {
.framing
.clone()
.unwrap_or_else(|| self.decoding.default_stream_framing());
let decoder = DecodingConfig::new(framing, self.decoding.clone(), log_namespace).build()?;
let mut decoder =
DecodingConfig::new(framing, self.decoding.clone(), log_namespace).build()?;

// If the VRL decoder has `inject_metadata: true`, build a metadata
// template with per-source context (hostname, command) so VRL programs
// can read `%exec.host`, `%exec.command`, etc. during decoding.
// `with_metadata_template` is a no-op for non-VRL deserializers and for
// VRL deserializers with `inject_metadata: false`.
let mut source_metadata = EventMetadata::default();
if let Some(ref hostname) = hostname {
source_metadata
.value_mut()
.insert("exec.host", hostname.clone());
}
source_metadata
.value_mut()
.insert("exec.command", self.command.clone());
decoder = decoder.with_metadata_template(source_metadata);

match &self.mode {
Mode::Scheduled => {
Expand Down Expand Up @@ -519,8 +537,9 @@ async fn run_command(
byte_size: events.estimated_json_encoded_size_of(),
});

let vrl_inject_metadata = config.decoding.inject_metadata_enabled();
for event in &mut events {
handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace);
handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace, vrl_inject_metadata);
}
if (out.send_batch(events).await).is_err() {
emit!(StreamClosedError { count });
Expand Down Expand Up @@ -669,6 +688,7 @@ fn handle_event(
pid: Option<u32>,
event: &mut Event,
log_namespace: LogNamespace,
vrl_inject_metadata: bool,
) {
if let Event::Log(log) = event {
log_namespace.insert_standard_vector_source_metadata(log, ExecConfig::NAME, Utc::now());
Expand All @@ -695,25 +715,38 @@ fn handle_event(
);
}

// Add hostname (if needed)
// Add hostname (if needed). When the VRL decoder has inject_metadata enabled,
// use try_insert for the vector metadata path so any value the VRL program
// wrote to %exec.host is not overwritten here.
if let Some(hostname) = hostname {
if vrl_inject_metadata && matches!(log_namespace, LogNamespace::Vector) {
log.try_insert(metadata_path!(ExecConfig::NAME, "host"), hostname.clone());
} else {
log_namespace.insert_source_metadata(
ExecConfig::NAME,
log,
log_schema().host_key().map(LegacyKey::InsertIfEmpty),
path!("host"),
hostname.clone(),
);
}
}

// Add command. Same try_insert guard as hostname above.
if vrl_inject_metadata && matches!(log_namespace, LogNamespace::Vector) {
log.try_insert(
metadata_path!(ExecConfig::NAME, COMMAND_KEY),
config.command.clone(),
);
} else {
log_namespace.insert_source_metadata(
ExecConfig::NAME,
log,
log_schema().host_key().map(LegacyKey::InsertIfEmpty),
path!("host"),
hostname.clone(),
Some(LegacyKey::InsertIfEmpty(path!(COMMAND_KEY))),
path!(COMMAND_KEY),
config.command.clone(),
);
}

// Add command
log_namespace.insert_source_metadata(
ExecConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(COMMAND_KEY))),
path!(COMMAND_KEY),
config.command.clone(),
);
}
}

Expand Down
Loading
Loading