diff --git a/CHANGELOG.md b/CHANGELOG.md
index 97b59e2..39f8dcb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,7 +5,82 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
-## [1.2.1] - unreleased
+## [2.0.0] - unreleased
+
+### Security advisory
+
+- **Be cautious when piping raw `_msg` into `email_body_html`.** The example
+ config switched `body: "{{ _msg }}"` in v1.2.0 (#26 fix), and operators may
+ reasonably mirror that in `email_body_html`. The email notifier marks `body`
+ as `safe` (pre-escaped HTML) before injection into the email envelope, so a
+ log line containing raw HTML or `"});
- let result = engine.render("email_alert", &fields, "test_rule").unwrap();
+ let result = engine
+ .render("email_alert", &fields, "test_rule", "vlprod")
+ .unwrap();
let email_body_html = result.email_body_html.unwrap();
// HTML should be escaped
@@ -756,7 +795,9 @@ mod tests {
let engine = TemplateEngine::new(templates);
let fields = json!({"nginx.http.request_id": "abc"});
- let result = engine.render("alert", &fields, "test_rule").unwrap();
+ let result = engine
+ .render("alert", &fields, "test_rule", "vlprod")
+ .unwrap();
assert_eq!(result.title, "abc");
assert_eq!(result.body, "id=abc");
}
@@ -784,7 +825,9 @@ mod tests {
"nginx.http.status_code": "400"
});
- let result = engine.render("my_template", &fields, "test_rule").unwrap();
+ let result = engine
+ .render("my_template", &fields, "test_rule", "vlprod")
+ .unwrap();
assert_eq!(result.title, "T");
assert_eq!(result.body, "B");
let email_body_html = result.email_body_html.unwrap();
@@ -814,7 +857,7 @@ mod tests {
let engine = TemplateEngine::new(templates);
let fields = json!({"host": "server-01"});
- let result = engine.render("alert", &fields, "VM_OFF").unwrap();
+ let result = engine.render("alert", &fields, "VM_OFF", "vlprod").unwrap();
assert_eq!(result.title, "Alert VM_OFF");
}
@@ -832,7 +875,7 @@ mod tests {
let engine = TemplateEngine::new(templates);
let fields = json!({"host": "server-01"});
- let result = engine.render("alert", &fields, "VM_OFF").unwrap();
+ let result = engine.render("alert", &fields, "VM_OFF", "vlprod").unwrap();
assert_eq!(
result.body,
"rule=VM_OFF\nhost=server-01\nrule_again=VM_OFF"
@@ -854,13 +897,93 @@ mod tests {
let engine = TemplateEngine::new(templates);
let fields = json!({"host": "server-01"});
- let result = engine.render("email_alert", &fields, "VM_OFF").unwrap();
+ let result = engine
+ .render("email_alert", &fields, "VM_OFF", "vlprod")
+ .unwrap();
assert_eq!(
result.email_body_html.unwrap(),
"
Rule: VM_OFF on server-01
"
);
}
+ // ===================================================================
+ // v2.0.0: vl_source available in layer 1 templates (title, body,
+ // email_body_html), matching the layer 2 notifier-level contexts and
+ // the throttle key render context.
+ // ===================================================================
+
+ #[test]
+ fn render_injects_vl_source_in_title() {
+ let mut templates = HashMap::new();
+ templates.insert(
+ "alert".to_string(),
+ make_template("[{{ vl_source }}] {{ rule_name }}", "body"),
+ );
+
+ let engine = TemplateEngine::new(templates);
+ let fields = json!({"host": "server-01"});
+
+ let result = engine.render("alert", &fields, "VM_OFF", "vlprod").unwrap();
+ assert_eq!(result.title, "[vlprod] VM_OFF");
+ }
+
+ #[test]
+ fn render_injects_vl_source_in_body() {
+ let mut templates = HashMap::new();
+ templates.insert(
+ "alert".to_string(),
+ make_template("title", "source={{ vl_source }}\nhost={{ host }}"),
+ );
+
+ let engine = TemplateEngine::new(templates);
+ let fields = json!({"host": "server-01"});
+
+ let result = engine.render("alert", &fields, "VM_OFF", "vldev").unwrap();
+ assert_eq!(result.body, "source=vldev\nhost=server-01");
+ }
+
+ #[test]
+ fn render_injects_vl_source_in_email_body_html() {
+ let mut templates = HashMap::new();
+ templates.insert(
+ "email_alert".to_string(),
+ make_template_with_email_body_html(
+ "t",
+ "b",
+ "Source: {{ vl_source }}, host: {{ host }}
",
+ ),
+ );
+
+ let engine = TemplateEngine::new(templates);
+ let fields = json!({"host": "server-01"});
+
+ let result = engine
+ .render("email_alert", &fields, "VM_OFF", "vlprod")
+ .unwrap();
+ assert_eq!(
+ result.email_body_html.unwrap(),
+ "Source: vlprod, host: server-01
"
+ );
+ }
+
+ #[test]
+ fn render_vl_source_synthetic_overrides_event_field() {
+ // Collision policy: synthetic vl_source wins over any event field
+ // literally named "vl_source" (matches rule_name collision policy).
+ let mut templates = HashMap::new();
+ templates.insert(
+ "alert".to_string(),
+ make_template("{{ vl_source }}", "{{ vl_source }}"),
+ );
+
+ let engine = TemplateEngine::new(templates);
+ let fields = json!({"vl_source": "evil", "host": "server-01"});
+
+ let result = engine.render("alert", &fields, "VM_OFF", "vlprod").unwrap();
+ assert_eq!(result.title, "vlprod");
+ assert_eq!(result.body, "vlprod");
+ }
+
#[test]
fn render_rule_name_synthetic_overrides_event_field() {
// Collision policy: synthetic rule_name wins over any event field
@@ -874,7 +997,7 @@ mod tests {
let engine = TemplateEngine::new(templates);
let fields = json!({"rule_name": "event-value", "host": "server-01"});
- let result = engine.render("alert", &fields, "VM_OFF").unwrap();
+ let result = engine.render("alert", &fields, "VM_OFF", "vlprod").unwrap();
assert_eq!(result.title, "VM_OFF");
assert_eq!(result.body, "VM_OFF");
}
@@ -891,7 +1014,7 @@ mod tests {
let fields = json!({"host": "server-01"});
let result = engine
- .render("mattermost_alert", &fields, "test_rule")
+ .render("mattermost_alert", &fields, "test_rule", "vlprod")
.unwrap();
assert!(
diff --git a/src/throttle.rs b/src/throttle.rs
index c9260e5..21bbd05 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -65,6 +65,9 @@ pub struct Throttler {
max_count: u32,
/// Rule name for logging and metrics (Arc to avoid cloning).
rule_name: Arc,
+ /// VL source name bound to this throttler (per-task). Threaded into every
+ /// rendered key so the `(rule, source)` bucket is isolated by default.
+ vl_source: Arc,
/// Pre-created Jinja environment for template rendering (H1 fix).
jinja_env: Environment<'static>,
}
@@ -76,14 +79,11 @@ impl Throttler {
///
/// * `config` - Optional throttle configuration. If None, creates a pass-through throttler.
/// * `rule_name` - Name of the rule for logging and metrics.
- ///
- /// # Example
- ///
- /// ```ignore
- /// let throttler = Throttler::new(Some(&compiled_throttle), "my_rule");
- /// ```
- pub fn new(config: Option<&CompiledThrottle>, rule_name: &str) -> Self {
- Self::with_capacity(config, rule_name, DEFAULT_MAX_CAPACITY)
+ /// * `vl_source` - VL source name bound to the task (injected into render
+ /// context so `{{ vl_source }}` works in `throttle.key` and the default
+ /// key is per-source by construction).
+ pub fn new(config: Option<&CompiledThrottle>, rule_name: &str, vl_source: &str) -> Self {
+ Self::with_capacity(config, rule_name, vl_source, DEFAULT_MAX_CAPACITY)
}
/// Create a new Throttler with custom max capacity (for testing).
@@ -92,10 +92,12 @@ impl Throttler {
///
/// * `config` - Optional throttle configuration.
/// * `rule_name` - Name of the rule for logging and metrics.
+ /// * `vl_source` - VL source name bound to this task's throttler.
/// * `max_capacity` - Maximum number of keys in the cache (FR25).
pub fn with_capacity(
config: Option<&CompiledThrottle>,
rule_name: &str,
+ vl_source: &str,
max_capacity: u64,
) -> Self {
let (key_template, max_count, window) = match config {
@@ -108,12 +110,14 @@ impl Throttler {
if t.count == 0 {
tracing::warn!(
rule_name = %rule_name,
+ vl_source = %vl_source,
"Throttle count is 0, all alerts after first will be throttled"
);
}
if t.window.is_zero() {
tracing::warn!(
rule_name = %rule_name,
+ vl_source = %vl_source,
"Throttle window is 0, entries will expire immediately"
);
}
@@ -133,6 +137,7 @@ impl Throttler {
key_template,
max_count,
rule_name: Arc::from(rule_name),
+ vl_source: Arc::from(vl_source),
jinja_env,
}
}
@@ -161,14 +166,18 @@ impl Throttler {
"Throttle count updated"
);
- // L1: Pre-convert rule_name to String once for metrics (required for 'static)
+ // L1: Pre-convert rule_name and vl_source to String once for metrics
+ // (required for 'static label storage).
let rule_name_str = self.rule_name.to_string();
+ let vl_source_str = self.vl_source.to_string();
if count <= self.max_count {
- // M3: Increment metric for passed alerts
+ // M3: Increment metric for passed alerts (gains `vl_source` for
+ // multi-source observability — v2.0.0 part 2).
metrics::counter!(
"valerter_alerts_passed_total",
- "rule_name" => rule_name_str
+ "rule_name" => rule_name_str,
+ "vl_source" => vl_source_str,
)
.increment(1);
@@ -177,16 +186,19 @@ impl Throttler {
// Log at DEBUG level (throttling is normal behavior)
tracing::debug!(
rule_name = %self.rule_name,
+ vl_source = %self.vl_source,
throttle_key = %key,
count = count,
max_count = self.max_count,
"Alert throttled"
);
- // Increment metric (FR23, FR24)
+ // Increment metric (FR23, FR24); gains `vl_source` to disambiguate
+ // throttle hot-spots per source.
metrics::counter!(
"valerter_alerts_throttled_total",
- "rule_name" => rule_name_str
+ "rule_name" => rule_name_str,
+ "vl_source" => vl_source_str,
)
.increment(1);
@@ -196,26 +208,27 @@ impl Throttler {
/// Render the throttle key from template and fields.
///
- /// If no template is configured, returns a global key for the rule.
- /// If template rendering fails, logs a warning and returns a fallback key.
+ /// If no template is configured, returns the per-source default key
+ /// `"{rule}-{source}:global"` so multi-source deployments see isolated
+ /// throttle buckets without any config. If rendering fails, logs a
+ /// warning and returns a fallback key.
fn render_key(&self, fields: &Value) -> String {
match &self.key_template {
Some(template) => {
- // Inject synthetic `rule_name` (issue #31) so users can write
- // `{{ rule_name }}` in throttle.key. Synthetic wins over any
- // event field with the same name, matching layer 1/2 template
+ // Inject synthetic `rule_name` (issue #31) and `vl_source`
+ // (multi-source v2.0.0). Synthetic values win over any event
+ // field with the same name, matching layer 1/2 template
// behavior.
- let enriched_ctx = enrich_with_rule_name(fields, &self.rule_name);
- // H1 fix: Use pre-created jinja_env instead of creating new one
+ let enriched_ctx = enrich_with_context(fields, &self.rule_name, &self.vl_source);
match self.jinja_env.render_str(template, &enriched_ctx) {
Ok(key) => {
tracing::trace!(rendered_key = %key, "Throttle key rendered");
key
}
Err(e) => {
- // Template error - log and use fallback
tracing::warn!(
rule_name = %self.rule_name,
+ vl_source = %self.vl_source,
template = %template,
error = %e,
"Failed to render throttle key, using fallback"
@@ -225,8 +238,11 @@ impl Throttler {
}
}
None => {
- // No template = global throttle for the rule
- format!("{}:global", self.rule_name)
+ // Default key is per-(rule, source) so buckets are isolated
+ // per-source by construction. Equivalent to rendering
+ // `"{{ rule_name }}-{{ vl_source }}:global"` via Jinja, but
+ // inlined to avoid the render round-trip on every call.
+ format!("{}-{}:global", self.rule_name, self.vl_source)
}
}
}
@@ -242,20 +258,25 @@ impl Throttler {
}
/// Unflatten dotted event keys (issue #25) then inject the synthetic
-/// `rule_name` key (issue #31). Matches the layer 1 template rendering path
-/// so users can reference both dotted event fields (`{{ nginx.http.status }}`)
-/// and `{{ rule_name }}` inside a `throttle.key` template consistently.
+/// `rule_name` (issue #31) and `vl_source` (v2.0.0 multi-source) keys.
+/// Matches the layer 1 template rendering path so users can reference both
+/// dotted event fields (`{{ nginx.http.status }}`) and the synthetic keys
+/// inside a `throttle.key` template consistently.
///
/// Returns the original value unchanged if it is not a JSON object (should
-/// not happen in practice, VL events are always objects). The synthetic
-/// value wins over any event field literally named `rule_name`.
-fn enrich_with_rule_name(fields: &Value, rule_name: &str) -> Value {
+/// not happen in practice, VL events are always objects). Synthetic values
+/// win over any event field literally named `rule_name` or `vl_source`.
+fn enrich_with_context(fields: &Value, rule_name: &str, vl_source: &str) -> Value {
let mut ctx = crate::parser::unflatten_dotted_keys(fields);
if let Some(obj) = ctx.as_object_mut() {
obj.insert(
"rule_name".to_string(),
Value::String(rule_name.to_string()),
);
+ obj.insert(
+ "vl_source".to_string(),
+ Value::String(vl_source.to_string()),
+ );
}
ctx
}
@@ -291,7 +312,7 @@ mod tests {
#[test]
fn render_key_with_simple_template() {
let config = make_config(Some("{{ host }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let fields = json!({"host": "SW-01", "port": "Gi0/1"});
let key = throttler.render_key(&fields);
@@ -306,7 +327,7 @@ mod tests {
#[test]
fn render_key_with_composite_template() {
let config = make_config(Some("{{ host }}-{{ port }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let fields = json!({"host": "SW-01", "port": "Gi0/1"});
let key = throttler.render_key(&fields);
@@ -321,7 +342,7 @@ mod tests {
#[test]
fn render_key_with_missing_field_returns_empty_value() {
let config = make_config(Some("{{ host }}-{{ missing }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let fields = json!({"host": "SW-01"});
let key = throttler.render_key(&fields);
@@ -337,7 +358,7 @@ mod tests {
#[test]
fn first_alert_passes() {
let config = make_config(Some("{{ host }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let fields = json!({"host": "SW-01"});
let result = throttler.check(&fields);
@@ -352,7 +373,7 @@ mod tests {
#[test]
fn alerts_up_to_count_pass() {
let config = make_config(Some("{{ host }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let fields = json!({"host": "SW-01"});
@@ -369,7 +390,7 @@ mod tests {
#[test]
fn alert_after_count_is_throttled() {
let config = make_config(Some("{{ host }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let fields = json!({"host": "SW-01"});
@@ -395,7 +416,7 @@ mod tests {
count: 2,
window: Duration::from_millis(100), // Very short for testing
};
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let fields = json!({"host": "SW-01"});
@@ -424,7 +445,7 @@ mod tests {
let config = make_config(Some("{{ key }}"), 2, 3600);
// Use with_capacity to set a small max (5 keys)
- let throttler = Throttler::with_capacity(Some(&config), "test_rule", 5);
+ let throttler = Throttler::with_capacity(Some(&config), "test_rule", "vlprod", 5);
// Fill cache with 5 different keys, each gets 2 alerts (at max)
for i in 0..5 {
@@ -469,27 +490,44 @@ mod tests {
#[test]
fn no_key_template_uses_global_key() {
let config = make_config(None, 2, 60);
- let throttler = Throttler::new(Some(&config), "my_rule");
+ let throttler = Throttler::new(Some(&config), "my_rule", "vlprod");
let fields1 = json!({"host": "SW-01"});
let fields2 = json!({"host": "SW-02"});
- // Both should use the same global key "my_rule:global"
+ // Both should use the same per-source default key
+ // "my_rule-vlprod:global" - cross-host but not cross-source.
assert_eq!(throttler.check(&fields1), ThrottleResult::Pass);
assert_eq!(throttler.check(&fields2), ThrottleResult::Pass);
- // Third from either should be throttled (same global key)
+ // Third from either should be throttled (same default key)
assert_eq!(throttler.check(&fields1), ThrottleResult::Throttled);
}
#[test]
- fn global_key_format() {
+ fn default_key_format_is_rule_dash_source_global() {
+ // Spec: default throttle key is `{rule}-{source}:global`.
+ // Used to be `{rule}:global` (pre-v2.0.0); breaking change for
+ // multi-source deployments and locks per-source bucket isolation.
let config = make_config(None, 2, 60);
- let throttler = Throttler::new(Some(&config), "my_rule");
+ let throttler = Throttler::new(Some(&config), "my_rule", "vlprod");
let fields = json!({});
let key = throttler.render_key(&fields);
- assert_eq!(key, "my_rule:global");
+ assert_eq!(key, "my_rule-vlprod:global");
+ }
+
+ #[test]
+ fn default_key_isolates_buckets_per_source() {
+ // Two throttlers with the same rule but different sources must
+ // produce different default keys, so buckets are isolated per-source.
+ let config = make_config(None, 2, 60);
+ let throttler_a = Throttler::new(Some(&config), "VM_OFF", "vlprod");
+ let throttler_b = Throttler::new(Some(&config), "VM_OFF", "vldev");
+
+ let fields = json!({});
+ assert_eq!(throttler_a.render_key(&fields), "VM_OFF-vlprod:global");
+ assert_eq!(throttler_b.render_key(&fields), "VM_OFF-vldev:global");
}
// ===================================================================
@@ -499,7 +537,7 @@ mod tests {
#[test]
fn render_key_with_nested_fields() {
let config = make_config(Some("{{ data.server.name }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let fields = json!({
"data": {
@@ -520,7 +558,7 @@ mod tests {
#[test]
fn different_keys_are_throttled_independently() {
let config = make_config(Some("{{ host }}"), 2, 60);
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let sw01 = json!({"host": "SW-01"});
let sw02 = json!({"host": "SW-02"});
@@ -538,7 +576,7 @@ mod tests {
#[test]
fn no_config_passes_all() {
- let throttler = Throttler::new(None, "test_rule");
+ let throttler = Throttler::new(None, "test_rule", "vlprod");
let fields = json!({"host": "SW-01"});
@@ -551,7 +589,7 @@ mod tests {
#[test]
fn reset_clears_all_entries() {
let config = make_config(Some("{{ host }}"), 2, 60);
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let fields = json!({"host": "SW-01"});
@@ -570,7 +608,7 @@ mod tests {
#[test]
fn debug_format_shows_useful_info() {
let config = make_config(Some("{{ host }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let debug = format!("{:?}", throttler);
@@ -590,7 +628,7 @@ mod tests {
#[test]
fn render_key_includes_rule_name() {
let config = make_config(Some("{{ rule_name }}-{{ host }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "VM_OFF");
+ let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod");
let fields = json!({"host": "SW-01"});
let key = throttler.render_key(&fields);
@@ -604,7 +642,7 @@ mod tests {
// tened the same way template rendering does, so `{{ nginx.http.status }}`
// works here too (not just in `title`/`body`).
let config = make_config(Some("{{ rule_name }}-{{ nginx.http.status_code }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "VM_OFF");
+ let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod");
let fields = json!({"nginx.http.status_code": "404"});
let key = throttler.render_key(&fields);
@@ -617,7 +655,7 @@ mod tests {
// Collision policy: synthetic rule_name wins over any event field
// literally named "rule_name".
let config = make_config(Some("{{ rule_name }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "VM_OFF");
+ let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod");
let fields = json!({"rule_name": "event-value", "host": "SW-01"});
let key = throttler.render_key(&fields);
@@ -625,11 +663,50 @@ mod tests {
assert_eq!(key, "VM_OFF");
}
+ // ===================================================================
+ // v2.0.0: vl_source injected into throttle key render context
+ // ===================================================================
+
+ #[test]
+ fn render_key_includes_vl_source() {
+ let config = make_config(Some("{{ rule_name }}-{{ vl_source }}"), 3, 60);
+ let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod");
+
+ let fields = json!({"host": "SW-01"});
+ let key = throttler.render_key(&fields);
+
+ assert_eq!(key, "VM_OFF-vlprod");
+ }
+
+ #[test]
+ fn render_key_vl_source_synthetic_overrides_event_field() {
+ // Collision policy: synthetic vl_source wins over any event field
+ // literally named "vl_source" (matches rule_name collision policy).
+ let config = make_config(Some("{{ vl_source }}"), 3, 60);
+ let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod");
+
+ let fields = json!({"vl_source": "evil", "host": "SW-01"});
+ let key = throttler.render_key(&fields);
+
+ assert_eq!(key, "vlprod");
+ }
+
+ #[test]
+ fn render_key_custom_template_with_both_synthetics() {
+ let config = make_config(Some("{{ rule_name }}-{{ vl_source }}-{{ host }}"), 3, 60);
+ let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod");
+
+ let fields = json!({"host": "SW-01"});
+ let key = throttler.render_key(&fields);
+
+ assert_eq!(key, "VM_OFF-vlprod-SW-01");
+ }
+
#[test]
fn template_error_uses_fallback_key() {
// Invalid template syntax that minijinja can't render
let config = make_config(Some("{{ nonexistent_filter | bad_filter }}"), 3, 60);
- let throttler = Throttler::new(Some(&config), "test_rule");
+ let throttler = Throttler::new(Some(&config), "test_rule", "vlprod");
let fields = json!({"host": "SW-01"});
let key = throttler.render_key(&fields);
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
new file mode 100644
index 0000000..38be906
--- /dev/null
+++ b/tests/common/mod.rs
@@ -0,0 +1,12 @@
+//! Shared helpers for integration tests.
+//!
+//! Each file under `tests/` compiles as its own crate, so shared helpers live
+//! here and are pulled in via `mod common;` from the consuming test file.
+//!
+//! The `#[allow(dead_code)]` below is intentional: not every consumer uses
+//! every sub-module, and Cargo's per-test-crate compilation would otherwise
+//! flag unused items.
+
+#![allow(dead_code)]
+
+pub mod vl_events;
diff --git a/tests/common/vl_events.rs b/tests/common/vl_events.rs
new file mode 100644
index 0000000..1186837
--- /dev/null
+++ b/tests/common/vl_events.rs
@@ -0,0 +1,182 @@
+//! Loader for the versioned VictoriaLogs event fixture corpus.
+//!
+//! Fixtures live under `tests/fixtures/vl_events/*.json`, one anonymised
+//! one-line JSON event per file. `index.yaml` in the same directory is the
+//! authoritative manifest: it documents each fixture (description,
+//! `source_system`, `tags`, `prevents_regression_for`).
+//!
+//! The manifest is parsed once per test crate via `OnceLock`.
+//!
+//! # Panics
+//!
+//! Helpers panic aggressively on misuse (missing fixture file, fixture not
+//! listed in the index, malformed manifest). The assumption is that these
+//! are programmer errors in tests, not runtime failures to recover from.
+//! The consistency test `vl_fixtures_consistency.rs` catches them in CI.
+//!
+//! # Examples
+//!
+//! ```ignore
+//! mod common;
+//! use common::vl_events::{load_fixture, load_fixtures_by_tag};
+//!
+//! let event = load_fixture("nginx_http_400.json");
+//! assert_eq!(event["_stream"].as_str().is_some(), true);
+//!
+//! let dotted = load_fixtures_by_tag("dotted_keys");
+//! assert!(!dotted.is_empty());
+//! ```
+
+use serde::Deserialize;
+use serde_json::Value;
+use std::collections::BTreeMap;
+use std::path::{Path, PathBuf};
+use std::sync::OnceLock;
+
+/// Relative path (from crate root) to the fixture directory.
+const FIXTURES_DIR: &str = "tests/fixtures/vl_events";
+
+/// Filename of the manifest inside `FIXTURES_DIR`.
+const INDEX_FILENAME: &str = "index.yaml";
+
+/// One manifest entry as declared in `index.yaml`.
+#[derive(Debug, Clone, Deserialize)]
+pub struct FixtureEntry {
+ pub description: String,
+ pub source_system: String,
+ #[serde(default)]
+ pub tags: Vec,
+ #[serde(default)]
+ pub prevents_regression_for: Vec,
+}
+
+/// Root of the manifest document.
+#[derive(Debug, Clone, Deserialize)]
+struct Manifest {
+ fixtures: BTreeMap,
+}
+
+/// Cache: manifest parsed once per test crate.
+static MANIFEST: OnceLock> = OnceLock::new();
+
+/// Absolute path to the fixture directory.
+fn fixtures_dir() -> PathBuf {
+ let manifest_dir = env!("CARGO_MANIFEST_DIR");
+ Path::new(manifest_dir).join(FIXTURES_DIR)
+}
+
+/// Load and cache the manifest.
+fn manifest() -> &'static BTreeMap {
+ MANIFEST.get_or_init(|| {
+ let path = fixtures_dir().join(INDEX_FILENAME);
+ let raw = std::fs::read_to_string(&path).unwrap_or_else(|e| {
+ panic!("failed to read fixture manifest {}: {}", path.display(), e)
+ });
+ let parsed: Manifest = serde_yaml::from_str(&raw).unwrap_or_else(|e| {
+ panic!("failed to parse fixture manifest {}: {}", path.display(), e)
+ });
+ parsed.fixtures
+ })
+}
+
+/// Return the full manifest (for consistency tests).
+pub fn manifest_entries() -> &'static BTreeMap {
+ manifest()
+}
+
+/// Read and parse a fixture by stem (e.g. `"nginx_http_400"`) or by full
+/// filename (e.g. `"nginx_http_400.json"`). The `.json` suffix is optional.
+///
+/// # Panics
+///
+/// Panics if the fixture is not listed in `index.yaml`, if the file is
+/// missing, or if it is not valid JSON.
+pub fn load_fixture(name: &str) -> Value {
+ let filename = if name.ends_with(".json") {
+ name.to_string()
+ } else {
+ format!("{}.json", name)
+ };
+ if !manifest().contains_key(&filename) {
+ panic!(
+ "fixture '{}' is not declared in {}/{}. Add it to the manifest \
+ or pick an existing name.",
+ name, FIXTURES_DIR, INDEX_FILENAME
+ );
+ }
+ load_fixture_from_disk(&filename)
+}
+
+/// Read and parse a fixture file from disk without checking the manifest.
+///
+/// Used internally to catch the "listed in index but missing on disk" case
+/// with a clearer message than a plain IO error.
+fn load_fixture_from_disk(name: &str) -> Value {
+ let path = fixtures_dir().join(name);
+ let raw = std::fs::read_to_string(&path).unwrap_or_else(|e| {
+ panic!(
+ "fixture '{}' declared in index but missing on disk ({}): {}",
+ name,
+ path.display(),
+ e
+ )
+ });
+ serde_json::from_str(&raw)
+ .unwrap_or_else(|e| panic!("fixture '{}' is not valid JSON: {}", name, e))
+}
+
+/// Return every fixture listed in the manifest as `(name, value)`.
+///
+/// Ordering is stable (manifest keys are a `BTreeMap`).
+pub fn all_fixtures() -> Vec<(String, Value)> {
+ manifest()
+ .keys()
+ .map(|name| (name.clone(), load_fixture_from_disk(name)))
+ .collect()
+}
+
+/// Return all fixtures whose manifest entry has `tag` in its `tags` list.
+///
+/// Returns an empty `Vec` for an unknown tag; callers that expect a
+/// non-empty result should assert it themselves.
+pub fn load_fixtures_by_tag(tag: &str) -> Vec<(String, Value)> {
+ manifest()
+ .iter()
+ .filter(|(_, entry)| entry.tags.iter().any(|t| t == tag))
+ .map(|(name, _)| (name.clone(), load_fixture_from_disk(name)))
+ .collect()
+}
+
+/// Return all fixtures whose manifest entry has `source_system == system`.
+///
+/// Empty `Vec` for unknown systems; callers assert the expected size.
+pub fn load_fixtures_by_source_system(system: &str) -> Vec<(String, Value)> {
+ manifest()
+ .iter()
+ .filter(|(_, entry)| entry.source_system == system)
+ .map(|(name, _)| (name.clone(), load_fixture_from_disk(name)))
+ .collect()
+}
+
+/// Return the set of filenames physically present in `FIXTURES_DIR`,
+/// excluding the manifest itself. Used by the consistency test.
+pub fn filesystem_fixtures() -> Vec {
+ let dir = fixtures_dir();
+ let entries = std::fs::read_dir(&dir)
+ .unwrap_or_else(|e| panic!("failed to read fixture directory {}: {}", dir.display(), e));
+ let mut names: Vec = entries
+ .filter_map(|e| e.ok())
+ .filter_map(|entry| {
+ let file_name = entry.file_name().to_string_lossy().to_string();
+ if file_name == INDEX_FILENAME {
+ None
+ } else if entry.path().extension().and_then(|s| s.to_str()) == Some("json") {
+ Some(file_name)
+ } else {
+ None
+ }
+ })
+ .collect();
+ names.sort();
+ names
+}
diff --git a/tests/fixtures/config_disabled_invalid.yaml b/tests/fixtures/config_disabled_invalid.yaml
index b0f146b..7f23240 100644
--- a/tests/fixtures/config_disabled_invalid.yaml
+++ b/tests/fixtures/config_disabled_invalid.yaml
@@ -1,7 +1,8 @@
# Configuration fixture with disabled rule containing invalid regex
# Validation should STILL fail even for disabled rules (AD-11)
victorialogs:
- url: "http://victorialogs:9428"
+ default:
+ url: "http://victorialogs:9428"
notifiers:
test:
diff --git a/tests/fixtures/config_email_missing_email_body_html.yaml b/tests/fixtures/config_email_missing_email_body_html.yaml
index 3a73135..2accd97 100644
--- a/tests/fixtures/config_email_missing_email_body_html.yaml
+++ b/tests/fixtures/config_email_missing_email_body_html.yaml
@@ -2,7 +2,8 @@
# This should fail validation at startup (AC7)
victorialogs:
- url: "http://victorialogs:9428"
+ default:
+ url: "http://victorialogs:9428"
defaults:
throttle:
diff --git a/tests/fixtures/config_invalid_basic_auth.yaml b/tests/fixtures/config_invalid_basic_auth.yaml
index 29c74fe..79de6a1 100644
--- a/tests/fixtures/config_invalid_basic_auth.yaml
+++ b/tests/fixtures/config_invalid_basic_auth.yaml
@@ -1,9 +1,10 @@
# Configuration fixture with incomplete Basic Auth (missing password)
victorialogs:
- url: "https://victorialogs.secure.local:9428"
- basic_auth:
- username: "testuser"
- # password is missing - this should fail to parse
+ default:
+ url: "https://victorialogs.secure.local:9428"
+ basic_auth:
+ username: "testuser"
+ # password is missing - this should fail to parse
defaults:
throttle:
diff --git a/tests/fixtures/config_invalid_notifier_type.yaml b/tests/fixtures/config_invalid_notifier_type.yaml
index 67ebce2..08f6946 100644
--- a/tests/fixtures/config_invalid_notifier_type.yaml
+++ b/tests/fixtures/config_invalid_notifier_type.yaml
@@ -1,6 +1,7 @@
# Configuration fixture with invalid notifier type (Story 6.2 - AC4)
victorialogs:
- url: "http://victorialogs:9428"
+ default:
+ url: "http://victorialogs:9428"
defaults:
throttle:
diff --git a/tests/fixtures/config_invalid_regex.yaml b/tests/fixtures/config_invalid_regex.yaml
index 161d11b..22c4b24 100644
--- a/tests/fixtures/config_invalid_regex.yaml
+++ b/tests/fixtures/config_invalid_regex.yaml
@@ -1,6 +1,7 @@
# Configuration fixture with invalid regex pattern for testing fail-fast validation
victorialogs:
- url: "http://victorialogs:9428"
+ default:
+ url: "http://victorialogs:9428"
notifiers:
test:
diff --git a/tests/fixtures/config_invalid_template.yaml b/tests/fixtures/config_invalid_template.yaml
index 9728f1c..f511146 100644
--- a/tests/fixtures/config_invalid_template.yaml
+++ b/tests/fixtures/config_invalid_template.yaml
@@ -1,6 +1,7 @@
# Configuration fixture with invalid Jinja template for testing fail-fast validation
victorialogs:
- url: "http://victorialogs:9428"
+ default:
+ url: "http://victorialogs:9428"
notifiers:
test:
diff --git a/tests/fixtures/config_minimal.yaml b/tests/fixtures/config_minimal.yaml
index b0e714f..2be185e 100644
--- a/tests/fixtures/config_minimal.yaml
+++ b/tests/fixtures/config_minimal.yaml
@@ -1,6 +1,7 @@
# Minimal valid config - matches README example
victorialogs:
- url: "http://localhost:9428"
+ default:
+ url: "http://localhost:9428"
notifiers:
mattermost-ops:
diff --git a/tests/fixtures/config_no_notifier.yaml b/tests/fixtures/config_no_notifier.yaml
index 40cb944..bcc1b72 100644
--- a/tests/fixtures/config_no_notifier.yaml
+++ b/tests/fixtures/config_no_notifier.yaml
@@ -1,7 +1,8 @@
# Invalid config - no notifiers configured
# This should fail validation with "no notifiers configured" error
victorialogs:
- url: "http://localhost:9428"
+ default:
+ url: "http://localhost:9428"
defaults:
throttle:
diff --git a/tests/fixtures/config_no_template.yaml b/tests/fixtures/config_no_template.yaml
index 7c1a841..da952dc 100644
--- a/tests/fixtures/config_no_template.yaml
+++ b/tests/fixtures/config_no_template.yaml
@@ -1,7 +1,8 @@
# Invalid config - no templates defined
# This should fail validation with "no templates defined" error
victorialogs:
- url: "http://localhost:9428"
+ default:
+ url: "http://localhost:9428"
notifiers:
test:
diff --git a/tests/fixtures/config_valid.yaml b/tests/fixtures/config_valid.yaml
index f9e50da..c24c573 100644
--- a/tests/fixtures/config_valid.yaml
+++ b/tests/fixtures/config_valid.yaml
@@ -1,6 +1,7 @@
# Valid configuration fixture for tests
victorialogs:
- url: "http://victorialogs:9428"
+ default:
+ url: "http://victorialogs:9428"
notifiers:
default-mattermost:
diff --git a/tests/fixtures/config_with_auth.yaml b/tests/fixtures/config_with_auth.yaml
index 0176fac..b124158 100644
--- a/tests/fixtures/config_with_auth.yaml
+++ b/tests/fixtures/config_with_auth.yaml
@@ -2,15 +2,16 @@
# NOTE: This is a TEST FIXTURE only - credentials are intentionally hardcoded.
# In production, always use environment variables: ${VL_USER}, ${VL_PASS}, etc.
victorialogs:
- url: "https://victorialogs.secure.local:9428"
- basic_auth:
- username: "testuser"
- password: "testpassword"
- headers:
- X-API-Key: "secret-api-key-12345"
- X-Custom-Header: "custom-value"
- tls:
- verify: false
+ default:
+ url: "https://victorialogs.secure.local:9428"
+ basic_auth:
+ username: "testuser"
+ password: "testpassword"
+ headers:
+ X-API-Key: "secret-api-key-12345"
+ X-Custom-Header: "custom-value"
+ tls:
+ verify: false
notifiers:
test-notifier:
diff --git a/tests/fixtures/config_with_notifiers.yaml b/tests/fixtures/config_with_notifiers.yaml
index e12825f..c248d15 100644
--- a/tests/fixtures/config_with_notifiers.yaml
+++ b/tests/fixtures/config_with_notifiers.yaml
@@ -1,6 +1,7 @@
# Configuration fixture with notifiers section (Story 6.2)
victorialogs:
- url: "http://victorialogs:9428"
+ default:
+ url: "http://victorialogs:9428"
defaults:
throttle:
diff --git a/tests/fixtures/multi-file-collision/config.yaml b/tests/fixtures/multi-file-collision/config.yaml
index 88f7a65..cc73fe4 100644
--- a/tests/fixtures/multi-file-collision/config.yaml
+++ b/tests/fixtures/multi-file-collision/config.yaml
@@ -1,7 +1,8 @@
# Config with a rule that will collide with one in rules.d/
victorialogs:
- url: http://victorialogs:9428
+ default:
+ url: http://victorialogs:9428
notifiers:
test:
diff --git a/tests/fixtures/multi-file-cross-ref/config.yaml b/tests/fixtures/multi-file-cross-ref/config.yaml
index ce78aa1..cca4373 100644
--- a/tests/fixtures/multi-file-cross-ref/config.yaml
+++ b/tests/fixtures/multi-file-cross-ref/config.yaml
@@ -1,7 +1,8 @@
# Config for testing cross-file references
victorialogs:
- url: http://victorialogs:9428
+ default:
+ url: http://victorialogs:9428
notifiers:
test:
diff --git a/tests/fixtures/multi-file-empty/config.yaml b/tests/fixtures/multi-file-empty/config.yaml
index ebd599a..952fb46 100644
--- a/tests/fixtures/multi-file-empty/config.yaml
+++ b/tests/fixtures/multi-file-empty/config.yaml
@@ -1,7 +1,8 @@
# Config with empty .d/ directories (should work fine)
victorialogs:
- url: http://victorialogs:9428
+ default:
+ url: http://victorialogs:9428
notifiers:
test:
diff --git a/tests/fixtures/multi-file-intra-collision/config.yaml b/tests/fixtures/multi-file-intra-collision/config.yaml
index ecdde8a..7ad2bd6 100644
--- a/tests/fixtures/multi-file-intra-collision/config.yaml
+++ b/tests/fixtures/multi-file-intra-collision/config.yaml
@@ -1,7 +1,8 @@
# Config for testing collision within .d/ directory
victorialogs:
- url: http://victorialogs:9428
+ default:
+ url: http://victorialogs:9428
notifiers:
test:
diff --git a/tests/fixtures/multi-file-invalid/config.yaml b/tests/fixtures/multi-file-invalid/config.yaml
index efe4f00..4df2cd1 100644
--- a/tests/fixtures/multi-file-invalid/config.yaml
+++ b/tests/fixtures/multi-file-invalid/config.yaml
@@ -1,7 +1,8 @@
# Valid config, but rules.d/ contains invalid YAML
victorialogs:
- url: http://victorialogs:9428
+ default:
+ url: http://victorialogs:9428
defaults:
throttle:
diff --git a/tests/fixtures/multi-file-notifier-collision/config.yaml b/tests/fixtures/multi-file-notifier-collision/config.yaml
index c75f71b..64a7f98 100644
--- a/tests/fixtures/multi-file-notifier-collision/config.yaml
+++ b/tests/fixtures/multi-file-notifier-collision/config.yaml
@@ -1,5 +1,6 @@
victorialogs:
- url: http://victorialogs:9428
+ default:
+ url: http://victorialogs:9428
defaults:
throttle:
diff --git a/tests/fixtures/multi-file-only-d/config.yaml b/tests/fixtures/multi-file-only-d/config.yaml
index 4304f87..5182365 100644
--- a/tests/fixtures/multi-file-only-d/config.yaml
+++ b/tests/fixtures/multi-file-only-d/config.yaml
@@ -2,7 +2,8 @@
# Everything loaded from .d/ directories
victorialogs:
- url: http://victorialogs:9428
+ default:
+ url: http://victorialogs:9428
defaults:
throttle:
diff --git a/tests/fixtures/multi-file-template-collision/config.yaml b/tests/fixtures/multi-file-template-collision/config.yaml
index 97cb76c..971177d 100644
--- a/tests/fixtures/multi-file-template-collision/config.yaml
+++ b/tests/fixtures/multi-file-template-collision/config.yaml
@@ -1,5 +1,6 @@
victorialogs:
- url: http://victorialogs:9428
+ default:
+ url: http://victorialogs:9428
notifiers:
test:
diff --git a/tests/fixtures/multi-file/config.yaml b/tests/fixtures/multi-file/config.yaml
index 277187a..34c64c4 100644
--- a/tests/fixtures/multi-file/config.yaml
+++ b/tests/fixtures/multi-file/config.yaml
@@ -1,7 +1,8 @@
# Base config with inline rules/templates/notifiers for multi-file merge testing
victorialogs:
- url: http://victorialogs:9428
+ default:
+ url: http://victorialogs:9428
defaults:
throttle:
diff --git a/tests/fixtures/vl_events/audit_event.json b/tests/fixtures/vl_events/audit_event.json
new file mode 100644
index 0000000..f472c70
--- /dev/null
+++ b/tests/fixtures/vl_events/audit_event.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T11:10:08.770Z","_stream":"{group=\"audit\",host=\"app-host-07\"}","_stream_id":"00000000000000005566778899aabbcc","_msg":"USER_LOGIN pid=4422 uid=0 auid=1001 ses=3 msg='op=login id=1001 exe=\"/usr/sbin/sshd\" hostname=? addr=198.51.100.77 terminal=ssh res=success'","type":"USER_LOGIN","auid":"1001","uid":"0","res":"success","host":"app-host-07"}
diff --git a/tests/fixtures/vl_events/cisco_sw_critical.json b/tests/fixtures/vl_events/cisco_sw_critical.json
new file mode 100644
index 0000000..e6f1ccf
--- /dev/null
+++ b/tests/fixtures/vl_events/cisco_sw_critical.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T10:20:17.000Z","_stream":"{group=\"network\",source=\"syslog\"}","_stream_id":"0000000000000000b2c3d4e5f6071122","_msg":"<187>Apr 15 10:20:17 switch-sw-01 %LINK-3-UPDOWN: Interface GigabitEthernet0/24, changed state to down","host":"switch-sw-01","facility":"local7","severity":"critical","appname":"cisco-ios"}
diff --git a/tests/fixtures/vl_events/docker_container_exit.json b/tests/fixtures/vl_events/docker_container_exit.json
new file mode 100644
index 0000000..0f24592
--- /dev/null
+++ b/tests/fixtures/vl_events/docker_container_exit.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T11:15:44.229Z","_stream":"{group=\"docker\",host=\"app-host-08\"}","_stream_id":"000000000000000066778899aabbccdd","_msg":"container sidecar-logger exited with code 1","container_id":"9f8e7d6c5b4a3e2f1d0c9b8a7654321fedcba","container_name":"sidecar-logger","image":"ghcr.io/example/sidecar-logger:1.2.3","exit_code":"1","host":"app-host-08"}
diff --git a/tests/fixtures/vl_events/edge_dotted_keys_literal.json b/tests/fixtures/vl_events/edge_dotted_keys_literal.json
new file mode 100644
index 0000000..8a5bc76
--- /dev/null
+++ b/tests/fixtures/vl_events/edge_dotted_keys_literal.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T11:05:12.333Z","_stream":"{group=\"app\",host=\"app-host-06\"}","_stream_id":"0000000000000000445566778899aabb","_msg":"GET /healthz -> 200","nginx.http.method":"GET","nginx.http.path":"/healthz","nginx.http.status":"200","nginx.http.request_id":"req-0f1e2d3c4b5a","nginx.http.remote_addr":"203.0.113.9","host":"app-host-06"}
diff --git a/tests/fixtures/vl_events/edge_empty_fields.json b/tests/fixtures/vl_events/edge_empty_fields.json
new file mode 100644
index 0000000..84ee27e
--- /dev/null
+++ b/tests/fixtures/vl_events/edge_empty_fields.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T11:00:00.000Z","_stream":"{group=\"app\",host=\"app-host-05\"}","_stream_id":"00000000000000003344556677889900","_msg":"request completed","host":"app-host-05","request_id":"","user_id":"","session":"","error":""}
diff --git a/tests/fixtures/vl_events/edge_unicode_msg.json b/tests/fixtures/vl_events/edge_unicode_msg.json
new file mode 100644
index 0000000..34d1e0d
--- /dev/null
+++ b/tests/fixtures/vl_events/edge_unicode_msg.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T10:55:41.671Z","_stream":"{group=\"app\",host=\"app-host-04\"}","_stream_id":"000000000000000022334455667788aa","_msg":"\u2705 \u652f\u4ed8\u5931\u8d25 payment failed for user \u5c71\u7530\u592a\u90ce","host":"app-host-04","app":"checkout","locale":"ja_JP.UTF-8"}
diff --git a/tests/fixtures/vl_events/index.yaml b/tests/fixtures/vl_events/index.yaml
new file mode 100644
index 0000000..c9c7a2d
--- /dev/null
+++ b/tests/fixtures/vl_events/index.yaml
@@ -0,0 +1,107 @@
+# Manifest for VictoriaLogs event fixtures.
+#
+# Each entry documents one file under this directory. The entry key MUST match
+# the filename on disk exactly; the consistency test (`tests/vl_fixtures_consistency.rs`)
+# enforces the invariant that index.yaml and the filesystem are in sync.
+#
+# Schema per entry:
+# description: free-form sentence describing what this event represents.
+# source_system: one of nginx | cisco | k8s | windows | journald | otel | raw | docker | audit | syslog | edge_case
+# tags: list of free-form strings. Consistent vocabulary:
+# dotted_keys, flat_keys, regex_extract, pascal_case, multiline_msg,
+# unicode, http, syslog, empty_fields, embedded_json, minimal, labels.
+# prevents_regression_for: list of issue references (e.g. "#25"). Empty list is valid.
+#
+# All data is anonymised. Hostnames are fake (app-host-NN, switch-sw-01, k8s-node-01).
+# IPs use RFC 5737 ranges (192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24).
+# No real tokens, trace ids, or tenants.
+fixtures:
+ nginx_http_400.json:
+ description: "nginx reverse proxy 400 response with vector-forwarded flat-dotted keys"
+ source_system: nginx
+ tags: [dotted_keys, flat_keys, http]
+ prevents_regression_for: ["#25"]
+
+ nginx_http_500.json:
+ description: "nginx 500 upstream failure, same flat-dotted shape as 400 case"
+ source_system: nginx
+ tags: [dotted_keys, flat_keys, http]
+ prevents_regression_for: ["#25"]
+
+ cisco_sw_critical.json:
+ description: "Cisco IOS syslog link-down event with regex-extractable switch name in _msg"
+ source_system: cisco
+ tags: [regex_extract, syslog]
+ prevents_regression_for: []
+
+ k8s_pod_oom.json:
+ description: "Kubernetes container OOMKilled event with kubernetes.* labels as flat-dotted keys"
+ source_system: k8s
+ tags: [dotted_keys, flat_keys, labels]
+ prevents_regression_for: []
+
+ windows_security_logoff.json:
+ description: "Windows Security Event 4634 (account logoff) with PascalCase keys and escaped multiline _msg"
+ source_system: windows
+ tags: [pascal_case, multiline_msg]
+ prevents_regression_for: []
+
+ journald_cron.json:
+ description: "journald CRON execution with _SYSTEMD_* / _TRANSPORT fields"
+ source_system: journald
+ tags: [flat_keys, syslog]
+ prevents_regression_for: []
+
+ otel_span.json:
+ description: "OpenTelemetry server span with trace_id, span_id and dotted service.* / http.* attributes"
+ source_system: otel
+ tags: [dotted_keys, flat_keys]
+ prevents_regression_for: []
+
+ raw_message_only.json:
+ description: "Minimal event: only _time, _stream, _stream_id and _msg, no structured fields"
+ source_system: raw
+ tags: [minimal]
+ prevents_regression_for: []
+
+ json_embedded_msg.json:
+ description: "_msg contains an embedded JSON payload, verbatim (not pre-parsed by the ingester)"
+ source_system: raw
+ tags: [embedded_json]
+ prevents_regression_for: []
+
+ edge_unicode_msg.json:
+ description: "_msg containing CJK characters and an emoji to exercise UTF-8 handling"
+ source_system: edge_case
+ tags: [unicode]
+ prevents_regression_for: []
+
+ edge_empty_fields.json:
+ description: "Several structured fields present but empty string, to exercise empty-guard paths"
+ source_system: edge_case
+ tags: [empty_fields]
+ prevents_regression_for: ["#26"]
+
+ edge_dotted_keys_literal.json:
+ description: "Exact shape from #25 with literal dotted keys nginx.http.* preserved top-level"
+ source_system: edge_case
+ tags: [dotted_keys, flat_keys, http]
+ prevents_regression_for: ["#25"]
+
+ audit_event.json:
+ description: "Linux auditd USER_LOGIN record with embedded key=value payload inside _msg"
+ source_system: audit
+ tags: [regex_extract, flat_keys]
+ prevents_regression_for: []
+
+ docker_container_exit.json:
+ description: "Docker engine event for a sidecar container exiting with non-zero status"
+ source_system: docker
+ tags: [flat_keys]
+ prevents_regression_for: []
+
+ syslog_raw_daemon.json:
+ description: "Raw RFC3164 syslog line from chronyd forwarded via a syslog collector"
+ source_system: syslog
+ tags: [syslog, regex_extract]
+ prevents_regression_for: []
diff --git a/tests/fixtures/vl_events/journald_cron.json b/tests/fixtures/vl_events/journald_cron.json
new file mode 100644
index 0000000..c1380eb
--- /dev/null
+++ b/tests/fixtures/vl_events/journald_cron.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T10:35:00.112Z","_stream":"{group=\"systemd\",host=\"app-host-01\"}","_stream_id":"0000000000000000e5f6071122334455","_msg":"(root) CMD (test -x /usr/sbin/anacron || ( cd / && run-parts --report /etc/cron.hourly ))","_SYSTEMD_UNIT":"cron.service","_SYSTEMD_SLICE":"system.slice","_TRANSPORT":"journald","_HOSTNAME":"app-host-01","_PID":"2481","_UID":"0","SYSLOG_IDENTIFIER":"CRON","PRIORITY":"6"}
diff --git a/tests/fixtures/vl_events/json_embedded_msg.json b/tests/fixtures/vl_events/json_embedded_msg.json
new file mode 100644
index 0000000..43ec67b
--- /dev/null
+++ b/tests/fixtures/vl_events/json_embedded_msg.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T10:50:55.818Z","_stream":"{group=\"app\",host=\"app-host-03\"}","_stream_id":"0000000000000000112233445566778a","_msg":"{\"level\":\"error\",\"event\":\"payment_declined\",\"user_id\":\"u-00012\",\"amount\":42.50,\"reason\":\"insufficient_funds\"}","host":"app-host-03","app":"billing"}
diff --git a/tests/fixtures/vl_events/k8s_pod_oom.json b/tests/fixtures/vl_events/k8s_pod_oom.json
new file mode 100644
index 0000000..023c1b9
--- /dev/null
+++ b/tests/fixtures/vl_events/k8s_pod_oom.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T10:25:44.877Z","_stream":"{group=\"k8s\",namespace=\"payments\",pod=\"worker-7c9b\"}","_stream_id":"0000000000000000c3d4e5f607112233","_msg":"Container worker terminated (OOMKilled), exit code 137","kubernetes.namespace_name":"payments","kubernetes.pod_name":"worker-7c9b4d5f-abc12","kubernetes.container_name":"worker","kubernetes.node_name":"k8s-node-01","kubernetes.labels.app":"worker","kubernetes.labels.version":"1.8.3","reason":"OOMKilled","exit_code":"137"}
diff --git a/tests/fixtures/vl_events/nginx_http_400.json b/tests/fixtures/vl_events/nginx_http_400.json
new file mode 100644
index 0000000..809c763
--- /dev/null
+++ b/tests/fixtures/vl_events/nginx_http_400.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T10:12:33.421Z","_stream":"{group=\"web\",host=\"app-host-01\"}","_stream_id":"0000000000000000a1b2c3d4e5f60011","_msg":"400 Bad Request from 192.0.2.17","nginx.http.method":"POST","nginx.http.status":"400","nginx.http.request_id":"req-7f9c2a1b8d4e","nginx.http.path":"/api/v1/events","nginx.http.remote_addr":"192.0.2.17","nginx.http.user_agent":"curl/8.6.0","host":"app-host-01"}
diff --git a/tests/fixtures/vl_events/nginx_http_500.json b/tests/fixtures/vl_events/nginx_http_500.json
new file mode 100644
index 0000000..b7c8de2
--- /dev/null
+++ b/tests/fixtures/vl_events/nginx_http_500.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T10:14:02.108Z","_stream":"{group=\"web\",host=\"app-host-02\"}","_stream_id":"0000000000000000a1b2c3d4e5f60022","_msg":"500 Internal Server Error while proxying to upstream","nginx.http.method":"GET","nginx.http.status":"500","nginx.http.request_id":"req-1122aabbccdd","nginx.http.path":"/api/v1/render","nginx.http.remote_addr":"198.51.100.42","nginx.http.upstream":"backend-api:8080","host":"app-host-02"}
diff --git a/tests/fixtures/vl_events/otel_span.json b/tests/fixtures/vl_events/otel_span.json
new file mode 100644
index 0000000..63cb75a
--- /dev/null
+++ b/tests/fixtures/vl_events/otel_span.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T10:40:22.340Z","_stream":"{group=\"otel\",service=\"checkout\"}","_stream_id":"0000000000000000f607112233445566","_msg":"span completed: POST /api/checkout","trace_id":"5e8f3c1d92a94fb8b7d6a1e2c3f40517","span_id":"9e3a7b4d15f82c60","parent_span_id":"7a3f8d2e1c4b5a6f","service.name":"checkout","service.version":"2.4.1","http.method":"POST","http.status_code":"200","span.kind":"server","duration_ns":"48210000"}
diff --git a/tests/fixtures/vl_events/raw_message_only.json b/tests/fixtures/vl_events/raw_message_only.json
new file mode 100644
index 0000000..be09415
--- /dev/null
+++ b/tests/fixtures/vl_events/raw_message_only.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T10:45:11.002Z","_stream":"{group=\"misc\"}","_stream_id":"00000000000000000711223344556677","_msg":"disk space usage at 87% on /var"}
diff --git a/tests/fixtures/vl_events/syslog_raw_daemon.json b/tests/fixtures/vl_events/syslog_raw_daemon.json
new file mode 100644
index 0000000..1a5970a
--- /dev/null
+++ b/tests/fixtures/vl_events/syslog_raw_daemon.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T11:20:30.444Z","_stream":"{group=\"syslog\",host=\"app-host-09\"}","_stream_id":"00000000000000007788899aabbccdde","_msg":"<30>Apr 15 11:20:30 app-host-09 chronyd[812]: Selected source 203.0.113.123 (pool.example.test)","facility":"daemon","severity":"info","appname":"chronyd","host":"app-host-09"}
diff --git a/tests/fixtures/vl_events/windows_security_logoff.json b/tests/fixtures/vl_events/windows_security_logoff.json
new file mode 100644
index 0000000..9350067
--- /dev/null
+++ b/tests/fixtures/vl_events/windows_security_logoff.json
@@ -0,0 +1 @@
+{"_time":"2026-04-15T10:31:09.550Z","_stream":"{group=\"windows\",host=\"WIN-APP-01\"}","_stream_id":"0000000000000000d4e5f60711223344","_msg":"An account was logged off.\r\n\r\nSubject:\r\n\tSecurity ID:\t\tS-1-5-21-0-0-0-1001\r\n\tAccount Name:\t\ttestuser\r\n\tAccount Domain:\t\tEXAMPLE\r\n\tLogon ID:\t\t0x3E7","EventID":"4634","Channel":"Security","Computer":"WIN-APP-01.example.test","Provider":"Microsoft-Windows-Security-Auditing","LogonType":"3","TargetUserName":"testuser","TargetDomainName":"EXAMPLE"}
diff --git a/tests/integration_notify.rs b/tests/integration_notify.rs
index d28900d..2aefb26 100644
--- a/tests/integration_notify.rs
+++ b/tests/integration_notify.rs
@@ -27,6 +27,7 @@ fn make_payload_with_destinations(rule_name: &str, destinations: Vec) ->
accent_color: Some("#ff0000".to_string()),
},
rule_name: rule_name.to_string(),
+ vl_source: "vlprod".to_string(),
destinations,
log_timestamp: "2026-01-15T10:49:35.799Z".to_string(),
log_timestamp_formatted: "15/01/2026 10:49:35 UTC".to_string(),
@@ -207,7 +208,7 @@ async fn test_mattermost_payload_format() {
"fallback": "Alert from format_rule",
"title": "Alert from format_rule",
"text": "Test body content",
- "footer": "valerter | format_rule | 15/01/2026 10:49:35 UTC"
+ "footer": "valerter | format_rule | vlprod | 15/01/2026 10:49:35 UTC"
}]
})))
.respond_with(ResponseTemplate::new(200))
diff --git a/tests/integration_streaming.rs b/tests/integration_streaming.rs
index 5da5fb3..4ae8d27 100644
--- a/tests/integration_streaming.rs
+++ b/tests/integration_streaming.rs
@@ -53,7 +53,10 @@ async fn test_streaming_basic_single_line() {
let config = create_config(&mock_server, "_stream:test");
let mut client = TailClient::new(config).unwrap();
- let lines = client.connect_and_receive("test_rule").await.unwrap();
+ let lines = client
+ .connect_and_receive("test_rule", "default")
+ .await
+ .unwrap();
assert_eq!(lines.len(), 1);
assert!(lines[0].contains("test log"));
@@ -80,7 +83,10 @@ async fn test_streaming_multiple_lines() {
let config = create_config(&mock_server, "_stream:multi");
let mut client = TailClient::new(config).unwrap();
- let lines = client.connect_and_receive("test_rule").await.unwrap();
+ let lines = client
+ .connect_and_receive("test_rule", "default")
+ .await
+ .unwrap();
assert_eq!(lines.len(), 3);
assert!(lines[0].contains("log 1"));
@@ -101,7 +107,10 @@ async fn test_streaming_empty_response() {
let config = create_config(&mock_server, "_stream:empty");
let mut client = TailClient::new(config).unwrap();
- let lines = client.connect_and_receive("test_rule").await.unwrap();
+ let lines = client
+ .connect_and_receive("test_rule", "default")
+ .await
+ .unwrap();
assert!(lines.is_empty());
}
@@ -123,7 +132,7 @@ async fn test_connection_error_http_500() {
let config = create_config(&mock_server, "_stream:error");
let mut client = TailClient::new(config).unwrap();
- let result = client.connect_and_receive("test_rule").await;
+ let result = client.connect_and_receive("test_rule", "default").await;
assert!(result.is_err());
match result {
@@ -147,7 +156,7 @@ async fn test_connection_error_http_404() {
let config = create_config(&mock_server, "_stream:notfound");
let mut client = TailClient::new(config).unwrap();
- let result = client.connect_and_receive("test_rule").await;
+ let result = client.connect_and_receive("test_rule", "default").await;
assert!(result.is_err());
match result {
@@ -171,7 +180,7 @@ async fn test_connection_error_http_503() {
let config = create_config(&mock_server, "_stream:unavailable");
let mut client = TailClient::new(config).unwrap();
- let result = client.connect_and_receive("test_rule").await;
+ let result = client.connect_and_receive("test_rule", "default").await;
assert!(result.is_err());
match result {
@@ -196,7 +205,7 @@ async fn test_connection_error_server_down() {
let mut client = TailClient::new(config).unwrap();
- let result = client.connect_and_receive("test_rule").await;
+ let result = client.connect_and_receive("test_rule", "default").await;
assert!(result.is_err());
match result {
@@ -231,7 +240,7 @@ async fn test_timeout_detection() {
let mut client = TailClient::new(config).unwrap();
// This won't actually timeout since delay is short, but tests the path
- let result = client.connect_and_receive("test_rule").await;
+ let result = client.connect_and_receive("test_rule", "default").await;
// Should succeed (data received before timeout)
// The buffer will hold "incomplete" without newline
@@ -287,7 +296,7 @@ async fn test_url_construction_is_correct() {
};
let mut client = TailClient::new(config).unwrap();
- let _ = client.connect_and_receive("test_rule").await;
+ let _ = client.connect_and_receive("test_rule", "default").await;
// If we get here without panic, the URL matched
}
@@ -315,7 +324,7 @@ async fn test_url_with_start_param() {
};
let mut client = TailClient::new(config).unwrap();
- let _ = client.connect_and_receive("test_rule").await;
+ let _ = client.connect_and_receive("test_rule", "default").await;
}
// =============================================================================
@@ -338,7 +347,7 @@ async fn test_headers_are_set_correctly() {
let config = create_config(&mock_server, "_stream:headers");
let mut client = TailClient::new(config).unwrap();
- let _ = client.connect_and_receive("test_rule").await;
+ let _ = client.connect_and_receive("test_rule", "default").await;
}
// =============================================================================
@@ -365,7 +374,10 @@ async fn test_streaming_with_utf8_content() {
let config = create_config(&mock_server, "_stream:utf8");
let mut client = TailClient::new(config).unwrap();
- let lines = client.connect_and_receive("test_rule").await.unwrap();
+ let lines = client
+ .connect_and_receive("test_rule", "default")
+ .await
+ .unwrap();
assert_eq!(lines.len(), 2);
assert!(lines[0].contains("Café"));
@@ -395,7 +407,7 @@ impl TestReconnectCallback {
}
impl ReconnectCallback for TestReconnectCallback {
- fn on_reconnect(&self, _rule_name: &str) {
+ fn on_reconnect(&self, _rule_name: &str, _vl_source: &str) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
@@ -423,7 +435,7 @@ async fn test_stream_with_reconnect_receives_lines() {
// Use tokio::time::timeout to prevent infinite loop
let result = tokio::time::timeout(Duration::from_millis(500), async {
client
- .stream_with_reconnect("test_rule", None, |line| {
+ .stream_with_reconnect("test_rule", "default", None, |line| {
let lines = Arc::clone(&lines_clone);
async move {
lines.lock().unwrap().push(line);
@@ -474,7 +486,7 @@ async fn test_stream_with_reconnect_retries_on_error() {
// Use short timeout - should get at least one retry and one success
let _ = tokio::time::timeout(Duration::from_secs(3), async {
client
- .stream_with_reconnect("test_rule", Some(&callback), |line| {
+ .stream_with_reconnect("test_rule", "default", Some(&callback), |line| {
let lines = Arc::clone(&lines_clone);
async move {
lines.lock().unwrap().push(line);
@@ -505,15 +517,15 @@ async fn test_stream_with_reconnect_retries_on_error() {
#[test]
fn test_log_reconnection_attempt_does_not_panic() {
// Just verify the function can be called without panic
- log_reconnection_attempt("test_rule", 0, Duration::from_secs(1));
- log_reconnection_attempt("test_rule", 5, Duration::from_secs(32));
- log_reconnection_attempt("test_rule", 10, Duration::from_secs(60));
+ log_reconnection_attempt("test_rule", "default", 0, Duration::from_secs(1));
+ log_reconnection_attempt("test_rule", "default", 5, Duration::from_secs(32));
+ log_reconnection_attempt("test_rule", "default", 10, Duration::from_secs(60));
}
#[test]
fn test_log_reconnection_success_does_not_panic() {
// Just verify the function can be called without panic
- log_reconnection_success("test_rule");
+ log_reconnection_success("test_rule", "default");
}
#[test]
@@ -521,10 +533,10 @@ fn test_reconnect_callback_trait() {
let callback = TestReconnectCallback::new();
assert_eq!(callback.reconnect_count(), 0);
- callback.on_reconnect("rule1");
+ callback.on_reconnect("rule1", "vlprod");
assert_eq!(callback.reconnect_count(), 1);
- callback.on_reconnect("rule2");
+ callback.on_reconnect("rule2", "vldev");
assert_eq!(callback.reconnect_count(), 2);
}
@@ -598,7 +610,10 @@ async fn test_basic_auth_header_is_sent() {
let config = create_config_with_basic_auth(&mock_server, "testuser", "testpass");
let mut client = TailClient::new(config).unwrap();
- let lines = client.connect_and_receive("test_rule").await.unwrap();
+ let lines = client
+ .connect_and_receive("test_rule", "default")
+ .await
+ .unwrap();
assert_eq!(lines.len(), 1);
assert!(lines[0].contains("authenticated"));
@@ -633,7 +648,10 @@ async fn test_custom_headers_are_sent() {
let config = create_config_with_headers(&mock_server, headers);
let mut client = TailClient::new(config).unwrap();
- let lines = client.connect_and_receive("test_rule").await.unwrap();
+ let lines = client
+ .connect_and_receive("test_rule", "default")
+ .await
+ .unwrap();
assert_eq!(lines.len(), 1);
assert!(lines[0].contains("headers received"));
@@ -663,7 +681,10 @@ async fn test_bearer_token_in_header() {
let config = create_config_with_headers(&mock_server, headers);
let mut client = TailClient::new(config).unwrap();
- let lines = client.connect_and_receive("test_rule").await.unwrap();
+ let lines = client
+ .connect_and_receive("test_rule", "default")
+ .await
+ .unwrap();
assert_eq!(lines.len(), 1);
assert!(lines[0].contains("bearer auth ok"));
@@ -706,7 +727,10 @@ async fn test_basic_auth_with_custom_headers_combined() {
let mut client = TailClient::new(config).unwrap();
- let lines = client.connect_and_receive("test_rule").await.unwrap();
+ let lines = client
+ .connect_and_receive("test_rule", "default")
+ .await
+ .unwrap();
assert_eq!(lines.len(), 1);
assert!(lines[0].contains("combined auth ok"));
@@ -734,7 +758,7 @@ async fn test_basic_auth_401_on_wrong_credentials() {
let config = create_config_with_basic_auth(&mock_server, "wrong_user", "wrong_pass");
let mut client = TailClient::new(config).unwrap();
- let result = client.connect_and_receive("test_rule").await;
+ let result = client.connect_and_receive("test_rule", "default").await;
assert!(result.is_err());
match result {
@@ -763,7 +787,10 @@ async fn test_without_auth_no_authorization_header() {
let config = create_config(&mock_server, "_stream:noauth");
let mut client = TailClient::new(config).unwrap();
- let lines = client.connect_and_receive("test_rule").await.unwrap();
+ let lines = client
+ .connect_and_receive("test_rule", "default")
+ .await
+ .unwrap();
assert_eq!(lines.len(), 1);
assert!(lines[0].contains("no auth"));
diff --git a/tests/integration_validate.rs b/tests/integration_validate.rs
index 70dab3f..2f994b2 100644
--- a/tests/integration_validate.rs
+++ b/tests/integration_validate.rs
@@ -57,8 +57,8 @@ fn validate_valid_config_exits_success() {
stdout
);
assert!(
- stdout.contains("VictoriaLogs URL"),
- "Output should show VictoriaLogs URL: {}",
+ stdout.contains("VictoriaLogs sources"),
+ "Output should show VictoriaLogs sources summary: {}",
stdout
);
assert!(
@@ -277,6 +277,87 @@ fn validate_mattermost_env_var_ignored() {
);
}
+// Test: shipped config/config.example.yaml passes --validate
+// Locks the canonical example against future schema drift.
+#[test]
+fn shipped_config_example_validates() {
+ ensure_binary_built();
+
+ let config_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
+ .join("config")
+ .join("config.example.yaml");
+
+ let output = Command::new(valerter_binary())
+ .args(["--validate", "-c"])
+ .arg(&config_path)
+ .output()
+ .expect("Failed to run valerter");
+
+ assert!(
+ output.status.success(),
+ "shipped config '{}' must pass --validate (exit 0)\nstdout: {}\nstderr: {}",
+ config_path.display(),
+ String::from_utf8_lossy(&output.stdout),
+ String::from_utf8_lossy(&output.stderr)
+ );
+}
+
+// Test: every shipped examples//config.yaml passes --validate
+// Iterates so new example folders are picked up automatically.
+// Sets placeholder env vars so examples that reference ${...} secrets validate
+// without hitting the network (--validate does not actually call any backend).
+#[test]
+fn shipped_examples_pass_validate() {
+ ensure_binary_built();
+
+ let examples_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("examples");
+
+ let entries = std::fs::read_dir(&examples_dir).expect("Failed to read examples/ directory");
+
+ let mut checked = 0usize;
+ for entry in entries {
+ let entry = entry.expect("Failed to read examples/ entry");
+ let path = entry.path();
+ if !path.is_dir() {
+ continue;
+ }
+ let config_path = path.join("config.yaml");
+ if !config_path.exists() {
+ continue;
+ }
+
+ let output = Command::new(valerter_binary())
+ .args(["--validate", "-c"])
+ .arg(&config_path)
+ // Placeholders for examples that reference ${VAR} secrets.
+ // --validate does not perform any network call, so values can be dummies.
+ .env("WEBHOOK_URL", "https://mattermost.example.com/hooks/dummy")
+ .env("VL_PROD_USER", "dummy_user")
+ .env("VL_PROD_PASS", "dummy_pass")
+ .env("VL_PROD_TOKEN", "dummy_token")
+ .env("SMTP_USER", "dummy_smtp_user")
+ .env("SMTP_PASSWORD", "dummy_smtp_pass")
+ .env("TELEGRAM_BOT_TOKEN", "dummy_bot_token")
+ .output()
+ .expect("Failed to run valerter");
+
+ assert!(
+ output.status.success(),
+ "shipped example '{}' must pass --validate (exit 0)\nstdout: {}\nstderr: {}",
+ config_path.display(),
+ String::from_utf8_lossy(&output.stdout),
+ String::from_utf8_lossy(&output.stderr)
+ );
+ checked += 1;
+ }
+
+ assert!(
+ checked >= 1,
+ "expected at least one examples//config.yaml to validate, found 0 in {}",
+ examples_dir.display()
+ );
+}
+
// Test: --validate with minimal config (README example) passes
#[test]
fn validate_minimal_config_exits_success() {
diff --git a/tests/metrics_snapshot.rs b/tests/metrics_snapshot.rs
new file mode 100644
index 0000000..b8b5003
--- /dev/null
+++ b/tests/metrics_snapshot.rs
@@ -0,0 +1,330 @@
+//! `/metrics` snapshot test for the multi-source observability work
+//! (v2.0.0 part 2).
+//!
+//! Spins up a 2-source 1-rule engine plus the real Prometheus metrics
+//! exporter on an ephemeral port, exercises every per-rule metric path
+//! once (alert sent, alert throttled, parse error, log matched), then
+//! scrapes `/metrics` and asserts the **set of metric names + label keys**
+//! against an inline expected list. Values and timestamps are intentionally
+//! ignored — the test catches accidental metric rename/relabel in future PRs
+//! without coupling to runtime numbers.
+//!
+//! ## Why a separate integration test binary
+//!
+//! `metrics-exporter-prometheus` installs a global recorder via
+//! `PrometheusBuilder::install()`, which can only run once per process. Each
+//! integration test binary gets its own process, so this file owns the
+//! recorder for its run and does not race with `src/metrics.rs` unit tests
+//! or other integration suites.
+
+use std::collections::BTreeMap;
+use std::collections::BTreeSet;
+use std::sync::Arc;
+use std::time::Duration;
+
+use serde_json::Value;
+use tokio::sync::broadcast;
+use tokio_util::sync::CancellationToken;
+use valerter::config::{
+ CompiledParser, CompiledRule, CompiledTemplate, DEFAULT_MAX_STREAMS, DefaultsConfig,
+ JsonParserConfig, MetricsConfig, NotifyConfig, RuntimeConfig, ThrottleConfig, VlSourceConfig,
+};
+use valerter::notify::{AlertPayload, NotificationQueue};
+use valerter::{MetricsServer, RuleEngine};
+use wiremock::matchers::{method, path};
+use wiremock::{Mock, MockServer, ResponseTemplate};
+
+/// Re-serialize a JSON value into NDJSON (one event + trailing newline).
+fn ndjson_body(events: &[&Value]) -> Vec {
+ let mut out = Vec::new();
+ for ev in events {
+ out.extend_from_slice(
+ serde_json::to_vec(ev)
+ .expect("fixture is valid JSON")
+ .as_slice(),
+ );
+ out.push(b'\n');
+ }
+ out
+}
+
+async fn mount_ndjson(server: &MockServer, events: &[&Value]) {
+ Mock::given(method("GET"))
+ .and(path("/select/logsql/tail"))
+ .respond_with(
+ ResponseTemplate::new(200).set_body_raw(ndjson_body(events), "application/x-ndjson"),
+ )
+ .mount(server)
+ .await;
+}
+
+fn rule(name: &str, vl_sources: Vec, throttle_count: u32) -> CompiledRule {
+ CompiledRule {
+ name: name.to_string(),
+ enabled: true,
+ query: "_stream:test".to_string(),
+ parser: CompiledParser {
+ regex: None,
+ json: Some(JsonParserConfig {
+ fields: vec!["_msg".to_string()],
+ }),
+ },
+ throttle: Some(valerter::config::CompiledThrottle {
+ key_template: None,
+ count: throttle_count,
+ window: Duration::from_secs(60),
+ }),
+ notify: NotifyConfig {
+ template: "tpl".to_string(),
+ mattermost_channel: None,
+ destinations: vec!["dest".to_string()],
+ },
+ vl_sources,
+ }
+}
+
+fn vl_source(uri: &str) -> VlSourceConfig {
+ VlSourceConfig {
+ url: uri.to_string(),
+ basic_auth: None,
+ headers: None,
+ tls: None,
+ }
+}
+
+fn runtime(sources: BTreeMap, rules: Vec) -> RuntimeConfig {
+ let mut templates = std::collections::HashMap::new();
+ templates.insert(
+ "tpl".to_string(),
+ CompiledTemplate {
+ title: "{{ rule_name }}@{{ vl_source }}".to_string(),
+ body: "{{ _msg }}".to_string(),
+ email_body_html: None,
+ accent_color: None,
+ },
+ );
+
+ RuntimeConfig {
+ victorialogs: sources,
+ defaults: DefaultsConfig {
+ throttle: ThrottleConfig {
+ key: None,
+ count: 5,
+ window: Duration::from_secs(60),
+ },
+ timestamp_timezone: "UTC".to_string(),
+ max_streams: DEFAULT_MAX_STREAMS,
+ },
+ templates,
+ rules,
+ metrics: MetricsConfig::default(),
+ notifiers: None,
+ config_dir: std::path::PathBuf::from("."),
+ }
+}
+
+async fn drain(rx: &mut broadcast::Receiver, max: usize, deadline: Duration) {
+ let mut got = 0;
+ let _ = tokio::time::timeout(deadline, async {
+ while got < max {
+ match rx.recv().await {
+ Ok(_) => got += 1,
+ Err(_) => break,
+ }
+ }
+ })
+ .await;
+}
+
+/// Parse a Prometheus exposition body and return the set of `name{labelkeys}`
+/// strings. Label *values* and metric *values* are stripped; only the metric
+/// identifier and the **sorted set of label keys** are kept. This is exactly
+/// what we want to catch accidental rename/relabel without coupling to
+/// counter values, timestamps, or how many label-value combinations exist.
+fn extract_name_label_keys(body: &str) -> BTreeSet {
+ let mut out = BTreeSet::new();
+ for line in body.lines() {
+ let line = line.trim();
+ if line.is_empty() || line.starts_with('#') {
+ continue;
+ }
+ // Extract name and (optional) labels block. Format: `name{k="v",...} value`
+ // or `name value`.
+ let (head, _) = match line.split_once(' ') {
+ Some(parts) => parts,
+ None => continue,
+ };
+ let (name, label_keys) = if let Some(brace) = head.find('{') {
+ let name = &head[..brace];
+ let labels_str = &head[brace + 1..head.len() - 1];
+ let mut keys: Vec<&str> = labels_str
+ .split(',')
+ .filter_map(|kv| kv.split_once('=').map(|(k, _)| k))
+ .collect();
+ keys.sort();
+ keys.dedup();
+ (name.to_string(), keys.join(","))
+ } else {
+ (head.to_string(), String::new())
+ };
+ if label_keys.is_empty() {
+ out.insert(name);
+ } else {
+ out.insert(format!("{}{{{}}}", name, label_keys));
+ }
+ }
+ out
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn metrics_snapshot_two_sources_one_rule() {
+ // 1) Mock 2 VL sources. Source `vlprod` serves a parseable event so the
+ // engine drives the throttle + alert path. Source `vldev` serves a
+ // line that fails JSON parsing, exercising the parse-error path.
+ let vlprod = MockServer::start().await;
+ let vldev = MockServer::start().await;
+
+ let good_event: Value = serde_json::json!({
+ "_time": "2026-04-15T10:00:00Z",
+ "_stream": "{}",
+ "_msg": "ok",
+ });
+ mount_ndjson(&vlprod, &[&good_event, &good_event, &good_event]).await;
+
+ // For vldev, serve raw garbage so the parser increments
+ // `valerter_parse_errors_total{rule_name, vl_source, error_type}`.
+ Mock::given(method("GET"))
+ .and(path("/select/logsql/tail"))
+ .respond_with(
+ ResponseTemplate::new(200)
+ .set_body_raw(b"this is not json\n".to_vec(), "application/x-ndjson"),
+ )
+ .mount(&vldev)
+ .await;
+
+ let mut sources = BTreeMap::new();
+ sources.insert("vlprod".to_string(), vl_source(&vlprod.uri()));
+ sources.insert("vldev".to_string(), vl_source(&vldev.uri()));
+
+ // Throttle count=1 on the only rule so the second event on `vlprod`
+ // also exercises the throttled path.
+ let rules = vec![rule("snapshot_rule", Vec::new(), 1)];
+ let cfg = runtime(sources, rules);
+
+ // 2) Boot the metrics server on an ephemeral port. The recorder install
+ // is a one-shot global; subsequent tests in this same binary cannot
+ // install it again, which is why this file is a dedicated integration
+ // test.
+ let port = portpicker::pick_unused_port().expect("free port");
+ let cancel = CancellationToken::new();
+ let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
+ let metrics_cancel = cancel.clone();
+ let metrics_handle = tokio::spawn(async move {
+ let server = MetricsServer::with_ready_signal(port, ready_tx);
+ let _ = server.run(metrics_cancel).await;
+ });
+ ready_rx.await.expect("metrics server should signal ready");
+
+ // 3) Initialize all known metric series so the snapshot is deterministic
+ // even before counters tick. Mirrors the call valerter's main does.
+ let rule_source_pairs: Vec<(&str, &str)> =
+ vec![("snapshot_rule", "vlprod"), ("snapshot_rule", "vldev")];
+ let source_names: Vec<&str> = vec!["vlprod", "vldev"];
+ // Pass at least one notifier so the per-notifier sentinel counters
+ // (`alerts_failed_total{notifier}` / `notify_errors_total{notifier}`)
+ // are seeded and the snapshot can assert their presence.
+ let notifier_names: Vec<&str> = vec!["sentinel"];
+ valerter::initialize_metrics(&rule_source_pairs, &source_names, ¬ifier_names);
+
+ // 4) Run the engine briefly so each metric path fires at least once.
+ let queue = NotificationQueue::new(64);
+ let mut rx = queue.subscribe();
+ let engine = RuleEngine::new(cfg, reqwest::Client::new(), queue.clone());
+ let cancel_for_engine = cancel.clone();
+ let engine_handle = tokio::spawn(async move { engine.run(cancel_for_engine).await });
+
+ // Drain a few alerts to make sure the throttle/passed/sent paths run.
+ drain(&mut rx, 5, Duration::from_secs(2)).await;
+
+ // 5) Scrape /metrics.
+ let url = format!("http://127.0.0.1:{}/metrics", port);
+ let body = reqwest::Client::new()
+ .get(&url)
+ .send()
+ .await
+ .expect("scrape should succeed")
+ .text()
+ .await
+ .expect("body should decode");
+
+ // 6) Tear down. The engine task runs forever until cancelled.
+ cancel.cancel();
+ let _ = tokio::time::timeout(Duration::from_secs(2), engine_handle).await;
+ let _ = tokio::time::timeout(Duration::from_secs(1), metrics_handle).await;
+
+ // 7) Assert the snapshot. We check that *every expected* metric series
+ // (name + label-key set) is present. The actual output may carry
+ // additional series from per-(rule, source) initialization that we
+ // explicitly seeded, so we tolerate supersets.
+ let actual = extract_name_label_keys(&body);
+
+ // Inline expected snapshot. Sorted alphabetically for stable diffs.
+ // Each entry is `metric_name{label_keys_csv_sorted}` or just
+ // `metric_name` when unlabeled.
+ let expected: Arc<[&'static str]> = Arc::from([
+ // Per-(rule, source) counters seeded by initialize_metrics.
+ "valerter_alerts_passed_total{rule_name,vl_source}",
+ "valerter_alerts_sent_total{rule_name,vl_source}",
+ "valerter_alerts_throttled_total{rule_name,vl_source}",
+ "valerter_logs_matched_total{rule_name,vl_source}",
+ "valerter_parse_errors_total{rule_name,vl_source}",
+ "valerter_reconnections_total{rule_name,vl_source}",
+ "valerter_rule_errors_total{rule_name,vl_source}",
+ "valerter_rule_panics_total{rule_name,vl_source}",
+ // Per-(rule, source) discarded counter (3-label, reason="oversized").
+ "valerter_lines_discarded_total{reason,rule_name,vl_source}",
+ // Per-(rule, source) gauge for last query timestamp.
+ "valerter_last_query_timestamp{rule_name,vl_source}",
+ // Per-(rule, source) histogram exported as a Prometheus summary by
+ // metrics-exporter-prometheus: emits the metric with `quantile` label
+ // plus `_sum` and `_count` companion series.
+ "valerter_query_duration_seconds{quantile,rule_name,vl_source}",
+ "valerter_query_duration_seconds_count{rule_name,vl_source}",
+ "valerter_query_duration_seconds_sum{rule_name,vl_source}",
+ // Per-notifier sentinel counters.
+ "valerter_alerts_failed_total{notifier}",
+ "valerter_notify_errors_total{notifier}",
+ // Global / shared counters & gauges.
+ "valerter_alerts_dropped_total",
+ "valerter_queue_size",
+ "valerter_uptime_seconds",
+ // Per-source reachability gauge (replaces the old per-rule
+ // valerter_victorialogs_up).
+ "valerter_vl_source_up{vl_source}",
+ // Build info carries only the version label.
+ "valerter_build_info{version}",
+ ]);
+
+ let mut missing: Vec = Vec::new();
+ for want in expected.iter() {
+ if !actual.contains(*want) {
+ missing.push((*want).to_string());
+ }
+ }
+ assert!(
+ missing.is_empty(),
+ "metrics snapshot missing expected series:\n missing = {:#?}\n\n actual = {:#?}\n\n raw body =\n{}",
+ missing,
+ actual,
+ body
+ );
+
+ // Hard regression: the v1.x per-rule gauge MUST be gone in v2.0.0.
+ assert!(
+ !actual
+ .iter()
+ .any(|s| s.starts_with("valerter_victorialogs_up")),
+ "valerter_victorialogs_up must be removed in v2.0.0 (replaced by valerter_vl_source_up). Found in /metrics:\n{}",
+ body
+ );
+}
diff --git a/tests/multi_source_integration.rs b/tests/multi_source_integration.rs
new file mode 100644
index 0000000..e964532
--- /dev/null
+++ b/tests/multi_source_integration.rs
@@ -0,0 +1,393 @@
+//! End-to-end integration test for the v2.0.0 multi-source VL core.
+//!
+//! Spins up **two** wiremock `MockServer` instances to stand in for two
+//! VictoriaLogs sources (`vlprod`, `vldev`), serves distinct fixture events
+//! on each, runs `RuleEngine` against the pair, and inspects the alert
+//! payloads arriving on the notification queue to prove:
+//!
+//! 1. A rule with `vl_sources: [vlprod]` spawns exactly one task and its
+//! payloads carry `vl_source == "vlprod"`.
+//! 2. A rule with empty `vl_sources` fans out across every source and
+//! payloads arrive tagged with each source name.
+//! 3. The synthetic `vl_source` field is rendered in template output
+//! (layer 1) and survives in `AlertPayload` for layer 2 notifiers.
+//! 4. Per-source default throttle buckets are isolated: two sources sending
+//! identical events both pass through on first delivery rather than the
+//! second being dropped as a duplicate.
+//!
+//! The fixture corpus from `tests/fixtures/vl_events/` (chore/vl-fixtures-corpus)
+//! is consumed via `common::vl_events::load_fixture`.
+
+mod common;
+
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use serde_json::Value;
+use tokio::sync::broadcast;
+use tokio_util::sync::CancellationToken;
+use valerter::config::{
+ CompiledParser, CompiledRule, CompiledTemplate, DefaultsConfig, JsonParserConfig,
+ MetricsConfig, NotifyConfig, RuntimeConfig, ThrottleConfig, VlSourceConfig,
+};
+use valerter::notify::{AlertPayload, NotificationQueue};
+use valerter::{RuleEngine, TemplateEngine};
+use wiremock::matchers::{method, path};
+use wiremock::{Mock, MockServer, ResponseTemplate};
+
+use common::vl_events::load_fixture;
+
+/// Re-serialize a JSON fixture (which in the corpus is a single object) into
+/// an NDJSON response body — one JSON object followed by a trailing newline.
+fn ndjson_body(events: &[&Value]) -> Vec {
+ let mut out = Vec::new();
+ for ev in events {
+ out.extend_from_slice(
+ serde_json::to_vec(ev)
+ .expect("fixture is valid JSON")
+ .as_slice(),
+ );
+ out.push(b'\n');
+ }
+ out
+}
+
+/// Build an NDJSON mock that replies once with the given events.
+async fn mount_ndjson(server: &MockServer, events: &[&Value]) {
+ Mock::given(method("GET"))
+ .and(path("/select/logsql/tail"))
+ .respond_with(
+ ResponseTemplate::new(200).set_body_raw(ndjson_body(events), "application/x-ndjson"),
+ )
+ .mount(server)
+ .await;
+}
+
+fn rule(name: &str, vl_sources: Vec) -> CompiledRule {
+ CompiledRule {
+ name: name.to_string(),
+ enabled: true,
+ query: "_stream:test".to_string(),
+ parser: CompiledParser {
+ regex: None,
+ json: Some(JsonParserConfig {
+ fields: vec!["_msg".to_string()],
+ }),
+ },
+ throttle: None,
+ notify: NotifyConfig {
+ template: "tpl".to_string(),
+ mattermost_channel: None,
+ destinations: vec!["dest".to_string()],
+ },
+ vl_sources,
+ }
+}
+
+fn vl_source(uri: &str) -> VlSourceConfig {
+ VlSourceConfig {
+ url: uri.to_string(),
+ basic_auth: None,
+ headers: None,
+ tls: None,
+ }
+}
+
+fn runtime(sources: BTreeMap, rules: Vec) -> RuntimeConfig {
+ let mut templates = std::collections::HashMap::new();
+ templates.insert(
+ "tpl".to_string(),
+ CompiledTemplate {
+ title: "[{{ vl_source }}] {{ rule_name }}".to_string(),
+ body: "source={{ vl_source }} msg={{ _msg }}".to_string(),
+ email_body_html: None,
+ accent_color: None,
+ },
+ );
+
+ RuntimeConfig {
+ victorialogs: sources,
+ defaults: DefaultsConfig {
+ throttle: ThrottleConfig {
+ key: None,
+ count: 5,
+ window: Duration::from_secs(60),
+ },
+ timestamp_timezone: "UTC".to_string(),
+ max_streams: valerter::config::DEFAULT_MAX_STREAMS,
+ },
+ templates,
+ rules,
+ metrics: MetricsConfig::default(),
+ notifiers: None,
+ config_dir: std::path::PathBuf::from("."),
+ }
+}
+
+/// Collect up to `max` alerts from a pre-created receiver within `deadline`,
+/// returning whatever arrived. Using a receiver created BEFORE the engine
+/// spawns avoids the broadcast channel's "messages before subscribe are lost"
+/// behaviour (see tokio::sync::broadcast docs).
+async fn drain_from(
+ rx: &mut broadcast::Receiver,
+ max: usize,
+ deadline: Duration,
+) -> Vec {
+ let mut out = Vec::new();
+ let _ = tokio::time::timeout(deadline, async {
+ while out.len() < max {
+ match rx.recv().await {
+ Ok(p) => out.push(p),
+ Err(_) => break,
+ }
+ }
+ })
+ .await;
+ out
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn multi_source_rule_with_vl_sources_list_targets_single_source() {
+ let vlprod = MockServer::start().await;
+ let vldev = MockServer::start().await;
+
+ let ev_prod = load_fixture("nginx_http_400.json");
+ let ev_dev = load_fixture("nginx_http_500.json");
+
+ mount_ndjson(&vlprod, &[&ev_prod]).await;
+ mount_ndjson(&vldev, &[&ev_dev]).await;
+
+ let mut sources = BTreeMap::new();
+ sources.insert("vlprod".to_string(), vl_source(&vlprod.uri()));
+ sources.insert("vldev".to_string(), vl_source(&vldev.uri()));
+
+ // Rule pinned to vlprod only. vldev's stream must never produce an alert
+ // via this rule.
+ let rules = vec![rule("prod_only", vec!["vlprod".to_string()])];
+
+ let queue = NotificationQueue::new(64);
+ // Subscribe BEFORE spawning the engine so no payloads are lost.
+ let mut rx = queue.subscribe();
+
+ let cfg = runtime(sources, rules);
+ let engine = RuleEngine::new(cfg, reqwest::Client::new(), queue.clone());
+
+ let cancel = CancellationToken::new();
+ let cancel_clone = cancel.clone();
+ let handle = tokio::spawn(async move { engine.run(cancel_clone).await });
+
+ let alerts = drain_from(&mut rx, 1, Duration::from_secs(3)).await;
+ // Negative-evidence proof: vldev MUST NOT have served any tail request,
+ // since the only rule is pinned to vlprod. Catches regressions where the
+ // resolve_sources filter is bypassed and the rule fans out anyway.
+ let vldev_hits = vldev.received_requests().await.unwrap_or_default();
+ cancel.cancel();
+ let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
+
+ assert!(
+ !alerts.is_empty(),
+ "rule pinned to vlprod should have produced at least one alert"
+ );
+ for a in &alerts {
+ assert_eq!(
+ a.vl_source, "vlprod",
+ "payload carried wrong vl_source: {}",
+ a.vl_source
+ );
+ assert!(
+ a.message.title.contains("vlprod"),
+ "layer 1 title must contain rendered vl_source, got: {}",
+ a.message.title
+ );
+ }
+ assert!(
+ vldev_hits.is_empty(),
+ "vldev served {} request(s) for a rule pinned to vlprod (negative-evidence assertion)",
+ vldev_hits.len()
+ );
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn multi_source_rule_without_vl_sources_fans_out_across_all() {
+ let vlprod = MockServer::start().await;
+ let vldev = MockServer::start().await;
+
+ let ev = load_fixture("k8s_pod_oom.json");
+ mount_ndjson(&vlprod, &[&ev]).await;
+ mount_ndjson(&vldev, &[&ev]).await;
+
+ let mut sources = BTreeMap::new();
+ sources.insert("vlprod".to_string(), vl_source(&vlprod.uri()));
+ sources.insert("vldev".to_string(), vl_source(&vldev.uri()));
+
+ // vl_sources empty = fan out across all sources.
+ let rules = vec![rule("fan_out", Vec::new())];
+
+ let queue = NotificationQueue::new(64);
+ let mut rx = queue.subscribe();
+ let cfg = runtime(sources, rules);
+ let engine = RuleEngine::new(cfg, reqwest::Client::new(), queue.clone());
+
+ let cancel = CancellationToken::new();
+ let cancel_clone = cancel.clone();
+ let handle = tokio::spawn(async move { engine.run(cancel_clone).await });
+
+ // Drain a fixed time window with a high `max` so we don't exit before
+ // the slower of the two parallel source tasks delivers its first alert.
+ let alerts = drain_from(&mut rx, 200, Duration::from_secs(2)).await;
+ cancel.cancel();
+ let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
+
+ let mut sources_seen: std::collections::HashSet = Default::default();
+ for a in &alerts {
+ sources_seen.insert(a.vl_source.clone());
+ }
+
+ assert!(
+ sources_seen.contains("vlprod"),
+ "expected vlprod alert, got sources: {:?}",
+ sources_seen
+ );
+ assert!(
+ sources_seen.contains("vldev"),
+ "expected vldev alert, got sources: {:?}",
+ sources_seen
+ );
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn multi_source_default_throttle_buckets_are_isolated_per_source() {
+ // The same event delivered twice on two different sources must not be
+ // deduped as a single bucket when the rule uses the default throttle
+ // (no custom key). The default key is `{rule}-{source}:global`, so
+ // each source has its own bucket and both alerts should land.
+ let vlprod = MockServer::start().await;
+ let vldev = MockServer::start().await;
+
+ // Low count (=1) would cause a cross-source collision under v1 default.
+ let ev = load_fixture("nginx_http_500.json");
+ mount_ndjson(&vlprod, &[&ev]).await;
+ mount_ndjson(&vldev, &[&ev]).await;
+
+ let mut sources = BTreeMap::new();
+ sources.insert("vlprod".to_string(), vl_source(&vlprod.uri()));
+ sources.insert("vldev".to_string(), vl_source(&vldev.uri()));
+
+ // Tight throttle count=1: if buckets were shared the second source
+ // would be blocked.
+ let mut cfg = runtime(sources, vec![rule("isolate", Vec::new())]);
+ cfg.defaults.throttle.count = 1;
+
+ let queue = NotificationQueue::new(64);
+ let mut rx = queue.subscribe();
+ let engine = RuleEngine::new(cfg, reqwest::Client::new(), queue.clone());
+
+ let cancel = CancellationToken::new();
+ let cancel_clone = cancel.clone();
+ let handle = tokio::spawn(async move { engine.run(cancel_clone).await });
+
+ // Drain for a fixed window long enough for both sources' first stream
+ // to land. With throttle count=1 each (rule, source) bucket allows only
+ // one alert through, but both buckets are independent so both deliver.
+ // Use a high `max` so we don't exit early before both sources land.
+ let alerts = drain_from(&mut rx, 100, Duration::from_secs(2)).await;
+ cancel.cancel();
+ let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
+
+ let sources_seen: std::collections::HashSet =
+ alerts.iter().map(|a| a.vl_source.clone()).collect();
+
+ assert!(
+ sources_seen.contains("vlprod") && sources_seen.contains("vldev"),
+ "per-source default throttle buckets must be isolated; saw: {:?} (alerts: {})",
+ sources_seen,
+ alerts.len()
+ );
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn multi_source_event_field_named_vl_source_is_masked_by_synthetic() {
+ // If an event literally carries a `vl_source` field, the synthetic
+ // value wins in layer 1 and in the AlertPayload (collision policy
+ // matches rule_name, v1.2.1).
+ let server = MockServer::start().await;
+
+ // Build an event with a hostile literal vl_source value. Use a minimal
+ // VL shape: _time, _stream, _msg, plus the collision field.
+ let hostile: Value = serde_json::json!({
+ "_time": "2026-04-15T10:00:00Z",
+ "_stream": "{}",
+ "_msg": "hostile",
+ "vl_source": "evil"
+ });
+ mount_ndjson(&server, &[&hostile]).await;
+
+ let mut sources = BTreeMap::new();
+ sources.insert("real_source".to_string(), vl_source(&server.uri()));
+
+ let rules = vec![rule("collision", Vec::new())];
+
+ let queue = NotificationQueue::new(64);
+ let mut rx = queue.subscribe();
+ let engine = RuleEngine::new(
+ runtime(sources, rules),
+ reqwest::Client::new(),
+ queue.clone(),
+ );
+
+ let cancel = CancellationToken::new();
+ let cancel_clone = cancel.clone();
+ let handle = tokio::spawn(async move { engine.run(cancel_clone).await });
+
+ let alerts = drain_from(&mut rx, 1, Duration::from_secs(3)).await;
+ cancel.cancel();
+ let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
+
+ assert!(!alerts.is_empty(), "expected at least one alert");
+ for a in &alerts {
+ assert_eq!(
+ a.vl_source, "real_source",
+ "AlertPayload.vl_source must be synthetic, not event-literal 'evil'"
+ );
+ assert!(
+ a.message.title.contains("real_source"),
+ "layer 1 title must show synthetic vl_source, got: {}",
+ a.message.title
+ );
+ assert!(
+ !a.message.title.contains("evil"),
+ "title must not leak event-literal 'evil' into rendered output: {}",
+ a.message.title
+ );
+ }
+}
+
+#[tokio::test]
+async fn template_engine_renders_vl_source_directly_without_http() {
+ // Smoke test the template-level contract independently of the engine
+ // so that if the integration tests above time out in CI under load we
+ // still have a fast unit-level guarantee that vl_source threads through
+ // layer 1. This also doubles as an assertion that the fixture corpus is
+ // consumable from template rendering (guards against future shape drift).
+ let mut templates = std::collections::HashMap::new();
+ templates.insert(
+ "tpl".to_string(),
+ CompiledTemplate {
+ title: "[{{ vl_source }}] {{ rule_name }} {{ _msg }}".to_string(),
+ body: "b".to_string(),
+ email_body_html: None,
+ accent_color: None,
+ },
+ );
+ let engine = Arc::new(TemplateEngine::new(templates));
+
+ let ev = load_fixture("nginx_http_400.json");
+ let rendered = engine.render_with_fallback("tpl", &ev, "nginx_rule", "vlprod");
+
+ assert!(
+ rendered.title.starts_with("[vlprod] nginx_rule "),
+ "expected title to include synthetic vl_source, got: {}",
+ rendered.title
+ );
+}
diff --git a/tests/smtp_integration.rs b/tests/smtp_integration.rs
index fd25937..d35c7c3 100644
--- a/tests/smtp_integration.rs
+++ b/tests/smtp_integration.rs
@@ -217,6 +217,7 @@ fn make_alert_payload(rule_name: &str, title: &str, body: &str) -> AlertPayload
accent_color: Some("#ff0000".to_string()),
},
rule_name: rule_name.to_string(),
+ vl_source: "vlprod".to_string(),
destinations: vec![],
log_timestamp: "2026-01-15T10:49:35.799Z".to_string(),
log_timestamp_formatted: "15/01/2026 10:49:35 UTC".to_string(),
@@ -404,6 +405,7 @@ async fn test_send_email_html_format() {
accent_color: Some("#ff0000".to_string()),
},
rule_name: "html_rule".to_string(),
+ vl_source: "vlprod".to_string(),
destinations: vec![],
log_timestamp: "2026-01-15T10:49:35.799Z".to_string(),
log_timestamp_formatted: "15/01/2026 10:49:35 UTC".to_string(),
diff --git a/tests/template_against_vl_fixtures.rs b/tests/template_against_vl_fixtures.rs
new file mode 100644
index 0000000..a6a846a
--- /dev/null
+++ b/tests/template_against_vl_fixtures.rs
@@ -0,0 +1,160 @@
+//! Parameterised template × fixture integration tests.
+//!
+//! Demonstrates how the corpus plugs into the `TemplateEngine` and catches
+//! regressions that motivated the chore:
+//! - #25: `{{ nginx.http.request_id }}` against flat-dotted keys.
+//! - empty-field rendering against `edge_empty_fields.json`.
+//!
+//! The existing inline-event tests (e.g. `integration_notify.rs`) are left
+//! untouched; this file is purely additive.
+
+use std::collections::HashMap;
+
+use valerter::config::CompiledTemplate;
+use valerter::template::TemplateEngine;
+
+mod common;
+
+use common::vl_events::{
+ all_fixtures, load_fixture, load_fixtures_by_source_system, load_fixtures_by_tag,
+};
+
+/// Build a single-template engine wired to `{{ title_tpl }}` / `{{ body_tpl }}`.
+fn engine_with(title_tpl: &str, body_tpl: &str) -> TemplateEngine {
+ let mut templates = HashMap::new();
+ templates.insert(
+ "t".to_string(),
+ CompiledTemplate {
+ title: title_tpl.to_string(),
+ body: body_tpl.to_string(),
+ email_body_html: None,
+ accent_color: None,
+ },
+ );
+ TemplateEngine::new(templates)
+}
+
+#[test]
+fn smoke_every_fixture_renders_msg_and_time_matching_event() {
+ // For every fixture, render `{{ _msg }} @ {{ _time }}` and assert the
+ // output matches what the event literally carries. Stronger than a bare
+ // `!is_empty()` assertion: catches silent drift if templating or unflat-
+ // tening starts mangling top-level fields.
+ let engine = engine_with("{{ _msg }}", "{{ _msg }} @ {{ _time }}");
+ for (name, value) in all_fixtures() {
+ let msg = value
+ .as_object()
+ .and_then(|o| o.get("_msg"))
+ .and_then(|v| v.as_str())
+ .unwrap_or_else(|| panic!("fixture {} missing string _msg", name));
+ let time = value
+ .as_object()
+ .and_then(|o| o.get("_time"))
+ .and_then(|v| v.as_str())
+ .unwrap_or_else(|| panic!("fixture {} missing string _time", name));
+ let expected = format!("{} @ {}", msg, time);
+ let rendered = engine
+ .render("t", &value, "smoke", "vlprod")
+ .unwrap_or_else(|e| panic!("fixture {} failed to render: {}", name, e));
+ assert_eq!(
+ rendered.body, expected,
+ "fixture {} rendered body did not match the event's _msg/_time",
+ name
+ );
+ assert_eq!(
+ rendered.title, msg,
+ "fixture {} rendered title drifted",
+ name
+ );
+ }
+}
+
+#[test]
+fn regression_gh25_dotted_keys_render_their_value() {
+ // Regression guard for #25: `{{ nginx.http.request_id }}` must render
+ // the flat-dotted value from the event. Before the fix, minijinja
+ // treated `nginx.http.request_id` as nested attribute lookup and the
+ // expression resolved to empty under Lenient undefined behaviour.
+ let engine = engine_with("req", "{{ nginx.http.request_id }}");
+ let hits = load_fixtures_by_tag("dotted_keys");
+ assert!(
+ !hits.is_empty(),
+ "expected at least one `dotted_keys` fixture for #25 regression"
+ );
+ let mut checked_any = false;
+ for (name, value) in hits {
+ // Skip fixtures that don't carry the specific dotted key we test.
+ let expected = value
+ .as_object()
+ .and_then(|o| o.get("nginx.http.request_id"))
+ .and_then(|v| v.as_str());
+ let Some(expected) = expected else {
+ continue;
+ };
+ let rendered = engine
+ .render("t", &value, "gh25", "vlprod")
+ .unwrap_or_else(|e| panic!("fixture {} failed to render for #25: {}", name, e));
+ assert_eq!(
+ rendered.body, expected,
+ "fixture {} did not surface nginx.http.request_id via template",
+ name
+ );
+ checked_any = true;
+ }
+ assert!(
+ checked_any,
+ "no dotted_keys fixture carried `nginx.http.request_id`; add one or \
+ retag the existing nginx fixtures"
+ );
+}
+
+#[test]
+fn regression_empty_fields_render_as_empty_string() {
+ // `edge_empty_fields.json` carries `request_id: ""`. Templates that
+ // reference these must render to empty string (not fail, not produce
+ // `"None"`, etc.). This is the contract the empty-guard in #26 relies on.
+ let engine = engine_with("t", "[{{ request_id }}][{{ user_id }}][{{ error }}]");
+ let event = load_fixture("edge_empty_fields.json");
+ let rendered = engine
+ .render("t", &event, "empty_fields", "vlprod")
+ .expect("render should succeed with lenient undefined");
+ assert_eq!(rendered.body, "[][][]");
+}
+
+#[test]
+fn raw_source_fixtures_render_missing_as_empty_under_lenient() {
+ // Fixtures whose manifest source_system is `raw` carry no structured
+ // fields beyond the VL envelope. Rendering `{{ _msg }}` must still
+ // work, and rendering `{{ missing_field }}` must yield empty under
+ // Lenient (not error). This protects against accidental Strict flips.
+ let engine = engine_with("{{ _msg }}", "{{ unknown_field_that_does_not_exist }}");
+ let fixtures = load_fixtures_by_source_system("raw");
+ assert!(
+ !fixtures.is_empty(),
+ "expected at least one fixture with source_system: raw"
+ );
+ for (name, value) in fixtures {
+ let rendered = engine
+ .render("t", &value, "raw", "vlprod")
+ .unwrap_or_else(|e| panic!("fixture {} failed lenient render: {}", name, e));
+ assert!(!rendered.title.is_empty(), "title empty for {}", name);
+ assert!(
+ rendered.body.is_empty(),
+ "body should be empty string for missing field (fixture {})",
+ name
+ );
+ }
+}
+
+#[test]
+fn unicode_fixture_preserves_cjk_and_emoji() {
+ // Render the unicode fixture via `{{ _msg }}` and check the raw bytes
+ // survive. This catches encoding bugs in the template pipeline.
+ let engine = engine_with("{{ _msg }}", "{{ _msg }}");
+ let event = load_fixture("edge_unicode_msg.json");
+ let rendered = engine
+ .render("t", &event, "unicode", "vlprod")
+ .expect("render should succeed on unicode event");
+ assert!(rendered.body.contains("支付失败"), "lost CJK codepoints");
+ assert!(rendered.body.contains('\u{2705}'), "lost emoji codepoint");
+}
diff --git a/tests/vl_fixtures_consistency.rs b/tests/vl_fixtures_consistency.rs
new file mode 100644
index 0000000..02f9674
--- /dev/null
+++ b/tests/vl_fixtures_consistency.rs
@@ -0,0 +1,164 @@
+//! Consistency checks between `tests/fixtures/vl_events/index.yaml` and the
+//! filesystem. This is the gate that keeps the corpus honest: no orphaned
+//! files, no phantom entries, every fixture parses, every fixture carries the
+//! VL-mandatory fields.
+
+use std::collections::BTreeSet;
+
+mod common;
+
+use common::vl_events::{
+ all_fixtures, filesystem_fixtures, load_fixtures_by_tag, manifest_entries,
+};
+
+/// Required top-level fields on every `/select/logsql/tail` event.
+const REQUIRED_FIELDS: &[&str] = &["_msg", "_time", "_stream"];
+
+#[test]
+fn corpus_contains_at_least_fifteen_fixtures() {
+ let count = manifest_entries().len();
+ assert!(
+ count >= 15,
+ "spec requires at least 15 fixtures, found {}",
+ count
+ );
+}
+
+#[test]
+fn index_and_filesystem_agree() {
+ let index_names: BTreeSet = manifest_entries().keys().cloned().collect();
+ let disk_names: BTreeSet = filesystem_fixtures().into_iter().collect();
+
+ let orphans: Vec<&String> = disk_names.difference(&index_names).collect();
+ let phantoms: Vec<&String> = index_names.difference(&disk_names).collect();
+
+ assert!(
+ orphans.is_empty(),
+ "orphan fixtures on disk without an index.yaml entry: {:?}. \
+ Either add them to tests/fixtures/vl_events/index.yaml or delete them.",
+ orphans
+ );
+ assert!(
+ phantoms.is_empty(),
+ "phantom fixtures declared in index.yaml but missing on disk: {:?}. \
+ Either create the file or remove the entry from index.yaml.",
+ phantoms
+ );
+}
+
+#[test]
+fn every_fixture_parses_as_json() {
+ // `all_fixtures()` already panics on parse error; this test just exercises
+ // every file so CI surfaces the panic with a clear path.
+ let loaded = all_fixtures();
+ assert!(!loaded.is_empty(), "no fixtures discovered");
+ for (name, value) in &loaded {
+ assert!(
+ value.is_object(),
+ "fixture {} parsed but is not a JSON object (got {})",
+ name,
+ match value {
+ serde_json::Value::Null => "null",
+ serde_json::Value::Bool(_) => "bool",
+ serde_json::Value::Number(_) => "number",
+ serde_json::Value::String(_) => "string",
+ serde_json::Value::Array(_) => "array",
+ serde_json::Value::Object(_) => "object",
+ }
+ );
+ }
+}
+
+#[test]
+fn every_fixture_is_single_line_json() {
+ // VL's /select/logsql/tail emits one event per line; the corpus must
+ // mirror that shape so consumers can copy-paste fixtures into mocks.
+ let dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/vl_events");
+ for name in filesystem_fixtures() {
+ let path = dir.join(&name);
+ let raw = std::fs::read_to_string(&path)
+ .unwrap_or_else(|e| panic!("failed to read {}: {}", path.display(), e));
+ // Allow a single trailing newline (POSIX convention) but no embedded
+ // newlines: the JSON payload itself must be on one line.
+ let trimmed = raw.strip_suffix('\n').unwrap_or(&raw);
+ assert!(
+ !trimmed.contains('\n'),
+ "fixture {} spans multiple lines. VL tail output is one JSON per \
+ line; fixtures must match.",
+ name
+ );
+ }
+}
+
+#[test]
+fn every_fixture_has_required_vl_fields() {
+ // Self-contained: do not assume parses_as_json ran first. `cargo test`
+ // parallelises tests by default, so cross-test dependencies via panic
+ // messages are unreliable.
+ for (name, value) in all_fixtures() {
+ let obj = value.as_object().unwrap_or_else(|| {
+ panic!(
+ "fixture {} is not a top-level JSON object (see also parses_as_json test)",
+ name
+ )
+ });
+ for field in REQUIRED_FIELDS {
+ assert!(
+ obj.contains_key(*field),
+ "fixture {} is missing required VL field `{}`",
+ name,
+ field
+ );
+ }
+ // _msg must be a non-empty string so the smoke test can render it.
+ let msg = obj
+ .get("_msg")
+ .and_then(|v| v.as_str())
+ .unwrap_or_else(|| panic!("fixture {} has non-string _msg", name));
+ assert!(
+ !msg.is_empty(),
+ "fixture {} has empty _msg (not useful for template smoke tests)",
+ name
+ );
+ }
+}
+
+#[test]
+fn every_entry_has_description_source_and_tags() {
+ for (name, entry) in manifest_entries() {
+ assert!(
+ !entry.description.trim().is_empty(),
+ "fixture {} has an empty description",
+ name
+ );
+ assert!(
+ !entry.source_system.trim().is_empty(),
+ "fixture {} has an empty source_system",
+ name
+ );
+ assert!(
+ !entry.tags.is_empty(),
+ "fixture {} has no tags (require at least one for discoverability)",
+ name
+ );
+ }
+}
+
+#[test]
+fn tag_lookup_returns_empty_for_unknown() {
+ // Contract from the spec: unknown tags yield an empty Vec, caller asserts.
+ let hits = load_fixtures_by_tag("this_tag_does_not_exist_anywhere_xyz");
+ assert!(hits.is_empty(), "expected empty result for unknown tag");
+}
+
+#[test]
+fn dotted_keys_tag_is_populated() {
+ // The corpus exists primarily to catch regressions like #25. If this
+ // tag ever ends up empty, the smoke test below becomes a no-op.
+ let hits = load_fixtures_by_tag("dotted_keys");
+ assert!(
+ !hits.is_empty(),
+ "no fixtures tagged `dotted_keys` — the regression harness for #25 \
+ would silently skip"
+ );
+}